ディスパッチャ
Akkaの MessageDispatcher
はAkkaのアクターを "作動させる(tick)" もので、マシンのエンジンです。 すべての MessageDispatcher
の実装は ExecutionContext
です。これは、例えば Futures などの任意のコードを実行するために使用できることを意味します。
デフォルトディスパッチャ
ActorSystem
には、 Actor
に個別の設定がされていない場合に適用されるデフォルトのディスパッチャがあります。 デフォルトのディスパッチャは設定することができ、デフォルトは default-executor
を指定して Dispatcher
を実行します。 渡された ExecutionContext を使用して ActorSystem が作られると、この ExecutionContext は ActorSystem 内のすべてのディスパッチャのデフォルトエグゼキュータとして使用されます。 ExecutionContext が指定されていない場合、 akka.actor.default-dispatcher.default-executor.fallback
で指定された エグゼキュータにフォールバックします。 デフォルトは「fork-join-executor」で、ほとんどの場合、優れたパフォーマンスが得られます。
ディスパッチャをルックアップする
ディスパッチャは ExecutionContext
インタフェースを実装しているので、 Future
の呼び出しなどを実行するために使用できます。
// for use with Futures, Scheduler, etc.
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
アクターのためのディスパッチャの設定
Actor
にデフォルトとは別のディスパッチャを適用したい場合は、まずディスパッチャに2つの設定が必要です。
my-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 2.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
注釈
parallelism-max
は、ForkJoinPool によって割り当てられたスレッドの総数に上限を設定しないことに注意してください。これは、新しい着信タスクの制御によるレイテンシを削減するために、プールが実行している hot スレッド数を明確にする設定です。並列性の詳細については、JDKの ForkJoinPool documentation を参照してください。
"thread-pool-executor" を使用する別の例を次に示します。
my-thread-pool-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "thread-pool-executor"
# Configuration for the thread pool
thread-pool-executor {
# minimum number of threads to cap factor-based core number to
core-pool-size-min = 2
# No of core threads ... ceil(available processors * factor)
core-pool-size-factor = 2.0
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
注釈
スレッドプールエグゼキュータディスパッチャは java.util.concurrent.ThreadPoolExecutor
を使って実装されています。 詳しくはJDKの ThreadPoolExecutor documentation を参照してください。
その他のオプションについては、 環境設定 の default-dispatcher 節を参照してください。
次に、通常通りアクターを作成し、デプロイメント設定でディスパッチャを定義します。
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "myactor")
akka.actor.deployment {
/myactor {
dispatcher = my-dispatcher
}
}
デプロイメント設定の代わりに、ソースコードでディスパッチャを定義することもできます。デプロイメント設定で dispatcher
を定義すると、プログラムで提供されたパラメータの代わりにこの値が使用されます。
import akka.actor.Props
val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")
注釈
デプロイメント設定の withDispatcher
と dispatcher
プロパティで指定するディスパッチャは設定への実際のパスです。したがって、この例ではトップレベルのセクションですが、サブセクションとして使用することもでき、たとえば、 "foo.bar.my-dispatcher"
のようになります。
ディスパッチャの種類
3種類のメッセージディスパッチャがあります。
Dispatcher
アクターのセットをスレッドプールにバインドするイベントベースのディスパッチャです。ディスパッチャが指定されていない場合に使用されるデフォルトのディスパッチャです。
共有可能性: 無制限
メールボックス: 任意、アクターごとに1つ作成
ユースケース: デフォルトのディスパッチャー、隔壁
- 駆動者:
java.util.concurrent.ExecutorService
"fork-join-executor"、"thread-pool-executor" 、または
akka.dispatcher.ExecutorServiceConfigurator
のFQCNを使用する "executor" を使用して指定
- 駆動者:
PinnedDispatcher
使用する各アクターに固有のスレッドを割り当てます。すなわち、各アクターは、プール内に1つのスレッドのみを持つ独自のスレッドプールを有します。
共有可能性: なし
メールボックス: 任意、アクターごとに1つ作成
ユースケース: 隔壁
- 駆動者: 任意の
akka.dispatch.ThreadPoolExecutorConfigurator
デフォルトでは、 "thread-pool-executor"
- 駆動者: 任意の
BalancingDispatcher
エグゼキュータベースのイベント駆動型ディスパッチャで、ビジー状態のアクターからアイドル状態のアクターに作業を再分配しようとします。
すべてのアクターはメッセージを受け取る1つのメールボックスを共有します。
このディスパッチャの同じインスタンスを使用するすべてのアクターが、1つのアクターに送信されたすべてのメッセージを処理できると仮定します。すなわちアクターはアクターの1つのプールに属し、クライアントにはどのアクターインスタンスが実際に与えられたメッセージを処理するかの保証はありません。
共有可能性: 同じ種類のアクターのみ
メールボックス: 任意、すべてのアクターに対して1つ作成
ユースケース: ワークシェアリング
- 駆動者:
java.util.concurrent.ExecutorService
"fork-join-executor"、"thread-pool-executor" 、または
akka.dispatcher.ExecutorServiceConfigurator
のFQCNを使用する "executor" を使用して指定
- 駆動者:
Router Dispatcher として "BalancingDispatcher" を使用することは できません 。 (ただし、 Routees に使用することはできます)
CallingThreadDispatcher
カレントスレッドでのみで呼び出しを実行します。このディスパッチャは新しいスレッドを作成しませんが、同じアクターに対して異なるスレッドから同時に使用できます。詳細と制限については CallingThreadDispatcher を参照してください。
共有可能性: 無制限
メールボックス: 任意、スレッドごとに1つのアクタを作成します(要求に応じて)
ユースケース: テスト
駆動者: 呼び出し側スレッド(当然ですが)
その他のディスパッチャの設定例
スレッドプールサイズが固定されたディスパッチャの設定、例えばIOをブロックするアクターの場合:
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 32
}
throughput = 1
}
これを使用して:
val myActor =
context.actorOf(Props[MyActor].withDispatcher("blocking-io-dispatcher"), "myactor2")
PinnedDispatcher
を設定:
my-pinned-dispatcher {
executor = "thread-pool-executor"
type = PinnedDispatcher
}
これを使用して:
val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor3")
上記の my-thread-pool-dispatcher
の例にある thread-pool-executor
の設定は適用されません。これは、すべてのアクターが、 PinnedDispatcher
を使うときには独自のスレッドプールを持ち、そのプールにはスレッドが1つしかないからです。
コアプールタイムアウトは、 PinnedDispatcher
がアイドル状態のアクターの場合にリソースの使用を抑えるために使用されるので、 同じ スレッドが時間とともに使用されることは保証されません。同じスレッドを常に使用するには、 Thread-pool-executor.allow-core-timeout = off
を PinnedDispatcher
の設定に追加する必要があります。
Contents