ディスパッチャ

ディスパッチャ

Akkaの MessageDispatcher はAkkaのアクターを "作動させる(tick)" もので、マシンのエンジンです。 すべての MessageDispatcher の実装は ExecutionContext です。これは、例えば Futures などの任意のコードを実行するために使用できることを意味します。

デフォルトディスパッチャ

ActorSystem には、 Actor に個別の設定がされていない場合に適用されるデフォルトのディスパッチャがあります。 デフォルトのディスパッチャは設定することができ、デフォルトは default-executor を指定して Dispatcher を実行します。 渡された ExecutionContext を使用して ActorSystem が作られると、この ExecutionContext は ActorSystem 内のすべてのディスパッチャのデフォルトエグゼキュータとして使用されます。 ExecutionContext が指定されていない場合、 akka.actor.default-dispatcher.default-executor.fallback で指定された エグゼキュータにフォールバックします。 デフォルトは「fork-join-executor」で、ほとんどの場合、優れたパフォーマンスが得られます。

ディスパッチャをルックアップする

ディスパッチャは ExecutionContext インタフェースを実装しているので、 Future の呼び出しなどを実行するために使用できます。

// for use with Futures, Scheduler, etc.
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")

アクターのためのディスパッチャの設定

Actor にデフォルトとは別のディスパッチャを適用したい場合は、まずディスパッチャに2つの設定が必要です。

my-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "fork-join-executor"
  # Configuration for the fork join pool
  fork-join-executor {
    # Min number of threads to cap factor-based parallelism number to
    parallelism-min = 2
    # Parallelism (threads) ... ceil(available processors * factor)
    parallelism-factor = 2.0
    # Max number of threads to cap factor-based parallelism number to
    parallelism-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

注釈

parallelism-max は、ForkJoinPool によって割り当てられたスレッドの総数に上限を設定しないことに注意してください。これは、新しい着信タスクの制御によるレイテンシを削減するために、プールが実行している hot スレッド数を明確にする設定です。並列性の詳細については、JDKの ForkJoinPool documentation を参照してください。

"thread-pool-executor" を使用する別の例を次に示します。

my-thread-pool-dispatcher {
  # Dispatcher is the name of the event-based dispatcher
  type = Dispatcher
  # What kind of ExecutionService to use
  executor = "thread-pool-executor"
  # Configuration for the thread pool
  thread-pool-executor {
    # minimum number of threads to cap factor-based core number to
    core-pool-size-min = 2
    # No of core threads ... ceil(available processors * factor)
    core-pool-size-factor = 2.0
    # maximum number of threads to cap factor-based number to
    core-pool-size-max = 10
  }
  # Throughput defines the maximum number of messages to be
  # processed per actor before the thread jumps to the next actor.
  # Set to 1 for as fair as possible.
  throughput = 100
}

注釈

スレッドプールエグゼキュータディスパッチャは java.util.concurrent.ThreadPoolExecutor を使って実装されています。 詳しくはJDKの ThreadPoolExecutor documentation を参照してください。

その他のオプションについては、 環境設定 の default-dispatcher 節を参照してください。

次に、通常通りアクターを作成し、デプロイメント設定でディスパッチャを定義します。

import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "myactor")
akka.actor.deployment {
  /myactor {
    dispatcher = my-dispatcher
  }
}

デプロイメント設定の代わりに、ソースコードでディスパッチャを定義することもできます。デプロイメント設定で dispatcher を定義すると、プログラムで提供されたパラメータの代わりにこの値が使用されます。

import akka.actor.Props
val myActor =
  context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")

注釈

デプロイメント設定の withDispatcherdispatcher プロパティで指定するディスパッチャは設定への実際のパスです。したがって、この例ではトップレベルのセクションですが、サブセクションとして使用することもでき、たとえば、 "foo.bar.my-dispatcher" のようになります。

ディスパッチャの種類

3種類のメッセージディスパッチャがあります。

  • Dispatcher

    • アクターのセットをスレッドプールにバインドするイベントベースのディスパッチャです。ディスパッチャが指定されていない場合に使用されるデフォルトのディスパッチャです。

    • 共有可能性: 無制限

    • メールボックス: 任意、アクターごとに1つ作成

    • ユースケース: デフォルトのディスパッチャー、隔壁

    • 駆動者: java.util.concurrent.ExecutorService

      "fork-join-executor"、"thread-pool-executor" 、または akka.dispatcher.ExecutorServiceConfigurator のFQCNを使用する "executor" を使用して指定

  • PinnedDispatcher

    • 使用する各アクターに固有のスレッドを割り当てます。すなわち、各アクターは、プール内に1つのスレッドのみを持つ独自のスレッドプールを有します。

    • 共有可能性: なし

    • メールボックス: 任意、アクターごとに1つ作成

    • ユースケース: 隔壁

    • 駆動者: 任意の akka.dispatch.ThreadPoolExecutorConfigurator

      デフォルトでは、 "thread-pool-executor"

  • BalancingDispatcher

    • エグゼキュータベースのイベント駆動型ディスパッチャで、ビジー状態のアクターからアイドル状態のアクターに作業を再分配しようとします。

    • すべてのアクターはメッセージを受け取る1つのメールボックスを共有します。

    • このディスパッチャの同じインスタンスを使用するすべてのアクターが、1つのアクターに送信されたすべてのメッセージを処理できると仮定します。すなわちアクターはアクターの1つのプールに属し、クライアントにはどのアクターインスタンスが実際に与えられたメッセージを処理するかの保証はありません。

    • 共有可能性: 同じ種類のアクターのみ

    • メールボックス: 任意、すべてのアクターに対して1つ作成

    • ユースケース: ワークシェアリング

    • 駆動者: java.util.concurrent.ExecutorService

      "fork-join-executor"、"thread-pool-executor" 、または akka.dispatcher.ExecutorServiceConfigurator のFQCNを使用する "executor" を使用して指定

    • Router Dispatcher として "BalancingDispatcher" を使用することは できません 。 (ただし、 Routees に使用することはできます)

  • CallingThreadDispatcher

    • カレントスレッドでのみで呼び出しを実行します。このディスパッチャは新しいスレッドを作成しませんが、同じアクターに対して異なるスレッドから同時に使用できます。詳細と制限については CallingThreadDispatcher を参照してください。

    • 共有可能性: 無制限

    • メールボックス: 任意、スレッドごとに1つのアクタを作成します(要求に応じて)

    • ユースケース: テスト

    • 駆動者: 呼び出し側スレッド(当然ですが)

その他のディスパッチャの設定例

スレッドプールサイズが固定されたディスパッチャの設定、例えばIOをブロックするアクターの場合:

blocking-io-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 32
  }
  throughput = 1
}

これを使用して:

val myActor =
  context.actorOf(Props[MyActor].withDispatcher("blocking-io-dispatcher"), "myactor2")

PinnedDispatcher を設定:

my-pinned-dispatcher {
  executor = "thread-pool-executor"
  type = PinnedDispatcher
}

これを使用して:

val myActor =
  context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor3")

上記の my-thread-pool-dispatcher の例にある thread-pool-executor の設定は適用されません。これは、すべてのアクターが、 PinnedDispatcher を使うときには独自のスレッドプールを持ち、そのプールにはスレッドが1つしかないからです。

コアプールタイムアウトは、 PinnedDispatcher がアイドル状態のアクターの場合にリソースの使用を抑えるために使用されるので、 同じ スレッドが時間とともに使用されることは保証されません。同じスレッドを常に使用するには、 Thread-pool-executor.allow-core-timeout = offPinnedDispatcher の設定に追加する必要があります。

Contents