リクエスト/レスポンスエンティティのストリーミングにおける性質
Akka HTTPは全てストリームで通じています。これは、Akka Streamによって可能になったバックプレッシャー制御が、TCPレイヤからHTTPサーバ、そして、ユーザーが直接触れる「HttpRequest」、「HttpResponse」及び「HttpEntity」まで全てのレイヤーに対して公開されていることを意味しています。
このことは、あなたがノンストリーミング/ノンリアクティブなHTTPクライアントを慣れているのであれば驚くべき性質でしょう。具体的には、「HTTPエンティティが消費されないと、接続元へバックプレッシャーとして通知される」ということを意味します。これは単一のエンティティを消費するだけで、背圧の高いサーバやクライアントがアプリケーションを圧倒して、メモリ内のエンティティの不必要なバッファリングを引き起こす可能性がある機能です。
警告
リクエスト内のエンティティは必ず消費(もしくは破棄)しなければなりません!もし誤って消費も破棄もされない場合、Akka HTTPは入力データが背圧制御されるべきと仮定して、TCPのバックプレッシャー機構によって、入力データをストールさせます。クライアントは、HttpResponseのステータスに関わらずエンティティを消費しなければなりません。
クライアント側におけるHTTPエンティティのストリーム処理
HTTPレスポンスエンティティを消費する(クライアント)
最も一般的なユースケースは勿論、'dataBytes'を持つSourceを元にしたレスポンスエンティティの消費です。これは、dataBytesを実行するのと同じくらい(もしくは、サーバーサイドで``BasicDirectives.extractDataBytes``ディレクティブを使うのと同じくらい)シンプルです。
インフラストラクチャを最大限に活用するには、さまざまなストリーミングテクニックを使用することをお勧めします。例えば、着信したチャンクをフレーミングし、ラインごとに解析し、ファイルや他のAkka Streamsのコネクタなどを宛先としたSinkに繋ぎこみます。
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Framing
import akka.stream.scaladsl.FileIO
import akka.http.scaladsl.model._
implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()
val response: HttpResponse = ???
response.entity.dataBytes
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256))
.map(transformEachLine)
.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))
def transformEachLine(line: ByteString): ByteString = ???
しかしながら、エンティティ全体を``Strict``な(メモリ上に完全にロードされていることを意味します)エンティティとして消費する必要がある時もあります。Akka HTTPはエンティティを積極的に消費し、メモリ上に乗せる``toStrict(timeout)``メソッドを特別に用意しています。
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.model._
import scala.concurrent.duration._
implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()
case class ExamplePerson(name: String)
def parse(line: ByteString): ExamplePerson = ???
val response: HttpResponse = ???
// toStrict to enforce all data be loaded into memory from the connection
val strictEntity: Future[HttpEntity.Strict] = response.entity.toStrict(3.seconds)
// while API remains the same to consume dataBytes, now they're in memory already:
val transformedData: Future[ExamplePerson] =
strictEntity flatMap { e =>
e.dataBytes
.runFold(ByteString.empty) { case (acc, b) => acc ++ b }
.map(parse)
}
HTTPレスポンスエンティティを破棄する(クライアント)
私達はHTTPサービスを呼び出す時、レスポンスされるペイロードには関心がない場合(例. レスポンスされるコードにのみ関心がある場合)が時々あります。以前に説明した通り、エンティティは何かしらの方法で消費しなければならず、消費しない場合は、TCP接続を元にしたバックプレッシャーを用いることになります。
discardEntityBytes
という便利なメソッドが使い道の無いエンティティを簡単に破棄することを目的として提供されています。このメソッドは、入力バイトを直接 ``Sink.ignore``に繋ぎこみます。
この2つのコードの断片は等価であり、サーバーサイドで入力されたHTTPリクエストに対して同じように動作します。
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.http.scaladsl.model._
implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()
val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below)
val discarded: DiscardedEntity = response1.discardEntityBytes()
discarded.future.onComplete { case done => println("Entity discarded completely!") }
また、この低レベルなAPIによるコードも同じ結果となります。
val response1: HttpResponse = ??? // obtained from an HTTP call (see examples below)
val discardingComplete: Future[Done] = response1.entity.dataBytes.runWith(Sink.ignore)
discardingComplete.onComplete { case done => println("Entity discarded completely!") }
サーバーサイドにおけるHTTPエンティティのストリーム処理
クライアント側と同様に、HTTPエンティティはTCP接続から供給されるストリームに直接リンクしています。従って、リクエストエンティティが消費されずに残る場合、ユーザーが受け取ったデータを処理することを期待して、TCP接続にバックプレッシャーをかけます。
entity(as[String])
などのいくつかのディレクティブは、強制的に``toStrict`` の実行を暗黙的に行うことに注意して下さい。
HTTPリクエストエンティティを消費する(サーバー)
送信されてきたリクエストエンティティを消費する最も簡単な方法は、 entity ディレクティブを使ってそれを実際のドメインオブジェクトに変換することです。
import akka.actor.ActorSystem
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.stream.ActorMaterializer
import spray.json.DefaultJsonProtocol._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
final case class Bid(userId: String, bid: Int)
// these are from spray-json
implicit val bidFormat = jsonFormat2(Bid)
val route =
path("bid") {
put {
entity(as[Bid]) { bid =>
// incoming entity is fully consumed and converted into a Bid
complete("The bid was: " + bid)
}
}
}
例えば、ファイルIO Sink にパイプして全てのデータの書き込んだら``Future[IoResult]`` を通じて完了を通知するなど、ストリーム上に流れている生のデータを触ることも勿論可能です。
import akka.actor.ActorSystem
import akka.stream.scaladsl.FileIO
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import java.io.File
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val route =
(put & path("lines")) {
withoutSizeLimit {
extractDataBytes { bytes =>
val finishedWriting = bytes.runWith(FileIO.toPath(new File("/tmp/example.out").toPath))
// we only want to respond once the incoming data has been handled:
onComplete(finishedWriting) { ioResult =>
complete("Finished writing data: " + ioResult)
}
}
}
}
HTTPリクエストエンティティを破棄する(サーバー)
何かしらの検証(例えば、ユーザーにアップロードの実行を許可するかしないかをチェックする)の結果によって、アップロードされたエンティティを破棄したい時があります。
破棄する、ということは、たとえあなたがサーバ上でストリームとしてデータを扱うことに関心が無いとしても、アップロードが完全に行われることを意味することに注意して下さい。これは、受信したエンティティを使わない場合に便利ですが、同じ接続で保留中のリクエストがまだある可能性があるため、接続全体を中断しません(これもデモンストレーションします)。
HTTPRequest``の``discardEntityBytes
を実行することによって、データバイトを明示的に破棄することが出来ます。
import akka.actor.ActorSystem
import akka.stream.scaladsl.FileIO
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.model.HttpRequest
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val route =
(put & path("lines")) {
withoutSizeLimit {
extractRequest { r: HttpRequest =>
val finishedWriting = r.discardEntityBytes().future
// we only want to respond once the incoming data has been handled:
onComplete(finishedWriting) { done =>
complete("Drained all data from connection... (" + done + ")")
}
}
}
}
関連するコンセプトは、entity.dataBytes``ストリームのキャンセルにもあり、この場合、Akka HTTPはクライアントからの接続をます。このことはアップロードの許可の無いユーザを検知して、接続を切りたい時に(受信データを読んで無視するより)便利です。受信した``entity.dataBytes
をエンティティストリームをキャンセルする``Sink.cancelled``に繋げることによって可能であり、結果として接続がサーバーによってシャットダウンされ、着信要求を効果的に中断します。
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.headers.Connection
import akka.stream.ActorMaterializer
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher
val route =
(put & path("lines")) {
withoutSizeLimit {
extractDataBytes { data =>
// Closing connections, method 1 (eager):
// we deem this request as illegal, and close the connection right away:
data.runWith(Sink.cancelled) // "brutally" closes the connection
// Closing connections, method 2 (graceful):
// consider draining connection and replying with `Connection: Close` header
// if you want the client to close after this request/reply cycle instead:
respondWithHeader(Connection("close"))
complete(StatusCodes.Forbidden -> "Not allowed!")
}
}
}
接続のクローズについては、ドキュメントの:ref: `http-closing-connection-low-level`セクションで詳しく説明しています。
ペンディング: 使用されないエンティティの自動的な破棄
特定の条件下では、受信したリクエストのエンティティがユーザによって使用される可能性はほとんどないことを検出して、警告を出すか、エンティティを自動的に破棄することが可能です。 この先進的な機能はまだ実装されていません。さらなる議論とアイデアについては、以下の注記とイシューを参照してください。
注釈
Akka HTTPの先進的な機能である「auto draining」は議論され、提案されており、それを実装したり、コミュニティが実装を支援してくれることを望んでいます。
issue#18716 <https://github.com/akka/akka/issues/18716> _や `issue#18540 <https://github.com/akka/akka/ 'でさらに詳しく読むことができます。貢献はいつも大歓迎です!
Contents