リクエスト/レスポンスエンティティのストリーミングにおける性質

リクエスト/レスポンスエンティティのストリーミングにおける性質

Akka HTTPは全てストリームで通じています。これは、Akka Streamによって可能になったバックプレッシャー制御が、TCPレイヤからHTTPサーバ、そして、ユーザーが直接触れる「HttpRequest」、「HttpResponse」及び「HttpEntity」まで全てのレイヤーに対して公開されていることを意味しています。

もしあなたがノンストリーミング/ノンリアクティブなHTTPクライアントを慣れているのであれば、このことは驚くべき性質でしょう。具体的には、「HTTPエンティティが消費されないと、接続元へバックプレッシャーとして通知される」ということを意味します。これは単一のエンティティを消費するだけで、背圧の高いサーバやクライアントがアプリケーションを圧倒して、メモリ内のエンティティの不必要なバッファリングを引き起こす可能性がある機能です。

警告

リクエスト内のエンティティを消費(もしくは破棄)することは必須です。もし誤って消費も破棄もされない場合、Akka HTTPは入力データが背圧制御されるべきと仮定して、TCPのバックプレッシャー機構によって、入力データをストールさせます。クライアントは、HttpResponseのステータスに関わらずエンティティを消費しなければなりません。

クライアント側におけるHTTPエンティティのストリーム処理

HTTPレスポンスエンティティの消費(クライアント)

最も一般的なユースケースは勿論、``dataBytes``を持つSourceを元にしたレスポンスエンティティの消費です。これは、dataBytesを実行するのと同じくらい(もしくは、サーバーサイドで``BasicDirectives.extractDataBytes``ディレクティブを使うのと同じくらい)シンプルです。

インフラストラクチャを最大限に活用するには、さまざまなストリーミングテクニックを使用することをお勧めします。例えば、着信したチャンクをフレーミングし、ラインごとに解析し、ファイルや他のAkka Streamsのコネクタなどを宛先としたSinkに繋ぎこみます。

import java.io.File;
import akka.actor.ActorSystem;

import java.util.concurrent.TimeUnit;
import java.util.function.Function; 
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Framing;
import akka.http.javadsl.model.*;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Try;

    final ActorSystem system = ActorSystem.create();
    final ExecutionContextExecutor dispatcher = system.dispatcher();
    final ActorMaterializer materializer = ActorMaterializer.create(system);

    final HttpResponse response = responseFromSomewhere();

    final Function<ByteString, ByteString> transformEachLine = line -> line /* some transformation here */;

    final int maximumFrameLength = 256;

    response.entity().getDataBytes()
      .via(Framing.delimiter(ByteString.fromString("\n"), maximumFrameLength, FramingTruncation.ALLOW))
      .map(transformEachLine::apply)
      .runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), materializer);

しかしながら、エンティティ全体を``Strict``な(メモリ上に完全にロードされていることを意味します)エンティティとして消費する必要がある時もあります。Akka HTTPはエンティティを積極的に消費し、メモリ上に乗せる``toStrict(timeout, materializer)``メソッドを特別に用意しています。

final class ExamplePerson {
  final String name;
  public ExamplePerson(String name) { this.name = name; }
}

public ExamplePerson parse(ByteString line) { 
  return new ExamplePerson(line.utf8String()); 
}
  final ActorSystem system = ActorSystem.create();
  final ExecutionContextExecutor dispatcher = system.dispatcher();
  final ActorMaterializer materializer = ActorMaterializer.create(system);
  
  final HttpResponse response = responseFromSomewhere();
  
  // toStrict to enforce all data be loaded into memory from the connection
  final CompletionStage<HttpEntity.Strict> strictEntity = response.entity()
      .toStrict(FiniteDuration.create(3, TimeUnit.SECONDS).toMillis(), materializer);

  // while API remains the same to consume dataBytes, now they're in memory already:

  final CompletionStage<ExamplePerson> person = 
    strictEntity
      .thenCompose(strict ->
        strict.getDataBytes()
          .runFold(ByteString.empty(), (acc, b) -> acc.concat(b), materializer)
          .thenApply(this::parse)
      );

HTTPレスポンスエンティティを破棄する(クライアント)

私達はHTTPサービスを呼び出す時、レスポンスされるペイロードには関心がない場合(例. レスポンスコードにのみ関心がある場合)が時々あります。以前に説明した通り、エンティティは何かしらの方法で消費しなければならず、消費しない場合は、TCP接続を元にしたバックプレッシャーを用いることになります。

discardEntityBytes という便利なメソッドは、使い途の無いエンティティを簡単に破棄することを目的として提供されています。このメソッドは、入力バイトを直接 ``Sink.ignore``に繋ぎこみます。

この2つのコードの断片は等価であり、サーバーサイドで入力されたHTTPリクエストに対して同じように動作します。

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final HttpResponse response = responseFromSomewhere();

final HttpMessage.DiscardedEntity discarded = response.discardEntityBytes(materializer);

discarded.completionStage().whenComplete((done, ex) -> {
  System.out.println("Entity discarded completely!");
});

また、この低レベルAPIによるコードも同じ結果となります。

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final HttpResponse response = responseFromSomewhere();

final CompletionStage<Done> discardingComplete = response.entity().getDataBytes().runWith(Sink.ignore(), materializer);

discardingComplete.whenComplete((done, ex) -> {
  System.out.println("Entity discarded completely!");
});

サーバーサイドにおけるHTTPエンティティのストリーム処理

クライアント側と同様に、HTTPエンティティはTCP接続から供給されるストリームに直接リンクしています。従って、リクエストエンティティが消費されずに残る場合、ユーザーが受け取ったデータを処理することを期待して、TCP接続にバックプレッシャーをかけます。

entity(exampleUnmarshaller, example -> {}) などのいくつかのディレクティブは、強制的に``toStrict`` の実行を暗黙的に行うことに注意して下さい。

HTTPリクエストエンティティの消費(サーバー)

送信されてきたリクエストエンティティを消費する最も簡単な方法は、 entity ディレクティブを使ってそれを実際のドメインオブジェクトに変換することです。

class Bid {
  final String userId;
  final int bid;

  Bid(String userId, int bid) {
    this.userId = userId;
    this.bid = bid; 
  }
}
  final ActorSystem system = ActorSystem.create();
  final ExecutionContextExecutor dispatcher = system.dispatcher();
  final ActorMaterializer materializer = ActorMaterializer.create(system);
  
  final Unmarshaller<HttpEntity, Bid> asBid = Jackson.unmarshaller(Bid.class);

  final Route s = path("bid", () ->
    put(() ->
      entity(asBid, bid ->
        // incoming entity is fully consumed and converted into a Bid
        complete("The bid was: " + bid)
      )
    )
  );

例えば、ファイルIO Sink にパイプして全てのデータの書き込んだら CompletionStage<IoResult> を通じて完了を通知するなど、ストリーム上に流れている生のデータを触ることも勿論可能です。

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Route s =
  put(() ->
    path("lines", () ->
      withoutSizeLimit(() ->
        extractDataBytes(bytes -> {
          final CompletionStage<IOResult> res = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath()), materializer);

          return onComplete(() -> res, ioResult ->
            // we only want to respond once the incoming data has been handled:
            complete("Finished writing data :" + ioResult));
        })
      )
    )
  );

HTTPリクエストエンティティを破棄する(サーバー)

何かしらの検証(例えば、ユーザーにアップロードの実行を許可するかしないかをチェックする)の結果によって、アップロードされたエンティティを破棄したい時があります。

破棄する、ということは、たとえあなたがサーバ上でストリームとしてデータを扱うことに関心が無いとしても、アップロードが完全に行われることを意味することに注意して下さい。これは、指定された受信したエンティティを使わない場合に便利ですが、同じ接続で保留中のリクエストがまだある可能性があるため、接続全体を中止する必要はありません(これもデモンストレーションします)。

HTTPRequest``の``discardEntityBytes を実行することによって、データバイトを明示的に破棄することが出来ます。

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Route s =
  put(() ->
    path("lines", () ->
      withoutSizeLimit(() ->
        extractRequest(r -> {
          final CompletionStage<Done> res = r.discardEntityBytes(materializer).completionStage();

          return onComplete(() -> res, done ->
            // we only want to respond once the incoming data has been handled:
            complete("Finished writing data :" + done));
        })
      )
    )
  );

関連するコンセプトは、 entity.getDataBytes() ストリームのキャンセルにもあり、この場合、Akka HTTPはクライアントからの接続をます。このことはアップロードの許可の無いユーザを検知して、接続を切りたい時に(受信データを読んで無視するより)便利です。受信した entity.getDataBytes() をエンティティストリームをキャンセルする Sink.cancelled に繋げることによって可能であり、結果として接続がサーバーによってシャットダウンされ、着信要求を効果的に中断します。

final ActorSystem system = ActorSystem.create();
final ExecutionContextExecutor dispatcher = system.dispatcher();
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Route s =
  put(() ->
    path("lines", () ->
      withoutSizeLimit(() ->
        extractDataBytes(bytes -> {
          // Closing connections, method 1 (eager):
          // we deem this request as illegal, and close the connection right away:
          bytes.runWith(Sink.cancelled(), materializer);  // "brutally" closes the connection

          // Closing connections, method 2 (graceful):
          // consider draining connection and replying with `Connection: Close` header
          // if you want the client to close after this request/reply cycle instead:
          return respondWithHeader(Connection.create("close"), () -> 
            complete(StatusCodes.FORBIDDEN, "Not allowed!")
          );
        })
      )
    )
  );

接続のクローズについては、ドキュメントの 接続を閉じる セクションで詳しく説明しています。

ペンディング: 使用されないエンティティの自動的な破棄

特定の条件下では、受信したリクエストのエンティティがユーザによって使用される可能性はほとんどないことを検出して、警告を出すか、エンティティを自動的に破棄することが可能です。 この高度な機能はまだ実装されていません。さらなる議論とアイデアについては、以下の注記とイシューを参照してください。

注釈

Akka HTTPの先進的な機能である「auto draining」は議論され、提案されており、それを実装したり、コミュニティが実装を支援してくれることを望んでいます。

issue#18716 <https://github.com/akka/akka/issues/18716> _や issue #18540 でさらに詳しく読むことができます。貢献はいつも大歓迎です!

Contents