耐障害性

耐障害性

アクターシステム で説明した通り、各アクターは自身の子供のスーパーバイザーであり、障害をハンドリングするためのスーパーバイザー戦略を定義しています。この戦略はアクターシステムに不可欠な部品であるため後から変更することはできません。

障害ハンドリングの実践

まず、実際のアプリケーションでの典型的な障害原因であるデータストアのエラーを処理する方法の 1 つを示すサンプルを見てみましょう。 もちろん、データストアが利用できないときに何ができるかは実際のアプリケーションに依存しますが、このサンプルでは、ベストエフォートの再接続アプローチを用います。

次のソースコードを読んでください。 インラインのコメントはさまざまな障害ハンドリングと、なぜそれらが追加されたのかを説明します。 実行時に何が起こっているかを理解するためにはログの出力を追うのが簡単なので、これらのサンプルを実行することを強くお勧めします。

スーパーバイザー戦略を作成する

以下のセクションでは、障害ハンドリングのメカニズムとその代替方法について詳しく説明します。

デモンストレーションのために、次の戦略を考えてみましょう。

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException      => Resume
    case _: NullPointerException     => Restart
    case _: IllegalArgumentException => Stop
    case _: Exception                => Escalate
  }

私は、 スーパービジョン と モニタリング に記述されている障害ハンドリングのディレクティブの適用をデモするために、いくつかのよく知られた例外タイプを選びました。 まず、one-for-one 戦略については、それぞれの子が別々に扱われることを意味します (all-for-one 戦略は非常によく似ていますが、唯一の違いは障害が起きたものだけでなく、スーパーバイザーのすべての子に適用されるということです) 。 再起動の頻度には上限が設定されています。つまり、1 分あたり最大 10 回再起動します。 これらの設定は省略することができます。その場合はそれぞれの制限が適用されず、再起動時に上限の絶対値を指定するか、再起動を無期限に実行させることができます。上限を超えると、子アクターは停止します。

本体の大部分を形成する match 文は Decider 型で、 PartialFunction[Throwable, Directive] です。これは、子の障害のタイプから対応するディレクティブにマッピングする部分です。

注釈

戦略がスーパーバイザーのアクターの内部で宣言されている場合 (コンパニオンオブジェクト内ではなく) 、その決定者は、今障害が起きた子の参照を (障害メッセージの sender から) 取得することができ、スレッドセーフな方法でアクターの内部状態にアクセスすることもできます 。

デフォルトのスーパーバイザー戦略

Escalate は定義された戦略がスローされた例外を処理しない場合に使われます。

スーパーバイザー戦略がアクターに対して定義されていない場合、デフォルトで次の例外が処理されます。

  • ActorInitializationException は障害の起きた子アクターを停止します

  • ActorKilledException は障害の起きた子アクターを停止します

  • Exception は障害の起きた子アクターを再起動します

  • 他の型の Throwable は親のアクターにエスカレーションされます

例外がルートガーディアンにまでエスカレーションされると、上記で定義されたデフォルトの戦略と同じ方法で処理されます。

独自の戦略とデフォルトの戦略を組み合わせることもできます。

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException => Resume
    case t =>
      super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
  }

停止のスーパーバイザー戦略

Erlang に似た方法として、障害が起きた子を停止し、DeathWatch が子の損失を知らせたときにスーパーバイザーで是正措置をとるという戦略があります。この戦略は、 ''/user'' ガーディアンで適用したいときのために :obj:` SupervisorStrategy.stoppingStrategy` として StoppingSupervisorStrategy コンフィグレーターと一緒にあらかじめパッケージされて提供されています。

アクターの障害をロギングする

デフォルトでは、 SupervisorStrategy はエスカレーションされない限り障害を記録します。 エスカレーションされた障害は、階層内の上位レベルで処理され、潜在的にログに記録されることになっています。

SupervisorStrategy のデフォルトのログをミュートするには、インスタンス化するときに loggingEnabledfalse に設定します。 カスタマイズされたロギングは Decider の中で行うことができます。 障害が起きた子への参照は、 SupervisorStrategy がスーパーバイザーアクターの内部で宣言されたときに sender として利用できることを知っておいてください。

独自の SupervisorStrategy 実装で logFailure メソッドをオーバーライドすることで、ログをカスタマイズすることもできます。

トップレベルアクターのスーパービジョン

トップレベルアクターは、 system.actorOf() を使って作成されたアクターを意味し、 User Guardian の子です。 この場合、特別なルールは適用されません。ガーディアンは構成済みの戦略を適用するだけです。

アプリケーションをテストする

次のセクションでは、異なるディレクティブの効果を実際に示しますが、そのためにはテストのセットアップが必要です。 まず、適切なスーパーバイザーが必要です。

import akka.actor.Actor

class Supervisor extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception                => Escalate
    }

  def receive = {
    case p: Props => sender() ! context.actorOf(p)
  }
}

このスーパーバイザーは子を作るために使用され、これで実験ができます。

import akka.actor.Actor

class Child extends Actor {
  var state = 0
  def receive = {
    case ex: Exception => throw ex
    case x: Int        => state = x
    case "get"         => sender() ! state
  }
}

テストは、 Testing Actor Systems で説明されているユーティリティを使うと簡単にできます。

import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ FlatSpecLike, Matchers, BeforeAndAfterAll }
import akka.testkit.{ TestActors, TestKit, ImplicitSender, EventFilter }

class FaultHandlingDocSpec(_system: ActorSystem) extends TestKit(_system)
  with ImplicitSender with FlatSpecLike with Matchers with BeforeAndAfterAll {

  def this() = this(ActorSystem(
    "FaultHandlingDocSpec",
    ConfigFactory.parseString("""
      akka {
        loggers = ["akka.testkit.TestEventListener"]
        loglevel = "WARNING"
      }
      """)))

  override def afterAll {
    TestKit.shutdownActorSystem(system)
  }

  "A supervisor" must "apply the chosen strategy for its child" in {
    // code here
  }
}

アクターを作成してみましょう:

val supervisor = system.actorOf(Props[Supervisor], "supervisor")

supervisor ! Props[Child]
val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor

最初のテストでは Resume ディレクティブをデモするために、アクターの非初期状態をいくつか設定して障害を起こしてみましょう。

child ! 42 // set state to 42
child ! "get"
expectMsg(42)

child ! new ArithmeticException // crash it
child ! "get"
expectMsg(42)

ご覧のように、42 という値が障害ハンドリングのディレクティブに残っています。より深刻な NullPointerException に障害を変更すると、もはやそれはなくなります。

child ! new NullPointerException // crash it harder
child ! "get"
expectMsg(0)

最後に、致命的な IllegalArgumentException の場合、子はスーパーバイザによって終了されます。

watch(child) // have testActor watch “child”
child ! new IllegalArgumentException // break it
expectMsgPF() { case Terminated(`child`) => () }

今のところスーパーバイザーは、子の障害から全く影響を受けませんでした。 Exception の場合、これはもはや正しくなく、スーパーバイザーは障害をエスカレーションします。

supervisor ! Props[Child] // create new child
val child2 = expectMsgType[ActorRef]
watch(child2)
child2 ! "get" // verify it is alive
expectMsg(0)

child2 ! new Exception("CRASH") // escalate failure
expectMsgPF() {
  case t @ Terminated(`child2`) if t.existenceConfirmed => ()
}

スーパーバイザー自体は ActorSystem によって提供されるトップレベルのアクターによって監督されます。これのデフォルトのポリシーは、 Exception の全てのケースで再起動することです (例外的に ActorInitializationExceptionActorKilledException があります) 。 再起動の場合のデフォルトのディレクティブは不運な子を停止することで、この障害が残らないことを期待します。

これが望ましくない場合 (ユースケースに依存します)、この振る舞いを無効化する別のスーパーバイザーを使用する必要があります。

class Supervisor2 extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception                => Escalate
    }

  def receive = {
    case p: Props => sender() ! context.actorOf(p)
  }
  // override default to kill all children during restart
  override def preRestart(cause: Throwable, msg: Option[Any]) {}
}

この親を使用すると、最後のテストで示されているように、エスカレートされた再起動で子が生き残ります。

val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")

supervisor2 ! Props[Child]
val child3 = expectMsgType[ActorRef]

child3 ! 23
child3 ! "get"
expectMsg(23)

child3 ! new Exception("CRASH")
child3 ! "get"
expectMsg(0)

Contents