Vanilla Akka Persistence
Akka persistence involves three main concepts:
- Commands
- Events
- State
Consider the following two command objects:
case class WriteCommand(data: String) case object PrintCommand
We are going to build an actor which responds to two sorts of commands 'Write!' and 'Print!'. When it receives a WriteCommand, the data in the command will be persisted and when it receives a PrintCommand it will print all the saved data to the console.
On receipt of a WriteCommand our actor will generate and persist a WriteEvent.
case class DataWriteEvent(data: String)
The actor will maintain an internal representation of the data it has persisted ('state') . This is the data that will be printed to console on receipt of a PrintCommand.
The state has an update method which is called with a DataWriteEvent. The updated state is returned.
case class ExampleState(events: List[String] = Nil) { def updated(evt: DataWriteEvent): ExampleState = copy(evt.data :: events) def size: Int = events.length override def toString: String = events.reverse.toString }
Wiring up the actor:
class ExamplePersistentActor extends PersistentActor { override def persistenceId = "example-id" var state = ExampleState() def updateState(event: DataWriteEvent): Unit = state = state.updated(event) def numberOfEvents: Int = state.size val receiveCommand: Receive = { case WriteCommand(data) ⇒ persist(DataWriteEvent(s"$data-$numberOfEvents")) { event ⇒ updateState(event) context.system.eventStream.publish(event) } case PrintCommand ⇒ println(state) } val receiveRecover: Receive = { case evt: DataWriteEvent ⇒ updateState(evt) case SnapshotOffer(_, snapshot: ExampleState) ⇒ state = snapshot } }
And finally to run:
object Example extends App { val system = ActorSystem() val actor = system.actorOf(Props(new ExamplePersistentActor)) actor ! WriteCommand("Ruby") actor ! PrintCommand }
If I run it once I get the following:
List(Ruby-0)
And running it one more time:
List(Ruby-0, Ruby-1)
Moving on...
Receive is a function with a signature from Any => Unit which is pretty damn generic! Akka typed, the motivation for which is beyond the scope of this post, basically gives compile-time feedback on the correctness of your actor interactions.
Sexy Types
This time we define a 'behaviour' in terms of Command, Event and State. This means that we need our 'Write!' and 'Print!' commands to share a type hierarchy:
sealed trait TypedExampleCommand extends Serializable case class TypedExampleWriteCommand(data: String) extends TypedExampleCommand case object TypedExamplePrint extends TypedExampleCommand
Our event and state look pretty much the same as they did before:
case class TypedEvent(data: String) case class TypedExampleState(events: List[String] = Nil) { def updated(evt: TypedEvent): TypedExampleState = copy(evt.data :: events) def size: Int = events.length override def toString: String = events.reverse.toString }
Our behaviour is defined with four parameters:
1. An id for the actor
2. An initial state
3. A function for converting commands to events (wrapped in 'Effects') and handling side effects
4. A function for updating the state given an event
object TypedExample { def behavior: Behavior[TypedExampleCommand] = PersistentActor .immutable[TypedExampleCommand, TypedEvent, TypedExampleState]( persistenceId = "example-id",initialState = new TypedExampleState,commandHandler = PersistentActor.CommandHandler { (_, state, cmd) ⇒ cmd match { case TypedExampleWriteCommand(data) => Effect.persist(TypedEvent(s"$data ${state.size}")) case TypedExamplePrint => println(state); Effect.none} },eventHandler = (state, event) ⇒ event match { case TypedEvent(_) => state.updated(event) } ) }
Now to run it:
object TypedExampleMain extends App { import akka.actor.typed.scaladsl.adapter._ val system = akka.actor.ActorSystem("system") val actor = system.spawn(TypedExample.behavior, "example") actor ! TypedExampleWriteCommand("Hi Bella") actor ! TypedExamplePrint }
Again running once gives:
List(Hi Bella 0)
And twice:
List(Hi Bella 0, Hi Bella 1)
Typed vs Untyped
Let's define a case object:
case object FluffyKittenFace
If we send a FluffyKittenFace message to our persistent actor then the message gets swallowed. The original code continues to work but nothing appears to happen.
However if we try sending a FluffyKittenFace to our typed actor:
Red squiggle of doom!
We helpfully get a compile time error saying that our actor doesn't know what to do with all that fluff!
FluffyKittenFace is going to have to find another actor to process her message, or become a TypedExampleCommand.
Zee Cod:
https://github.com/polyglotpiglet/akka-typed-examples
Note that these examples are working against master rather than the release build. It should work from 2.5.9.
No comments:
Post a Comment