メールボックス

メールボックス

Akka のメールボックスは アクター に宛てられたメッセージを保持します。通常それぞれの アクター は自分自身のメールボックスを持ちますが、 BalancingPool は例外的に単一のメールボックスのインスタンスを、全ての Routee が共有します。

メールボックスの選択

アクターのメッセージキューの型を要求する

パラメータ化した RequiresMessageQueue トレイトを継承したアクターを定義することで、特定のアクターの型に対するメッセージキューに特定の型を要求することができます。こちらがその例です。

import akka.dispatch.RequiresMessageQueue
import akka.dispatch.BoundedMessageQueueSemantics

class MyBoundedActor extends MyActor
  with RequiresMessageQueue[BoundedMessageQueueSemantics]

RequiresMessageQueue に対する型パラメータは次のようなメールボックスの設定と対応付けられます。

bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 1000
  mailbox-push-timeout-time = 10s
}

akka.actor.mailbox.requirements {
  "akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}

これで、MyBoundedActor 型のアクターを生成するといつでも、 制限付きのメールボックスを取得しようとします。もしアクターに、直接またはディスパッチャー経由で特定のメールボックスの型を使って、設定とは異なるメールボックスをデプロイ時に定義した場合は、この対応付けを上書きします。

注釈

アクターに対して作成されたメールボックスのキューの型は、トレイトの要求型に反していないかチェックされます。そのキューが要求型を実装していない場合は、アクターの生成は失敗します。

ディスパッチャーに対するメッセージキューの型を要求する

ディスパッチャーも、その上でアクターが実行する際に使用するメールボックスの型を要求するかもしれません。1つの例は複数のコンシューマに対してスレッドセーフなメッセージキューを要求する BalancingDispatcher です。そのような要求は、次のようにディスパッチャーの設定の一部の中にまとめられます。

my-dispatcher {
  mailbox-requirement = org.example.MyInterface
}

指定する要求にはメッセージキューの実装のスーパータイプとなることが保証されるクラスやインターフェース名を入れます。競合する場合 - 例えばアクターがこの要求を満たさないメールボックスの型を要求した場合 - アクターの生成は失敗します。

メールボックスの型の選択方法

アクターが生成されるとき、 ActorRefProvider が自身を実行するディスパッチャーを最初に決定します。それから次のようにメールボックスが決定されます。

  1. もしアクターのデプロイ設定のセクションに、 mailbox キーを含む場合、それがメールボックスの型を記述した設定として扱われます。

  2. もし、アクターの Props が メールボックスの選択を含む場合、 -- 例えば withMailbox がそこで呼ばれる場合 -- それがメールボックスの型を記述した設定として扱われます。

  3. もしディスパッチャーの設定に、mailbox-type キーを含む場合、同じセクションがメールボックスの型の設定として扱われます。

  4. もしアクターが上記の方法でメールボックスの型を要求した場合、要求に対する対応がメールボックスの型を決定する方法として扱われます。もしそれが失敗した場合、ディスパッチャーの要求があった場合はそれを替わりに試します。

  5. もし、ディスパッチャーが上記の方法でメールボックスの型を要求した場合、その要求に対する対応がメールボックスの型を決定する方法として扱われます。

  6. デフォルトのメールボックスである akka.actor.default-mailbox が使用されます。

デフォルトのメールボックス

メールボックスが上記の内容のように明記されていない場合はデフォルトのメールボックスが使用されます。デフォルトではそれは、 java.util.concurrent.ConcurrentLinkedQueue を基にした、制限のないメールボックスとなります。

SingleConsumerOnlyUnboundedMailbox は非常に効率の良いメールボックスであり、これをデフォルトのメールボックスとして使用することができます。ただし、 BalancingDispatcher と一緒に使うことはできません。

デフォルトのメールボックスを SingleConsumerOnlyUnboundedMailbox にする設定

akka.actor.default-mailbox {
  mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}

どの設定がメールボックスの型に渡されるか

各メールボックスの型は MailboxType を継承したクラスによって実装され、2つのコンストラクタ引数、 ActorSystem.Settings オブジェクトと Config セクションを取ります。後者はアクターシステムの設定にある名前付きのセクションを取得してから、その Id キーをメールボックスの型の設定パスで上書きし、デフォルトのメールボックスの設定セクションを後ろに追加したものになります。

組み込みのメールボックス実装

Akka はいくつかのメールボックスの実装が付属しています

  • UnboundedMailbox (デフォルト)

    • デフォルトのメールボックス

    • java.util.concurrent.ConcurrentLinkedQueue に基づく

    • ブロッキング: しない

    • 上限: なし

    • 設定名称: "unbounded" または "akka.dispatch.UnboundedMailbox"

  • SingleConsumerOnlyUnboundedMailbox

    このキューはユースケースによってはデフォルトのものより早くなるかもしれませんし、ならないかもしれません。適切にベンチーマークをしてください!

    • 複数生産者-単一消費者 キューに基づき、 BalancingDispatcher と併用できない

    • ブロッキング: しない

    • 上限: なし

    • 設定名称: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"

  • NonBlockingBoundedMailbox

    • 非常に効率的な 複数生産者-単一消費者 キューに基づく

    • ブロッキング: しない (あふれたメッセージは deadLetter へ捨てます)

    • 上限: あり

    • 設定名称: "akka.dispatch.NonBlockingBoundedMailbox"

  • UnboundedControlAwareMailbox

    • akka.dispatch.ControlMessage を継承した高優先度のメッセージを配送します。

    • 2 つの java.util.concurrent.ConcurrentLinkedQueue に基づく

    • ブロッキング: しない

    • 上限: なし

    • 設定名称: "akka.dispatch.UnboundedControlAwareMailbox"

  • UnboundedPriorityMailbox

    • java.util.concurrent.PriorityBlockingQueue に基づく

    • 優先度の等しいメッセージの配送順は未定義です。UnboundedStablePriorityMailbox とは対照的です。

    • ブロッキング: しない

    • 上限: なし

    • 設定名称: "akka.dispatch.UnboundedPriorityMailbox"

  • UnboundedStablePriorityMailbox

    • akka.util.PriorityQueueStabilizer にラップされている java.util.concurrent.PriorityBlockingQueue に基づく

    • 同一の優先度のメッセージに対して、FIFO の順序を維持します。 UnboundedPriorityMailbox とは対照的です。

    • ブロッキング: しない

    • 上限: なし

    • 設定名称: "akka.dispatch.UnboundedStablePriorityMailbox"

上限ありのメールボックス実装は、容量がいっぱいになり、さらに mailbox-push-timeout-time に 0 以外が設定してある場合は 送信者をブロックします。

注釈

以降のメールボックスでは mailbox-push-timeout-time に 0 のみを指定するようにするべきです。

  • BoundedMailbox
    • java.util.concurrent.LinkedBlockingQueue に基づく

    • ブロッキング: mailbox-push-timeout-time が0以外ならする、そうでない場合しない

    • 上限: あり

    • 設定名称: "bounded" または "akka.dispatch.BoundedMailbox"

  • BoundedPriorityMailbox
    • akka.util.BoundedBlockingQueue がラップしている java.util.PriorityQueue に基づく

    • 優先度の等しいメッセージの配送順は未定義です。``BoundedStablePriorityMailbox``とは対照的です。

    • ブロッキング: mailbox-push-timeout-time が0以外ならする、そうでない場合しない

    • 上限: あり

    • 設定名称: "akka.dispatch.BoundedPriorityMailbox"

  • BoundedStablePriorityMailbox
    • akka.util.PriorityQueueStabilizerakka.util.BoundedBlockingQueue がラップしている java.util.PriorityQueue に基づく

    • 同一優先度のメッセージは FIFO の順序になるように維持します。 BoundedPriorityMailbox とは対照的です。

    • ブロッキング: mailbox-push-timeout-time が0以外ならする、そうでない場合しない

    • 上限: あり

    • 設定名称: "akka.dispatch.BoundedStablePriorityMailbox"

  • BoundedControlAwareMailbox
    • akka.dispatch.ControlMessage を継承した高優先度のメッセージを配送します。

    • 2つの java.util.concurrent.ConcurrentLinkedQueue に基づき、もし容量が上限に達している場合はエンキューをブロックします。

    • ブロッキング: mailbox-push-timeout-time が0以外ならする、そうでない場合しない

    • 上限: あり

    • 設定名称: "akka.dispatch.BoundedControlAwareMailbox"

メールボックスの設定例

PriorityMailbox

PriorityMailbox の作成方法

import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedStablePriorityMailbox
import com.typesafe.config.Config

// We inherit, in this case, from UnboundedStablePriorityMailbox
// and seed it with the priority generator
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config)
  extends UnboundedStablePriorityMailbox(
    // Create a new PriorityGenerator, lower prio means more important
    PriorityGenerator {
      // 'highpriority messages should be treated first if possible
      case 'highpriority => 0

      // 'lowpriority messages should be treated last if possible
      case 'lowpriority  => 2

      // PoisonPill when no other left
      case PoisonPill    => 3

      // We default to 1, which is in between high and low
      case otherwise     => 1
    })

設定にメールボックスの追加を行います

prio-dispatcher {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other dispatcher configuration goes here
}

メールボックスの使用例です

// We create a new Actor that just prints out what it processes
class Logger extends Actor {
  val log: LoggingAdapter = Logging(context.system, this)

  self ! 'lowpriority
  self ! 'lowpriority
  self ! 'highpriority
  self ! 'pigdog
  self ! 'pigdog2
  self ! 'pigdog3
  self ! 'highpriority
  self ! PoisonPill

  def receive = {
    case x => log.info(x.toString)
  }
}
val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
  "prio-dispatcher"))

/*
 * Logs:
 * 'highpriority
 * 'highpriority
 * 'pigdog
 * 'pigdog2
 * 'pigdog3
 * 'lowpriority
 * 'lowpriority
 */

このように直接メールボックスの型を設定することも可能です

prio-mailbox {
  mailbox-type = "docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
  //Other mailbox configuration goes here
}

akka.actor.deployment {
  /priomailboxactor {
    mailbox = prio-mailbox
  }
}

このようにデプロイメントからでも使うことができます

import akka.actor.Props
val myActor = context.actorOf(Props[MyActor], "priomailboxactor")

もしくは次のようなコードでもよいです

import akka.actor.Props
val myActor = context.actorOf(Props[MyActor].withMailbox("prio-mailbox"))

ControlAwareMailbox

ControlAwareMailbox は メールボックス内にすでにいくつかのメッセージがある場合でも、コントロールメッセージを即座に受信したい場合に非常に便利です。

このように設定します

control-aware-dispatcher {
  mailbox-type = "akka.dispatch.UnboundedControlAwareMailbox"
  //Other dispatcher configuration goes here
}

コントロールメッセージは、 ControlMessage トレイトを継承する必要があります。

import akka.dispatch.ControlMessage

case object MyControlMessage extends ControlMessage

メールボックスの使用例です

// We create a new Actor that just prints out what it processes
class Logger extends Actor {
  val log: LoggingAdapter = Logging(context.system, this)

  self ! 'foo
  self ! 'bar
  self ! MyControlMessage
  self ! PoisonPill

  def receive = {
    case x => log.info(x.toString)
  }
}
val a = system.actorOf(Props(classOf[Logger], this).withDispatcher(
  "control-aware-dispatcher"))

/*
 * Logs:
 * MyControlMessage
 * 'foo
 * 'bar
 */

メールボックスの型を自作する

次の実装例自体には大した意味はありません。

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.dispatch.Envelope
import akka.dispatch.MailboxType
import akka.dispatch.MessageQueue
import akka.dispatch.ProducesMessageQueue
import com.typesafe.config.Config
import java.util.concurrent.ConcurrentLinkedQueue
import scala.Option

// Marker trait used for mailbox requirements mapping
trait MyUnboundedMessageQueueSemantics

object MyUnboundedMailbox {
  // This is the MessageQueue implementation
  class MyMessageQueue extends MessageQueue
    with MyUnboundedMessageQueueSemantics {

    private final val queue = new ConcurrentLinkedQueue[Envelope]()

    // these should be implemented; queue used as example
    def enqueue(receiver: ActorRef, handle: Envelope): Unit =
      queue.offer(handle)
    def dequeue(): Envelope = queue.poll()
    def numberOfMessages: Int = queue.size
    def hasMessages: Boolean = !queue.isEmpty
    def cleanUp(owner: ActorRef, deadLetters: MessageQueue) {
      while (hasMessages) {
        deadLetters.enqueue(owner, dequeue())
      }
    }
  }
}

// This is the Mailbox implementation
class MyUnboundedMailbox extends MailboxType
  with ProducesMessageQueue[MyUnboundedMailbox.MyMessageQueue] {

  import MyUnboundedMailbox._

  // This constructor signature must exist, it will be called by Akka
  def this(settings: ActorSystem.Settings, config: Config) = {
    // put your initialization code here
    this()
  }

  // The create method is called to create the MessageQueue
  final override def create(
    owner:  Option[ActorRef],
    system: Option[ActorSystem]): MessageQueue =
    new MyMessageQueue()
}

次に、ディスパッチャーの設定かメールボックスの設定に、自作のメールボックス型の FQCN を "mailbox-type" の値として明記します。

注釈

akka.actor.ActorSystem.Settingscom.typesafe.config.Config を引数に取るコンストラクタを必ず作成してください。このコンストラクタは自作のメールボックスを構築するためにリフレクションから実行されます。第二引数に渡されるコンフィグは、このメールボックス型を使用するように設定されたディスパッチャーかメールボックスを記述した設定ファイルのセクションになります。メールボックス型は各ディスパッチャーかメールボックスの設定が使用される度に一度だけインスタンス化されます。

このようにディスパッチャー上の要求としてメールボックスを使用することもできます。

custom-dispatcher {
  mailbox-requirement =
  "docs.dispatcher.MyUnboundedJMessageQueueSemantics"
}

akka.actor.mailbox.requirements {
  "docs.dispatcher.MyUnboundedJMessageQueueSemantics" =
  custom-dispatcher-mailbox
}

custom-dispatcher-mailbox {
  mailbox-type = "docs.dispatcher.MyUnboundedJMailbox"
}

または、このように自身のアクタークラス上に要求を定義することもできます。

class MySpecialActor extends Actor
  with RequiresMessageQueue[MyUnboundedMessageQueueSemantics] {
  // ...
}

system.actorOf の特別な意味

system.actorOfActorRef 型を返却している時に、同期性とノンブロッキングを両立させるため、(そして 参照の返却を完全に機能させるという意味を持たせるため) 、この場合に特別な処理を行っています。この背後では、中身の無いアクターの参照が構築が行われ、その参照は、実際にアクターとコンテキストを生成しそれを参照の内部に設定するシステムガーディアンに送られています。それが起きるまでに :class:`ActorRef に送信されたメッセージは、一旦ローカルに蓄積されます。そして本当の内容への交換が起きたときにだけ、本当のメールボックスへ転送されます。

val props: Props = ...
// this actor uses MyCustomMailbox, which is assumed to be a singleton
system.actorOf(props.withDispatcher("myCustomMailbox")) ! "bang"
assert(MyCustomMailbox.instance.getLastEnqueuedMessage == "bang")

これはおそらく失敗します。TestKit.awaitCond を使って、いくらかの時間が経過してから再度試行する必要があります。

Contents