メールボックス
Akka のメールボックスは アクター
に宛てられたメッセージを保持します。通常それぞれの アクター
は自分自身のメールボックスを持ちますが、 BalancingPool
は例外的に単一のメールボックスのインスタンスを、全ての Routee が共有します。
メールボックスの選択
アクターのメッセージキューの型を要求する
パラメータ化した RequiresMessageQueue
トレイトを継承したアクターを定義することで、特定のアクターの型に対するメッセージキューに特定の型を要求することができます。こちらがその例です。
import akka.dispatch.RequiresMessageQueue
import akka.dispatch.BoundedMessageQueueSemantics
class MyBoundedActor extends MyActor
with RequiresMessageQueue[BoundedMessageQueueSemantics]
RequiresMessageQueue
に対する型パラメータは次のようなメールボックスの設定と対応付けられます。
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
}
akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}
これで、MyBoundedActor
型のアクターを生成するといつでも、 制限付きのメールボックスを取得しようとします。もしアクターに、直接またはディスパッチャー経由で特定のメールボックスの型を使って、設定とは異なるメールボックスをデプロイ時に定義した場合は、この対応付けを上書きします。
注釈
アクターに対して作成されたメールボックスのキューの型は、トレイトの要求型に反していないかチェックされます。そのキューが要求型を実装していない場合は、アクターの生成は失敗します。
ディスパッチャーに対するメッセージキューの型を要求する
ディスパッチャーも、その上でアクターが実行する際に使用するメールボックスの型を要求するかもしれません。1つの例は複数のコンシューマに対してスレッドセーフなメッセージキューを要求する BalancingDispatcher です。そのような要求は、次のようにディスパッチャーの設定の一部の中にまとめられます。
my-dispatcher {
mailbox-requirement = org.example.MyInterface
}
指定する要求にはメッセージキューの実装のスーパータイプとなることが保証されるクラスやインターフェース名を入れます。競合する場合 - 例えばアクターがこの要求を満たさないメールボックスの型を要求した場合 - アクターの生成は失敗します。
メールボックスの型の選択方法
アクターが生成されるとき、 ActorRefProvider
が自身を実行するディスパッチャーを最初に決定します。それから次のようにメールボックスが決定されます。
もしアクターのデプロイ設定のセクションに、
mailbox
キーを含む場合、それがメールボックスの型を記述した設定として扱われます。もし、アクターの
Props
が メールボックスの選択を含む場合、 -- 例えばwithMailbox
がそこで呼ばれる場合 -- それがメールボックスの型を記述した設定として扱われます。もしディスパッチャーの設定に、
mailbox-type
キーを含む場合、同じセクションがメールボックスの型の設定として扱われます。もしアクターが上記の方法でメールボックスの型を要求した場合、要求に対する対応がメールボックスの型を決定する方法として扱われます。もしそれが失敗した場合、ディスパッチャーの要求があった場合はそれを替わりに試します。
もし、ディスパッチャーが上記の方法でメールボックスの型を要求した場合、その要求に対する対応がメールボックスの型を決定する方法として扱われます。
デフォルトのメールボックスである
akka.actor.default-mailbox
が使用されます。
デフォルトのメールボックス
メールボックスが上記の内容のように明記されていない場合はデフォルトのメールボックスが使用されます。デフォルトではそれは、 java.util.concurrent.ConcurrentLinkedQueue
を基にした、制限のないメールボックスとなります。
SingleConsumerOnlyUnboundedMailbox
は非常に効率の良いメールボックスであり、これをデフォルトのメールボックスとして使用することができます。ただし、 BalancingDispatcher と一緒に使うことはできません。
デフォルトのメールボックスを SingleConsumerOnlyUnboundedMailbox
にする設定
akka.actor.default-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
どの設定がメールボックスの型に渡されるか
各メールボックスの型は MailboxType
を継承したクラスによって実装され、2つのコンストラクタ引数、 ActorSystem.Settings
オブジェクトと Config
セクションを取ります。後者はアクターシステムの設定にある名前付きのセクションを取得してから、その Id
キーをメールボックスの型の設定パスで上書きし、デフォルトのメールボックスの設定セクションを後ろに追加したものになります。
組み込みのメールボックス実装
Akka はいくつかのメールボックスの実装が付属しています
UnboundedMailbox (デフォルト)
デフォルトのメールボックス
java.util.concurrent.ConcurrentLinkedQueue
に基づくブロッキング: しない
上限: なし
設定名称:
"unbounded"
または"akka.dispatch.UnboundedMailbox"
SingleConsumerOnlyUnboundedMailbox
このキューはユースケースによってはデフォルトのものより早くなるかもしれませんし、ならないかもしれません。適切にベンチーマークをしてください!
複数生産者-単一消費者 キューに基づき、
BalancingDispatcher
と併用できないブロッキング: しない
上限: なし
設定名称:
"akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
NonBlockingBoundedMailbox
非常に効率的な 複数生産者-単一消費者 キューに基づく
ブロッキング: しない (あふれたメッセージは deadLetter へ捨てます)
上限: あり
設定名称:
"akka.dispatch.NonBlockingBoundedMailbox"
UnboundedControlAwareMailbox
akka.dispatch.ControlMessage
を継承した高優先度のメッセージを配送します。2 つの
java.util.concurrent.ConcurrentLinkedQueue
に基づくブロッキング: しない
上限: なし
設定名称: "akka.dispatch.UnboundedControlAwareMailbox"
UnboundedPriorityMailbox
java.util.concurrent.PriorityBlockingQueue
に基づく優先度の等しいメッセージの配送順は未定義です。UnboundedStablePriorityMailbox とは対照的です。
ブロッキング: しない
上限: なし
設定名称: "akka.dispatch.UnboundedPriorityMailbox"
UnboundedStablePriorityMailbox
akka.util.PriorityQueueStabilizer
にラップされているjava.util.concurrent.PriorityBlockingQueue
に基づく同一の優先度のメッセージに対して、FIFO の順序を維持します。 UnboundedPriorityMailbox とは対照的です。
ブロッキング: しない
上限: なし
設定名称: "akka.dispatch.UnboundedStablePriorityMailbox"
上限ありのメールボックス実装は、容量がいっぱいになり、さらに mailbox-push-timeout-time
に 0 以外が設定してある場合は 送信者をブロックします。
注釈
以降のメールボックスでは mailbox-push-timeout-time
に 0 のみを指定するようにするべきです。
- BoundedMailbox
java.util.concurrent.LinkedBlockingQueue
に基づくブロッキング:
mailbox-push-timeout-time
が0以外ならする、そうでない場合しない上限: あり
設定名称: "bounded" または "akka.dispatch.BoundedMailbox"
- BoundedPriorityMailbox
akka.util.BoundedBlockingQueue
がラップしているjava.util.PriorityQueue
に基づく優先度の等しいメッセージの配送順は未定義です。``BoundedStablePriorityMailbox``とは対照的です。
ブロッキング:
mailbox-push-timeout-time
が0以外ならする、そうでない場合しない上限: あり
設定名称:
"akka.dispatch.BoundedPriorityMailbox"
- BoundedStablePriorityMailbox
akka.util.PriorityQueueStabilizer
とakka.util.BoundedBlockingQueue
がラップしているjava.util.PriorityQueue
に基づく同一優先度のメッセージは FIFO の順序になるように維持します。 BoundedPriorityMailbox とは対照的です。
ブロッキング:
mailbox-push-timeout-time
が0以外ならする、そうでない場合しない上限: あり
設定名称: "akka.dispatch.BoundedStablePriorityMailbox"
- BoundedControlAwareMailbox
akka.dispatch.ControlMessage
を継承した高優先度のメッセージを配送します。2つの
java.util.concurrent.ConcurrentLinkedQueue
に基づき、もし容量が上限に達している場合はエンキューをブロックします。ブロッキング:
mailbox-push-timeout-time
が0以外ならする、そうでない場合しない上限: あり
設定名称: "akka.dispatch.BoundedControlAwareMailbox"
メールボックスの設定例
PriorityMailbox
PriorityMailbox の作成方法
import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedStablePriorityMailbox
import com.typesafe.config.Config
// We inherit, in this case, from UnboundedStablePriorityMailbox
// and seed it with the priority generator
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
extends UnboundedStablePriorityMailbox(
// Create a new PriorityGenerator, lower prio means more important
PriorityGenerator {
// 'highpriority messages should be treated first if possible
case 'highpriority => 0
// 'lowpriority messages should be treated last if possible
case 'lowpriority => 2
// PoisonPill when no other left
case PoisonPill => 3
// We default to 1, which is in between high and low
case otherwise => 1
})
設定にメールボックスの追加を行います
prio-dispatcher {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other dispatcher configuration goes here
}
メールボックスの使用例です
// We create a new Actor that just prints out what it processes
class Logger extends Actor {
val log: LoggingAdapter = Logging(context.system, this)
self ! 'lowpriority
self ! 'lowpriority
self ! 'highpriority
self ! 'pigdog
self ! 'pigdog2
self ! 'pigdog3
self ! 'highpriority
self ! PoisonPill
def receive = {
case x => log.info(x.toString)
}
}
val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
"prio-dispatcher"))
/*
* Logs:
* 'highpriority
* 'highpriority
* 'pigdog
* 'pigdog2
* 'pigdog3
* 'lowpriority
* 'lowpriority
*/
このように直接メールボックスの型を設定することも可能です
prio-mailbox {
mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
//Other mailbox configuration goes here
}
akka.actor.deployment {
/priomailboxactor {
mailbox = prio-mailbox
}
}
このようにデプロイメントからでも使うことができます
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "priomailboxactor")
もしくは次のようなコードでもよいです
import akka.actor.Props
val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))
ControlAwareMailbox
ControlAwareMailbox
は メールボックス内にすでにいくつかのメッセージがある場合でも、コントロールメッセージを即座に受信したい場合に非常に便利です。
このように設定します
control-aware-dispatcher {
mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
//Other dispatcher configuration goes here
}
コントロールメッセージは、 ControlMessage
トレイトを継承する必要があります。
import akka.dispatch.ControlMessage
case object MyControlMessage extends ControlMessage
メールボックスの使用例です
// We create a new Actor that just prints out what it processes
class Logger extends Actor {
val log: LoggingAdapter = Logging(context.system, this)
self ! 'foo
self ! 'bar
self ! MyControlMessage
self ! PoisonPill
def receive = {
case x => log.info(x.toString)
}
}
val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
"control-aware-dispatcher"))
/*
* Logs:
* MyControlMessage
* 'foo
* 'bar
*/
メールボックスの型を自作する
次の実装例自体には大した意味はありません。
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Envelope
import akka.dispatch.MailboxType
import akka.dispatch.MessageQueue
import akka.dispatch.ProducesMessageQueue
import com.typesafe.config.Config
import java.util.concurrent.ConcurrentLinkedQueue
import scala.Option
// Marker trait used for mailbox requirements mapping
trait MyUnboundedMessageQueueSemantics
object MyUnboundedMailbox {
// This is the MessageQueue implementation
class MyMessageQueue extends MessageQueue
with MyUnboundedMessageQueueSemantics {
private final val queue = new ConcurrentLinkedQueue[Envelope]()
// these should be implemented; queue used as example
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
queue.offer(handle)
def dequeue(): Envelope = queue.poll()
def numberOfMessages: Int = queue.size
def hasMessages: Boolean = !queue.isEmpty
def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
while (hasMessages) {
deadLetters.enqueue(owner, dequeue())
}
}
}
}
// This is the Mailbox implementation
class MyUnboundedMailbox extends MailboxType
with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {
import MyUnboundedMailbox._
// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = {
// put your initialization code here
this()
}
// The create method is called to create the MessageQueue
final override def create(
owner: Option[ActorRef],
system: Option[ActorSystem]): MessageQueue =
new MyMessageQueue()
}
次に、ディスパッチャーの設定かメールボックスの設定に、自作のメールボックス型の FQCN を "mailbox-type" の値として明記します。
注釈
akka.actor.ActorSystem.Settings
と com.typesafe.config.Config
を引数に取るコンストラクタを必ず作成してください。このコンストラクタは自作のメールボックスを構築するためにリフレクションから実行されます。第二引数に渡されるコンフィグは、このメールボックス型を使用するように設定されたディスパッチャーかメールボックスを記述した設定ファイルのセクションになります。メールボックス型は各ディスパッチャーかメールボックスの設定が使用される度に一度だけインスタンス化されます。
このようにディスパッチャー上の要求としてメールボックスを使用することもできます。
custom-dispatcher {
mailbox-requirement =
"docs.dispatcher.MyUnboundedJMessageQueueSemantics"
}
akka.actor.mailbox.requirements {
"docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
custom-dispatcher-mailbox
}
custom-dispatcher-mailbox {
mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
}
または、このように自身のアクタークラス上に要求を定義することもできます。
class MySpecialActor extends Actor
with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] {
// ...
}
system.actorOf
の特別な意味
system.actorOf
は ActorRef 型を返却している時に、同期性とノンブロッキングを両立させるため、(そして 参照の返却を完全に機能させるという意味を持たせるため) 、この場合に特別な処理を行っています。この背後では、中身の無いアクターの参照が構築が行われ、その参照は、実際にアクターとコンテキストを生成しそれを参照の内部に設定するシステムガーディアンに送られています。それが起きるまでに :class:`ActorRef
に送信されたメッセージは、一旦ローカルに蓄積されます。そして本当の内容への交換が起きたときにだけ、本当のメールボックスへ転送されます。
val props: Props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang"
assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")
これはおそらく失敗します。TestKit.awaitCond
を使って、いくらかの時間が経過してから再度試行する必要があります。
Contents