クラスターシャーディング

クラスターシャーディング

クラスターシャーディングはクラスタのいくつかのノードにまたがってアクターを分散させ、時間とともに変化するであろうクラスタでの物理的な位置を気にすることなしに、それらを論理的な識別子を使って相互作用できるようにしたいときに便利です。

例として、アクターをドメイン駆動設計の用語で集約ルートとして表すこともできます。ここではアクターのことを "エンティティ" と呼びます。これらのアクターは一般的に永続化された (耐久性のある) 状態を持ちますが、この機能では状態が永続化されたアクターとは限りません。

クラスターシャーディングは一般的に、たくさんのステートフルなアクターがあり、一つのマシンにあるよりも多くのリソース (例えばメモリ) を消費するときに使います。もし少ない数のステートフルなアクターがあるだけの場合は、 Cluster Singleton のノードで動かすほうが簡単になる可能性があります。

この文脈においてシャーディングは識別子を持ったアクターを意味するので、エンティティと呼び、クラスターの復数のノードにまたっがって自動的に分散させることができるようになります。それぞれのエンティティのアクターは一箇所でのみ稼働し、メッセージは送り元が宛先のアクターの居場所を必要とせず送れるようになります。これはこの拡張から提供される、エンティティ ID が含まれているメッセージをどうルートすれば最終的な宛先に届くのかを知っている ShardRegion アクターを通じてメッセージを送ることで成し遂げられます。

クラスターシャーディングは、ステータスが WeaklyUp になる機能が有効になっていて、そのステータスのメンバがいるとアクティブになりません。

警告

クラスターシャーディングを自動ダウン機能と一緒に使わないでください。クラスタが2つの別のクラスタに分断するのを許してしまい、 復数のシャードとエンティティ がそれぞれ別のクラスタで起動してしまう結果になるからです!Downing を参照してください。

エンティティのアクターがどう見えるか:

上記のアクターは PersistentActor からサポートが提供されるイベントソーシングを使い、状態を保存します。永続アクターになる必要はありませんが、障害やノード間でのエンティティのマイグレーションといったケースでは、その状態が重要であるなら復元できる必要があります。

persistenceId がどう定義されているか注目してください。アクターの名前はエンティティの識別子 (URL エンコードされた utf-8) です。あなたは他の方法で定義するかもしれませんが、ユニークになる必要があります。

シャーディングの拡張を使うときはまず、一般的にはクラスターのそれぞれのノードのシステム起動時に、ClusterSharding.start メソッドでサポートされているエンティティの型を登録することになっています。ClusterSharding.start は参照を提供し、その参照を伝って渡すことができます。

extractEntityIdextractShardId は 2 つのアプリケーション固有の機能で、やってくるメッセージからエンティティの識別子とシャードの識別子を取り出します。

この例はメッセージにあるエンティティの識別子を定義する 2 つの異なる方法を示しています。

  • Get メッセージはそれ自身の識別子を含んでいます。

  • EntityEnvelope は識別子を保持しており、エンティティのアクターに送られる本当のメッセージはそのエンベロープに包まれています。

上記に示された extractEntityId 関数で 2 つのメッセージの型がどう処理されるか注目してください。エンティティのアクターに送ったメッセージは extractEntityId から返されるタプルの 2 番目で、必要であればエンベロープから取り出させることができます。

シャードはエンティティのグループで、一緒に管理されます。そのグループは上記に示した extractShardId 関数で定義されます。特定のエンティティの識別子に対するシャードの識別子はいつも同じである必要があります。

良いシャーディングアルゴリズムを作ることは、それ自体が興味深い挑戦です。均一な分散、すなわちそれぞれのシャードに同じ量のエンティティになるように作ってみてください。経験則として、シャードの数はクラスターノードの計画された最大数の 10 倍より大きい数にすべきです。ノード数よりも少ないシャードだと、いくつかのノードにはシャードが一つもホストされなくなります。多すぎるシャードはシャードによる管理効率を損ない、例えば、リバランスのオーバーヘッドと、レイテンシの増加に繋がります。コーディネーターがそれぞれのシャードへの最初のメッセージルーティングに関わっているからです。シャーディングのアルゴリズムは稼働しているクラスタの全てのノードで同じである必要があります。クラスタの全てのノードを停止させた後、変更することができるようになります。

ほとんどのケースにおいて、エンティティの識別子の hashCode の絶対値をシャードの数で余剰を取るシンプルなシャードのアルゴリズムで上手くいきます。これは利便性のために ShardRegion.HashCodeMessageExtractor で提供されています。

エンティティへのメッセージはいつもローカルの ShardRegion を通じて送信されます。名付けられた、あるエンティティの型のための ShardRegion アクターの参照は ClusterSharding.start から返され、ClusterSharding.shardRegion でも取り出すことができます。エンティティのシャードの位置をまだ知らないとき、ShardRegion は、その位置を見つけ出します。それはメッセージを正しいノードに委譲し、オンデマンドで、すなわち特定のエンティティに最初のメッセージが送られたときにエンティティのアクターを作ります。

さらに包括的なサンプルは Lightbend ActivatorAkka Cluster Sharding with Scala! と名付けられたチュートリアルにあります。

どのように動作するか

ShardRegion アクターはクラスターのそれぞれのノードか、特定のロールでタグ付けされたノードで開始されます。 ShardRegion は入ってくるメッセージからエンティティの識別子とシャードの識別子を取り出す、アプリケーションの特定の 2 つの関数によって作られます。シャードは一緒に管理されるエンティティのグループです。特定のシャードに最初のメッセージが来ると、 ShardRegion は中央のコーディネーターからシャードの位置をリクエストします。

ShardCoordinator はどの ShardRegionShard を所有するのかを決定し、そのことを ShardRegion に告げます。そのリージョンはこのリクエストを確認し、子アクターとして Shard スーパーバイザーを作ります。個別の EntitiesShard アクターに必要とされたときに作られます。入ってくるメッセージは ShardRegionShard 経由することでターゲットの Entity へ旅をします。

もしシャードが別の ShardRegion インスタンスに所属している場合、メッセージは代わりにその ShardRegion へ転送されます。シャードの場所を解決している間、そのシャードへ宛てて入ってくるメッセージはバッファされ、シャードの所属が解決された後に配送されます。解決したシャードに宛てた後続のメッセージは ShardCoordinator の関与なしに、即座にターゲットの宛先へ配送させることができます。

シナリオ 1:

  1. M1 メッセージが R1 ShardRegion インスタンスに宛てて入ってきます。

  2. M1 は S1 シャードにマッピングされます。R1 は S1 について知らないので、C コーディネーターに S1 の場所を尋ねます。

  3. S1 の所属は R1 だと C が答えます。

  4. R1 は E1 エンティティのための子アクターを作成し、バッファした E1 に宛てたメッセージを子の S1 に送ります。

  5. R1 に届く、S1 に向けて入ってくるメッセージは 全て C なしに R1 によって処理されます。それは必要に応じて子のエンティティを作成し、それらにメッセージを転送します。

シナリオ 2:

  1. M2 メッセージが R1 に向けて入ってきます。

  2. M2 は S2 にマッピングされています。R1 は S2 について知らないので、C に S2 の場所を問い合わせます。

  3. S2 の所属は R2 だと C が答えます。

  4. R1 はバッファした S2 に宛てたメッセージを R2 に送ります。

  5. R1 に届く、S2 に宛てて入ってくるメッセージは全て C なしに R1 によって処理されます。それはメッセージを R2 に転送します。

  6. R2 は S2 に宛てたメッセージを受け取り、C に尋ね、C は S2 の所属が R2 だと答え、(R2のための) シナリオ 1 に入ります。

特定のエンティティアクターのインスタンスは、高々一つだけクラスターのどこかで実行されています。全てのノードがどこにシャードが位置しているのかを同じように認識しているのは重要です。シャード割り当ての決定はクラスターシングルトンで実行されている中央の ShardCoordinator によって行われるからです。シングルトンとはすなわち、全てのクラスターノードで最も古いメンバか、特定のロールでタグ付けされたノードのグループ内でインスタンスが一つです。

シャードをどこに配置するかを決定するロジックは付け替えできるシャードのアロケーションストラテジーで定義されています。デフォルトの実装である ShardCoordinator.LeastShardAllocationStrategy はそれまでに割り当てたシャードが一番少ない ShardRegion に新しいシャードを割り当てます。このストラテジーはアプリケーション特有の実装に置き換えられます。

新たに追加されたメンバをクラスターで使用できるようにするために、コーディネータはシャードのリバランス、すなわち、あるノードから別のノードへエンティティを移行することをアシストします。リバランスプロセスでは、コーディネーターは最初にシャードのハンドオフが開始されたことをすべての ShardRegion アクターに通知します。つまり、シャードの位置がわからない場合と同じ方法で、そのシャードの受信メッセージをバッファリングし始めます。リバランス中、コーディネータはリバランスされているシャードの位置に対する要求には応答せず、すなわち、ハンドオフが完了するまでローカルのバッファリングが継続されます。リバランスされたシャードに対する責任を負う ShardRegion は、指定された handOffStopMessage (デフォルトは PoisonPill )をそれらのシャードに送ることによって、そのシャードのすべてのエンティティを停止します。すべてのエンティティが終了したとき、エンティティを所有する ShardRegion はコーディネータへハンドオフの完了確認を通知します。その後コーディネータは、シャードの位置に関するリクエストに応答し、それによってシャードの新しい位置を割り当て、次いで ShardRegion アクターにバッファされたメッセージが新しい位置に配信されます。これは、エンティティの状態が転送されたり、移行されたりしないことを意味します。エンティティの状態が重要である場合、エンティティは永続的(耐久性のある状態)でなければなりません。 永続化 を追加すると、新しい場所で状態を復元できます。

リバランスするシャードを決定するロジックは、付け替えできるシャードのアロケーションストラテジーで定義されています。 デフォルト実装の ShardCoordinator.LeastShardAllocationStrategy はハンドオフのために、以前に割り当てられたシャードを多く持つ ShardRegion から、シャードを選びます。 次に、以前に割り当てられたシャードの数が最も少ない ShardRegion 、すなわち、クラスターの新しいメンバに割り当てられます。 リバランスを開始するのに、差がどれだけ大きくなければならないかという、設定可能な閾値があります。 この戦略は、アプリケーション固有の実装に置き換えることができます。

ShardCoordinator のシャードロケーションの状態は、障害から生き残るために 永続化 を使って永続化されます (耐久性がある)。これはクラスタ内で実行されているため、 永続化 は分散されたジャーナルで構成する必要があります。 クラッシュするか、疎通できないコーディネータノードが (ダウン状態を経由して) クラスターから削除されると、新しい ShardCoordinator のシングルトンアクターが引き継ぎ、状態が回復します。 このような障害中には、既知の場所のシャードは引き続き使用できますが、新しい (未知の) シャードのメッセージは、新しい ShardCoordinator が利用可能になるまでバッファリングされます。

送信者が同じ ShardRegion アクターを使用してエンティティアクターにメッセージを配信する限り、メッセージの順序は維持されます。 バッファの上限に達していない限り、メッセージは通常のメッセージ送信と同じ方法で、at-least-once のセマンティクスを使いベストエフォートで配信されます。 永続化AtLeastOnceDelivery を使うことで、at-least-once のセマンティクスを持つ信頼性の高いエンドツーエンドメッセージングを実現できます。

新しいか、それまで使用されていないシャードを宛先とするメッセージには、コーディネーターへの往復のためのレイテンシが追加されます。 シャードのリバランスでもレイテンシが増える可能性があります。 これは、アプリケーション固有の分割粒度を設計するときに考慮しておく必要があります。 例えば、細かすぎる粒度のシャードにしないなどです。

分散されたデータモード

永続化 を使う代わりに、 Distributed Data モジュールをシャードコーディネーターの状態のストレージとして使うことができます。 そのような場合、 ShardCoordinator の状態は Distributed Data モジュールによって WriteMajority / ReadMajority の整合性を持ってクラスター内で複製されます。

このモードは、次の設定プロパティで有効にできます:

akka.cluster.sharding.state-store-mode = ddata

クラスター内のすべてのノードで実行されている必要があるDistributed Data 拡張機能を使用します。 したがって、すべてのノードに拡張機能の設定を追加して、起動する必要があります:

akka.extensions += "akka.cluster.ddata.DistributedData"

このモードを使用する場合は、ビルドへ明示的に akka-distributed-data-experimental の依存関係を追加する必要があります。もし akka-persistence がユーザーコードで使われておらず、 remember-entitiesoff であれば、 akka-persistence の依存関係をプロジェクトから削除できます。これを Remember Entities と一緒に使うと、リバランス後に再作成されますが、何もない状態からクラスタの開始した後には再作成されません。ddata モードを使用する場合、何もない状態からクラスターを起動した後のシャードコーディネーターの状態は空です。 Remember Entitieson のとき、シャードリージョンは、 State Store Mode がどのように設定されていても、常にデータの永続性を維持します。

警告

ddata モードは、Akka 2.4.0 で導入され、実験的な Distributed Data モジュールに依存するため、 "実験的" であるとみなされています。

プロキシ専用モード

ShardRegion アクターは、プロキシ専用モードで開始することもできます。つまり、エンティティを自身にホストせず、メッセージを適切な場所に委譲する方法を知っています。 ShardRegionClusterSharding.startProxy メソッドを使うとプロキシ専用モードで起動します。

不活性化

エンティティの状態が永続的な場合は、メモリ消費を減らすために使用されていないエンティティを停止することがあります。 これは、受信タイムアウト ( context.setReceiveTimeout ) を定義するなど、エンティティアクターのアプリケーション固有の実装によって行われます。 メッセージがエンティティにすでにエンキューされていて、エンティティ自体が停止した場合、メールボックスにエンキューされたメッセージは破棄されます。 このようにメッセージを失うことなくグレースフルな不活性化をサポートするために、エンティティのアクターは親の ShardShardRegion.Passivate を送ることができます。 Passivate にラップされた特定のメッセージはエンティティに返送され、エンティティは自分自身を停止するはずです。 Passivate を受信してからエンティティが終了するまでの間の受信メッセージは、 Shard によってバッファリングされます。 そのようにバッファされたメッセージは、その後、エンティティの新しいインスタンスに配信されます。

エンティティの再現

ClusterSharding.start を呼び出すとき ClusterShardingSettingsrememberEntities フラグを true に設定することで、各 Shard のエンティティのリストを永続化 (耐久性がある状態に) できます。 エンティティを覚えるように設定されている場合、 Shard が別のノードにリバランスされるか、クラッシュ後の復旧をするとき、それ以前にその Shard で起動していた全てのエンティティを再作成します。エンティティを永久に停止させるには、エンティティアクタの親に Passivate メッセージを送信しなければなりません。そうしないと、エンティティが撤退した後、設定で指定されたように自動的にそのエンティティが再起動します。

rememberEntities が false に設定されている場合、 Shard は、リバランス後やクラッシュからの復旧後にエンティティを自動的に再起動しません。 エンティティは、そのエンティティに宛てた最初のメッセージが Shard で受信されたときにのみ開始されます。 Passivate を使わずにエンティティを停止すると、エンティティは再起動されません。

エンティティ自身の状態は、 永続化 などで永続化されていなければ復元されないことに注意してください。

スーパービジョン

デフォルトの (再起動) 戦略の代わりに、エンティティアクターで別の supervisorStrategy を使う必要がある場合は、子のエンティティアクターに対する supervisorStrategy を定義する中間の親アクターを作成する必要があります。

そのようなスーパーバイザーを、エンティティアクターであるかのように開始します。

停止したエンティティは、新しいメッセージがそのエンティティに宛てられたときに再び開始されることに注意してください。

グレースフルシャットダウン

ShardRegion.GracefulShutdown メッセージを ShardRegion アクターに送って、 ShardRegion でホストされているすべてのシャードを切り離し、 ShardRegion アクターを止めることができます。 それがいつ完了したのかを知るために ShardRegion アクターを watch することができます。この期間中、コーディネーターによってリバランスがトリガーされたときと同じ方法で、他のリージョンがこれらのシャードのメッセージをバッファします。シャードが停止すると、コーディネーターはこれらのシャードを他の場所に割り当てます。

ShardRegion が終了したら、クラスターから leave し、 ActorSystem をシャットダウンするでしょう。

これを行う方法:

クラスターシャーディングの内部データの削除

クラスターシャーディングのコーディネーターは、Akka Persistence を使ってシャードの場所を保存します。このデータは、Akka のクラスタ全体を再起動するときに安全に削除できます。 削除するのはアプリケーションデータではないことに注意してください。

このデータを削除する akka.cluster.sharding.RemoveInternalClusterShardingData というユーティリティプログラムがあります。

警告

クラスターシャーディングを使用している Akka のクラスターノードを実行しているときは、このプログラムを決して使わないでください。 このプログラムを使用する前に、すべてのクラスターノードを停止してください。

例えば、auto-down を使ったためにネットワークの分断が発生した場合など、偶発的に 2 つのクラスターが同時に実行されてしまった場合に発生する可能性のある破損データによって、クラスターシャーディングのコーディネータが起動できない場合は、データを削除する必要があります。

警告

クラスターシャーディングと自動ダウン機能を一緒に使用しないでください 。クラスターが2つの別々のクラスターに分離してしまう可能性があります! その結果、 シャードとエンティティが復数 起動します。 Downing を参照してください。

このプログラムをスタンドアローンのJavaメインプログラムとして使用する:

java -classpath <jar files, including akka-cluster-sharding>
  akka.cluster.sharding.RemoveInternalClusterShardingData
    -2.3 entityType1 entityType2 entityType3

このプログラムは akka-cluster-sharding の JAR ファイルに含まれています。 通常のアプリケーションと同じクラスパスと設定で実行するのが最も簡単です。同様の方法で sbt または maven から実行できます。

エンティティタイプ名 ( ClusterShardingstart メソッドで使用するのと同じもの) を引数として指定します。

プログラムの最初の引数に -2.3 を指定すると、Akka 2.3.x の Cluster Sharding によって、異なる persistenceId で保存されたデータの削除を試みます。

依存関係

クラスターシャーディングを使うには、下記の依存関係をあなたのプロジェクトに追加する必要があります。

sbt:

"com.typesafe.akka" %% "akka-cluster-sharding" % "@version@" @crossString@

maven:

<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster-sharding_@binVersion@</artifactId>
  <version>@version@</version>
</dependency>

設定

ClusterSharding の拡張機能は以下のプロパティで設定できます。 これらの設定プロパティは、 ActorSystem のパラメータで作成された場合、 ClusterShardingSettings を使って読み込まれます。 ClusterShardingSettings を修正することも、以下のようなレイアウトで別の設定セクションから作成することもできます。 ClusterShardingSettings は、 ClusterSharding 拡張の start メソッドに渡すパラメータです。つまり、それぞれのエンティティタイプを、必要に応じて異なる設定で構成できます。

独自のシャード割り当て戦略は、 ClusterSharding.start のオプションパラメータで定義することができます。 独自のシャード割り当て戦略の実装方法の詳細については、 ShardAllocationStrategy のAPIドキュメントを参照してください。

クラスターシャーディングの状態検査

クラスターの状態を検査するために、2 種類のリクエストが利用できます。

ShardRegion.GetShardRegionState は、リージョン内で実行されているシャードの識別子と、それぞれのシャードにどのエンティティが生存しているのかを示す ShardRegion.CurrentShardRegionState を返します。

ShardRegion.GetClusterShardingStats は、クラスター内のすべてのリージョンを照会し、各リージョンで実行されているシャードの識別子と、各シャード内に存在するエンティティの数を含んだ ShardRegion.ClusterShardingStats を返します。

これらのメッセージの目的は、テストと監視であり、メッセージを個々のエンティティに直接送信するための手段を提供するものではありません。

Contents