クイックスタートガイド

クイックスタートガイド

ストリームは基本的にはソースから始まります。Akka Stream を始める場合も然りです。しかしまずは、ストリーミングのために必要なものをインポートしてみましょう。

import akka.stream._
import akka.stream.scaladsl._

このクイックスタートガイドを読み進めるなかでサンプルコードを実行する場合、以下のインポートも必要になります。

import akka.{ NotUsed, Done }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
import scala.concurrent.duration._
import java.nio.file.Paths

さて、まずは整数値を 1 から 100 まで発行するシンプルなソースから始めてゆきましょう。

val source: Source[Int, NotUsed] = Source(1 to 100)

Source 型は2つの型パラメータを取ります。一つ目がソースが発行する個別要素の型を示し、二つ目が、実行中のソースが生成する補助的な値を示します(例えばネットワークソースは、接続先のポートや通信先のアドレスに関する情報を提供します)。 補助的な情報が生成されない場合、ここには akka.NotUsed という型が付与されます。例えば整数の範囲をストリームとして扱うケースがこれに相当します。

このソースを作成したということは、1から100までの整数を発行するためのレシピが作成されたということです。しかしソースはまだアクティブにはなっていません。これらの整数を手にするには、以下のようなコードを実行する必要があります。

source.runForeach(i => println(i))(materializer)

この行では、ソースに対してそれを消費するための関数を与えています。例では単純に数値をコンソール上に出力しようとしており、Actorがこの小さなストリームのレシピを受け取って実行します。"run" という語を名前に含んだメソッドが、実行の引き金となります。Akka Stream を実行するためのメソッドは他にもありますが、それらは例外なくこの命名規則に従っています。

ところで、このストリームを実行する Actor はどこで生成されるのか、そしてこの materializer は一体何者なのでしょうか。これらの値を取得するには、まずアクターシステムを作る必要があります。

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

マテリアライザを生成する方法は他にもあります。例えばアクター内部でストリームを利用する際、 ActorContext から生成します。Materializer クラスは、ストリーム実行エンジンのためのファクトリであり、ストリームを走らせるために必要なものです。今の段階では詳細を理解する必要は全くありませんが、ただ Source が持っているあらゆる run メソッドを呼び出すために必要だということだけ覚えておいてください。run メソッドの引数に指定しなかった場合、マテリアライザは暗黙的に選択されます。次を御覧ください。

Source は、どのようなデータを実行するかの説明書に過ぎないのですが、これはAkka Streamの優れた点でもあります。ソースを建築家の青写真のように再利用し、より大きな設計に役立てることが可能なのです。先の例で言うと整数のソースを変形してファイルに書き込むこともできます。

val factorials = source.scan(BigInt(1))((acc, next) => acc * next)

val result: Future[IOResult] =
  factorials
    .map(num => ByteString(s"$num\n"))
    .runWith(FileIO.toPath(Paths.get("factorials.txt")))

まず、ストリーム全体に対して計算処理を適用するために scan コンビネータを用います。これは数値の1( BigInt(1) )から始めて、続く数値を次々と掛け合わせてゆき、初期値とそれに続く全ての計算結果を出力として返します。結果として階乗のリストを、後の再利用のために Source に隠蔽した形で手に入れることができました。ここで注意していただきたいのは、実際の計算はまだ何も行われていないということです。これはストリームを実行した際にどのような計算処理を行いたいかを記述したものに過ぎないのです。更にこの数値リストを、テキストファイル内の各行を表現した ByteString オブジェクトのストリームへと変換します。そしてデータの受信者としてファイルを指定することにより、ストリームを実行するのです。Akka Stream の用語では、これを Sink と呼びます。IOResult は Akka Stream で IO 操作が返却するタイプです。これはどのくらいのバイトあるいは要素が消費されたか、そしてストリームが正常終了あるいは異常終了したかに関する情報を保持しています。

再利用可能な部品

他のストリーム系ライブラリが持たない Akka Stream の優れた点として、ソースだけでなく、他の全ての要素も青写真のように再利用可能であるということが挙げられます。例えばファイル書込用の Sink は、入力文字列から ByteString 要素を取得する処理を挿入し、それを再利用可能な部品としてパッケージ化することもできます。(普段目にする英文のように)ストリームを書き込む言語は常に左から右へと流れてゆきます。従ってソースに似てはいるものの、「開いた」入力元となる開始地点も必要です。これを Akka Stream で Flow: と呼びます。

def lineSink(filename: String): Sink[String, Future[IOResult]] =
  Flow[String]
    .map(s => ByteString(s + "\n"))
    .toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)

文字列のストリームから始まり、それぞれを ByteString へ変換し、そして先ほどのファイル書き込み用 Sink へと流し込んでゆきます。できあがった青写真は Sink[String, Future[IOResult]] という型になります。これは文字列を入力値として取り、マテリアル化する際に Future[IOResult] 型の補助情報を生成することを意味します(SourceFlow で処理をチェーンする場合、補助情報―「マテリアル化された値」とも呼びます―の型は最上流の開始地点で決まります。ここでは FileIO.toPath が提供するものを保持したいため、Keep.right と宣言する必要があります)。

このできあがったばかりのピカピカの Sink を、数値を文字列に変更するため少しだけ手を加えた factorials につなげてみましょう。

factorials.map(_.toString).runWith(lineSink("factorial2.txt"))

時間ベースの処理

ここからより複雑な例に入る前に、Akka Stream が実現するストリーミングの性質について見てゆきましょう。まず factorials ソースをスタート地点として、0 から 100 までの数値を発行する Source に対してそれぞれの要素でペアを作ってゆきます。factorials ソースから発行される 1 番目の値は 0 の階乗、2 番めが 1の階乗、といった具合に続いてゆきます。この数値の組み合わせで "3! = 6" のような文字列を作ってゆきます。

val done: Future[Done] =
  factorials
    .zipWith(Source(0 to 100))((num, idx) => s"$idx! = $num")
    .throttle(1, 1.second, 1, ThrottleMode.shaping)
    .runForeach(println)

ここまでの操作には時間という概念は考慮されていませんでした。このままでは通常のコレクションに対する操作と何ら変わりません。しかし次の行では、ストリームが一定の速度で流れるように処理が示されています。具体的には``throttle`` コンビネータを使って、秒間1要素でストリームの速度を絞っています。(引数に出てくる2つめの 1 は、データのバースト時に流しても良い最大の要素数を指定します。ここで 1 を渡しているのは、最初の要素が流れた直後は、後続の要素はそれぞれ1秒待たなければいけない、という意味です。)

このプログラムを実行すると、一秒ごとに一行が表示されるはずです。さて、直ちに役立つわけでもないですが、留意して欲しいことがあります。それは、ここでもし10億もの数値を生成するようなストリームを使ったとしても、JVMがOutOfMemoryErrorでクラッシュすることはないということです。また、ストリームがバックグラウンドで実行されることにも着目してください(これは補助情報が:class:`Future`として提供されていることの理由になります)。この仕組が動くのは、Akka Streamが暗黙に汎用的なフローコントロールを実装しており、全てのコンビネータがバックプレッシャーを意識している作りだからです。これによってthrottleコンビネータが、上流のデータソース全てに対して、要素を決められたレートでしか受け取ることができないということを通知することができます。つまり秒間1以上の入力レートに直面した際、throttleコンビネータは上流に対して*back-pressure*を通知します。

これがAkka Streamの簡単な要約となります。他にも数多くのソースとシンク、そして更に多くのストリーム変換のコンビネータが利用可能ですが、詳細につきましては:ref:stages-overview_scala もご参照ください。

リアクティブTweet

ストリーム処理の典型的なユースケースとして、ライブストリームデータを消費して、そこから有用なデータを抽出したりまとめたりする例を考えてみましょう。ここではツイートのストリームを消費して、そこからAkkaに関連した情報を抽出してみます。

また、「もし購読者がデータのライブストリームを消費する速度があまりにも遅い場合はどうするのか」 という、全てのノンブロッキングソリューションに内在する問題も考えてみます。概して要素をバッファリングするソリューションが取られることが多いですが、これは結果的にはバッファーオーバーフローを引き起こしたりシステムを不安定にさせる結果に終わります。一方でAkka Streamでは、このような状況においてどう振る舞うべきかをコントロールするための内部的なバックプレッシャーシグナル機構を備えています。

ここで、本クイックスタートの例を通じて扱ってゆくデータモデルを紹介します。

final case class Author(handle: String)

final case class Hashtag(name: String)

final case class Tweet(author: Author, timestamp: Long, body: String) {
  def hashtags: Set[Hashtag] =
    body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}

val akka = Hashtag("#akka")

注釈

このままコード例に飛び込んでゆく前に使用される語彙の概観を理解したい場合、ドキュメンテーション内の Core conceptsDefining and running streams 節に目を通してみてください。クイックスタートに戻った際、それらの語彙がこのシンプルなアプリケーション例でどう利用されるか理解できることでしょう。

シンプルなストリームの変換と消費

ここで扱うアプリケーションの一例として、シンプルなTwitterフィードストリーに対して、例えば #akka についてツイートしている全てのユーザーのハンドル名を洗い出すなどの情報抽出を試してみましょう。

ActorSystemActorMaterializer はこれから作成するストリームをマテリアライズしたり実行するための責務を持っています。これらを生成して環境を整えるには次のように記述します

implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()

ActorMaterializer はオプションで ActorMaterializerSettings を取ります。これはデフォルトのバッファサイズ(非同期ステージ用のバッファー も参照ください)やパイプラインに使用されるディスパッチャなど、マテリアライズのためのプロパティを定義するのに利用します。これらの値は FlowSourceSinkGraph などが持つ withAttributes で上書きすることができます。

ここで、手元にツイートのストリームがあるとしましょう。Akkaにおいてこれは Source[Out, M]: として表現します。

val tweets: Source[Tweet, NotUsed]

ストリームは常に Source[Out,M1] から流れ始め、Flow[In,Out,M2] 要素やより応用的なグラフ要素を幾つか通って、最終的に Sink[In,M3] によって消費されます(今のところ MM2M3 などの型パラメータは無視してください。これらは各ストリームクラスが生産/消費する要素の型とは関係はありません。below でお話する通りこれらは「マテリアライズされた型」になります)。

Scalaのコレクションライブラリを使ったことのある方にとっては、各種の操作は見慣れたものに映るでしょう。しかしこれらはコレクションにではなくストリームに対して実行されます(ストリームの文脈では意味のある操作もコレクションの文脈ではそうとは限りませんし、逆もまたしかりです。この大きな違いに注意してください)。

val authors: Source[Author, NotUsed] =
  tweets
    .filter(_.hashtags.contains(akka))
    .map(_.author)

最後に、 materialize を行いストリーム処理を実行するために、FlowSink へ接続し、 Flow を実行可能な状態にする必要があります。最も簡単なやり方として SourcerunWith(sink) を呼び出すという手があります。幾つもの共通のシンクは利便性のために、Sinkコンパニオンオブジェクト のメソッドとして、あらかじめ定義されています。ここではシンプルに作者を表示してみましょう。

authors.runWith(Sink.foreach(println))

あるいは簡略な記法ではこうなります(これは Sink.foldSink.foreach などの最もポピュラーなシンクにしか定義されていません):

authors.runForeach(println)

ストリームをマテリアライズし実行するには、常に暗黙的なスコープに Materializer が必要です(あるいは .run(materializer) のように明示的に渡すことになります)。

コード全体は次のようになります

implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()

val authors: Source[Author, NotUsed] =
  tweets
    .filter(_.hashtags.contains(akka))
    .map(_.author)

authors.runWith(Sink.foreach(println))

ストリームのシーケンスをフラット化する

前節では、1:1の要素の関係に対して操作を行っていました。これは最も一般的なケースではありますが、時に一つの要素から複数の要素を受け取り、Scalaコレクションに対して flatMap を行う場合と同様に「フラット化」されたストリームを受け取りたいような場合もあると思います。

val hashtags: Source[Hashtag, NotUsed] = tweets.mapConcat(_.hashtags.toList)

注釈

for内包表記やモナド合成を連想させるため、flatMap という名前は意図的に避けられています。flatMap という名前は2つの理由から問題があります。まず連結を行ってフラット化することは(ストラテジとしてマージが優先されるため)デッドロックの危険性があるため、制限のかかったストリーム処理環境では望まれない挙動となります。もう一つは、(生存性の問題により)私たちの flatMap はモナド率を満たさないからです。

mapConcat に渡す関数は厳密なコレクションを返す必要がある (f:Out=>immutable.Seq[T]) のに対して、flatMap はストリームに対して操作が行われるということに注意してください。

ストリームをブロードキャストする

さてここで、あるライブストリームから全てのハッシュタグおよび投稿者名を永続化したいとします。例えば投稿者全員のハンドル名を一つのファイルに書き出し、全てのハッシュタグをディスク上の別のファイルに書き出すことを考えてみます。これはつまりソースストリームを2つのストリームに分け、それぞれ異なるファイルへの書き出しを扱うことになります。

「ファン・アウト」(あるいは「ファン・イン」)などの構造を作るための要素は、Akka Streamにおいては「ジャンクション」と呼ばれます。この例ではその中から Broadcast を使います。これはシンプルに入力ポートから全ての出力ポートに対して要素を発行します。

Akka Streamは線形的なストリーム構造(フロー)を、非線形的で分岐してゆくもの(グラフ)と意図的に区別して、それぞれのケースに対して最も便利なAPIを提供しています。グラフは、コレクションの変換のように容易には読めないですが、その代わりに複雑な任意のストリームの組み立てを表現することができます。

GraphDSL を用いると、グラフをこのように構築することができます:

val writeAuthors: Sink[Author, Unit] = ???
val writeHashtags: Sink[Hashtag, Unit] = ???
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
  import GraphDSL.Implicits._

  val bcast = b.add(Broadcast[Tweet](2))
  tweets ~> bcast.in
  bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors 
  bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
  ClosedShape
})
g.run()

ご覧の通り GraphDSL の内部で暗黙的なグラフビルダーである b を使って、グラフをミュータブルに構築します。その際に ~> というエッジ演算子(あるいは「接続」、「経由」などと読みます)を利用します。演算子は GraphDSL.Implicits._ をインポートすることで暗黙的に読み込まれます。

GraphDSL.createGraph を返却します。この例では Graph[ClosedShape, Unit] となっていますが、ここで ClosedShape完全に接続されたグラフ あるいは「閉じた」グラフを意味します。つまり未接続の入力や出力が存在しないということです。このグラフは閉じているため、RunnableGraph.fromGraph を使って RunnableGraph に変換することができます。ランナブルなグラフは run() することでストリームをマテリアライズすることができます。

GraphRunnableGraphイミュータブルでスレッドセーフであり、自由に共有 することができます。

グラフは一つ以上の未接続のポートを持ち、それにより異なるシェイプをなすことがあります。グラフに未接続のポートがあるということは、そのグラフは 部分グラフ であると表現します。大きな構造のなかでグラフを合成したりネストする際の考え方に関しては、Modularity, Composition and Hierarchy で詳細に説明されています。また Constructing Sources, Sinks and Flows from Partial Graphs では、複雑な計算グラフをFlow,SinkやSourceとしてラップすることができるということを詳細に説明しています。

バックプレッシャーを実際に扱ってみる

Akka Streamが持つ強みの一つに、ストリームのシンク(購読者)からソース(発行者)に対して、いつでも バックプレッシャー情報を伝播できるという点があります。これはオプション機能ではなく、常に有効な機能として提供されています。Akka Streamが扱うバックプレッシャープロトコルやその他のReactive Stream互換の実装に関してさらに学びたい場合は、 Back-pressure explained を参照下さい。

このような(Akka Streamを使っていない)アプリケーションはしばしば、入力データを充分な速度で処理することができない、という問題に直面することがあります。たまたまなのか設計上そうなっているのかはともかく、処理しきれない入力データをバッファリングし始め、やがてバッファリングする余地もなくなって最終的には OutOfMemoryError が発生したり、サービスの応答性に対し深刻なデグレデーションが発生することとなってしまいます。Akka Streamは明示的にバッファリングを行うことができますし、またそうする必要があります。例えば「10要素をバッファリングした最新のツイート」にのみ興味がある場合は、buffer 要素を使ってこのように表現することができます:

tweets
  .buffer(10, OverflowStrategy.dropHead)
  .map(slowComputation)
  .runWith(Sink.ignore)

buffer 要素には明示的に OverflowStrategy を渡す必要があり、これはバッファが一杯の時に新しく要素を受け取ったらどのように応答すべきかを定義します。取りうる戦略としては最も古い要素をドロップする(dropHead)、バッファ全体をドロップする、エラーを発行する、などがあります。それぞれのユースケースに最も相応しい戦略を正しく選択して下さい。

マテリアライズされた値

ここまでFlowを使ってデータ処理を行い、コンソール表示であれ外部システムへの保存であれ、何らかの外部シンクへと消費する流れのみを見てきました。しかし時には、マテリアライズされたプロセスパイプラインで得られた値を扱いたいケースもあると思います。例えばツイートをいくつ処理したか知りたい場合はどうでしょう。無限に続くツイートストリームが相手では、この質問に答えるのは容易ではありません(一つの解答として「現時点で N個のツイートを処理した」ことを表すカウントストリームを作ることはできます)。しかし有限のストリームから要素の数を取得するといったケースは一般的にも充分考えられます。

まず、Sink.fold を使って要素カウンタを書いてみましょう。型はどのようになるでしょうか?

val count: Flow[Tweet, Int, NotUsed] = Flow[Tweet].map(_ => 1)

val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)

val counterGraph: RunnableGraph[Future[Int]] =
  tweets
    .via(count)
    .toMat(sumSink)(Keep.right)

val sum: Future[Int] = counterGraph.run()

sum.foreach(c => println(s"Total tweets processed: $c"))

最初に、 流れ込むツイートを整数値の 1 へ変換する、再利用可能な Flow を用意します。そして Sink.fold を使ってストリーム上の全ての Int 要素を足し合わせ、結果を Future[Int] として利用するシンクを作ります。次に tweets ストリームを viacount と接続します。最後に toMat を使い、Flowを先ほど用意したシンクへと接続するのです。

Source[+Out, +Mat]Flow[-In, +Out, +Mat]Sink[-In, +Mat] に出てくる不可思議な型パラメータ Mat を覚えていますか?これはマテリアライズされた際に処理モジュールが返却する値の型を表します。モジュール同士を繋げるとき、それらのマテリアライズされた値も明示的に結合されることとなります。先の例では Keep.right という定義済みの関数を使っていますが、これは右側に追加されているステージの持つマテリアライズされた型のみ注目することを示します。sumSink のマテリアライズされた型は Future[Int] であり、また Keep.right を使うことにより、結果となる RunnableGraph もまた型パラメータ Future[Int] を持つことになります。

このステップではまだプロセスパイプラインをマテリアライズ しません。ここでやっていることは単純にFlowの説明を準備し、それをシンクと接続し、RunnableGraph[Future[Int]] という名前が示す通り run() できる状態にしただけです。次に、Flowをマテリアライズし実行するために、暗黙的な ActorMaterializer を使って run() を呼びだします。RunnableGraph[T]run() メソッドを呼び出した際の戻り値は T 型となります。この例では型は Future[Int] であり、完了時に tweets ストリームの長さを返します。ストリーム処理が失敗した場合、このFutureはFailureをもって完了します。

RunnableGraph はストリームの「青写真」であるため、再利用して何度もマテリアライズすることが可能です。これが何を意味するかといいますと、ストリームをマテリアライズする際、例えば1分以内のツイートのライブストリームを消費するケースで考えてみますと、この例で示すような2つのマテリアライズされた値は、互いに異なっていることがあります。

val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableGraph: RunnableGraph[Future[Int]] =
  tweetsInMinuteFromNow
    .filter(_.hashtags contains akka)
    .map(t => 1)
    .toMat(sumSink)(Keep.right)

// materialize the stream once in the morning
val morningTweetsCount: Future[Int] = counterRunnableGraph.run()
// and once in the evening, reusing the flow
val eveningTweetsCount: Future[Int] = counterRunnableGraph.run()

Akka Streamには、マテリアライズされた値を提供するための仕組みが多数備わっています。それらを使って計算結果を取得したり、他の仕組みと組み合わせることもできます。詳細については Stream Materialization にて説明いたします。このセクションを振り返ってみると、次のワンライナーを実行した時に背後で何が起きるかをもう理解できると思います。これは、上でお見せした複数行バージョンと同じものとなります。

val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)

注釈

runWith() は便利なメソッドで、自身が追加したもの以外の他の全てのステージ上のマテリアライズされた値を自動的に無視します。先ほどの例は、マテリアライズされた値に対して Keep.right をコンビネータとして使うという意味となります。

Contents