低レベルサーバーサイドAPI

低レベルサーバーサイドAPI

Akka HTTPは HTTP Client とは別に、完全に非同期な Reactive-Streams ベースの、 Akka Stream 上に構築されたHTTP/1.1サーバーの実装を提供しています。

次のような機能をサポートしています。

  • パーシステント・コネクションの完全なサポート

  • HTTPパイプラインの完全なサポート

  • 自然なAPIを通じたチャンク転送形式を含む非同期なHTTPストリーミングの完全なサポート

  • オプショナルなSSL/TLS暗号化のサポート

  • WebSocketのサポート

Akka HTTPのサーバーサイドのコンポーネントは2つのレイヤに分けられます。

  1. 低レベルなサーバーの実装は akka-http-core モジュールにあります。

  2. 高レベルな機能は akka-http モジュールにあります。

低レベルなサーバ (1) はHTTP/1.1の基本的な機能に主眼を置いています。

  • 接続の管理

  • メッセージとヘッダーの解析とレンダリング

  • (リクエストと接続の)タイムアウト管理

  • (パイプライニングをサポートする為の)レスポンスの整列

典型的なHTTPサーバーのコアではない機能(例えば、リクエストルーティング、ファイル転送、圧縮など)は高レベルなレイヤにあり、 akka-http-core には実装されていません。これらの一般的なデザインから分けることによって、コアを小さく、軽量に保つのと同様に理解と保守を簡単にしています。

あなたの必要に応じて、低レベルなAPIを直接使うことも、複雑なサービスのロジックをより簡単に定義する為に高レベルなルーティングDSL Routing DSL に頼ることもできます。

注釈

リクエスト/レスポンスエンティティのストリーミングにおける性質 の節を読むことを推奨します。この節では、ストリーミングファーストではないHTTPサーバーから移行している人に向けたフルスタックのストリーミングの概念に則った説明をしています。

ストリームとHTTP

Akka HTTPサーバーは Akka Stream の上に実装されていて、その実装や全てのレベルのAPIも同様に強く依存しています。

コネクションレベルにおいて、Akka HTTPは Akka Stream IO: と基本的に同じインターフェースを提供しています。ソケットのバインディングは接続要求のストリームとして表現されています。アプリケーションはストリームから接続をプルし、それらを Flow[HttpRequest, HttpResponse, _] に供給し、リクエストをレスポンスへ変換します。

サーバサイドにおけるソケットのバインディングを Source[IncomingConnection] とし、接続の一つ一つを Source[HttpRequest]Sink[HttpResponse] としたストリームの抽象化は、単一のHTTPメッセージの中にも存在ます。HTTPリクエストとレスポンスのエンティティは一般的に Source[ByteString] としてモデル化されています。HTTPメッセージがAkka HTTPにおいてどのように表現されているかについての詳しい情報は HTTP Model も参照してください。

起動と停止

最も基本的なレベルのAkka HTTPサーバは akka.http.scaladsl.Httpbind メソッドを実行することによって起動します。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
  Http().bind(interface = "localhost", port = 8080)
val bindingFuture: Future[Http.ServerBinding] =
  serverSource.to(Sink.foreach { connection => // foreach materializes the source
    println("Accepted new connection from " + connection.remoteAddress)
    // ... and then actually handle the connection
  }).run()

Http().bind メソッドに引数に、インターフェースとバインドするポートを指定します。そして、HTTP接続の要求をハンドリングする処理を登録します。さらに、必要に応じてソケットオプションを設定するのと同様に、数多くのサーバーを調整する為の設定を行うことができます。

bind メソッドの結果である Source[Http.IncomingConnection] は接続要求を受信することによってドレインしなければなりません。実際のバインディングはSourceがパイプラインプロセスの一部としてマテリアライズされる前には実行されません。バインドに失敗した場合(例.ポートが既に使用されていた場合)はマテリアライズされたストリームはそれぞれ例外を出し、直ぐに停止します。接続要求の消費側が消費をキャンセルした時に、バインディングは解放されます。(即ち、ソケットはアンバウンドされます。) 代替手段として、Sourceのマテリアライズプロセスの一部として生成された Http.ServerBinding インスタンスの unbind() メソッドを使うことができます。 Http.ServerBinding はまた、ソケットにバインドされた実際のローカルアドレスを取得する方法を提供します。これは、例えばポート0(有効なポートをOSに選択させる)にバインディングする時に便利です。

リクエスト-レスポンス サイクル

新しい接続を受信した時に、リモートアドレスとリクエストをハンドリグする Flow[HttpRequest, HttpResponse, _] を供給するメソッドで構成されている Http.IncomingConnection が出現します。

リクエストは handleWithXXX メソッドの一つを呼び出すことによって処理されます。

  • Flow[HttpRequest, HttpResponse, _] を処理する handleWith

  • 関数 HttpRequest => HttpResponse を処理する handleWithSyncHandler

  • 関数 HttpRequest => Future[HttpResponse] を処理する handleWithAsyncHandler

ここに完全な例があります。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpMethods._
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

val serverSource = Http().bind(interface = "localhost", port = 8080)

val requestHandler: HttpRequest => HttpResponse = {
  case HttpRequest(GET, Uri.Path("/"), _, _, _) =>
    HttpResponse(entity = HttpEntity(
      ContentTypes.`text/html(UTF-8)`,
      "<html><body>Hello world!</body></html>"))

  case HttpRequest(GET, Uri.Path("/ping"), _, _, _) =>
    HttpResponse(entity = "PONG!")

  case HttpRequest(GET, Uri.Path("/crash"), _, _, _) =>
    sys.error("BOOM!")

  case r: HttpRequest =>
    r.discardEntityBytes() // important to drain incoming HTTP Entity stream
    HttpResponse(404, entity = "Unknown resource!")
}

val bindingFuture: Future[Http.ServerBinding] =
  serverSource.to(Sink.foreach { connection =>
    println("Accepted new connection from " + connection.remoteAddress)

    connection handleWithSyncHandler requestHandler
    // this is equivalent to
    // connection handleWith { Flow[HttpRequest] map requestHandler }
  }).run()

この例では、リクエストはリクエストストリームを handleWithSyncHandler と関数 HttpRequest => HttpResponse を使って変換することによって処理されます。(Akka Streamの map 演算子を使っても同様です。)Akka Streamのコンビネータを使うことでユースケースに応じたリクエストを処理する方法はいくらでも考えられます。

アプリケーションがFlowを指定する場合、リクエスト毎に該当する唯一のレスポンスを生成し、レスポンスの順序を該当するリクエストの順序と一致させることはアプリケーションの責務です。(このことは、複数のリクエストが重複するHTTPパイプライニングが有効になっている場合に関係します。) handleWithSyncHandlerhandleWithAsyncHandler 、若しくは mapmapAsync を使用している場合は、この要件は自動的に満たされます。

ストリーミング リクエスト/レスポンス エンティティ

HTTPメッセージのストリーミングは、 HttpEntity のサブクラスを通じてサポートされます。アプリケーションはリクエストを受信した時に、ストリーミングされたエンティティを扱うことが出来るだけでなく、多くの場合は、レスポンスを生成する時にも扱うことが出来る必要があります。代替手段の説明については、 HttpEntity を参照して下さい。

Akka HTTPが提供している MarshallingUnmarshalling の機能は、カスタム型とストリーミングされたエンティティとの相互変換に非常に便利です。

接続を閉じる

HTTPの接続は Flow を処理中に上流の消費を中断したり、ピアが接続を閉じると閉じられます。明示的に Connection: close ヘッダを``HttpResponse`` に加えることが便利な場合もあります。このレスポンスは接続の最後のレスポンスになり、サーバーは送信が完了すると接続を閉じます。

接続はリクエストエンティティが中断された場合(例:Sink.cancelled に繋がれた場合)、若しくは、部分的に消費された場合(例:take コンビネータを使用した場合)にも閉じられます。この振る舞いを防ぐ為には、 Sink.ignore に繋ぐことによって明示的にドレインしなければなりません。

サーバー側のHTTPSを設定する

サーバーサイドでHTTPSを設定し使用する事に関する詳細なドキュメントは、 Server-Side HTTPS Support を参照して下さい。

スタンドアロンのHTTPレイヤの使用方法

リアクティブ・ストリームを基本としたの性質を持つAkka HTTPレイヤは、TCPインターフェースから完全に切り離す事ができます。この"特性"は殆どのアプリケーションでは重要ではありませんが、特定のケースにおいてはネットワークからではない他のソースによってはHTTPレイヤ(場合によっては、より高いレイヤ)を"実行"できる事が便利である場合があります。これが有用である可能性があるシナリオは、テストやデバッグ、低レベルのイベントソーシング(例:ネットワークトラフィックの再現)などがあります。

サーバーサイドにおけるスタンドアロンのHTTPレイヤは以下の様な BidiFlow を形成します。

Http.ServerLayer のインスタンスは Http().serverLayer をオーバーロードした二つのメソッドの内の一つを呼ぶことによって生成する事ができ、また、様々な段階の設定を可能にします。

サーバーの並列性を制御する

リクエストの処理は、幾つのかの接続を並列に処理する、またはHTTPパイプライニングを使って幾つかのリクエストをレスポンスを待つ事なく送信する、という二つの軸で並列化する事ができます。どちらのケースでもクライアントは送出中のリクエストの数を制御します。多くのリクエストを送信しすぎて過負荷となる事を防ぐために、Akka HTTPは並列して処理するリクエストの数を制限する事が出来ます。

同時接続数を制限するには akka.http.server.max-connections の設定を使用します。この設定は全ての Http.bindAndHandle* メソッドに適用されます。 Http.bind を使用しているのであれば、着信した接続は Source[IncomingConnection, ...] として表されます。着信した接続にバックプレシャー制御を適用してフローを制御する為に、 throttlemapAsync などのAkka Streamのコンビネータを使用して下さい。

HTTPパイプライニングは一般的には推奨されていませんが ( そして、多くのブラウザでは無効化されていますが)、Akka HTTPでは完全にサポートしています。制限は二段階で適用されます。一つは、 akka.http.server.pipeline-limit を設定する事で、未解決のリクエストがユーザが提供する処理フローに設定値以上に供給される事を防ぎます。もう一つは、処理のフローが自身にスロットリングを適用する事によるものです。 Http.bindAndHandleSyncHttp.bindAndHandleAsync を使用するのであれば、 parallelism パラメータで一つの接続に対して並行するリクエストの数を指定する事が出来ます。(デフォルトの値は1です。つまり、パイプライニングは無効化されています。) Http.bindAndHandleHttp.bind を使用するのであれば、幾つのリクエストを同時に受け付けるかを、ユーザが提供する処理フローによって完全に制御する事が出来ます。この場合、例えばAkka Streamの mapAsync コンビネータを使用して、並行して処理するリクエストの数を制限する事が出来ます。実際には、これらの二つの措置、つまり、構成の設定と手動によるフローの構成の制約の厳しい方が、一つの接続での並列リクエストの処理方法を決定します。

低レベルAPIでHTTPサーバの障害を処理する

Akka HTTPサーバの初期化時もしくは実行中に、障害が起こる可能性はいくつもあります。Akkaの既定の動作では、全ての障害のログを記録しますが、ログの記録に加えて、例えば、アクターシステムを停止したり、外部のモニタリング装置に明示的に通知したりなど、応答したい時があるかもしれません。

HTTPサーバを生成したり、マテリアライズする時に幾つもの障害が起こる可能性があります。(平素なストリーミング Tcp() サーバも同様です。) スタックのさまざまな層で発生する可能性のある失敗の種類には、サーバを起動できない事から、HttpRequestがアンマーシャリングできない事まで (最も外側のものから最も内側のものまで) 障害の例として含まれます。

  • 指定したアドレス/ポートの bind に失敗する場合

  • OSのファイルディスクリプタやメモリを使い果たし、新しい IncommingConnection の受信に失敗する場合

  • 受信した HttpRequest が不正であった場合など、接続の処理に失敗する場合

この節では、様々な障害の状況に応じた処理の仕方について、またどのような状況でこれらの障害が発生する可能性があるかについて説明します。

最初のタイプ障害はサーバーが指定されたポートをバインドできない場合です。例えば、ポートが既に他のアプリケーションに使用されている場合や ポートが特権を持っている場合 (即ち、ルートのみ使用する事ができる場合) などです。この場合、バインド処理のFutureは直ぐに失敗し、Futureの完了をリスニングしている場合は応答する事ができます。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.stream.ActorMaterializer

import scala.concurrent.Future

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future onFailure in the end
implicit val executionContext = system.dispatcher

// let's say the OS won't allow us to bind to 80.
val (host, port) = ("localhost", 80)
val serverSource = Http().bind(host, port)

val bindingFuture: Future[ServerBinding] = serverSource
  .to(handleConnections) // Sink[Http.IncomingConnection, _]
  .run()

bindingFuture.onFailure {
  case ex: Exception =>
    log.error(ex, "Failed to bind to {}:{}!", host, port)
}

一度サーバーがポートのバインドに成功すると、 Source[IncomingConnection, _] が起動し、新しく受信した接続を発行します。Sourceは技術的には障害を検知する事ができます。しかしながら、それはファイルディスクリプタやメモリが枯渇し接続の着信ができなくなった場合など、非常に劇的な状況に限られるべきです。Akka Streamsにおける障害の処理は非常に進歩しています。障害が発生したストリームからストリームを通って、最終ステージまですべての方法で通知されます。

以下の例では、ストリームの処理に失敗する GraphStage を追加しています。 ( Custom stream processing を参照して下さい。) ストリームがダウンした原因を failureMonitor アクターに通知し、残りの処理、つまりサーバーを再起動するのか、アクターシステムを停止するのかをアクターに依頼します。

import akka.actor.ActorSystem
import akka.actor.ActorRef
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

import Http._
val (host, port) = ("localhost", 8080)
val serverSource = Http().bind(host, port)

val failureMonitor: ActorRef = system.actorOf(MyExampleMonitoringActor.props)

val reactToTopLevelFailures = Flow[IncomingConnection]
  .watchTermination()((_, termination) => termination.onFailure {
    case cause => failureMonitor ! cause
  })

serverSource
  .via(reactToTopLevelFailures)
  .to(handleConnections) // Sink[Http.IncomingConnection, _]
  .run()

三つ目の障害は、接続が確立されたもののその後不意に停止した場合、例えばクライアントがTCP接続を中断した場合です。接続のFlowに以前のコードの断片のパターンを適用する事で、この障害を処理する事ができます。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher

val (host, port) = ("localhost", 8080)
val serverSource = Http().bind(host, port)

val reactToConnectionFailure = Flow[HttpRequest]
  .recover[HttpRequest] {
    case ex =>
      // handle the failure somehow
      throw ex
  }

val httpEcho = Flow[HttpRequest]
  .via(reactToConnectionFailure)
  .map { request =>
    // simple streaming (!) "echo" response:
    HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, request.entity.dataBytes))
  }

serverSource
  .runForeach { con =>
    con.handleWith(httpEcho)
  }

これらの機能は多かれ少なかれ、接続やバインドに失敗したなど、基盤の機能に依存していると説明する事ができます。多くの場合、Akkaは単純にエラーログを出力ます。これは、このような問題に対する最も合理的な既定の動作であり、あなたはこれらの問題に深入りする必要はありません。

アプリケーションコードが実際のルーティングレイヤーで例外を処理する方法については、 Exception Handling を参照してください。これは、ルーティング処理で投げられた例外をどのように扱うことができるかを明確に説明するものであり、クラス HttpResponse に変換され、適切なエラーコードと人間が判読可能な障害の説明があります。

Contents