低レベルサーバーサイドAPI

低レベルサーバーサイドAPI

Akka HTTPは HTTP Client とは別に、完全に非同期な Reactive-Streams ベースの、 Akka Stream.上に構築されたHTTP/1.1サーバーの実装を提供しています。

次のような機能をサポートしています。

  • パーシステント・コネクションの完全なサポート

  • HTTPパイプラインの完全なサポート

  • 自然なAPIを通じたチャンク転送形式を含む非同期なHTTPストリーミングの完全なサポート

  • オプショナルなSSL/TLS暗号化のサポート

  • WebSocketのサポート

Akka HTTPのサーバーサイドのコンポーネントは2つのレイヤに分けられます。

  1. akka-http-core モジュールにある低レベルなサーバーの実装

  2. 高レベルな機能は akka-http モジュールにあります。

低レベルなサーバ (1) はHTTP/1.1の基本的な機能は

  • 接続の管理

  • メッセージとヘッダーの解析とレンダリング

  • (リクエストと接続の)タイムアウトの管理

  • (パイプライニングをサポートする為の)レスポンスの整列

典型的なHTTPサーバーの全ての核ではない機能(例えば、リクエストルーティング、ファイル転送、圧縮など)は高レベルなレイヤにあり、 akka-http-core には実装されていません。これらの一般的なデザインから分けることによって、コアを小さく、軽量に保つのと同様に理解と保守を簡単にしています。

あなたの必要に応じて、低レベルなAPIを直接使うことも、複雑なサービスのロジックをより簡単に定義する為に高レベルなルーティングDSL Routing DSL に頼ることもできます。

ストリームとHTTP

Akka HTTPサーバーは Akka Stream の上に実装されていて、その実装や全てのレベルのAPIも同様に強く依存しています。

コネクションレベルにおいて、Akka HTTPは Akka Stream IO: と基本的に同じインターフェースを提供しています。ソケットのバインディングは接続要求のストリームとして表現されています。アプリケーションはストリームから接続をプルし、それらの1つ1つを Flow<HttpRequest, HttpResponse, ?> に供給し、リクエストをレスポンスへ変換します。

サーバサイドにおけるソケットのバインディングを Source<IncomingConnection> とし、接続の1つ1つを Source<HttpRequest>Sink<HttpResponse> としたストリームの抽象化は、単一のHTTPメッセージの中にも存在ます。HTTPリクエストとレスポンスのエンティティは一般的に ``Source<ByteString>``としてモデル化されています。HTTPメッセージがAkka HTTPにおいてどのように表現されているかについての詳しい情報は HTTP Model も参照してください。

起動と停止

最も基本的なレベルのAkka HTTPサーバは akka.http.javadsl.Http extension: の``bind`` メソッドを実行することによって起動します。

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
  Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer);

CompletionStage<ServerBinding> serverBindingFuture =
  serverSource.to(Sink.foreach(connection -> {
      System.out.println("Accepted new connection from " + connection.remoteAddress());
      // ... and then actually handle the connection
    }
  )).run(materializer);

Arguments to the Http().bind method specify the interface and port to bind to and register interest in handling incoming HTTP connections. Additionally, the method also allows for the definition of socket options as well as a larger number of settings for configuring the server according to your needs.

The result of the bind method is a Source<Http.IncomingConnection> which must be drained by the application in order to accept incoming connections. The actual binding is not performed before this source is materialized as part of a processing pipeline. In case the bind fails (e.g. because the port is already busy) the materialized stream will immediately be terminated with a respective exception. The binding is released (i.e. the underlying socket unbound) when the subscriber of the incoming connection source has cancelled its subscription. Alternatively one can use the unbind() method of the Http.ServerBinding instance that is created as part of the connection source's materialization process. The Http.ServerBinding also provides a way to get a hold of the actual local address of the bound socket, which is useful for example when binding to port zero (and thus letting the OS pick an available port).

リクエスト-レスポンス サイクル

新しい接続を受信した時に、リモートアドレスとリクエストをハンドリグする Flow<HttpRequest, HttpResponse, ?> を供給するメソッドで構成されている Http.IncomingConnection が出現します。

リクエストは handleWithXXX メソッドの一つを呼び出すことによって処理されます。

  • a Flow<HttpRequest, HttpResponse, ?> for handleWith,
  • 関数 Function<HttpRequest, HttpResponse> を処理する handleWithSyncHandler

  • a function Function<HttpRequest, CompletionStage<HttpResponse>> for handleWithAsyncHandler.

ここに完全な例があります。

ActorSystem system = ActorSystem.create();
  final Materializer materializer = ActorMaterializer.create(system);

  Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
    Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer);

  final Function<HttpRequest, HttpResponse> requestHandler =
    new Function<HttpRequest, HttpResponse>() {
      private final HttpResponse NOT_FOUND =
        HttpResponse.create()
          .withStatus(404)
          .withEntity("Unknown resource!");


      @Override
      public HttpResponse apply(HttpRequest request) throws Exception {
        Uri uri = request.getUri();
        if (request.method() == HttpMethods.GET) {
          if (uri.path().equals("/")) {
            return
              HttpResponse.create()
                .withEntity(ContentTypes.TEXT_HTML_UTF8,
                  "<html><body>Hello world!</body></html>");
          } else if (uri.path().equals("/hello")) {
            String name = uri.query().get("name").orElse("Mister X");

            return
              HttpResponse.create()
                .withEntity("Hello " + name + "!");
          } else if (uri.path().equals("/ping")) {
            return HttpResponse.create().withEntity("PONG!");
          } else {
            return NOT_FOUND;
          }
        } else {
          return NOT_FOUND;
        }
      }
    };

  CompletionStage<ServerBinding> serverBindingFuture =
    serverSource.to(Sink.foreach(connection -> {
      System.out.println("Accepted new connection from " + connection.remoteAddress());

      connection.handleWithSyncHandler(requestHandler, materializer);
      // this is equivalent to
      //connection.handleWith(Flow.of(HttpRequest.class).map(requestHandler), materializer);
    })).run(materializer);

この例では、リクエストはリクエストストリームを handleWithSyncHandler と関数 Function<HttpRequest, HttpResponse> を使って変換することによって処理されます。 (Akka Streamの map 演算子を使っても同様です。) Akka Streamのコンビネータを使うことでユースケースに応じたリクエストを処理する方法はいくらでも考えられます。

アプリケーションがFlowを指定する場合、リクエスト毎に該当する唯一のレスポンスを生成し、レスポンスの順序を該当するリクエストの順序と一致させることはアプリケーションの責務です。(このことは、複数のリクエストが重複するHTTPパイプライニングが有効になっている場合に関連します。) handleWithSyncHandlerhandleWithAsyncHandler 、若しくは mapmapAsync を使用している場合は、この要件は自動的に満たされます。

リクエストハンドラを作成するのに便利な高レベルDSLについては、 ルーティングDSLの概要 を参照して下さい。

ストリーミング リクエスト/レスポンス エンティティ

HTTPメッセージのストリーミングは、 HttpEntity のサブクラスを通じてサポートされます。アプリケーションはリクエストを受信した時に、ストリーミングされたエンティティを扱うことが出来るだけでなく、多くの場合は、レスポンスを生成する時にも扱うことが出来る必要があります。代替手段の説明については、 HttpEntity を参照して下さい。

接続を閉じる

HTTPの接続は Flow を処理中に上流の消費を中断したり、ピアが接続を閉じると閉じられます。明示的に Connection: close ヘッダを``HttpResponse`` に加えることが便利な場合もあります。このレスポンスは接続の最後のレスポンスになり、サーバーは送信が完了すると接続を閉じます。

接続はリクエストエンティティが中断された場合(例:Sink.cancelled() に繋がれた場合)、若しくは、部分的に消費された場合(例:take コンビネータを使用した場合)にも閉じられます。この振る舞いを防ぐ為には、 Sink.ignore() に繋ぐことによって明示的にドレインしなければなりません。

スタンドアロンのHTTPレイヤの使用方法

リアクティブ・ストリームを基本としたの性質を持つAkka HTTPレイヤは、TCPインターフェースから完全に切り離す事ができます。この"特性"は殆どのアプリケーションでは重要ではありませんが、特定のケースにおいてはネットワークからではない他のソースによって"実行"できる事が便利である場合があります。これが有用である可能性があるシナリオは、テストやデバッグ、低レベルのイベントソーシング(例:ネットワークトラフィックの再現)などがあります。

サーバー側では、スタンドアロンのHTTPレイヤは、暗号化された未処理の接続をHTTPレベルに「アップグレードする」段階である BidiFlow<HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed> を形成します。

レイヤーのインスタンスは Http.get(system).serverLayer をオーバーロードした二つのメソッドの内の一つを呼ぶことによって生成する事ができ、また、様々な段階の設定を可能にします。返されたインスタンスは再利用可能ではなく、一度しかマテリアライズされないことに注意してください。

サーバーの並列性を制御する

リクエストの処理は、幾つのかの接続を並列に処理する、またはHTTPパイプライニングを使って幾つかのリクエストをレスポンスを待つ事なく送信する、という二つの軸で並列化する事ができます。どちらのケースでもクライアントは送出中のリクエストの数を制御します。多くのリクエストを送信しすぎて過負荷となる事を防ぐために、Akka HTTPは並列して処理するリクエストの数を制限する事が出来ます。

同時接続数を制限するには akka.http.server.max-connections の設定を使用します。この設定は全ての Http.bindAndHandle* メソッドに適用されます。 Http.bind を使用しているのであれば、着信した接続は Source<IncomingConnection, ...> として表されます。着信した接続にバックプレシャー制御を適用してフローを制御する為に、 throttlemapAsync などのAkka Streamのコンビネータを使用して下さい。

HTTPパイプライニングは一般的には推奨されていませんが ( そして、多くのブラウザでは無効化されていますが)、Akka HTTPでは完全にサポートしています。制限は二段階で適用されます。一つは、 akka.http.server.pipeline-limit を設定する事で、未解決のリクエストがユーザが提供する処理フローに設定値以上に供給される事を防ぎます。もう一つは、処理のフローが自身にスロットリングを適用する事によるものです。 Http.bindAndHandleSyncHttp.bindAndHandleAsync を使用するのであれば、 parallelism パラメータで一つの接続に対して並行するリクエストの数を指定する事が出来ます。(デフォルトの値は1です。つまり、パイプライニングは無効化されています。) Http.bindAndHandleHttp.bind を使用するのであれば、幾つのリクエストを同時に受け付けるかを、ユーザが提供する処理フローによって完全に制御する事が出来ます。この場合、例えばAkka Streamの mapAsync コンビネータを使用して、並行して処理するリクエストの数を制限する事が出来ます。実際には、これらの二つの措置、つまり、構成の設定と手動によるフローの構成の制約の厳しい方が、一つの接続での並列リクエストの処理方法を決定します。

低レベルAPIでHTTPサーバの障害を処理する

Akka HTTPサーバの初期化もしくは実行中に、障害が起こる状況はいくつもあります。Akkaの既定の動作では、全ての障害のログを記録しますが、ログの記録に加えて、例えば、アクターシステムを停止したり、外部のモニタリング装置に明示的に通知したりなど、応答したい時があるかもしれません。

HTTPサーバを生成したり、マテリアライズする時に幾つもの障害が起こる可能性があります。(平素なストリーミング Tcp サーバも同様です。) スタックのさまざまな層で発生する可能性のある失敗の種類には、サーバを起動できない事から、HttpRequestがアンマーシャリングできない事まで (最も外側のものから最も内側のものまで) 障害の例として含まれます。

  • 指定したアドレス/ポートの bind に失敗する場合

  • OSのファイルディスクリプタやメモリを使い果たし、新しい IncommingConnection の受信に失敗する場合

  • 受信した HttpRequest が不正であった場合など、接続の処理に失敗する場合

この節では、様々な障害の状況に応じた処理の仕方について、またどのような状況でこれらの障害が発生する可能性があるかについて説明します。

最初のタイプ障害はサーバーが指定されたポートをバインドできない場合です。例えば、ポートが既に他のアプリケーションに使用されている場合や ポートが特権を持っている場合 (即ち、ルートのみ使用する事ができる場合) などです。この場合、バインド処理のフューチャーは直ぐに失敗し、CompletionStageの完了をリスニングしている場合は応答する事ができます。

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
  Http.get(system).bind(ConnectHttp.toHost("localhost", 80), materializer);

CompletionStage<ServerBinding> serverBindingFuture =
  serverSource.to(Sink.foreach(connection -> {
      System.out.println("Accepted new connection from " + connection.remoteAddress());
      // ... and then actually handle the connection
    }
  )).run(materializer);

serverBindingFuture.whenCompleteAsync((binding, failure) -> {
  // possibly report the failure somewhere...
}, system.dispatcher());

一度サーバーがポートのバインドに成功すると、 Source<IncomingConnection, ?> が起動し、新しく受信した接続を発行します。Sourceは技術的には障害を検知する事ができます。しかしながら、それはファイルディスクリプタやメモリが枯渇し接続の着信ができなくなった場合など、非常に劇的な状況に限られるべきです。Akka Streamsにおける障害の処理は非常に進歩しています。障害が発生したストリームからストリームを通って、最終ステージまですべての方法で通知されます。

以下の例では、ストリームの処理に失敗する GraphStage を追加しています。 ( Custom stream processing を参照して下さい。) ストリームがダウンした原因を failureMonitor アクターに通知し、残りの処理、つまりサーバーを再起動するのか、アクターシステムを停止するのかをアクターに依頼します。

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
  Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer);

Flow<IncomingConnection, IncomingConnection, NotUsed> failureDetection =
  Flow.of(IncomingConnection.class).watchTermination((notUsed, termination) -> {
    termination.whenComplete((done, cause) -> {
      if (cause != null) {
        // signal the failure to external monitoring service!
      }
    });
    return NotUsed.getInstance();
  });

CompletionStage<ServerBinding> serverBindingFuture =
  serverSource
    .via(failureDetection) // feed signals through our custom stage
    .to(Sink.foreach(connection -> {
      System.out.println("Accepted new connection from " + connection.remoteAddress());
      // ... and then actually handle the connection
    }))
    .run(materializer);

三つ目の障害は、接続が確立されたもののその後不意に停止した場合、例えばクライアントがTCP接続を中断した場合です。接続のFlowに以前のコードの断片のパターンを適用する事で、この障害を処理する事ができます。

ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
  Http.get(system).bind(ConnectHttp.toHost("localhost", 8080), materializer);

Flow<HttpRequest, HttpRequest, NotUsed> failureDetection =
  Flow.of(HttpRequest.class)
    .watchTermination((notUsed, termination) -> {
      termination.whenComplete((done, cause) -> {
        if (cause != null) {
          // signal the failure to external monitoring service!
        }
      });
      return NotUsed.getInstance();
    });

Flow<HttpRequest, HttpResponse, NotUsed> httpEcho =
  Flow.of(HttpRequest.class)
    .via(failureDetection)
    .map(request -> {
      Source<ByteString, Object> bytes = request.entity().getDataBytes();
      HttpEntity.Chunked entity = HttpEntities.create(ContentTypes.TEXT_PLAIN_UTF8, bytes);

      return HttpResponse.create()
        .withEntity(entity);
    });

CompletionStage<ServerBinding> serverBindingFuture =
  serverSource.to(Sink.foreach(conn -> {
      System.out.println("Accepted new connection from " + conn.remoteAddress());
      conn.handleWith(httpEcho, materializer);
    }
  )).run(materializer);

これらの機能は多かれ少なかれ、接続やバインドに失敗したなど、インフラストラクチャの機能に依存していると説明する事ができます。多くの場合、Akkaは単純にエラーログを出力ます。これは、このような問題に対する最も合理的な既定の動作であり、あなたはこれらの問題に深入りする必要はありません。

Contents