バッファーとレートの操作

バッファーとレートの操作

アップストリームとダウンストリームのレートが異なる場合、特にスループットが急上昇する場合は、ストリームにバッファを導入すると便利です。 この章では、Akka ストリームでのバッファの使用方法について説明します。

非同期ステージ用のバッファー

このセクションでは、非同期ステージを使用する際に最適化として導入される内部バッファについて説明します。

ステージを非同期に実行するには、 `` .async`` メソッドを使用してステージを明示的にマークする必要があります。 非同期に実行されるということは、ステージが要素を下流のコンシューマに渡した後、次のメッセージをただちに処理できることを意味します。 これが何を意味するのかを実証するために、次の例を見てみましょう:

Source(1 to 3)
  .map { i => println(s"A: $i"); i }.async
  .map { i => println(s"B: $i"); i }.async
  .map { i => println(s"C: $i"); i }.async
  .runWith(Sink.ignore)

上記の例を実行すると、可能な出力の1つは次のようになります。

A: 1
A: 2
B: 1
A: 3
B: 2
C: 1
B: 3
C: 2
C: 3

順序は、フローの通常の融合同期実行モデルに対応する「A:1、B:1、C:1、A:2、B:2、C:2」では ない ことに注意してください。その場合の要素は、次の要素がフローに入る前に、処理パイプラインを完全に通過します。 実際は前の要素が発行されるとすぐに次の要素が非同期段階で処理されます。

パイプライン処理は一般にスループットを向上させるが、実際には非同期 (スレッド交差) 境界を通過する重大なコストがあります。 このコストを償却するために、Akka Streams はウィンドウ化されたバッチ処理のバックプレッシャー戦略を内部的に使用します。 ウィンドウ化されたのは、 Stop-And-Wait プロトコルとは異なり、要素の要求と同時に複数の要素が「飛行中」になる可能性があるためです。バッチ処理の理由は、要素がウィンドウバッファから排出された後、直ちに新しい要素が要求されるではなく、複数の要素が排出された後に複数の要素が要求されるです。 このバッチ処理戦略は、バックプレッシャ信号を非同期境界を通って伝播する通信コストを低減できます。

この内部プロトコルは、ユーザーにはほとんど見えませんが (スループットの増加以外) 、これらの詳細が公開される状況があります。 これまでの例では、バックプレッシャ信号によって処理チェーンの速度が厳密に調整され、すべてのステージが接続チェーンのスループットよりも速く処理されないと常に仮定していました。 しかし、Akka Streamsには、処理チェーンのさまざまなセグメントのレートを「切り離す」、または外部のタイミングソースを通るストリームの最大スループットを定義できるツールがあります。 これらの状況は、まさに内部バッチ処理のバッファリング戦略が突然不透明になる状況です。

内部バッファーとその効果

これまで説明したように、パフォーマンスのため、Akka Streamsはすべての非同期処理ステージにバッファを導入しています。 これらのバッファの目的は最適化のみです。スループットの改善が必要ない場合は、実際には 1 のサイズが最も自然な選択です。 したがって、これらのバッファサイズは小さくしておき、アプリケーションのスループット要件に適したレベルまで増加させることをお勧めします。 デフォルトのバッファサイズは、こちらの設定によって調整できます。

akka.stream.materializer.max-input-buffer-size = 16

あるいは、マテリアライザに ActorMaterializerSettings を渡すことで設定できます。

val materializer = ActorMaterializer(
  ActorMaterializerSettings(system)
    .withInputBuffer(
      initialSize = 64,
      maxSize = 64))

一つの Flow のセグメントのみにバッファサイズを設定する必要がある場合は、次の属性を持つ個別の Flow を定義することができます。

val section = Flow[Int].map(_ * 2).async
  .addAttributes(Attributes.inputBuffer(initial = 1, max = 1)) // the buffer size of this map is 1
val flow = section.via(Flow[Int].map(_ / 2)).async // the buffer size of this map is the default

内部バッファによって引き起こされるいくつかの問題を示すコードの例を次に示します。

import scala.concurrent.duration._
case class Tick()

RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  // this is the asynchronous stage in this graph
  val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count).async)

  Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0

  Source.tick(initialDelay = 1.second, interval = 1.second, "message!")
    .conflateWithSeed(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1

  zipper.out ~> Sink.foreach(println)
  ClosedShape
})

上記の例を実行すると、3 秒ごとに数字 3 が出力されることが期待されます (conflateWithSeed ステップは、下流の ZipWith がそれらを消費する前に受け取った要素の数を数えるように設定されています) 。印刷される内容は異なりますが、番号 1 が表示されます。その理由は、デフォルトでは16要素の大きさの内部バッファであり、 ZipWith がそれらを消費する前に要素をプリフェッチします。 ZipWith (またはグラフ全体) のバッファサイズを1に変更することでこの問題を解決することは可能です。 また、 ZipWith 要素の最初のプリフェッチが原因で、先頭に1が表示されます 。

注釈

一般に、時間またはレート駆動の処理ステージが異常な挙動を示す場合、試行する最初の解決策の1つは、影響を受ける要素の入力バッファを1に減らすことです。

Akka Streams のバッファー

このセクションでは、アプリケーションのストリーム処理パイプラインのドメインロジックの一部である 明示的 なユーザー定義バッファについて説明します。

以下の例では、1000のジョブ (ピッタリ) が外部システム (仮想) からデキューされ、ローカルメモリに格納され、外部システムを解放していることを確認します。

// Getting a stream of jobs from an imaginary external system as a Source
val jobs: Source[Job, NotUsed] = inboundJobsConnector()
jobs.buffer(1000, OverflowStrategy.backpressure)

次の例では、1000個のジョブをローカルにキューイングしますが、想像上の外部システムで待っているジョブがさらにある場合は、バッファの tail から1つの要素を削除して新しい要素の領域を確保します。 尾から落とすことは非常に一般的な戦略ですが、これは 最年少 の待機中の仕事を落とすことに注意する必要があります。 私たちが長い間待ち望んでいた仕事に親しみやすいという意味で、 公平性 が望まれる場合、このオプションは便利です。

jobs.buffer(1000, OverflowStrategy.dropTail)

バッファの末尾から最も若い要素を削除する代わりに、新しい要素をバッファにエンキューすることなく削除することができます。

jobs.buffer(1000, OverflowStrategy.dropNew)

1000ジョブのキューを持つもう1つの例ですが、バッファの head から1つの要素を削除することで、新しい要素のための領域を確保しています。 これは 最古 待っているジョブです。 これは、ジョブが一定期間内に処理されないと再送信されることが予想される場合に推奨される方法です。 最も古い要素はすぐに再送信されます (実際には再送信されたコピーがすでにキューに入っている可能性があります) 、だから最初に削除するのが理にかなっています。

jobs.buffer(1000, OverflowStrategy.dropHead)

上記のドロップ方法と比較して、dropBuffer は、バッファがいっぱいになると、それがエンキューした1000のジョブをすべてドロップします。 この積極的な戦略は、ジョブの削除がジョブの遅延に優先される場合に便利です。

jobs.buffer(1000, OverflowStrategy.dropBuffer)

仮想外部のジョブプロバイダーがAPIを使用するクライアントである場合、クライアントが1000を超えるジョブを待機できない場合は、フラッディングと接続の終了を考慮する必要があります。 これは、バッファがいっぱいになると単にストリームに失敗するエラー戦略によって簡単に達成できます。

jobs.buffer(1000, OverflowStrategy.fail)

レート変換

conflate を理解する

速いプロデューサーがバックプレッシャーまたは他の信号によって減速するよう通知できない場合、需要信号がコンシューマから来るまでプロデューサーからの要素を組み合わせるのに conflate が役立つかもしれません。

以下は、要素の統計が計算されている間に高速ストリームの到着した要素の標準偏差、平均、およびカウントを計算するコード例です。

val statsFlow = Flow[Double]
  .conflateWithSeed(Seq(_))(_ :+ _)
  .map { s =>
    val μ = s.sum / s.size
    val se = s.map(x => pow(x - μ, 2))
    val σ = sqrt(se.sum / se.size)
    (σ, μ, s.size)
  }

この例は、そのようなフローの速度が切り離されていることを示しています。 フローの開始時の要素レートは、フローの終了時の要素レートよりもはるかに高くなる可能性があります。

conflate の別の可能な使い方は、生産者があまりにも速くなっている場合、すべての要素の統計を考慮しないことです。 以下の例は、コンシューマがプロデューサに追いつくことができないときに、 conflate が要素のランダムなドロップを実装するためにどのように使用できるかを示しています。

val p = 0.01
val sampleFlow = Flow[Double]
  .conflateWithSeed(Seq(_)) {
    case (acc, elem) if Random.nextDouble < p => acc :+ elem
    case (acc, _)                             => acc
  }
  .mapConcat(identity)

expand を理解する

Expand は、コンシューマからの需要に追いつかない遅いプロデューサーに対処するのに役立ちます。 Expand を使用すると、要素としてコンシューマに送信される値を推定することができます。

ここで単純に expand を使うのは、プロデューサが新しい要素を送信しないときに同じ要素をコンシューマに送るフローです。

val lastFlow = Flow[Double]
  .expand(Iterator.continually(_))

また、Expandを使用すると、下流からのデマンド要求の間にある状態を維持することもできます。 これを利用して、ここでは、高速コンシューマと低速プロデューサの間のドリフトを追跡し報告するフローがあります。

val driftFlow = Flow[Double]
  .expand(i => Iterator.from(0).map(i -> _))

アップストリームから来るすべての要素は、少なくとも1回は expand されることに注意してください。 これは、このフローの出力が、プロデューサが十分に速い場合は0のドリフトを、それ以外の場合はより大きなドリフトを報告することを意味します。

Contents