Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x)

Migration Guide Akka Persistence (experimental) 2.3.3 to 2.3.4 (and 2.4.x)

Akka Persistence is an experimental module, which means that neither Binary Compatibility nor API stability is provided for Persistence while under the experimental flag. The goal of this phase is to gather user feedback before we freeze the APIs in a major release.

defer renamed to deferAsync

The defer method in PersistentActor was renamed to deferAsync as it matches the semantics of persistAsync more closely than persist, which was causing confusion for users.

Its semantics remain unchanged.

Renamed EventsourcedProcessor to PersistentActor

EventsourcedProcessor is now deprecated and replaced by PersistentActor which provides the same (and more) API. Migrating to 2.4.x is as simple as changing all your classes to extending PersistentActor.

Replace all classes like:

class DeprecatedProcessor extends EventsourcedProcessor {
  def processorId = "id"
  /*...*/
}

To extend PersistentActor:

class NewPersistentProcessor extends PersistentActor {
  def persistenceId = "id"
  /*...*/
}

Read more about the persistent actor in the documentation for Scala and documentation for Java.

Changed processorId to (abstract) persistenceId

In Akka Persistence 2.3.3 and previously, the main building block of applications were Processors. Persistent messages, as well as processors implemented the processorId method to identify which persistent entity a message belonged to.

This concept remains the same in Akka 2.3.4, yet we rename processorId to persistenceId because Processors will be removed, and persistent messages can be used from different classes not only PersistentActor (Views, directly from Journals etc).

Please note that persistenceId is abstract in the new API classes (PersistentActor and PersistentView), and we do not provide a default (actor-path derived) value for it like we did for processorId. The rationale behind this change being stricter de-coupling of your Actor hierarchy and the logical "which persistent entity this actor represents". A longer discussion on this subject can be found on issue #15436 on github.

In case you want to preserve the old behavior of providing the actor's path as the default persistenceId, you can easily implement it yourself either as a helper trait or simply by overriding persistenceId as follows:

override def persistenceId = self.path.toStringWithoutAddress

We provided the renamed method also on already deprecated classes (Channels), so you can simply apply a global rename of processorId to persistenceId.

Removed Processor in favour of extending PersistentActor with persistAsync

The Processor is now deprecated since 2.3.4 and will be removed in 2.4.x. It's semantics replicated in PersistentActor in the form of an additional persist method: persistAsync.

In essence, the difference between persist and persistAsync is that the former will stash all incoming commands until all persist callbacks have been processed, whereas the latter does not stash any commands. The new persistAsync should be used in cases of low consistency yet high responsiveness requirements, the Actor can keep processing incoming commands, even though not all previous events have been handled.

When these persist and persistAsync are used together in the same PersistentActor, the persist logic will win over the async version so that all guarantees concerning persist still hold. This will however lower the throughput

Now deprecated code using Processor:

class OldProcessor extends Processor {
  override def processorId = "user-wallet-1337"

  def receive = {
    case Persistent(cmd) => sender() ! cmd
  }
}

Replacement code, with the same semantics, using PersistentActor:

class NewProcessor extends PersistentActor {
  override def persistenceId = "user-wallet-1337"

  def receiveCommand = {
    case cmd =>
      persistAsync(cmd) { e => sender() ! e }
  }

  def receiveRecover = {
    case _ => // logic for handling replay
  }
}

It is worth pointing out that using sender() inside the persistAsync callback block is valid, and does not suffer any of the problems Futures have when closing over the sender reference.

Using the PersistentActor instead of Processor also shifts the responsibility of deciding if a message should be persisted to the receiver instead of the sender of the message. Previously, using Processor, clients would have to wrap messages as Persistent(cmd) manually, as well as have to be aware of the receiver being a Processor, which didn't play well with transparency of the ActorRefs in general.

How to migrate data from Processor to PersistentActor

The recommended approach for migrating persisted messages from a Processor to events that can be replayed by a PersistentActor is to write a custom migration tool with a PersistentView and a PersistentActor. Connect the PersistentView to the persistenceId of the old Processor to replay the stored persistent messages. Send the messages from the view to a PersistentActor with another persistenceId. There you can transform the old messages to domain events that the real PersistentActor will be able to understand. Store the events with persistAsync.

Note that you can implement back-pressure between the writing PersistentActor and the reading PersistentView by turning off auto-update in the view and send custom Update messages to the view with a limited replayMax value.

Removed deleteMessage

deleteMessage is deprecated and will be removed. When using command sourced Processor the command was stored before it was received and could be validated and then there was a reason to remove faulty commands to avoid repeating the error during replay. When using PersistentActor you can always validate the command before persisting and therefore the stored event (or command) should always be valid for replay.

deleteMessages can still be used for pruning of all messages up to a sequence number.

Renamed View to PersistentView, which receives plain messages (Persistent() wrapper is gone)

Views used to receive messages wrapped as Persistent(payload, seqNr), this is no longer the case and views receive the payload as message from the Journal directly. The rationale here is that the wrapper approach was inconsistent with the other Akka Persistence APIs and also is not easily "discoverable" (you have to know you will be getting this Persistent wrapper).

Instead, since 2.3.4, views get plain messages, and can use additional methods provided by the View to identify if a message was sent from the Journal (had been played back to the view). So if you had code like this:

class AverageView extends View {
  override def processorId = "average-view"

  def receive = {
    case Persistent(msg, seqNr) =>
      // from Journal

    case msg =>
      // from user-land
}

You should update it to extend PersistentView instead:

class AverageView extends PersistentView {
  override def persistenceId = "persistence-sample"
  override def viewId = "persistence-sample-average"

  def receive = {
    case msg if isPersistent =>
      // from Journal
      val seqNr = lastSequenceNr // in case you require the sequence number

    case msg =>
      // from user-land
  }
}

In case you need to obtain the current sequence number the view is looking at, you can use the lastSequenceNr method. It is equivalent to "current sequence number", when isPersistent returns true, otherwise it yields the sequence number of the last persistent message that this view was updated with.

Removed Channel and PersistentChannel in favour of AtLeastOnceDelivery trait

One of the primary tasks of a Channel was to de-duplicate messages that were sent from a Processor during recovery. Performing external side effects during recovery is not encouraged with event sourcing and therefore the Channel is not needed for this purpose.

The Channel and PersistentChannel also performed at-least-once delivery of messages, but it did not free a sending actor from implementing retransmission or timeouts, since the acknowledgement from the channel is needed to guarantee safe hand-off. Therefore at-least-once delivery is provided in a new AtLeastOnceDelivery trait that is mixed-in to the persistent actor on the sending side.

Read more about at-least-once delivery in the documentation for Scala and documentation for Java.

Default persistence plugins

Previously default akka.persistence.journal.plugin was set to the LevelDB journal akka.persistence.journal.leveldb and default akka.persistence.snapshot-store.plugin was set to the local file-system snapshot akka.persistence.snapshot-store.local. Now default journal and default snapshot-store plugins are set to empty "" in the persistence extension reference.conf, and require explicit user configuration via override in the user application.conf. This change was needed to decouple persistence extension from the LevelDB dependency, and to support multiple plugin configurations. Please see persistence extension reference.conf for details.

Converted LevelDB to an optional dependency

Persistence extension uses LevelDB based plugins for own development and keeps related code in the published jar. However previously LevelDB was a compile scope dependency, and now it is an optional;provided dependency. To continue using LevelDB based persistence plugins it is now required for related user projects to include an additional explicit dependency declaration for the LevelDB artifacts. This change allows production Akka deployments to avoid need for the LevelDB provisioning. Please see persistence extension reference.conf for details.

Contents