Akka Persistence EventStore Plugin is a plugin for Akka Persistence
that provides components:
- write journal store
- snapshot store
- standard persistence queries
This plugin stores data in a EventStore database and based on EventStore.Client client library for .net45 and EventStore.ClientAPI.NetCore client library for .netstandard2.0.
From Nuget Package Manager
Install-Package Akka.Persistence.EventStore
From .NET CLI
dotnet add package Akka.Persistence.EventStore
To activate the journal plugin, add the following line to your HOCON config:
akka.persistence.journal.plugin = "akka.persistence.journal.eventstore"
This will run the journal with its default settings. The default settings can be changed with the configuration properties defined in your HOCON config:
connection-string
- connection string, as described here: https://eventstore.org/docs/dotnet-api/connecting-to-a-server/index.html#connection-stringconnection-name
- connection name to tell eventstore server who is connectingread-batch-size
- when reading back events, how many to bring back at a timeadapter
- controls how the event data and metadata is stored and retrieved. See Adapter section below for more information.
akka.persistence {
journal {
plugin = "akka.persistence.journal.eventstore""
eventstore {
connection-string = "ConnectTo=tcp://admin:changeit@localhost:1113; HeartBeatTimeout=500"
connection-name = "Akka"
}
}
}
Akka Persistence EventStore Plugin supports changing how data is stored and retrieved.
By default, it will serialize the data using Akka.Serialization.NewtonSoftJsonSerializer
, and populate the Metadata with the
following information:
{
"persistenceId": "p-14",
"occurredOn": "2018-05-03T10:28:06.3437687-06:00",
"manifest": "",
"senderPath": "",
"sequenceNr": 5,
"writerGuid": "f8706bba-52a7-4326-a760-990c7f657c46",
"clrEventType": "System.String, System.Private.CoreLib"
}
If you are happy with the default serialization and metadata, but want to just augment the metadata or data, or do any of the following:
- Inspect event to add metadata
- Encrypt data
- Change the "type" stored in event store
You can inherit from DefaultAdapter and override the ToBytes
and ToEvent
methods
public class AltAdapter : DefaultAdapter
{
protected override byte[] ToBytes(object @event, JObject metadata, out string type, out bool isJson)
{
var bytes = base.ToBytes(@event, metadata, out type, out isJson);
// Add some additional metadata, such as the CLR type name to help with deserialization
metadata["additionalProp"] = true;
// Do something additional with bytes, or do something to produce bytes
return bytes;
}
protected override object ToEvent(byte[] bytes, JObject metadata)
{
// Use the metadata to determine how to convert it back to an event, such as using the CLR type captured
// in ToBytes.
// Do something additional with bytes before handing it off to persistent actors.
return base.ToEvent(bytes, metadata);
}
}
You also have the option of creating a new implemenation of Akka.Persistence.EventStore.IAdapter
.
Everything is DIY in this case, including correct handling of internal Akka types if they appear in
events. Make use of the supplied Akka.Serialization.Serialization
to help with this.
public class CustomAdapter : IAdapter
{
public DefaultAdapter(Akka.Serialization.Serialization serialization)
{
}
public EventData Adapt(IPersistentRepresentation persistentMessage)
{
// Implement
}
public IPersistentRepresentation Adapt(ResolvedEvent resolvedEvent)
{
// Implement
}
}
Whichever direction you go, you will need to override the HOCON to use your new adapter
akka.persistence {
journal {
plugin = "akka.persistence.journal.eventstore""
eventstore {
class = "Akka.Persistence.EventStore.Journal.EventStoreJournal, Akka.Persistence.EventStore"
connection-string = "ConnectTo=tcp://admin:changeit@localhost:1113; HeartBeatTimeout=500"
connection-name = "Akka"
read-batch-size = 500
adapter = "Your.Namespace.YourAdapter, Your.Assembly"
}
}
}
- The
DefaultEventAdapter
does not support internal Akka types (e.g. actor refs) and thus specs are failing. This has been updated to useAkka.Serialization.NewtonSoftJsonSerializer
. If this does not affect you, or you have projections that depend on old configuration, use the following keys:
akka.persistence.journal.eventstore.adapter = legacy
akka.persistence.snapshot-store.eventstore.adapter = legacy
- Adapter API has been changed to more correctly serialize the Sender actor ref.
Derived event adapter classes requires a minor interface change, namely removed
Func<string, IActorRef>
argument. The Akka built-in Persistent serialization mechanism
uses System.Provider.ResolveActorRef
and Akka.Serialization.Serialization.SerializedActorPath
to accomplish this. Legacy behavior is preserved by using System.ActorSelection
rather
than the journal's actor context.
From Nuget Package Manager
Install-Package Akka.Persistence.EventStore.Query
From .NET CLI
dotnet add package Akka.Persistence.EventStore.Query
Please note that you need to cofigure write jouranl anyways since EventStore
Persistance Query reuses connection from that journal. Also, it uses
IEventStoreConnection.SubscribeToStreamFrom
known as Catch up subscription,
this means that each subscription will start new connection under the hood.
This is the way how official EventStore client library works.
To activate the journal plugin, add the following line to your HOCON config:
akka.persistence.query.journal.plugin = "akka.persistence.query.journal.eventstore"
This will run the journal with its default settings. The default settings can be changed with the configuration properties defined in your HOCON config:
write-plugin
- Absolute path to the write journal plugin configuration entry that this query journal will connect to. If undefined (or "") it will connect to the default journal as specified by theakka.persistence.journal.plugin
property.max-buffer-size
- How many events to fetch in one query (replay) and keep buffered until they are delivered downstreams. Default value is 500.auto-ack
- Should query journal automaticaly aknowladge delivered events to downstream. Default isfalse
. This is reserved for next release for competing consumers query. See this docs for more details
akka.persistence.query.journal.eventstore {
write-plugin = ""
max-buffer-size = 500
auto-ack = false
}
To use standard queries please refer to documentation about Persistence Query on getakka.net website.
- CompetingBy(string groupName, string streamName)