FSM

FSM

概要

FSM(Finite State Machine:有限状態マシン)はAkkaのアクターにミックスインとして利用することができ、Erlang design principles に詳しく記載されています。

FSMは、次の式の関係の集合として記述できます。

State(S) x Event(E) -> Actions (A), State(S')

これらの関係は、次のように解釈されます。

状態SでイベントEが発生した場合は、アクションAを実行して状態S'に遷移する必要があります。

簡単な例

FSM トレイトのほとんどの機能を実証するために、メッセージがバーストに到着している間にメッセージを受信しキューに入れ、バーストが終了した後、またはフラッシュ要求を受信した後にメッセージを送信するアクターを考えてみましょう。

まず、これらのインポート文を使用するには、以下のすべてを考慮してください。

import akka.actor.{ ActorRef, FSM }
import scala.concurrent.duration._

「Buncher」アクターの決め事として、以下のメッセージを受け入れるか、作り出します:

// received events
final case class SetTarget(ref: ActorRef)
final case class Queue(obj: Any)
case object Flush

// sent events
final case class Batch(obj: immutable.Seq[Any])

SetTarget is needed for starting it up, setting the destination for the Batches to be passed on; Queue will add to the internal queue while Flush will mark the end of a burst.

// states
sealed trait State
case object Idle extends State
case object Active extends State

sealed trait Data
case object Uninitialized extends Data
final case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends Data

アクターは2つの状態になることができます。キューにメッセージが存在しない状態(別名 Idle )と、存在する状態(別名 Active )です。メッセージが到着し続けてフラッシュが要求されない限り、Active状態に留まります。アクターの内部状態データは、バッチを送信するターゲットアクター参照とメッセージの実際のキューで構成されます。

FSMアクターのスケルトンを見てみましょう:

class Buncher extends FSM[State, Data] {

  startWith(Idle, Uninitialized)

  when(Idle) {
    case Event(SetTarget(ref), Uninitialized) =>
      stay using Todo(ref, Vector.empty)
  }

  // transition elided ...

  when(Active, stateTimeout = 1 second) {
    case Event(Flush | StateTimeout, t: Todo) =>
      goto(Idle) using t.copy(queue = Vector.empty)
  }

  // unhandled elided ...

  initialize()
}

基本的な戦略は、 FSM トレイトをミックスインし、可能な状態とデータ値を型パラメータとして指定することで、アクターを宣言することです。アクター本体では、ステートマシンを宣言するためにDSLが使用されます。

  • startWith は初期状態と初期データを定義します。

  • 処理される状態ごとに when(<state>) { ... } 宣言があります(潜在的に複数になる可能性があり、渡される PartialFunctionorElse を使用して連結されます`)

  • 最終的に、初期状態へ遷移させ(必要に応じて)タイマーを設定する initialize を使用して起動します。

この場合、 SetTarget() メッセージだけが処理される IdleUninitialized 状態から始めます。 stay は、このイベントの現在から遷移しないための処理を終了する準備をしていますが、 using 修飾子はFSMの内部状態(この時点では Uninitialized )をターゲットのアクター参照を含む新しい Todo() オブジェクト に置き換えます。Active 状態には、1秒間メッセージが受信されなければ FSM.StateTimeout メッセージが生成される状態のタイムアウトが宣言されています。この場合、Flush コマンドを受け取るのと同じ効果があります。 Idle 状態に戻り、内部キューを空のベクターにリセットします。 では、メッセージはどのようにしてキューに入れられるのでしょうか?これは両方の状態で同じように動作するので、 when() ブロックによって処理されないイベントは whenUnhandled() ブロックに渡されるという事実を利用します:

whenUnhandled {
  // common code for both states
  case Event(Queue(obj), t @ Todo(_, v)) =>
    goto(Active) using t.copy(queue = v :+ obj)

  case Event(e, s) =>
    log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
    stay
}

ここで扱う最初のケースは、 Queue() 要求を内部キューに追加し、 Active 状態に移ることです(これが既にその状態にあるなら、もちろん Active 状態に留まります)。 Queue() イベントが受信されたときにFSMデータが Uninitialized でない場合に限ります。それ以外の他で処理されないケースでは、2つ目のケースで、警告ログを出力するだけで、内部状態は変更されません。

残りは Batches が実際にターゲットに送られる部分です。これは、 onTransition メカニズムを使用しています。このブロックを複数宣言することができ、状態遷移が起こった場合(つまり、状態が実際に変化した場合のみ)にすべての振る舞いをマッチングさせようとします。

onTransition {
  case Active -> Idle =>
    stateData match {
      case Todo(ref, queue) => ref ! Batch(queue)
      case _                => // nothing to do
    }
}

遷移コールバックは、入力として現在の状態と次の状態の2つの状態をとる部分関数です。FSMトレイトには、arrow演算子の形でこれらのための便利な抽出器が含まれており、状態遷移の方向でマッチングできます。状態遷移の間、古い状態データは stateData を介して、新しい状態データは nextStateData として利用できます。

注釈

同じ状態の遷移は goto(S)stay() を使って実装できます( S 状態にあるとき)。 この2つの違いは、 goto(S)onTransition で処理されるイベント S->S イベントを送出するのに対し、 stay() は処理されません 。

このバンチャーが実際に動作することを確認するには、 Testing Actor Systems を使用してテストを書くと簡単です。ScalaTestのトレイトを AkkaSpec に便利にバンドルしています:

import akka.actor.Props
import scala.collection.immutable

object FSMDocSpec {
  // messages and data types
}

class FSMDocSpec extends MyFavoriteTestFrameWorkPlusAkkaTestKit {
  import FSMDocSpec._

  // fsm code elided ...

  "simple finite state machine" must {

    "demonstrate NullFunction" in {
      class A extends FSM[Int, Null] {
        val SomeState = 0
        when(SomeState)(FSM.NullFunction)
      }
    }

    "batch correctly" in {
      val buncher = system.actorOf(Props(classOf[Buncher], this))
      buncher ! SetTarget(testActor)
      buncher ! Queue(42)
      buncher ! Queue(43)
      expectMsg(Batch(immutable.Seq(42, 43)))
      buncher ! Queue(44)
      buncher ! Flush
      buncher ! Queue(45)
      expectMsg(Batch(immutable.Seq(44)))
      expectMsg(Batch(immutable.Seq(45)))
    }

    "not batch if uninitialized" in {
      val buncher = system.actorOf(Props(classOf[Buncher], this))
      buncher ! Queue(42)
      expectNoMsg
    }
  }
}

リファレンス

FSMトレイトとオブジェクト

FSM トレイトは Actor を直接継承します。 FSM を拡張すると、アクターが実際に作成されたことに気づくでしょう:

class Buncher extends FSM[State, Data] {

  // fsm body ...

  initialize()
}

注釈

FSMトレイトは、内部メッセージを処理し、(現在の状態に応じて)他のすべてをFSMロジックに渡す receive メソッドを定義します。 receive メソッドをオーバーライドするときは、状態のタイムアウト処理は、FSMロジックを介してメッセージを実際に送信するかどうかによって決まるといったことに注意して下さい。

FSM トレイトは2つの型パラメータをとります:

  1. すべての状態名のスーパータイプ、通常それを継承するケースオブジェクトのシールドトレイト

  2. FSM モジュール自身によって追跡される状態データの型

注釈

状態データと状態名とは、状態マシンの内部状態を記述します。この決め事に従い、FSMクラスに変更可能なフィールドを追加しなければ、いくつかのよく知られた点で内部状態のすべての変更を明確にするという利点が得られます。

状態の定義

状態は1つ以上のメソッド呼び出しで定義されます

when(<name>[, stateTimeout = <timeout>])(stateFunction).

指定する名前は、 FSM トレイトに与えられた最初の型パラメータと型互換性のあるオブジェクトでなければなりません。このオブジェクトはハッシュキーとして使用されるため、正しく equalshashCode を確実に正しく実装する必要があります。特にそれは変更可能であってはなりません。これらの要件に最も簡単に適合するのは、ケースオブジェクトです。

stateTimeout パラメータが与えられた場合、そのままの滞在を含むこの状態へのすべての遷移は、デフォルトでこのタイムアウトを受け取ります。明示的なタイムアウトで遷移を開始すると、このデフォルトを上書きすることができます。詳しくは 遷移の開始 を参照してください。状態のタイムアウトは、アクション処理中に setStateTimeout(state, duration) を使って変更することもできます。これにより、外部メッセージなどによる実行時設定を可能にします。

stateFunction の引数は PartialFunction[Event, State] です。これは、以下に示すような部分関数のリテラル構文を使用すると便利です。

when(Idle) {
  case Event(SetTarget(ref), Uninitialized) =>
    stay using Todo(ref, Vector.empty)
}

when(Active, stateTimeout = 1 second) {
  case Event(Flush | StateTimeout, t: Todo) =>
    goto(Idle) using t.copy(queue = Vector.empty)
}

Event(msg: Any, data: D) ケースクラスは、パターンマッチングしやすくするためにFSMが保持するデータ型でパラメータ化されています。

警告

考えられるFSMの状態ごとにハンドラを定義する必要があります。定義しなければ、宣言されていない状態に切り替える際に障害が発生します。

It is recommended practice to declare the states as objects extending a sealed trait and then verify that there is a when clause for each of the states. If you want to leave the handling of a state “unhandled” (more below), it still needs to be declared like this:

when(SomeState)(FSM.NullFunction)

初期状態の定義

各FSMには出発点が必要で以下で宣言されます。

startWith(state, data[, timeout])

オプションで指定されたタイムアウト引数は、目的の初期状態に与えられた値で上書きします。デフォルトのタイムアウトを取り消したい場合は、None を使います。

処理されないイベント

状態が受信したイベントを処理しない場合、警告がログ出力されます。このとき何か別のことをしたい場合は、それを whenUnhandled(stateFunction) で指定することができます:

whenUnhandled {
  case Event(x: X, data) =>
    log.info("Received unhandled event: " + x)
    stay
  case Event(msg, _) =>
    log.warning("Received unknown event: " + msg)
    goto(Error)
}

このハンドラ内では、 stateName メソッドを使用してFSMの状態を問い合わせることができます。

重要: このハンドラはスタックされません。つまり、 whenUnhandled を呼び出すたびに前に設定されていたハンドラが置き換えられます。

遷移の開始

stateFunction の結果は、 Termination from Inside で説明されているFSMを終了しない限り、次の状態の定義でなければなりません。状態定義は、stay ディレクティブで記述されている現在の状態でも、 goto(state) によって与えられた状態と異なる状態でも構いません。結果として得られるオブジェクトは、次のような修飾子を使用してさらに修飾することができます。

  • forMax(duration)

    この修飾子は、次の状態で状態のタイムアウトを設定します。これは、終了時に StateTimeout メッセージをFSMに送信するタイマーが開始されることを意味する。このタイマーは、その間に他のメッセージを受信するとキャンセルされ、そのメッセージの後に StateTimeout メッセージは処理されません。

    この修飾子を使用して、ターゲットの状態に指定されているデフォルトのタイムアウトを無効にすることもできます。デフォルトのタイムアウトをキャンセルするには、 Duration.Inf を使用します。

  • using(data)

    この修飾子は、古い状態のデータを新しいデータで置き換えます。 のアドバイスに従えば、これが内部状態のデータが変更される唯一の場所です。

  • replying(msg)

    この修飾子は、現在処理中のメッセージに応答を送信し状態は変更しません。

すべての修飾子は、簡潔に記述に記述するためにチェーンすることができます:

when(SomeState) {
  case Event(msg, _) =>
    goto(Processing) using (newData) forMax (5 seconds) replying (WillDo)
}

実際にはかっこは必要ありませんが、修飾子とその引数を視覚的に区別しているため、外国人にとっては快適で読みやすくなります。

注釈

return は、 when ブロックなどには使用できないことに注意してください。これはScalaの制約なので、 if () ... else ... を使用してコードをリファクタリングするか、またはメソッド定義に移します。

遷移の監視

遷移は「状態と状態の間」で概念的に発生します。これは、イベント処理ブロックに入れたアクションの後であることを意味します。次の状態は、イベント処理ロジックによって返される値によってのみ定義されるため、これは明らかです。 FSMアクター内のすべてがシングルスレッドで実行されているため、内部状態変数の設定に関して正確な順序を心配する必要はありません。

内部監視

ここれまで、FSM DSLは状態とイベントを中心に扱われてきました。2つの視点はこれをひと続きの遷移として記述します。これは下記メソッドによって有効になります。

onTransition(handler)

アクションは状態とイベントではなく遷移に関連付けられます。ハンドラは、入力に状態のペアをとる部分関数です。進行中の遷移を変更することはできないので、結果としての状態は必要ありません。

onTransition {
  case Idle -> Active => setTimer("timeout", Tick, 1 second, repeat = true)
  case Active -> _    => cancelTimer("timeout")
  case x -> Idle      => log.info("entering Idle from " + x)
}

便利な抽出器 -> は、遷移の方向を明確に視覚的に示した状態のペアを分解することができます。パターンマッチではいつものように関係のない部分にアンダースコアを使用することがあります。あるいは、拘束されていない状態を変数にバインドすることもできます。例えば、最後のcaseに示されているようにロギングを行います。

また、遷移処理ロジックがメソッドとして実装されている場合、 onTransition に2つの状態を受け入れる関数オブジェクトを渡すこともできます。

onTransition(handler _)

def handler(from: StateType, to: StateType) {
  // handle it here ...
}

このメソッドに登録されたハンドラはスタックされているので、設計にあわせて when ブロックと onTransition ブロックに分散させることができます。ただし、最初に一致するときだけでなく、 すべてのハンドラは、遷移の度に呼び出される ことに注意してください。前の宣言を追いかけて後でも宣言することを気にすることなくすべての遷移処理を確実に一箇所に配置できるように特別に設計されています。アクションは宣言の順序で実行されます。

注釈

この種の内部モニタリングは、遷移に応じたFSMを構成するために使用される可能性があります。たとえば、新しい状態を追加する際に、ある状態を離れるとタイマーをキャンセルすることを忘れてはいけません。

外部監視

外部アクターは、 SubscribeTransitionCallBack(actorRef) というメッセージを送信することによって、状態遷移の通知を受けるように登録することができます。指定されたアクターは、 CurrentState(self, stateName) メッセージをすぐに送信され、状態の変更が発生するたびに Transition(actorRef, oldState, newState) メッセージを受け取ります。

状態の変更には、 goto(S) を実行するアクションが含まれていますが、すでに S という状態になってる場合があります。その場合、監視アクターは Transition(ref,S,S) メッセージで通知されます。 FSM が(同じ状態も含め)すべての遷移に反応する必要がある場合に便利です。同じ状態の遷移イベントを通知しないのであれば、 goto(S) の代わりに stay() を使います。

外部監視は、 UnsubscribeTransitionCallBack(actorRef)FSM アクターに送ることによって登録解除されます。

登録解除せずにリスナーを停止しても、リスナーは購読リストから削除されません。リスナーを停止する前に、 UnsubscribeTransitionCallback を使用してください。

状態の変換

when() ブロックの引数として提供される部分関数は、Scalaの関数型プログラミングツールの完全な補足を使用して変換できます。型推論を保持するために、いくつかの共通の処理ロジックが異なる節に適用される場合に使用されるヘルパー関数があります:

when(SomeState)(transform {
  case Event(bytes: ByteString, read) => stay using (read + bytes.length)
} using {
  case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 =>
    goto(Processing)
})

言うまでもありませんが、このメソッドへの引数は記憶され、いくつかの when() ブロックに同じ変換を適用すると、複数回使用されます。

val processingTrigger: PartialFunction[State, State] = {
  case s @ FSM.State(state, read, timeout, stopReason, replies) if read > 1000 =>
    goto(Processing)
}

when(SomeState)(transform {
  case Event(bytes: ByteString, read) => stay using (read + bytes.length)
} using processingTrigger)

タイマー

状態のタイムアウトに加えて、FSMは String の名前で識別されるタイマーを管理します。以下でタイマーをセットします。

setTimer(name, msg, interval, repeat)

 msg は、 interval が経過した後に送信されるメッセージオブジェクトです。 repeattrue の場合、タイマーは interval パラメータによって与えられた固定間隔でスケジュールされます。新しいタイマーを追加する前に、同じ名前の既存のタイマーが自動的に取り消されます。

以下でタイマーをキャンセルします。

cancelTimer(name)

これは、すぐに動作することが保証されています。つまり、タイマーがすでに起動してキューに入れられていても、これを呼び出した後にスケジュールされたメッセージは処理されません。以下でタイマーの状態を問合せます。

isTimerActive(name)

これらの名前付きタイマーは、他のメッセージの介在による影響を受けないため、状態のタイムアウトを補完します。

内部からの終了

FSMは、次のように最後の状態を指定することによって停止されます。

stop([reason[, data]])

理由は、 Normal (デフォルト)、 Shutdown または Failure(reason) のいずれかで、第2引数は終了処理中に利用可能な状態のデータを変更するために与えます。

注釈

stop はアクションを中断せず、すぐにFSMを停止させることに注意してください。停止アクションは、状態遷移と同じ方法でイベントハンドラから返さなければなりません(ただし、 return ステートメントは when ブロック内で使用できないことに注意してください)。

when(Error) {
  case Event("stop", _) =>
    // do cleanup ...
    stop()
}

onTermination(handler) を使って、FSMが停止したときに実行されるカスタムコードを指定することができます。ハンドラは引数として StopEvent(reason, stateName, stateData) を取る部分関数です:

onTermination {
  case StopEvent(FSM.Normal, state, data)         => // ...
  case StopEvent(FSM.Shutdown, state, data)       => // ...
  case StopEvent(FSM.Failure(cause), state, data) => // ...
}

whenUnhandled の場合、このハンドラはスタックされていないので、 onTermination を呼び出すたびに、以前に設定されたハンドラが置き換えられます。

外部からの終了

stop メソッドを使用してFSMに関連付けられた ActorRef を停止すると、 postStop フックが実行されます。 FSM トレイトによるデフォルト実装は、 StopEvent(Shutdown, ...) を処理する準備ができている場合に:meth:onTermination ハンドラを実行します。

警告

postStop をオーバーライドし、 onTermination ハンドラを呼ぶ場合は、 super.postStop を呼び出すことを忘れないようにしてください。

有限状態マシンのテストとデバッグ

開発中やトラブルシューティングの際、FSMは他のアクターと同じように処置を必要とします。 Testing Finite State Machines と以下に記述されているような特別なツールがあります。

イベントのトレース

環境設定akka.actor.debug.fsm を設定すると、 LoggingFSM インスタンスによって、イベントのトレースのロギングを有効にできます :

import akka.actor.LoggingFSM
class MyFSM extends LoggingFSM[StateType, Data] {
  // body elided ...
}

以下は、DEBUGレベル出力します。

  • StateTimeout とスケジュールされたタイマーメッセージを含むすべての処理済みイベント

  • 名前付きタイマーの設定とキャンセル

  • すべての状態遷移

ライフサイクルの変更と特別なメッセージは、 Actors の説明のとおりにロギングすることができます。

ローリングイベントログ

LoggingFSM トレイトは、FSMにもう一つの機能を追加します。デバッグ中(FSMがどのように特定のエラー状態に陥ったのかをトレースするため)や他のクリエイティブな用途のために使用されるローリングイベントログです。

import akka.actor.LoggingFSM
class MyFSM extends LoggingFSM[StateType, Data] {
  override def logDepth = 12
  onTermination {
    case StopEvent(FSM.Failure(_), state, data) =>
      val lastEvents = getLog.mkString("\n\t")
      log.warning("Failure in state " + state + " with data " + data + "\n" +
        "Events leading up to this point:\n\t" + lastEvents)
  }
  // ...
}

logDepth のデフォルトは0で、これはイベントログをオフにします。

警告

ログバッファはアクターの作成時に割り当てられます。仮想メソッド呼び出しを使用して設定が行われるのはこのためです。 val でオーバーライドする場合は、 LoggingFSM のイニシャライザが動作する前に初期化が行われていることを確認し、バッファが割り当てられた後に logDepth が返す値を変更しないでください。

イベントログの内容は、 getLog メソッドを使って取得できます。このメソッドは IndexedSeq[LogEntry] を返します。最も古いエントリはインデックス0になります。

アクターの become/unbecome に相当する、より大きなFSMの例は、 Lightbend ActivatorAkka FSM in Scala というテンプレートにあります。

Contents