アクター
`アクターモデル`は並行分散システムを作るための高度な抽象を提供します。アクターモデルを用いることで開発者は明示的なロックやスレッドの管理を行う労力を軽減した上で信頼性のある並行並列なシステムを作ることができます。アクターは Cal Herwitt が1973年に論文として発表したものですが、Erlangによって世に広められ、この成功例として Ericsson 社による高い並行性と信頼性を持つ電気通信システムの構築が挙げられます。
Akka のアクターは Erlang からシンタックスを拝借した Scala のアクターと似ています。
アクターの生成
注釈
Akka では全てのアクターが親となるアクターに監視され、同時に子供のアクターの supervisor になる(可能性がある)という方式をとっています。この方式については アクターシステム や スーパービジョン と モニタリング あるいは アクターの参照、パス、アドレス といった記事を参考に読んでみるとよいでしょう。
アクターのクラスを定義する
Actors in Java are implemented by extending the UntypedActor
class and implementing the
onReceive
method. This method takes the message as a parameter.
以下はこのサンプルコードです。
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyUntypedActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
log.info("Received String message: {}", message);
getSender().tell(message, getSelf());
} else
unhandled(message);
}
}
Props
:class:`Propsはアクターを生成するときのオプションを指定するための設定クラスです。このクラスのインスタンスは不変なので気軽に共有することができるアクターを生成するためのレシピと考えることができます。また、このレシピにはデプロイに関係した情報(例えばどのdispatcherを使うかとかいったもの)も含めることができます。
import akka.actor.Props;
import akka.japi.Creator;
static class MyActorC implements Creator<MyActor> {
@Override public MyActor create() {
return new MyActor("...");
}
}
Props props1 = Props.create(MyUntypedActor.class);
Props props2 = Props.create(MyActor.class, "...");
Props props3 = Props.create(new MyActorC());
The second line shows how to pass constructor arguments to the Actor
being created. The presence of a matching constructor is verified during
construction of the Props
object, resulting in an
IllegalArgumentException
if no or multiple matching constructors are
found.
The third line demonstrates the use of a Creator
. The
creator class must be static, which is verified during Props
construction. The type parameter’s upper bound is used to determine the
produced actor class, falling back to Actor
if fully erased. An
example of a parametric factory could be:
static class ParametricCreator<T extends MyActor> implements Creator<T> {
@Override public T create() {
// ... fabricate actor here
}
}
注釈
In order for mailbox requirements—like using a deque-based mailbox for actors
using the stash—to be picked up, the actor type needs to be known before
creating it, which is what the Creator
type argument allows.
Therefore make sure to use the specific type for your actors wherever
possible.
推奨される手法
It is a good idea to provide static factory methods on the
UntypedActor
which help keeping the creation of suitable
Props
as close to the actor definition as possible. This also allows
usage of the Creator
-based methods which statically verify that the
used constructor actually exists instead relying only on a runtime check.
public class DemoActor extends UntypedActor {
/**
* Create Props for an actor of this type.
* @param magicNumber The magic number to be passed to this actor’s constructor.
* @return a Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
public static Props props(final int magicNumber) {
return Props.create(new Creator<DemoActor>() {
private static final long serialVersionUID = 1L;
@Override
public DemoActor create() throws Exception {
return new DemoActor(magicNumber);
}
});
}
final int magicNumber;
public DemoActor(int magicNumber) {
this.magicNumber = magicNumber;
}
@Override
public void onReceive(Object msg) {
// some behavior here
}
}
system.actorOf(DemoActor.props(42), "demo");
Another good practice is to declare what messages an Actor can receive as close to the actor definition as possible (e.g. as static classes inside the Actor or using other suitable class), which makes it easier to know what it can receive.
public class DemoMessagesActor extends UntypedActor {
static public class Greeting {
private final String from;
public Greeting(String from) {
this.from = from;
}
public String getGreeter() {
return from;
}
}
public void onReceive(Object message) throws Exception {
if (message instanceof Greeting) {
getSender().tell("Hello " + ((Greeting) message).getGreeter(), getSelf());
} else
unhandled(message);
}
}
Propsを使ったアクターの生成
アクターは:class:`Props`のインスタンスを:class:`ActorSystem`や:class:`ActorContext`が持っている:meth:`actorOf`というファクトリメソッドに渡すことで生成できます。
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
// ActorSystem is a heavy object: create only one per application
final ActorSystem system = ActorSystem.create("MySystem");
final ActorRef myActor = system.actorOf(Props.create(MyUntypedActor.class),
"myactor");
:class:`ActorSystem`を使うとトップレベルのアクターを生成できます。トップレベルのアクターはアクターシステムが提供しているガーディアンアクターによって監視されます。一方でコンテクストから生成したアクターはそのアクターの子アクターになります。
class A extends UntypedActor {
final ActorRef child =
getContext().actorOf(Props.create(MyUntypedActor.class), "myChild");
// plus some behavior ...
}
子供から孫といった階層を作るようにしてください。そのようにすることでアプリケーションの論理的なエラー処理の構造を作ることができます。詳しくは:ref:`actor-systems`を参照してください。
:meth:`actorOf`を呼ぶことで:class:`ActorRef`のインスタンスを得ることができます。:class:`ActorRef`はアクターのインスタンスとつながっていて、アクターと対話するための唯一の手段となります。:class:`ActorRef`は不変でそれが表現しているアクターと一対一の関係を持っています。:class:`ActorRef`は永続化可能なのでネットワークを介することができます。つまり、シリアライズしたインスタンスをリモートのホストに送信した場合、元のノードのアクターをネットワークを超えて表現することができます。
The name parameter is optional, but you should preferably name your actors,
since that is used in log messages and for identifying actors. The name must
not be empty or start with $
, but it may contain URL encoded characters
(eg. %20
for a blank space). If the given name is already in use by
another child to the same parent an InvalidActorNameException is thrown.
アクターは生成されたら非同期に自動的に開始されます。
依存性の注入
If your UntypedActor has a constructor that takes parameters then those need to
be part of the Props
as well, as described above. But there
are cases when a factory method must be used, for example when the actual
constructor arguments are determined by a dependency injection framework.
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
class DependencyInjector implements IndirectActorProducer {
final Object applicationContext;
final String beanName;
public DependencyInjector(Object applicationContext, String beanName) {
this.applicationContext = applicationContext;
this.beanName = beanName;
}
@Override
public Class<? extends Actor> actorClass() {
return MyActor.class;
}
@Override
public MyActor produce() {
MyActor result;
// obtain fresh Actor instance from DI framework ...
return result;
}
}
final ActorRef myActor = getContext().actorOf(
Props.create(DependencyInjector.class, applicationContext, "MyActor"),
"myactor3");
警告
You might be tempted at times to offer an IndirectActorProducer
which always returns the same instance, e.g. by using a static field. This is
not supported, as it goes against the meaning of an actor restart, which is
described here: リスタートが意味するもの.
依存性の注入を行うフレームワークを使う場合、アクターのbeanはシングルトンスコープであることは*許可されません*。
依存性の注入のためのテクニックや依存性注入を行うフレームワークとの統合については`Using Akka with Dependency Injection <http://letitcrash.com/post/55958814293/akka-dependency-injection>`_ guidelineやLightbend Activatorの中の`Akka Java Spring <http://www.lightbend.com/activator/template/akka-java-spring>`_ tutorialにより詳しい情報があります。
Inbox
アクターの外側のコードからアクターと通信をする場合、``ask``パターン(後で出てきます)を使うのが一つの方法ですが、それを使えない場合が二つあります。一つは複数のメッセージを受け取るような場合(例えば通知サービスとして実装されている:class:`ActorRef`を購読する場合)で、もう一つはActorのライフサイクルをwatchしている場合です。こういった場合のために:class:`Inbox`というクラスが用意されています。
final Inbox inbox = Inbox.create(system);
inbox.send(target, "hello");
try {
assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)).equals("world");
} catch (java.util.concurrent.TimeoutException e) {
// timeout
}
The send
method wraps a normal tell
and supplies the internal
actor’s reference as the sender. This allows the reply to be received on the
last line. Watching an actor is quite simple as well:
final Inbox inbox = Inbox.create(system);
inbox.watch(target);
target.tell(PoisonPill.getInstance(), ActorRef.noSender());
try {
assert inbox.receive(Duration.create(1, TimeUnit.SECONDS)) instanceof Terminated;
} catch (java.util.concurrent.TimeoutException e) {
// timeout
}
UntypedActor API
The UntypedActor
class defines only one abstract method, the above mentioned
onReceive(Object message)
, which implements the behavior of the actor.
If the current actor behavior does not match a received message, it's recommended that
you call the unhandled
method, which by default publishes a new
akka.actor.UnhandledMessage(message, sender, recipient)
on the actor system’s
event stream (set configuration item akka.actor.debug.unhandled
to on
to have them converted into actual Debug messages).
Actor
traitには他にも以下のようなメソッドがあります。
getSelf
reference to theActorRef
of the actorgetSender
reference sender Actor of the last received message, typically used as described in Reply to messagessupervisorStrategy
user overridable definition the strategy to use for supervising child actorsThis strategy is typically declared inside the actor in order to have access to the actor’s internal state within the decider function: since failure is communicated as a message sent to the supervisor and processed like other messages (albeit outside of the normal behavior), all values and variables within the actor are available, as is the
getSender()
reference (which will be the immediate child reporting the failure; if the original failure occurred within a distant descendant it is still reported one level up at a time).getContext
exposes contextual information for the actor and the current message, such as:子アクターを作るためのファクトリメソッド(
actorOf
)アクターが所属しているシステム
supervisorである親アクター
監視している子アクター
ライフサイクルの監視
- hotswap behavior stack as described in HotSwap
ここまででまだ残っているアクセス可能なメソッドはユーザがオーバーライドすることで以下の述べるアクターのライフサイクルにフックすることができるメソッドです。
public void preStart() {
}
public void preRestart(Throwable reason, scala.Option<Object> message) {
for (ActorRef each : getContext().getChildren()) {
getContext().unwatch(each);
getContext().stop(each);
}
postStop();
}
public void postRestart(Throwable reason) {
preStart();
}
public void postStop() {
}
The implementations shown above are the defaults provided by the UntypedActor
class.
アクターのライフサクル
アクターシステムにおけるパスは生存しているアクターによって占有されている"場所"を表現しています。始めは(システムによって初期化されたアクターを除き)パスは空になっています。 actorOf()
を呼びだすと Prop
で表現されたアクターの*インカーネーション*が与えられたパスに生成されます。アクターのインカーネーションはパスと UID によって識別されます。再起動が行われたときには Actor
のインスタンスは置き換えられますが、インカーネーションの方は置き換えられないので UID は同じものになります。
The lifecycle of an incarnation ends when the actor is stopped. At
that point the appropriate lifecycle events are called and watching actors
are notified of the termination. After the incarnation is stopped, the path can
be reused again by creating an actor with actorOf()
. In this case the
name of the new incarnation will be the same as the previous one but the
UIDs will differ. An actor can be stopped by the actor itself, another actor
or the ActorSystem
(see Stopping actors).
注釈
アクターは参照されなくなったとしても自動的に停止することはないという点は重要です。生成された全てのアクターは明示的に破棄する必要があります。ただし親のアクターを停止する場合、そのアクターが生成した全ての子供のアクターも停止されるのでこの点は単純です。
``ActorRef``はただ単に与えられたパスを表現しているのではなくいつでもインカーネーション(パスとUID)を表現しています。つまりアクターを停止して同じ名前のアクターを生成した場合、新しく生成した``ActorRef``は古いインカーネーションでなく新しいインカーネーションを指しています。
ActorSelection
on the other hand points to the path (or multiple paths
if wildcards are used) and is completely oblivious to which incarnation is currently
occupying it. ActorSelection
cannot be watched for this reason. It is
possible to resolve the current incarnation's ActorRef
living under the
path by sending an Identify
message to the ActorSelection
which
will be replied to with an ActorIdentity
containing the correct reference
(see Actor Selectionを使ったアクターの識別). This can also be done with the resolveOne
method of the ActorSelection
, which returns a Future
of the matching
ActorRef
.
ライフサイクルの監視、DeathWatch
他のアクターの停止を知るために(例えば、永久に停止された場合や一時的ではない障害によって再起動された場合など)、アクターは他のアクターが停止時に発する Terminated
メッセージを受け取るようにすることができます。(Stopping Actors`_も参照のこと)この機能はアクターシステムの :class:`DeathWatch というコンポーネントによって提供されています。
Registering a monitor is easy (see fourth line, the rest is for demonstrating the whole functionality):
import akka.actor.Terminated;
public class WatchActor extends UntypedActor {
final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
{
this.getContext().watch(child); // <-- the only call needed for registration
}
ActorRef lastSender = getContext().system().deadLetters();
@Override
public void onReceive(Object message) {
if (message.equals("kill")) {
getContext().stop(child);
lastSender = getSender();
} else if (message instanceof Terminated) {
final Terminated t = (Terminated) message;
if (t.getActor() == child) {
lastSender.tell("finished", getSelf());
}
} else {
unhandled(message);
}
}
}
Terminated
メッセージは登録や停止がどのような順番で起きたかとは独立して生成されることに注意してください。典型的な例として、監視を行うアクターは例え監視の登録を行った時点ですでに監視対象のアクターが停止されていたとしても Treminated
メッセージを受け取ることになります。
監視の登録を複数回行うことが必ずしも複数のメッセージを作ることになるわけではありませんが、こうしたメッセージを正確に一度受け取ることができる保障はありません。監視対象のアクターの停止メッセージが作られてキューに入ってから、このメッセージが処理される前に他のところで登録が行われたら、二つ目のメッセージがキューに入ります。何故なら既に停止したアクターの監視を登録すると、直ちに: class:Terminated が生成されるためです。
It is also possible to deregister from watching another actor’s liveliness
using getContext().unwatch(target)
. This works even if the
Terminated
message has already been enqueued in the mailbox; after
calling unwatch
no Terminated
message for that actor will be
processed anymore.
Start Hook
アクターが正しく起動すると、 preStart
というメソッドが呼び出されます。
@Override
public void preStart() {
child = getContext().actorOf(Props.empty());
}
このメソッドはアクタがーが初めに生成されたときに呼び出されます。アクターが再起動したときには postRestart
のデフォルトの実装がこのメソッドの呼び出しを行いますが、このメソッドをオーバーライドすることによって初期化のコードを一度だけ呼び出されるようにするのか、再起動のたびに呼び出されるようにするのかを選択することができます。アクターのコンストラクタの中の初期化コードはアクターが生成された時や再起動したときに常に呼び出されます。
Restart Hook
全てのアクターは、例えば他のアクターによるエラー処理のストラテジーに紐づくことによって監視されています。アクターがメッセージを処理しているときに例外をスローした場合は再起動が行われます。(スーパービジョン と モニタリング を参照)この再起動は上記に挙げたフック処理のトリガになります。
The old actor is informed by calling
preRestart
with the exception which caused the restart and the message which triggered that exception; the latter may beNone
if the restart was not caused by processing a message, e.g. when a supervisor does not trap the exception and is restarted in turn by its supervisor, or if an actor is restarted due to a sibling’s failure. If the message is available, then that message’s sender is also accessible in the usual way (i.e. by callinggetSender()
).このメソッドは新しいアクターのインスタンスなどへの引き継ぎのためにクリーンアップを行うのにもっともよい場所です。デフォルトの実装では全ての子アクターを停止して
postStop
を呼び出すようになっています。新しいインスタンスを生成するために
actorOf
の呼び出し時から引き継いだ初期化のファクトリを利用します。新しく生成したアクターの
postRestart
に再起動の原因となった例外を受け渡して呼び出します。デフォルトの実装ではpreStart
の呼び出しを行って通常のアクターの開始と同じように振る舞います。
アクターの再起動では実際のアクターのオブジェクトが入れ替わるだけです。再起動がメールボックの内容に影響を及ぼすことはないので、メッセージの処理は :metho:`postRestart` の処理が終わった後に再開されます。例外の原因となったメッセージを再び受け取ることはありません。再起動中にアクターが受け取ったメッセージは通常メールボックスには入りません。
警告
障害の通知とユーザのメッセージの相対的な順番が決定的ではないことに注意してください。特に、親のアクターによって子アクターが障害が起きる前に受け取った最後のメッセージを処理する前に再起動してしまうことがあります。詳細については ディスカッション: メッセージの順序 を参照してください。
Stop Hook
アクターが停止した後には、そのアクターの postStop
フックが呼び出されるので、これを使って他のサービスからこのアクターを登録解除するといった処理を行うようにします。ここのフックはこのアクターに対するメッセージのキューイングが利用できなくなってから呼び出されることが保障されています。停止されたアクターに送信したメッセージは ActorSystem
の deadLetters
にリダイレクトされるようになります。
Actor Selectionを使ったアクターの識別
アクターの参照、パス、アドレス で述べたように、アクターはそれぞれ一意な論理パスを持っていて、子供から親を辿ってアクターシステムのルートにまで遡ることができることができます。また、アクターは物理的なパスも持っていますが、監視のチェーンの中にリモートのsupervisorが存在する場合には論理的なパスと異なることがあります。これらのパスはリモートからメッセージを受け取った時に受信者を検索するのに利用されたりしますが、もっと直接的な利点があります。アクターは絶対パスもしくは相対パスを--これらは論理的なものと物理的なものがあります--を使ってほかのアクターを検索し ActorSelection
を使ってその結果を受け取ることができます。
// will look up this absolute path
getContext().actorSelection("/user/serviceA/actor");
// will look up sibling beneath same supervisor
getContext().actorSelection("../joe");
注釈
通常、アクターの通信はActorSelectionに頼らずにActorRefを使う方が望ましいです。ただし、次の場合は例外です。
- sending messages using the At-Least-Once Delivery facility
リモートシステムとの始めの通信を行う場合
上記以外のケースでは、親から子にActorRefを渡したり、ActorRefへの参照を含メッセージを他のアクターに送信するなど、アクターの生成時や初期化時にActorRefを受け渡す方法があります。
The supplied path is parsed as a java.net.URI
, which basically means
that it is split on /
into path elements. If the path starts with /
, it
is absolute and the look-up starts at the root guardian (which is the parent of
"/user"
); otherwise it starts at the current actor. If a path element equals
..
, the look-up will take a step “up” towards the supervisor of the
currently traversed actor, otherwise it will step “down” to the named child.
It should be noted that the ..
in actor paths here always means the logical
structure, i.e. the supervisor.
The path elements of an actor selection may contain wildcard patterns allowing for broadcasting of messages to that section:
// will look all children to serviceB with names starting with worker
getContext().actorSelection("/user/serviceB/worker*");
// will look up all siblings beneath same supervisor
getContext().actorSelection("../*");
Messages can be sent via the ActorSelection
and the path of the
ActorSelection
is looked up when delivering each message. If the selection
does not match any actors the message will be dropped.
To acquire an ActorRef
for an ActorSelection
you need to send
a message to the selection and use the getSender
reference of the reply
from the actor. There is a built-in Identify
message that all Actors will
understand and automatically reply to with a ActorIdentity
message
containing the ActorRef
. This message is handled specially by the
actors which are traversed in the sense that if a concrete name lookup fails
(i.e. a non-wildcard path element does not correspond to a live actor) then a
negative result is generated. Please note that this does not mean that delivery
of that reply is guaranteed, it still is a normal message.
import akka.actor.ActorIdentity;
import akka.actor.ActorSelection;
import akka.actor.Identify;
public class Follower extends UntypedActor {
final String identifyId = "1";
{
ActorSelection selection =
getContext().actorSelection("/user/another");
selection.tell(new Identify(identifyId), getSelf());
}
ActorRef another;
final ActorRef probe;
public Follower(ActorRef probe) {
this.probe = probe;
}
@Override
public void onReceive(Object message) {
if (message instanceof ActorIdentity) {
ActorIdentity identity = (ActorIdentity) message;
if (identity.correlationId().equals(identifyId)) {
ActorRef ref = identity.getRef();
if (ref == null)
getContext().stop(getSelf());
else {
another = ref;
getContext().watch(another);
probe.tell(ref, getSelf());
}
}
} else if (message instanceof Terminated) {
final Terminated t = (Terminated) message;
if (t.getActor().equals(another)) {
getContext().stop(getSelf());
}
} else {
unhandled(message);
}
}
}
You can also acquire an ActorRef
for an ActorSelection
with
the resolveOne
method of the ActorSelection
. It returns a Future
of the matching ActorRef
if such an actor exists. It is completed with
failure [[akka.actor.ActorNotFound]] if no such actor exists or the identification
didn't complete within the supplied timeout.
Remote actor addresses may also be looked up, if remoting is enabled:
getContext().actorSelection("akka.tcp://app@otherhost:1234/user/serviceB");
An example demonstrating remote actor look-up is given in リモート処理のサンプル.
Messages and immutability
IMPORTANT: Messages can be any kind of object but have to be immutable. Akka can’t enforce immutability (yet) so this has to be by convention.
Here is an example of an immutable message:
public class ImmutableMessage {
private final int sequenceNumber;
private final List<String> values;
public ImmutableMessage(int sequenceNumber, List<String> values) {
this.sequenceNumber = sequenceNumber;
this.values = Collections.unmodifiableList(new ArrayList<String>(values));
}
public int getSequenceNumber() {
return sequenceNumber;
}
public List<String> getValues() {
return values;
}
}
Send messages
Messages are sent to an Actor through one of the following methods.
tell
means “fire-and-forget”, e.g. send a message asynchronously and return immediately.ask
sends a message asynchronously and returns aFuture
representing a possible reply.
Message ordering is guaranteed on a per-sender basis.
注釈
There are performance implications of using ask
since something needs to
keep track of when it times out, there needs to be something that bridges
a Promise
into an ActorRef
and it also needs to be reachable through
remoting. So always prefer tell
for performance, and only ask
if you must.
In all these methods you have the option of passing along your own ActorRef
.
Make it a practice of doing so because it will allow the receiver actors to be able to respond
to your message, since the sender reference is sent along with the message.
Tell: Fire-forget
This is the preferred way of sending messages. No blocking waiting for a message. This gives the best concurrency and scalability characteristics.
// don’t forget to think about who is the sender (2nd argument)
target.tell(message, getSelf());
The sender reference is passed along with the message and available within the
receiving actor via its getSender
method while processing this
message. Inside of an actor it is usually getSelf
who shall be the
sender, but there can be cases where replies shall be routed to some other
actor—e.g. the parent—in which the second argument to tell
would be a
different one. Outside of an actor and if no reply is needed the second
argument can be null
; if a reply is needed outside of an actor you can use
the ask-pattern described next..
Ask: Send-And-Receive-Future
The ask
pattern involves actors as well as futures, hence it is offered as
a use pattern rather than a method on ActorRef
:
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.util.Timeout;
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout
futures.add(ask(actorB, "another request", t)); // using timeout from
// above
final Future<Iterable<Object>> aggregate = Futures.sequence(futures,
system.dispatcher());
final Future<Result> transformed = aggregate.map(
new Mapper<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator();
final String x = (String) it.next();
final String s = (String) it.next();
return new Result(x, s);
}
}, system.dispatcher());
pipe(transformed, system.dispatcher()).to(actorC);
This example demonstrates ask
together with the pipe
pattern on
futures, because this is likely to be a common combination. Please note that
all of the above is completely non-blocking and asynchronous: ask
produces
a Future
, two of which are composed into a new future using the
Futures.sequence
and map
methods and then pipe
installs
an onComplete
-handler on the future to effect the submission of the
aggregated Result
to another actor.
Using ask
will send a message to the receiving Actor as with tell
, and
the receiving actor must reply with getSender().tell(reply, getSelf())
in order to
complete the returned Future
with a value. The ask
operation
involves creating an internal actor for handling this reply, which needs to
have a timeout after which it is destroyed in order not to leak resources; see
more below.
注釈
A Java 8 variant of the ask
pattern that returns a CompletionStage
instead of a Scala Future
is available in the akka.pattern.PatternsCS
object.
警告
To complete the future with an exception you need send a Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.
try {
String result = operation();
getSender().tell(result, getSelf());
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e;
}
If the actor does not complete the future, it will expire after the timeout period,
specified as parameter to the ask
method; this will complete the
Future
with an AskTimeoutException
.
See Futures for more information on how to await or query a future.
The onComplete
, onSuccess
, or onFailure
methods of the Future
can be
used to register a callback to get a notification when the Future completes.
Gives you a way to avoid blocking.
警告
When using future callbacks, inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. See also: アクターと共有可変状態
Forward message
You can forward a message from one actor to another. This means that the original sender address/reference is maintained even though the message is going through a 'mediator'. This can be useful when writing actors that work as routers, load-balancers, replicators etc. You need to pass along your context variable as well.
target.forward(result, getContext());
Receive messages
When an actor receives a message it is passed into the onReceive
method, this is
an abstract method on the UntypedActor
base class that needs to be defined.
以下はこのサンプルコードです。
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyUntypedActor extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
log.info("Received String message: {}", message);
getSender().tell(message, getSelf());
} else
unhandled(message);
}
}
An alternative to using if-instanceof checks is to use Apache Commons MethodUtils to invoke a named method whose parameter type matches the message type.
Reply to messages
If you want to have a handle for replying to a message, you can use
getSender()
, which gives you an ActorRef. You can reply by sending to
that ActorRef with getSender().tell(replyMsg, getSelf())
. You can also store the ActorRef
for replying later, or passing on to other actors. If there is no sender (a
message was sent without an actor or future context) then the sender
defaults to a 'dead-letter' actor ref.
@Override
public void onReceive(Object msg) {
Object result =
// calculate result ...
// do not forget the second argument!
getSender().tell(result, getSelf());
}
Receive timeout
The UntypedActorContext setReceiveTimeout
defines the inactivity timeout after which
the sending of a ReceiveTimeout message is triggered.
When specified, the receive function should be able to handle an akka.actor.ReceiveTimeout message.
1 millisecond is the minimum supported timeout.
Please note that the receive timeout might fire and enqueue the ReceiveTimeout message right after another message was enqueued; hence it is not guaranteed that upon reception of the receive timeout there must have been an idle period beforehand as configured via this method.
Once set, the receive timeout stays in effect (i.e. continues firing repeatedly after inactivity periods). Pass in Duration.Undefined to switch off this feature.
import akka.actor.ActorRef;
import akka.actor.ReceiveTimeout;
import akka.actor.UntypedActor;
import scala.concurrent.duration.Duration;
public class MyReceiveTimeoutUntypedActor extends UntypedActor {
public MyReceiveTimeoutUntypedActor() {
// To set an initial delay
getContext().setReceiveTimeout(Duration.create("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
// To set in a response to a message
getContext().setReceiveTimeout(Duration.create("1 second"));
} else if (message instanceof ReceiveTimeout) {
// To turn it off
getContext().setReceiveTimeout(Duration.Undefined());
} else {
unhandled(message);
}
}
}
Messages marked with NotInfluenceReceiveTimeout
will not reset the timer. This can be useful when
ReceiveTimeout
should be fired by external inactivity but not influenced by internal activity,
e.g. scheduled tick messages.
Stopping actors
Actors are stopped by invoking the stop
method of a ActorRefFactory
,
i.e. ActorContext
or ActorSystem
. Typically the context is used for stopping
the actor itself or child actors and the system for stopping top level actors. The actual
termination of the actor is performed asynchronously, i.e. stop
may return before
the actor is stopped.
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyStoppingActor extends UntypedActor {
ActorRef child = null;
// ... creation of child ...
public void onReceive(Object message) throws Exception {
if (message.equals("interrupt-child")) {
context().stop(child);
} else if (message.equals("done")) {
context().stop(getSelf());
} else {
unhandled(message);
}
}
}
Processing of the current message, if any, will continue before the actor is stopped,
but additional messages in the mailbox will not be processed. By default these
messages are sent to the deadLetters
of the ActorSystem
, but that
depends on the mailbox implementation.
Termination of an actor proceeds in two steps: first the actor suspends its
mailbox processing and sends a stop command to all its children, then it keeps
processing the internal termination notifications from its children until the last one is
gone, finally terminating itself (invoking postStop
, dumping mailbox,
publishing Terminated
on the DeathWatch, telling
its supervisor). This procedure ensures that actor system sub-trees terminate
in an orderly fashion, propagating the stop command to the leaves and
collecting their confirmation back to the stopped supervisor. If one of the
actors does not respond (i.e. processing a message for extended periods of time
and therefore not receiving the stop command), this whole process will be
stuck.
Upon ActorSystem.terminate
, the system guardian actors will be
stopped, and the aforementioned process will ensure proper termination of the
whole system.
The postStop
hook is invoked after an actor is fully stopped. This
enables cleaning up of resources:
@Override
public void postStop() {
// clean up resources here ...
}
注釈
Since stopping an actor is asynchronous, you cannot immediately reuse the
name of the child you just stopped; this will result in an
InvalidActorNameException
. Instead, watch
the terminating
actor and create its replacement in response to the Terminated
message which will eventually arrive.
PoisonPill
You can also send an actor the akka.actor.PoisonPill
message, which will
stop the actor when the message is processed. PoisonPill
is enqueued as
ordinary messages and will be handled after messages that were already queued
in the mailbox.
Use it like this:
myActor.tell(akka.actor.PoisonPill.getInstance(), sender);
Graceful Stop
gracefulStop
is useful if you need to wait for termination or compose ordered
termination of several actors:
import static akka.pattern.Patterns.gracefulStop;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.pattern.AskTimeoutException;
try {
Future<Boolean> stopped =
gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN);
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
// the actor has been stopped
} catch (AskTimeoutException e) {
// the actor wasn't stopped within 5 seconds
}
public class Manager extends UntypedActor {
public static final String SHUTDOWN = "shutdown";
ActorRef worker = getContext().watch(getContext().actorOf(
Props.create(Cruncher.class), "worker"));
public void onReceive(Object message) {
if (message.equals("job")) {
worker.tell("crunch", getSelf());
} else if (message.equals(SHUTDOWN)) {
worker.tell(PoisonPill.getInstance(), getSelf());
getContext().become(shuttingDown);
}
}
Procedure<Object> shuttingDown = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("job")) {
getSender().tell("service unavailable, shutting down", getSelf());
} else if (message instanceof Terminated) {
getContext().stop(getSelf());
}
}
};
}
When gracefulStop()
returns successfully, the actor’s postStop()
hook
will have been executed: there exists a happens-before edge between the end of
postStop()
and the return of gracefulStop()
.
In the above example a custom Manager.SHUTDOWN
message is sent to the target
actor to initiate the process of stopping the actor. You can use PoisonPill
for
this, but then you have limited possibilities to perform interactions with other actors
before stopping the target actor. Simple cleanup tasks can be handled in postStop
.
警告
Keep in mind that an actor stopping and its name being deregistered are
separate events which happen asynchronously from each other. Therefore it may
be that you will find the name still in use after gracefulStop()
returned. In order to guarantee proper deregistration, only reuse names from
within a supervisor you control and only in response to a Terminated
message, i.e. not for top-level actors.
HotSwap
Upgrade
Akka supports hotswapping the Actor’s message loop (e.g. its implementation) at
runtime. Use the getContext().become
method from within the Actor.
The hotswapped code is kept in a Stack which can be pushed (replacing or adding
at the top) and popped.
警告
Please note that the actor will revert to its original behavior when restarted by its Supervisor.
To hotswap the Actor using getContext().become
:
import akka.japi.Procedure;
public class HotSwapActor extends UntypedActor {
Procedure<Object> angry = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("bar")) {
getSender().tell("I am already angry?", getSelf());
} else if (message.equals("foo")) {
getContext().become(happy);
}
}
};
Procedure<Object> happy = new Procedure<Object>() {
@Override
public void apply(Object message) {
if (message.equals("bar")) {
getSender().tell("I am already happy :-)", getSelf());
} else if (message.equals("foo")) {
getContext().become(angry);
}
}
};
public void onReceive(Object message) {
if (message.equals("bar")) {
getContext().become(angry);
} else if (message.equals("foo")) {
getContext().become(happy);
} else {
unhandled(message);
}
}
}
This variant of the become
method is useful for many different things,
such as to implement a Finite State Machine (FSM). It will replace the current
behavior (i.e. the top of the behavior stack), which means that you do not use
unbecome
, instead always the next behavior is explicitly installed.
The other way of using become
does not replace but add to the top of
the behavior stack. In this case care must be taken to ensure that the number
of “pop” operations (i.e. unbecome
) matches the number of “push” ones
in the long run, otherwise this amounts to a memory leak (which is why this
behavior is not the default).
public class UntypedActorSwapper {
public static class Swap {
public static Swap SWAP = new Swap();
private Swap() {
}
}
public static class Swapper extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) {
if (message == SWAP) {
log.info("Hi");
getContext().become(new Procedure<Object>() {
@Override
public void apply(Object message) {
log.info("Ho");
getContext().unbecome(); // resets the latest 'become'
}
}, false); // this signals stacking of the new behavior
} else {
unhandled(message);
}
}
}
public static void main(String... args) {
ActorSystem system = ActorSystem.create("MySystem");
ActorRef swap = system.actorOf(Props.create(Swapper.class));
swap.tell(SWAP, ActorRef.noSender()); // logs Hi
swap.tell(SWAP, ActorRef.noSender()); // logs Ho
swap.tell(SWAP, ActorRef.noSender()); // logs Hi
swap.tell(SWAP, ActorRef.noSender()); // logs Ho
swap.tell(SWAP, ActorRef.noSender()); // logs Hi
swap.tell(SWAP, ActorRef.noSender()); // logs Ho
}
}
Stash
The UntypedActorWithStash
class enables an actor to temporarily stash away messages
that can not or should not be handled using the actor's current
behavior. Upon changing the actor's message handler, i.e., right
before invoking getContext().become()
or getContext().unbecome()
, all
stashed messages can be "unstashed", thereby prepending them to the actor's
mailbox. This way, the stashed messages can be processed in the same
order as they have been received originally. An actor that extends
UntypedActorWithStash
will automatically get a deque-based mailbox.
注釈
The abstract class UntypedActorWithStash
implements the marker
interface RequiresMessageQueue<DequeBasedMessageQueueSemantics>
which requests the system to automatically choose a deque based
mailbox implementation for the actor. If you want more
control over the mailbox, see the documentation on mailboxes: Mailboxes.
Here is an example of the UntypedActorWithStash
class in action:
import akka.actor.UntypedActorWithStash;
public class ActorWithProtocol extends UntypedActorWithStash {
public void onReceive(Object msg) {
if (msg.equals("open")) {
unstashAll();
getContext().become(new Procedure<Object>() {
public void apply(Object msg) throws Exception {
if (msg.equals("write")) {
// do writing...
} else if (msg.equals("close")) {
unstashAll();
getContext().unbecome();
} else {
stash();
}
}
}, false); // add behavior on top instead of replacing
} else {
stash();
}
}
}
Invoking stash()
adds the current message (the message that the
actor received last) to the actor's stash. It is typically invoked
when handling the default case in the actor's message handler to stash
messages that aren't handled by the other cases. It is illegal to
stash the same message twice; to do so results in an
IllegalStateException
being thrown. The stash may also be bounded
in which case invoking stash()
may lead to a capacity violation,
which results in a StashOverflowException
. The capacity of the
stash can be configured using the stash-capacity
setting (an Int
) of the
mailbox's configuration.
Invoking unstashAll()
enqueues messages from the stash to the
actor's mailbox until the capacity of the mailbox (if any) has been
reached (note that messages from the stash are prepended to the
mailbox). In case a bounded mailbox overflows, a
MessageQueueAppendFailedException
is thrown.
The stash is guaranteed to be empty after calling unstashAll()
.
The stash is backed by a scala.collection.immutable.Vector
. As a
result, even a very large number of messages may be stashed without a
major impact on performance.
Note that the stash is part of the ephemeral actor state, unlike the
mailbox. Therefore, it should be managed like other parts of the
actor's state which have the same property. The UntypedActorWithStash
implementation of preRestart
will call unstashAll()
, which is
usually the desired behavior.
注釈
If you want to enforce that your actor can only work with an unbounded stash,
then you should use the UntypedActorWithUnboundedStash
class instead.
Killing an Actor
You can kill an actor by sending a Kill
message. This will cause the actor
to throw a ActorKilledException
, triggering a failure. The actor will
suspend operation and its supervisor will be asked how to handle the failure,
which may mean resuming the actor, restarting it or terminating it completely.
See スーパービジョンが意味するもの for more information.
Use Kill
like this:
victim.tell(akka.actor.Kill.getInstance(), ActorRef.noSender());
Actors and exceptions
It can happen that while a message is being processed by an actor, that some kind of exception is thrown, e.g. a database exception.
What happens to the Message
If an exception is thrown while a message is being processed (i.e. taken out of its mailbox and handed over to the current behavior), then this message will be lost. It is important to understand that it is not put back on the mailbox. So if you want to retry processing of a message, you need to deal with it yourself by catching the exception and retry your flow. Make sure that you put a bound on the number of retries since you don't want a system to livelock (so consuming a lot of cpu cycles without making progress). Another possibility would be to have a look at the PeekMailbox pattern.
What happens to the mailbox
If an exception is thrown while a message is being processed, nothing happens to the mailbox. If the actor is restarted, the same mailbox will be there. So all messages on that mailbox will be there as well.
What happens to the actor
If code within an actor throws an exception, that actor is suspended and the supervision process is started (see スーパービジョン と モニタリング). Depending on the supervisor’s decision the actor is resumed (as if nothing happened), restarted (wiping out its internal state and starting from scratch) or terminated.
Initialization patterns
The rich lifecycle hooks of Actors provide a useful toolkit to implement various initialization patterns. During the
lifetime of an ActorRef
, an actor can potentially go through several restarts, where the old instance is replaced by
a fresh one, invisibly to the outside observer who only sees the ActorRef
.
One may think about the new instances as "incarnations". Initialization might be necessary for every incarnation
of an actor, but sometimes one needs initialization to happen only at the birth of the first instance when the
ActorRef
is created. The following sections provide patterns for different initialization needs.
Initialization via constructor
Using the constructor for initialization has various benefits. First of all, it makes it possible to use val
fields to store
any state that does not change during the life of the actor instance, making the implementation of the actor more robust.
The constructor is invoked for every incarnation of the actor, therefore the internals of the actor can always assume
that proper initialization happened. This is also the drawback of this approach, as there are cases when one would
like to avoid reinitializing internals on restart. For example, it is often useful to preserve child actors across
restarts. The following section provides a pattern for this case.
Initialization via preStart
The method preStart()
of an actor is only called once directly during the initialization of the first instance, that
is, at creation of its ActorRef
. In the case of restarts, preStart()
is called from postRestart()
, therefore
if not overridden, preStart()
is called on every incarnation. However, overriding postRestart()
one can disable
this behavior, and ensure that there is only one call to preStart()
.
One useful usage of this pattern is to disable creation of new ActorRefs
for children during restarts. This can be
achieved by overriding preRestart()
:
@Override
public void preStart() {
// Initialize children here
}
// Overriding postRestart to disable the call to preStart()
// after restarts
@Override
public void postRestart(Throwable reason) {
}
// The default implementation of preRestart() stops all the children
// of the actor. To opt-out from stopping the children, we
// have to override preRestart()
@Override
public void preRestart(Throwable reason, Option<Object> message)
throws Exception {
// Keep the call to postStop(), but no stopping of children
postStop();
}
Please note, that the child actors are still restarted, but no new ActorRef
is created. One can recursively apply
the same principles for the children, ensuring that their preStart()
method is called only at the creation of their
refs.
For more information see リスタートが意味するもの.
Initialization via message passing
There are cases when it is impossible to pass all the information needed for actor initialization in the constructor,
for example in the presence of circular dependencies. In this case the actor should listen for an initialization message,
and use become()
or a finite state-machine state transition to encode the initialized and uninitialized states
of the actor.
private String initializeMe = null;
@Override
public void onReceive(Object message) throws Exception {
if (message.equals("init")) {
initializeMe = "Up and running";
getContext().become(new Procedure<Object>() {
@Override
public void apply(Object message) throws Exception {
if (message.equals("U OK?"))
getSender().tell(initializeMe, getSelf());
}
});
}
}
If the actor may receive messages before it has been initialized, a useful tool can be the Stash
to save messages
until the initialization finishes, and replaying them after the actor became initialized.
警告
This pattern should be used with care, and applied only when none of the patterns above are applicable. One of
the potential issues is that messages might be lost when sent to remote actors. Also, publishing an ActorRef
in
an uninitialized state might lead to the condition that it receives a user message before the initialization has been
done.
Contents