lerna-stack / akka-entity-replication Goto Github PK
View Code? Open in Web Editor NEWAkka extension for fast recovery from failure with replicating stateful entity on multiple nodes in Cluster
License: Apache License 2.0
Akka extension for fast recovery from failure with replicating stateful entity on multiple nodes in Cluster
License: Apache License 2.0
This is required for #201.
rollback-tool-cassandra
(v2.2.0) deletes all tagged events (events of tag_view
) and rebuilds them:
https://github.com/lerna-stack/akka-entity-replication/blob/v2.2.0/rollback-tool-cassandra/src/main/scala/lerna/akka/entityreplication/rollback/cassandra/CassandraPersistentActorRollback.scala#L76-L77. The old events deletion feature deletes source events, which are required to rebuild tag_view
completely. The rollback of tag_view
has to be achieved while it avoids deleting all tagged events. This rollback might be achieved by updating tag-related tables (tag_view
, tag_write_progress
, tag_scanning
): https://doc.akka.io/docs/akka-persistence-cassandra/1.0.5/journal.html#schema
From the Raft thesis (https://github.com/ongardie/dissertation) section 3.6.1 (Election Restriction),
RaftActor
should accept RequestVote(lastLogIndex < log.lastLogIndex, lastLogTerm > log.lastLogTerm)
.
The following is expected:
* RequestVote.lastLogTerm < log.lastLogTerm: deny
* RequestVote.lastLogTerm > log.lastLogTerm:
* RequestVote.lastLogIndex > log.lastLogIndex: accept
* RequestVote.lastLogIndex = log.lastLogIndex: accept
* RequestVote.lastLogIndex < log.lastLogIndex: accept
* RequestVote.lastLogTerm = log.lastLogTerm:
* RequestVote.lastLogIndex > log.lastLogIndex: accept
* RequestVote.lastLogIndex = log.lastLogIndex: accept
* RequestVote.lastLogIndex < log.lastLogIndex: deny
The current implementations are the following:
TODO: Translate to English
過去に発行したイベントを全て適用した状態で新たな決定(イベントの発行)を行うことが期待されるため、コミットされていないイベントがある時は Entity はコマンドを処理しない。
未コミットのイベントがある状態でも Entity がコマンドを処理し、過去に発行したイベントが全て適用されていない状態で新たなイベントを発行してしまうケースがある。
イベントをレプリケーション中に Leader 側の RaftActor がクラッシュすると、新しい Leader に属する Entity が未コミットのイベントが存在する状態で ProcessCommand
を処理してしまう場合がある。
例) (TODO: use mermaid )
次の 3 つの条件を全て満たす場合に発生します。
ProcessCommand
が Entity に届くProcessCommand
が完了した後にイベントのレプリケーションが完了するSnapshot is saved atomically per one entity and index by SnapshotStore.
InstallSnapshot and compaction are processes that save snapshots of multiple entities.
They can conflict and it can cause inconsistency of entity state.
AppendEntries
from RaftActor on node B
LeaderData.clients
associates a ClientContext
for entity X with index=5.Similar to #155
[JVM-4] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(4, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(5, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(6, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(7, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(8, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(9, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(10, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(11, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(12, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),12) ===
[JVM-4] [Follower] compaction started (logEntryIndex: 12, number of entities: 1)
[JVM-4] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,12,Term(1),Vector(LogEntry(13, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(14, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(15, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),14) ===
[JVM-4] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-4] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-4] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,15,Term(1),Vector(),15) ===
[JVM-4] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-4] state updated: State(1)
[JVM-4] state updated: State(2)
[JVM-4] state updated: State(3)
[JVM-4] state updated: State(4)
[JVM-4] state updated: State(5)
[JVM-4] state updated: State(6)
[JVM-4] state updated: State(7)
[JVM-4] state updated: State(8)
[JVM-4] state updated: State(9)
[JVM-4] state updated: State(10)
[JVM-4] state updated: State(11)
[JVM-4] state updated: State(12)
[JVM-4] state updated: State(13)
[JVM-4] state updated: State(14)
[JVM-4] save snapshot: EntitySnapshot(EntitySnapshotMetadata(NormalizedEntityId(test),12),EntityState(State(14)))
branch for reproduction: 6c10bb7...sandbox/consistency-leak-entity
ReplicatedEntity
manages its own recoveryReplicatedEntity
can produce a snapshot to RaftActor
for first TakeSnapshot
ReplicatedEntity
drop events of RecoveryState
ReplicatedEntity
ignores TakeSnapshot
while recoveringReplicatedEntity
cannot produce a snapshot to RaftActor
for first TakeSnapshot
Replica
or ReplicationSucceeded
in its mailbox) runs its command handler and will send a Replicate
message to the leader.
ReplicationFailed
message in some cases.
ReplicationFaield
message against a Replicate message sent based on a non-latest state.ReplicationFaield
message if LogEntry
for the previous entity's replicate message is uncommitted.ReplicationFaield
message if LogEntry
for the previous entity's replicate message is committed, but not applied to the entity. There is a subtle timing in which the entity sends a Replicate based on a non-latest state after the leader has sent the LogEntry
to the entity.An Entity sends a Replicate
message containing its lastAppliedLogEntryIndex
. A leader will reply with ReplicaionFaield
if Leader's Raft log (ReplicatedLog
) contains an entry that satisfies the all of the following conditions:
LogEntry.event.entityId
is equal to Replicate.entityId
)lastAppliedLogEntryIndex
) of the Replicate
message.#45 affects this issue.
disabled
is the default) since it only supports the Akka Persistence Cassandra.akka-entity-replication
deletes old events and snapshots automatically (when it saves events or snapshots)akka-entity-replication
doesn't delete tagged events (tag_view
):
TimeWindowCompactionStrategy
deleteMessages
and deleteSnapshots
APIs:
RaftActor
doesn't track the progress of the event-sourcing feature. If CommitLogStore
is not available for a long period (due to like journal failures), the Event Sourcing feature will halt, and it won't be recovered automatically. This halt seldom happens since CommitLogStore
retries to save the committed event. Users can recover this halt by restarting each node (rolling update is possible) unless compaction deletes such events.
If compaction deletes committed entries (but they are not persisted to CommitLogStore
), it is difficult to recover since RaftActor
doesn't know which committed entry to persist to CommitLogStore
.
For event application to entity actor, only the Event column existing in RaftActor's memory is used.
At that time, the omission was not checked.
In Akka Persistence, even if the snapshot is lost, there is no problem as long as the event is persisted.
Committed log entries are not deleted except by compaction
Committed log entries can be deleted by snapshot synchronization
(It is assumed that the number of nodes is 3)
m1
) stops (or crashes) during the follower (m2
) is synchronizing snapshotsm2
completes and then reset its logm3
) stops (or crashes)m1
becomes the new leaderWhen the entity process a command as follows:
case TellAndReplicateAEventAndPassivate(inc: Int) =>
Effect.replicate(SerializableEvent(inc)).thenPassivate().thenNoReply()
java.lang.IllegalStateException: Shard received unexpected message
is thrown
[info] java.lang.IllegalStateException: Shard received unexpected message [Passivate(akka://ReplicatedEntityBehaviorTestKitSpec/system/test/$y,PoisonPill)]
[info] at lerna.akka.entityreplication.typed.internal.testkit.ReplicatedEntityBehaviorTestKitImpl.handleReplicatedEvent(ReplicatedEntityBehaviorTestKitImpl.scala:117)
[info] at lerna.akka.entityreplication.typed.internal.testkit.ReplicatedEntityBehaviorTestKitImpl.runCommand(ReplicatedEntityBehaviorTestKitImpl.scala:46)
[info] at lerna.akka.entityreplication.typed.testkit.ReplicatedEntityBehaviorTestKitSpec.$anonfun$new$9(ReplicatedEntityBehaviorTestKitSpec.scala:78)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] at org.scalatest.Transformer.apply(Transformer.scala:20)
[info] at org.scalatest.FlatSpecLike$$anon$5.apply(FlatSpecLike.scala:1682)
[info] at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
For #201
CassandraPersistenceQueries.currentEventsBefore
iterates events on all partitions (from + 1
to 0) but can skip deleted partitions by using the highest deleted sequence number (callled deleted_to
), which reduces some read-events queries.
CassandraPersistenceQueries.currentEventsBefore
:
CassandraPersistenceQueries
can fetch the highest deleted sequence number (called deleted_to
) from the metadata table (akka.metadata
).
deleted_to
is described as:
RaftActor
sends NoOp
to ReplicationActor
however receiveReplica
may not handle the event because users are not interested in the event.
Users show a lot of warnings that is produced by following the method
private[this] def innerApplyEvent(event: Any, logEntryIndex: LogEntryIndex): Unit = { if (logEntryIndex > lastAppliedLogEntryIndex) { receiveReplica.applyOrElse[Any, Unit]( event, event => { log.warning("unhandled {} by receiveReplica", event) }, ) lastAppliedLogEntryIndex = logEntryIndex } }akka-entity-replication/ReplicationActor.scala at v1.0.0 · lerna-stack/akka-entity-replication
('1d = 6h)
extractShardId
an extractEntityId
)For #201
CassandraPersistenceQueries
doesn't yet support deleted partitions (which are empty). CassandraPersistenceQueries
can handle one empty partition but not two or more consecutive empty partitions:
For example, CassandraPersistenceQueries.findHighestSequenceNr
returns a Future
containing None
mistakenly in some cases if two or more consecutive empty partitions exist (all events on those partitions have been deleted).
Suppose that:
CassandraPersistenceQueries.findHighestSequenceNr(persistenceId=???,from=PartitionNr(0))
returns a Future
containing None
now but should return the highest sequence number 26 instead.
Because LinearSequenceNrSearchStrategy
(https://github.com/lerna-stack/akka-entity-replication/blob/v2.2.0/rollback-tool-cassandra/src/main/scala/lerna/akka/entityreplication/rollback/LinearSequenceNrSearchStrategy.scala) depends on CassandraPersistenceQueries.findHighestSequenceNr
indirectly, a rollback might be impossible if two or more deleted partitions exist.
To address this issue, CassandraPersistenceQueries
can fetch the highest deleted sequence number (called deleted_to
) from the metadata table (akka.metadata
) and then skip deleted partitions.
Akka Persistence Cassandra Journal Schema is described as:
The following errors happened when some failure-injection tests (like one AZ down):
ERROR akka.actor.OneForOneStrategy xxx-akka.actor.internal-dispatcher-4 akka://xxx/system/sharding/raft-shard-xxx-replica-group-2/1/1 - Term not found at lastApplied: 7317 java.lang.IllegalStateException: Term not found at lastApplied: 7317
at lerna.akka.entityreplication.raft.RaftMemberData.resolveSnapshotTargets(RaftMemberData.scala:416)
at lerna.akka.entityreplication.raft.RaftMemberData.resolveSnapshotTargets$(RaftMemberData.scala:405)
at lerna.akka.entityreplication.raft.RaftMemberDataImpl.resolveSnapshotTargets(RaftMemberData.scala:539)
at lerna.akka.entityreplication.raft.RaftActor.handleSnapshotTick(RaftActor.scala:496)
at lerna.akka.entityreplication.raft.Follower$$anonfun$followerBehavior$1.applyOrElse(Follower.scala:35)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at lerna.akka.entityreplication.raft.RaftActor.akka$persistence$Eventsourced$$super$aroundReceive(RaftActor.scala:124)
at akka.persistence.Eventsourced$$anon$4.stateReceive(Eventsourced.scala:923)
at akka.persistence.Eventsourced.aroundReceive$$original(Eventsourced.scala:251)
at akka.persistence.Eventsourced.aroundReceive(Eventsourced.scala:148)
at akka.persistence.Eventsourced.aroundReceive$(Eventsourced.scala:250)
at lerna.akka.entityreplication.raft.RaftActor.aroundReceive(RaftActor.scala:124)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke$$original(ActorCell.scala:548)
at akka.actor.ActorCell.invoke(ActorCell.scala:61)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run$$original(Mailbox.scala:231)
at akka.dispatch.Mailbox.run(Mailbox.scala:32)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
ERROR akka.actor.OneForOneStrategy xxx-akka.actor.internal-dispatcher-14 akka://xxx/system/sharding/raft-shard-xxx-replica-group-3/2/2 - requirement failed: Cannot select the entries (1-7610) unless RaftActor have applied the entries to the entities (lastApplied: 0) java.lang.IllegalArgumentException: requirement failed: Cannot select the entries (1-7610) unless RaftActor have applied the entries to the entities (lastApplied: 0)
at scala.Predef$.require(Predef.scala:337)
at lerna.akka.entityreplication.raft.RaftMemberData.selectEntityEntries(RaftMemberData.scala:375)
at lerna.akka.entityreplication.raft.RaftMemberData.selectEntityEntries$(RaftMemberData.scala:368)
at lerna.akka.entityreplication.raft.RaftMemberDataImpl.selectEntityEntries(RaftMemberData.scala:539)
at lerna.akka.entityreplication.raft.RaftActor.receiveFetchEntityEvents(RaftActor.scala:164)
at lerna.akka.entityreplication.raft.Follower$$anonfun$followerBehavior$1.applyOrElse(Follower.scala:32)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at lerna.akka.entityreplication.raft.RaftActor.akka$persistence$Eventsourced$$super$aroundReceive(RaftActor.scala:124)
at akka.persistence.Eventsourced$$anon$4.stateReceive(Eventsourced.scala:923)
at akka.persistence.Eventsourced.aroundReceive$$original(Eventsourced.scala:251)
at akka.persistence.Eventsourced.aroundReceive(Eventsourced.scala:148)
at akka.persistence.Eventsourced.aroundReceive$(Eventsourced.scala:250)
at lerna.akka.entityreplication.raft.RaftActor.aroundReceive(RaftActor.scala:124)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke$$original(ActorCell.scala:548)
at akka.actor.ActorCell.invoke(ActorCell.scala:61)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run$$original(Mailbox.scala:231)
at akka.dispatch.Mailbox.run(Mailbox.scala:32)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Replicate
message to its RaftActor (supposed that called as RaftActor A), and then waits for a Replication result (ReplicationSucceeded
, ReplicationFaield
, or Replica
).WaitForReplication
since the entity doesn't receive any Replica
.ProcessCommand
because its state is WaitForReplication
.RaftActor sends a ReplicationFaied
message to an entity if the entity is waiting for a Raft log entry truncated by conflict. This might be possible to achieve at AppendEntries
handling:
Conflicted entries are existing entries of the Raft log (called ReplicatedLog
) with indices greater than or equal to the index of the head of newEntries
.
The member that received InstallSnapshot
copies all snapshots of the sender-saved ones.
Snapshot synchronization copies snapshots by only journal of compaction that InstallSnapshot
sender executed.
Snapshots will be also updated by snapshot synchronization.
InstallSnapshot
from the leader-memberInstallSnapshot
to the delayed follower before entity C updated in member AExample:
(This scenario will be reproduced in an integration test case)
version: 2.3.0
The following log got in a fault injection test. The log suggests that an entity (running in replica-group-1) might be facing data inconsistency (Detecting event number constraint violation is described in https://github.com/lerna-stack/akka-entity-replication/blob/v2.3.0/docs/typed/implementation_guide.md#detecting-data-inconsistencies-by-entity-implementation):
06:13:23.767 TRACE [Inactive] Received Activate: recoveryIndex=[9077], shardSnapshotStore=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/ShardSnapshotStore:1#721724213]]
06:13:23.767 TRACE [Recovering] Sending FetchSnapshot: entityId=[0000000966], to=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/ShardSnapshotStore:1#721724213]]
06:13:23.767 TRACE [Recovering] Starting single RecoveryTimeoutTimer: delay=[35 seconds]
06:13:23.767 TRACE [Recovering] Stashing Replica: index=[9078], term=[8], entityId=[Some(0000000966)], eventType=[***]
06:13:23.767 TRACE [Recovering] Received ApplySnapshot: index=[Some(2001)], entityId=[Some(0000000966)], stateType=[Some(***)]
06:13:23.767 TRACE [Recovering] Sending FetchEntityEvents: entityId=[0000000966], fromIndex=[2002], toIndex=[9077], replyTo=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/0000000966/$$a-adapter#-394328591]], to=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1#-1477116895]]
06:13:23.768 TRACE [Recovering] Received RecoveryState: snapshot.index=[Some(2001)], snapshot.entityId=[Some(0000000966)], snapshot.stateType=[Some(***)], events.size=[0], events.head=[None], events.last=[None]
06:13:23.768 TRACE [Recovering] Recovering with initial state: index=[2001], stateType=[***]
06:13:23.768 TRACE [Recovering] Recovered with state: index=[2001], stateType=[***]
06:13:23.768 TRACE [Ready] Received Replica: index=[9078], term=[8], entityId=[Some(0000000966)], eventType=[***]
06:13:23.768 ERROR [EntityId=0000000966] Violated event number constraint: eventNo=[6] should be equal to state.nextEventNo: [3]; event=[***]
The above log also shows the entity recovered with a snapshot (index=2001) and no events.
However, RaftActor replicated the following events for that entity as below:
05:42:36.278 akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[1464], instanceId=[Some(72848)], eventType=[***]
05:42:36.831 akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[1470], instanceId=[Some(72949)], eventType=[***]
05:42:36.982 akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[1471], instanceId=[Some(72979)], eventType=[***]
05:57:59.938 akka://***/system/sharding/raft-shard-***-replica-group-2/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[5298], instanceId=[Some(279516)], eventType=[***]
05:58:00.503 akka://***/system/sharding/raft-shard-***-replica-group-2/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[5304], instanceId=[Some(279621)], eventType=[***]
05:58:00.644 akka://***/system/sharding/raft-shard-***-replica-group-2/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[5305], instanceId=[Some(279646)], eventType=[***]
06:13:23.614 akka://***/system/sharding/raft-shard-***-replica-group-2/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[9078], instanceId=[Some(481219)], eventType=[***]
06:13:24.173 akka://***/system/sharding/raft-shard-***-replica-group-2/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[9083], instanceId=[Some(481322)], eventType=[***]
06:13:24.324 akka://***/system/sharding/raft-shard-***-replica-group-2/1/1/0000000966 [WaitForReplication] Received ReplicationSucceeded: index=[9085], instanceId=[Some(481351)], eventType=[***]
Before that happened, SnapshotSyncManager (running in replica-group-1) executed snapshot synchronizations as below:
05:54:25.283 INFO Recovery completed: state=[SyncProgress(NoOffset)]
05:54:25.283 INFO Snapshot synchronization started: (typeName: ***, memberIndex: replica-group-2, snapshotLastLogTerm: 8, snapshotLastLogIndex: 3875) -> (typeName: ***, memberIndex: replica-group-1, snapshotLastLogTerm: 2, snapshotLastLogIndex: 1013)
05:57:04.684 INFO Stopping itself.
05:57:04.847 INFO Recovery completed: state=[SyncProgress(NoOffset)]
05:57:04.847 INFO Snapshot synchronization started: (typeName: ***, memberIndex: replica-group-2, snapshotLastLogTerm: 8, snapshotLastLogIndex: 4802) -> (typeName: ***, memberIndex: replica-group-1, snapshotLastLogTerm: 2, snapshotLastLogIndex: 1013)
05:59:55.112 INFO Snapshot synchronization completed: (typeName: ***, memberIndex: replica-group-2) -> (typeName: ***, memberIndex: replica-group-1, snapshotLastLogTerm: 2, snapshotLastLogIndex: 1013)
05:59:55.113 INFO Snapshot synchronization started: (typeName: ***, memberIndex: replica-group-2, snapshotLastLogTerm: 8, snapshotLastLogIndex: 5741) -> (typeName: ***, memberIndex: replica-group-1, snapshotLastLogTerm: 2, snapshotLastLogIndex: 1013)
05:59:55.114 INFO Succeeded to saveSnapshot given metadata [SnapshotMetadata(SnapshotSyncManager:***:replica-group-2:replica-group-1:1, 3, 1697435995112, None)]
06:00:05.212 INFO Snapshot synchronization completed: (typeName: ***, memberIndex: replica-group-2) -> (typeName: ***, memberIndex: replica-group-1, snapshotLastLogTerm: 2, snapshotLastLogIndex: 1013)
06:00:05.213 INFO Succeeded to saveSnapshot given metadata [SnapshotMetadata(SnapshotSyncManager:***:replica-group-2:replica-group-1:1, 5, 1697436005212, None)]
06:00:05.213 INFO Stopping itself.
A SnapshotStore (for the entity, running in replica-group-2) wrote snapshots as below:
05:44:29.922 DEBUG Saved EntitySnapshot: entityId=[0000000966], logEntryIndex=[2001], stateType=[***]
05:59:48.773 DEBUG Saved EntitySnapshot: entityId=[0000000966], logEntryIndex=[5741], stateType=[***]
However, a SnapshotStore (for the entity, running in replica-group-1) didn't write the snapshot (index=5741) as below:
05:59:37.347 DEBUG Saved EntitySnapshot: entityId=[0000000966], logEntryIndex=[2001], stateType=[***]
The log below, which is newly added for this diagnosis, shows EntitySnapshotsUpdated events (i.e., CompactionCompleted) SnapshotSyncManager subscribed:
05:57:04.652 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 2, snapshotLastLogIndex: 1030, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 1033, offset: TimeBasedUUID(90b42800-6be6-11ee-ad67-25a25fca1ba5))
05:57:04.657 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 2001, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 2014, offset: TimeBasedUUID(1336f410-6be7-11ee-ad67-25a25fca1ba5))
05:57:04.663 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 2953, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 2966, offset: TimeBasedUUID(a101c810-6be7-11ee-ad67-25a25fca1ba5))
05:57:04.670 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 3875, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 3890, offset: TimeBasedUUID(2bc72440-6be8-11ee-ad67-25a25fca1ba5))
05:57:04.670 DEBUG [entity-snapshots-updated-events] Downstream finished, cause: SubscriptionWithCancelException$StageWasCompleted$: null
05:59:16.036 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 2, snapshotLastLogIndex: 1030, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 1033, offset: TimeBasedUUID(90b42800-6be6-11ee-ad67-25a25fca1ba5))
05:59:16.046 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 2001, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 2014, offset: TimeBasedUUID(1336f410-6be7-11ee-ad67-25a25fca1ba5))
05:59:16.053 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 2953, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 2966, offset: TimeBasedUUID(a101c810-6be7-11ee-ad67-25a25fca1ba5))
05:59:16.058 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 3875, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 3890, offset: TimeBasedUUID(2bc72440-6be8-11ee-ad67-25a25fca1ba5))
05:59:16.059 DEBUG [entity-snapshots-updated-events] Downstream finished, cause: SubscriptionWithCancelException$StageWasCompleted$: null
05:59:41.724 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 3875, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 3890, offset: TimeBasedUUID(2bc72440-6be8-11ee-ad67-25a25fca1ba5))
05:59:41.727 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 4802, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 4819, offset: TimeBasedUUID(ae7c9910-6be8-11ee-ad67-25a25fca1ba5))
05:59:46.750 DEBUG [entity-snapshots-updated-events] Upstream finished.
05:59:55.123 DEBUG [entity-snapshots-updated-events] Element: EntitySnapshotsUpdated(snapshotLastLogTerm: 8, snapshotLastLogIndex: 5741, eventType: lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted, persistenceId: raft:***:1:replica-group-2, sequenceNr: 5757, offset: TimeBasedUUID(36e567a0-6be9-11ee-ad67-25a25fca1ba5))
06:00:00.139 DEBUG [entity-snapshots-updated-events] Upstream finished.
Also, the log below, which is also newly added for this diagnosis, shows entity snapshots SnapshotSyncManager copied:
05:59:37.255 DEBUG Copying EntitySnapshot: entityId=[0000000966], from=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/SnapshotSyncManager:***/$a#-2034853661]], to=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/ShardSnapshotStore:1#721724213]]
05:59:37.348 DEBUG Copied EntitySnapshot: entityId=[0000000966], from=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/SnapshotSyncManager:***/$a#-2034853661]], to=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/ShardSnapshotStore:1#721724213]], sourceEntitySnapshotMetadata=[EntitySnapshotMetadata(NormalizedEntityId(0000000966),2001)], destinationEntitySnapshotMetadata=[EntitySnapshotMetadata(NormalizedEntityId(0000000966),2001)]
06:00:05.089 DEBUG Copying EntitySnapshot: entityId=[0000000966], from=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/SnapshotSyncManager:***/$a#-2034853661]], to=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/ShardSnapshotStore:1#721724213]]
06:00:05.089 DEBUG Copied EntitySnapshot: entityId=[0000000966], from=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/SnapshotSyncManager:***/$a#-2034853661]], to=[Actor[akka://***/system/sharding/raft-shard-***-replica-group-1/1/1/ShardSnapshotStore:1#721724213]], sourceEntitySnapshotMetadata=[EntitySnapshotMetadata(NormalizedEntityId(0000000966),2001)], destinationEntitySnapshotMetadata=[EntitySnapshotMetadata(NormalizedEntityId(0000000966),2001)]
The above logs suggest that SnapshotSyncManager read an old entity snapshot from a source SnapshotStore.
Note that RaftActor (running in replica-group-2) executed Raft log compactions as below:
05:40:50.718 INFO [Follower] compaction started (logEntryIndex: 1030, number of entities: 339)
05:40:51.081 INFO [Follower] compaction completed (term: Term(2), logEntryIndex: 1030)
05:44:29.638 INFO [Leader] compaction started (logEntryIndex: 2001, number of entities: 299)
05:44:30.026 INFO [Leader] compaction completed (term: Term(8), logEntryIndex: 2001)
05:48:27.318 INFO [Leader] compaction started (logEntryIndex: 2953, number of entities: 336)
05:48:28.103 INFO [Leader] compaction completed (term: Term(8), logEntryIndex: 2953)
05:52:20.308 INFO [Leader] compaction started (logEntryIndex: 3875, number of entities: 306)
05:52:20.728 INFO [Leader] compaction completed (term: Term(8), logEntryIndex: 3875)
05:55:59.609 INFO [Leader] compaction started (logEntryIndex: 4802, number of entities: 309)
05:56:00.031 INFO [Leader] compaction completed (term: Term(8), logEntryIndex: 4802)
05:59:48.508 INFO [Leader] compaction started (logEntryIndex: 5741, number of entities: 314)
05:59:48.878 INFO [Leader] compaction completed (term: Term(8), logEntryIndex: 5741)
06:03:38.708 INFO [Leader] compaction started (logEntryIndex: 6691, number of entities: 317)
06:03:39.058 INFO [Leader] compaction completed (term: Term(8), logEntryIndex: 6691)
06:07:34.478 INFO [Leader] compaction started (logEntryIndex: 7621, number of entities: 311)
06:07:34.848 INFO [Leader] compaction completed (term: Term(8), logEntryIndex: 7621)
06:11:10.428 INFO [Leader] compaction started (logEntryIndex: 8522, number of entities: 300)
06:11:10.829 INFO [Leader] compaction completed (term: Term(8), logEntryIndex: 8522)
The log diagnosis suggests the following scenario happened. Suppose the following sequence:
If a new SnapshotSyncManager instance (is different from the instance that ran the first synchronization) handles the new snapshot synchronization, the new synchronization succeeds. However, if the same SnapshotSyncManager instance runs these synchronizations, the new synchronization sometimes cannot synchronize new entity snapshots.
The following sequence diagram illustrates the details of this scenario:
sequenceDiagram
autonumber
%% Actors
participant RaftActor1 as RaftActor<br/>(replica-group-1)
participant SnapshotSyncManager as SnapshotSyncManager<br/>(replica-group-1)
participant ShardSnapshotStore1 as ShardSnapshotStore<br/>(replica-group-1)
participant SnapshotStore1 as SnapshotStore<br/>(replica-group-1, EntityId=1)
participant ShardSnapshotStore2ForSync as ShardSnapshotStore<br/>(replica-group-2)
participant SnapshotStore2ForSync as SnapshotStore<br/>(replica-group-2, EntityId=1)
participant PersistentDataStore as Persistent Data Store<br/>(i.e. Cassandra)
participant RaftActor2 as RaftActor<br/>(replica-group-2)
participant ShardSnapshotStore2 as ShardSnapshotStore<br/>(replica-group-2)
participant SnapshotStore2 as SnapshotStore<br/>(replica-group-2, EntityId=1)
%% Snapshot Synchronization
RaftActor1 --> RaftActor1: Attempt to start snapshot synchronization (source=replica-group-2)
activate RaftActor1
RaftActor1 --> SnapshotSyncManager: Spawn SnapshotSyncManager actor
activate SnapshotSyncManager
RaftActor1 ->> SnapshotSyncManager: Send SyncSnapshot
SnapshotSyncManager --> PersistentDataStore: Recovery from persisted data
SnapshotSyncManager --> SnapshotSyncManager: Start snapshot synchronization
activate SnapshotSyncManager
SnapshotSyncManager --> ShardSnapshotStore2ForSync: Spawn ShardSnapshotStore
SnapshotSyncManager ->> ShardSnapshotStore2ForSync: Send FetchSnapshot
ShardSnapshotStore2ForSync --> SnapshotStore2ForSync: Spawn SnapshotStore
ShardSnapshotStore2ForSync ->> SnapshotStore2ForSync: Forward FetchSnapshot
SnapshotStore2ForSync --> PersistentDataStore: Recovery from persisted data
SnapshotStore2ForSync ->> SnapshotSyncManager: Reply SnapshotFound
par Reading EntitySnapshot from the persistent data store
SnapshotSyncManager ->> ShardSnapshotStore1: Send SaveSnapshot
ShardSnapshotStore1 --> SnapshotStore1: Spawn SnapshotStore
SnapshotStore1 --> PersistentDataStore: Recovery from persisted data
ShardSnapshotStore1 ->> SnapshotStore1: Forward SaveSnapshot
SnapshotStore1 -->> PersistentDataStore: Persist EntitySnapshot
SnapshotStore1 ->> SnapshotSyncManager: Reply SaveSnapshotSuccess
SnapshotSyncManager -->> PersistentDataStore: Persist SyncCompleted
SnapshotSyncManager ->> RaftActor1: SyncSnapshotSucceeded
deactivate SnapshotSyncManager
deactivate RaftActor1
and Writing EntitySnapshot to the persistent data store
%% Compaction
RaftActor2 --> RaftActor2: Start compaction
activate RaftActor2
RaftActor2 ->> ShardSnapshotStore2: Send SaveSnapshot
ShardSnapshotStore2 ->> SnapshotStore2: Forward SaveSnapshot
SnapshotStore2 --> PersistentDataStore: Recovery from persisted data
SnapshotStore2 -->> PersistentDataStore: Persist EntitySnapshot
Note over SnapshotStore2, PersistentDataStore: Another SnapshotStore (replica-group-2, EntityId=1) doesn't read this persisted EntitySnapshot<br/>if it has already been recovered.
SnapshotStore2 ->> RaftActor2: Reply SaveSnapshotSuccess
RaftActor2 -->> PersistentDataStore: Persist CompactionCompleted
deactivate RaftActor2
RaftActor2 -->> RaftActor1: Send heartbeats, which tells RaftActor1 must start a new snapshot synchronization.
%% Snapshot Synchronization
RaftActor1 --> RaftActor1: Attempt to start snapshot synchronization (source=replica-group-2)
activate RaftActor1
RaftActor1 ->> SnapshotSyncManager: SyncSnapshot
end
SnapshotSyncManager --> SnapshotSyncManager: Start snapshot synchronization
activate SnapshotSyncManager
SnapshotSyncManager ->> ShardSnapshotStore2ForSync: Send FetchSnapshot
ShardSnapshotStore2ForSync ->> SnapshotStore2ForSync: Forward FetchSnapshot
Note over SnapshotStore2ForSync: SnapshotStore (replica-group-2, EntityId=1) returns the cached EntitySnapshot.<br/>It doesn't return newly written data that another one writes after that caching.
SnapshotStore2ForSync ->> SnapshotSyncManager: Reply SnapshotFound
SnapshotSyncManager ->> ShardSnapshotStore1: Send SaveSnapshot
ShardSnapshotStore1 ->> SnapshotStore1: Forward SaveSnapshot
SnapshotStore1 -->> SnapshotStore1: Skip the EntitySnapshot save if it has already saved the snapshot.
SnapshotStore1 ->> SnapshotSyncManager: Reply SaveSnapshotSuccess
SnapshotSyncManager -->> PersistentDataStore: Persist SyncCompleted
SnapshotStore1 ->> SnapshotSyncManager: Reply SaveSnapshotSuccess
SnapshotSyncManager ->> RaftActor1: SyncSnapshotSucceeded
deactivate SnapshotSyncManager
deactivate SnapshotSyncManager
deactivate RaftActor1
The following figure shows the hierarchy of these actors. While the SnapshotSyncManager in replica-group-1 spawns a source SnapshotStore as its child for fetching an entity snapshot, RaftActor in replica-group-2 spawns another SnapshotStore with the same persistence ID for compaction.
flowchart
%% Actors
RaftActor1["RaftActor (replica-group-1)"]
SnapshotSyncManager["SnapshotSyncManager (replica-group-1)"]
ShardSnapshotStore1["ShardSnapshotStore (replica-group-1)"]
SnapshotStore1["SnapshotStore (replica-group-1, EntityId=1)"]
ShardSnapshotStore2ForSync["ShardSnapshotStore (replica-group-2)"]
SnapshotStore2ForSync["SnapshotStore (replica-group-2, EntityId=1)"]
RaftActor2["RaftActor (replica-group-2)"]
ShardSnapshotStore2["ShardSnapshotStore (replica-group-2)"]
SnapshotStore2["SnapshotStore (replica-group-2, EntityId=1)"]
%% Hierarchy
RaftActor1 --- SnapshotSyncManager
RaftActor1 --- ShardSnapshotStore1
ShardSnapshotStore1 --- SnapshotStore1
subgraph Run for Snapshot Synchronization
SnapshotSyncManager --- ShardSnapshotStore2ForSync
ShardSnapshotStore2ForSync --- SnapshotStore2ForSync
end
RaftActor2 --- ShardSnapshotStore2
ShardSnapshotStore2 --- SnapshotStore2
A SnapshotStore caches an entity snapshot in memory when it replays events from a persistent store. If a SnapshotSyncManager reuses the same SnapshotStore instance, it will read the cached entity snapshot,
which could be old, and then write the old one to a destination SnapshotStore.
Possible solutions might be the following:
I have confirmed that this is not the logger name expected with lerna-stack/akka-entity-replication-sample
BankAccountBehavior
Recovering
log output:
node3_1 | 06:49:13.598 INFO Recovering - [LEADER] Deposited(1623912553193,100) [balance: 1500, resent-transactions: 15]
Suppose that the following scenario:
CommitLogStore
did not save the domain event A for some reason:
CommitLogStore
was not available temporarily.CommitLogStore
saved the event.After we conduct a full cluster restart, CommitLogStore
will not save the domain event A until the cluster receives a user request against a RaftActor
that should handle the domain event A. This is because RaftActor
will not start until the cluster receives a user request. This behavior means we cannot subscribe to the domain event A on a query-side until the cluster receives a user request.
CommitLogStore
will save domain events automatically after a cluster startup. This behavior should not require any user request. This save enables us to subscribe to the event. To achieve this behavior, RaftActor
s should start automatically after a cluster startup without any user requests.
CommitLogStoreActor
behaves as an event source actor.
Saving a snapshot makes a recovery process efficient if the number of events increases.
The team will address this issue by the following.
Note that some of the details might be changed.
lerna.akka.entityreplication.raft.eventsourced.persistence.snapshot-store.plugin
to configure a snapshot store plugin ID. The default value is empty string ""
.lerna.akka.entityreplication.raft.eventsourced.persistence.snapshot-every
.1000
.RaftSettings
.
def eventSourcedSnapshotStorePluginId: String
def eventSourcedSnapshotEvery: Int
ClusterReplicationSettings
(both classic and typed)
def withEventSourcedSnapshotStorePluginId: ClusterReplicationSettings
LogEntryIndex
nowCommitLogStoreActor
/docs/*
)SnapshotStore
( ~ v2.1.0) saves an entity's snapshot as a snapshot, not an event. One of the reasons behind this is reducing storage space for saving entitys' snapshots.
As same as other persistent actors such as RaftActor
, SnapshotStore
is wanted to be rolled back to a certain point (timestamp or sequence number). To support this rollback, SnapshotStore
should save an entity's snapshot as an event instead. Note that this change will increase required storage space to save snapshots.
Considerations:
SnapshotStore
maintains data backward compatibility. The new SnapshotStore
should read a snapshot that the old SnapshotStore
saved.SnapshotStore
should not change message protocols as possible not to affect other actors such as RaftActor
and SnapshotSyncManager
.The job logs the following messages:
https://github.com/lerna-stack/akka-entity-replication/actions/runs/5308022311/jobs/9607508359
[info] gpg: no default secret key: unusable secret key
[info] gpg: signing failed: unusable secret key
Error: java.lang.RuntimeException: Failure running 'gpg --batch --passphrase *** --detach-sign --armor --use-agent --output /home/runner/work/akka-entity-replication/akka-entity-replication/core/target/scala-2.13/akka-entity-replication_2.13-2.3.0.pom.asc /home/runner/work/akka-entity-replication/akka-entity-replication/core/target/scala-2.13/akka-entity-replication_2.13-2.3.0.pom'. Exit code: 2
Error: at scala.sys.package$.error(package.scala:30)
Error: at com.jsuereth.sbtpgp.CommandLineGpgSigner.sign(PgpSigner.scala:74)
Error: at com.jsuereth.sbtpgp.PgpSettings$.$anonfun$signingSettings$2(PgpSettings.scala:151)
Error: at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
Error: at scala.collection.immutable.Map$Map4.foreach(Map.scala:236)
Error: at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
Error: at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
Error: at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
Error: at com.jsuereth.sbtpgp.PgpSettings$.$anonfun$signingSettings$1(PgpSettings.scala:146)
Error: at scala.Function1.$anonfun$compose$1(Function1.scala:49)
Error: at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
Error: at sbt.std.Transform$$anon$4.work(Transform.scala:67)
Error: at sbt.Execute.$anonfun$submit$2(Execute.scala:281)
Error: at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:19)
Error: at sbt.Execute.work(Execute.scala:290)
Error: at sbt.Execute.$anonfun$submit$1(Execute.scala:281)
Error: at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:178)
Error: at sbt.CompletionService$$anon$2.call(CompletionService.scala:37)
Error: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Error: at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
Error: at java.util.concurrent.FutureTask.run(FutureTask.java:266)
Error: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Error: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Error: at java.lang.Thread.run(Thread.java:748)
It happened in some fault injection tests.
An entity (called as entity X) on RaftActor (replica-group-2) got data inconsistency:
08:05:24.836
):
ApplySnapshot(entitySnapshot=[None])
messageFetchEntityEvents(..., from=[1], to=[4179], ...)
messageRecoveryState(snapshot=[None], events=([11] entries))
message08:16:10.985
):
ApplySnapshot(entitySnapshot=[None])
messageFetchEntityEvents(..., from=[1], to=[4179], ...)
RecoveryState(snapshot=[None], events=([0] entries))
messageOn the second recovery, entity X didn't receive events that the first recovery contained, which means that entity X got data inconsistency.
On the other hand, RaftActor (replica-group-2) started snapshot synchronization like the following:
08:05:03.392
: RaftActor (replica-group-2) started snapshot synchronization:
[Follower] Applying event [SnapshotSyncStarted], state diff: [lastSnapshotStatus: SnapshotStatus(Term(16),3730,Term(16),3730) -> SnapshotStatus(Term(16),3730,Term(17),3746)]
08:16:10.517
: RaftActor (replica-group-2) completed snapshot synchronization:
[Follower] Applying event [SnapshotSyncCompleted], state diff: [replicatedLog: ReplicatedLog(ancestorTerm=Term(14), ancestorIndex=3630, 549 entries with indices Some(3631)...Some(4179)) -> ReplicatedLog(ancestorTerm=Term(17), ancestorIndex=3746, 0 entries with indices None...None), lastSnapshotStatus: SnapshotStatus(Term(16),3730,Term(17),3746) -> SnapshotStatus(Term(17),3746,Term(17),3746)]
RaftActor (replica-group-2) committed entries (indices 3746 ~ 3748) at 08:02:27.449
.
The above snapshot synchronization removed committed log entries that not be included in snapshots.
RaftActor (replica-group-1) was the leader and updated indices for replica-group-2 like the following:
08:04:13.090
: Applying event [SucceededAppendEntries]: next index = 3952 -> 3953, match index = 3951 -> 3952
08:04:14.210
: Applying event [BecameLeader]: match index = 3953 -> None, match index = 3952 -> None
08:04:50.558
: Applying event [DeniedAppendEntries]: next index = None -> 4058
08:04:50.558
: Applying event [DeniedAppendEntries]: next index = 4058 -> 4057
08:05:05.632
: Applying event [DeniedAppendEntries]: next index = 3213 -> 3212
The next index was lower than expected, like the situation described on #165 (comment)
SnapshotStore
(or ShardSnapshotStore
) should reply with SnapshotNotFound
if it has no EntitySnapshot
and is saving an EntitySnapshot
. For example, when an entity (ReplicatedEntityBehavior
) is recovering, the entity waits for a response from ShardSnapshotStore
. If ShardSnapshotStore
replies with SnapshotNotFound
, the entity can continue its recovery. If ShardSnapshotStore
replies with nothing, the entity might get recovery timed out. This case might rarely happen because the entity might be already recovered when SnapshotStore
receives a SaveSnapshot
command.
Related code:
log message:
ERROR lerna.akka.entityreplication.ClusterReplicationGuardian - swallowing exception during message send akka.actor.dungeon.SerializationCheckFailedException: Failed to serialize and deserialize message of type lerna.akka.entityreplication.ClusterReplicationGuardian$Start for testing. To avoid this error, either disable 'akka.actor.serialize-messages', mark the message with 'akka.actor.NoSerializationVerificationNeeded', or configure serialization to support this message
ClusterReplicationGuardian$Start
should mix-in akka.actor.NoSerializationVerificationNeeded
, because the message is not sent any remote actors.
We would like to support Java8 and java11.
We have to configure GitHub Actions to do that.
actions/setup-java: Set up your GitHub Actions workflow with a specific version of Java may be helpful for us.
Remembering entities like akka-cluster-sahrding is useful for implementing an orchestrator for distributed transaction such as Saga pattern.
Remembering Entities
It probably exposes APIs currently that we don't need.
akka-entity-replication
uses some internal APIs of Akka.
The internal APIs may change when the patch version changes.
For more details, see the following:
Mixed versioning is not allowed
2.0.0
Messages buffered by Akka Cluster Sharding may be causing the SnapshotUpdateConflictException
.
Snapshot updates will not be rolled back, which may cause that entities recover with an incorrect state.
[JVM-2]
: Follower[JVM-3]
: Leader[JVM-4]
: An unreachable member[JVM-2] === Transition: Recovering -> Follower ===
[JVM-2] === [Recovering] election-timeout after 1427 ms ===
[JVM-3] === Transition: Recovering -> Follower ===
[JVM-3] === [Recovering] election-timeout after 1179 ms ===
[JVM-2] [Follower] election timeout. Leader will be changed
[JVM-2] === [Follower] broadcast RequestVote(NormalizedShardId(146),Term(4),replica-group-1,13,Term(3)) ===
[JVM-2] === [Follower] persisting time: 5 ms ===
[JVM-2] === Transition: Follower -> Candidate ===
[JVM-2] === [Follower] election-timeout after 984 ms ===
[JVM-4] [Follower] election timeout. Leader will be changed
[JVM-4] === [Follower] broadcast RequestVote(NormalizedShardId(146),Term(4),replica-group-3,13,Term(3)) ===
[JVM-4] === [Follower] persisting time: 3 ms ===
[JVM-4] === Transition: Follower -> Candidate ===
[JVM-4] === [Follower] election-timeout after 1300 ms ===
[JVM-2] raft-committed-event-store-typeNam-1: Trying to register to coordinator at [ActorSelection[Anchor(akka://RaftActorCompactionSpec@localhost:51063/), Path(/system/sharding/raft-committed-event-store-typeNam-1Coordinator/singleton/coordinator)]], but no acknowledgement. Total [27] buffered messages. [Coordinator [Member(akka://RaftActorCompactionSpec@localhost:51063, Up)] is reachable.]
[JVM-3] === [Follower] election timeout ===
[JVM-3] === [Follower] broadcast RequestVote(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0)) ===
[JVM-2] === [Follower] accept RequestVote(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0)) ===
[JVM-3] === [Follower] persisting time: 9 ms ===
[JVM-3] === Transition: Follower -> Candidate ===
[JVM-3] === [Follower] election-timeout after 1233 ms ===
[JVM-3] === [Candidate] accept self RequestVote ===
[JVM-2] === [Follower] persisting time: 4 ms ===
[JVM-2] === [Follower] election-timeout after 1243 ms ===
[JVM-3] === [Candidate] persisting time: 7 ms ===
[JVM-3] raft-committed-event-store-typeNam-1: Trying to register to coordinator at [ActorSelection[Anchor(akka://RaftActorCompactionSpec@localhost:51063/), Path(/system/sharding/raft-committed-event-store-typeNam-1Coordinator/singleton/coordinator)]], but no acknowledgement. Total [28] buffered messages. [Coordinator [Member(akka://RaftActorCompactionSpec@localhost:51063, Up)] is reachable.]
[JVM-3] === [Candidate] accept for replica-group-1 ===
[JVM-3] === [Candidate] election-timeout after 1262 ms ===
[JVM-3] === [Candidate] accept for replica-group-2 ===
[JVM-3] === Transition: Candidate -> Leader ===
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-3] [Leader] New leader was elected (term: Term(1), lastLogTerm: Term(0), lastLogIndex: 0)
[JVM-3] === [Leader] persisting time: 5 ms ===
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1))),0) to replica-group-1 ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1))),0) to replica-group-3 ===
[JVM-2] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1))),0) ===
[JVM-4] raft-committed-event-store-typeNam-1: Trying to register to coordinator at [ActorSelection[Anchor(akka://RaftActorCompactionSpec@localhost:51063/), Path(/system/sharding/raft-committed-event-store-typeNam-1Coordinator/singleton/coordinator)]], but no acknowledgement. Total [8] buffered messages. [Coordinator [Member(akka://RaftActorCompactionSpec@localhost:51063, Up)] is reachable.]
[JVM-2] === [Follower] persisting time: 4 ms ===
[JVM-2] === Transition: Follower -> Follower ===
[JVM-2] === [Follower] election-timeout after 837 ms ===
[JVM-3] === [Leader] committed LogEntry(1, EntityEvent(None,NoOp), Term(1)) and will notify it to ClientContext(Actor[akka://RaftActorCompactionSpec/system/sharding/raft-shard-typeNam-2-replica-group-2/146/146#579683721],None,None) ===
[JVM-3] === [Leader] broadcast TryCreateEntity(NormalizedShardId(146),NormalizedEntityId(test)) ===
[JVM-3] === [Leader] created an entity (NormalizedEntityId(test)) ===
[JVM-2] === [Follower] created an entity (NormalizedEntityId(test)) ===
[JVM-3] === [Leader] broadcast TryCreateEntity(NormalizedShardId(146),NormalizedEntityId(test)) ===
[JVM-3] === [Leader] broadcast TryCreateEntity(NormalizedShardId(146),NormalizedEntityId(test)) ===
[JVM-3] === [Leader] broadcast TryCreateEntity(NormalizedShardId(146),NormalizedEntityId(test)) ===
[JVM-3] === [Leader] persisting time: 6 ms ===
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,1,Term(1),List(LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),1) to replica-group-1 ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),1) to replica-group-3 ===
[JVM-2] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,1,Term(1),Vector(LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),1) ===
[JVM-2] === [Follower] persisting time: 9 ms ===
[JVM-2] === Transition: Follower -> Follower ===
[JVM-2] === [Follower] election-timeout after 1378 ms ===
[JVM-3] === [Leader] committed LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)) and will notify it to ClientContext(Actor[akka://RaftActorCompactionSpec/system/sharding/raft-shard-typeNam-2-replica-group-2/146/146/test#-930394426],Some(EntityInstanceId(2)),Some(Actor[akka://RaftActorCompactionSpec@localhost:51064/system/testActor-2#-1359332434])) ===
[JVM-3] state updated: State(1)
[JVM-2] [Follower] compaction started (logEntryIndex: 1, number of entities: 0)
[JVM-3] === [Leader] persisting time: 6 ms ===
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,2,Term(1),List(LogEntry(3, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),2) to replica-group-1 ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),2) to replica-group-3 ===
[JVM-2] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,2,Term(1),Vector(LogEntry(3, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),2) ===
[JVM-2] === [Follower] persisting time: 6 ms ===
[JVM-2] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-2] state updated: State(1)
[JVM-2] === Transition: Follower -> Follower ===
[JVM-2] === [Follower] election-timeout after 977 ms ===
[JVM-3] === [Leader] committed LogEntry(3, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)) and will notify it to ClientContext(Actor[akka://RaftActorCompactionSpec/system/sharding/raft-shard-typeNam-2-replica-group-2/146/146/test#-930394426],Some(EntityInstanceId(2)),Some(Actor[akka://RaftActorCompactionSpec@localhost:51064/system/testActor-2#-1359332434])) ===
[JVM-3] state updated: State(2)
[JVM-3] === [Leader] persisting time: 5 ms ===
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,3,Term(1),List(LogEntry(4, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),3) to replica-group-1 ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(4, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),3) to replica-group-3 ===
[JVM-2] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,3,Term(1),Vector(LogEntry(4, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),3) ===
[JVM-2] === [Follower] persisting time: 4 ms ===
[JVM-2] state updated: State(2)
[JVM-2] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-2] === Transition: Follower -> Follower ===
[JVM-2] === [Follower] election-timeout after 913 ms ===
[JVM-3] === [Leader] committed LogEntry(4, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)) and will notify it to ClientContext(Actor[akka://RaftActorCompactionSpec/system/sharding/raft-shard-typeNam-2-replica-group-2/146/146/test#-930394426],Some(EntityInstanceId(2)),Some(Actor[akka://RaftActorCompactionSpec@localhost:51064/system/testActor-2#-1359332434])) ===
[JVM-3] state updated: State(3)
[JVM-3] [Leader] compaction started (logEntryIndex: 4, number of entities: 1)
[JVM-3] === [Leader] persisting time: 4 ms ===
[JVM-3] [Leader] compaction completed (term: Term(1), logEntryIndex: 4)
[JVM-2] [Follower] compaction started (logEntryIndex: 3, number of entities: 1)
[JVM-2] === [Follower] persisting time: 5 ms ===
[JVM-2] [Follower] compaction completed (term: Term(1), logEntryIndex: 3)
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-2] [Candidate] Election timeout at Term(4). Retrying leader election.
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,4,Term(1),List(),4) to replica-group-1 ===
[JVM-3] === [Leader] publish InstallSnapshot(NormalizedShardId(146),Term(1),replica-group-2,Term(1),4) to replica-group-3 ===
[JVM-2] === [Candidate] broadcast RequestVote(NormalizedShardId(146),Term(5),replica-group-1,13,Term(3)) ===
[JVM-2] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,4,Term(1),Vector(),4) ===
[JVM-2] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-2] state updated: State(3)
[JVM-2] === Transition: Follower -> Follower ===
[JVM-2] === [Follower] election-timeout after 787 ms ===
[JVM-2] === [Candidate] persisting time: 16 ms ===
[JVM-2] === Transition: Candidate -> Candidate ===
[JVM-2] === [Candidate] election-timeout after 1254 ms ===
[JVM-1] === releaseIsolation RoleName(node3) ===
[JVM-1] - should synchronize snapshot to recover a Follower even if the Follower could not receive all logs by compaction in a Leader
[JVM-4] - should synchronize snapshot to recover a Follower even if the Follower could not receive all logs by compaction in a Leader
[JVM-3] - should synchronize snapshot to recover a Follower even if the Follower could not receive all logs by compaction in a Leader
[JVM-3] === [Leader] broadcast TryCreateEntity(NormalizedShardId(146),NormalizedEntityId(test)) ===
[JVM-3] === [Leader] persisting time: 10 ms ===
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,4,Term(1),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(test)),Incremented(0)), Term(1))),4) to replica-group-1 ===
[JVM-3] === [Leader] publish InstallSnapshot(NormalizedShardId(146),Term(1),replica-group-2,Term(1),4) to replica-group-3 ===
[JVM-2] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,4,Term(1),Vector(LogEntry(5, EntityEvent(Some(NormalizedEntityId(test)),Incremented(0)), Term(1))),4) ===
[JVM-2] === [Follower] persisting time: 17 ms ===
[JVM-2] === Transition: Follower -> Follower ===
[JVM-2] === [Follower] election-timeout after 1437 ms ===
[JVM-3] === [Leader] committed LogEntry(5, EntityEvent(Some(NormalizedEntityId(test)),Incremented(0)), Term(1)) and will notify it to ClientContext(Actor[akka://RaftActorCompactionSpec/system/sharding/raft-shard-typeNam-2-replica-group-2/146/146/test#-930394426],Some(EntityInstanceId(2)),Some(Actor[akka://RaftActorCompactionSpec@localhost:51064/system/testActor-2#-1359332434])) ===
[JVM-3] state updated: State(3)
[JVM-2] [Follower] compaction started (logEntryIndex: 4, number of entities: 1)
[JVM-4] === Transition: Recovering -> Follower ===
[JVM-2] === [Follower] persisting time: 36 ms ===
[JVM-3] [Leader] compaction started (logEntryIndex: 5, number of entities: 1)
[JVM-2] [Follower] compaction completed (term: Term(1), logEntryIndex: 4)
[JVM-4] === [Recovering] election-timeout after 1285 ms ===
[JVM-4] === [Follower] accept RequestVote(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0)) ===
[JVM-3] === [Leader] persisting time: 12 ms ===
[JVM-3] [Leader] compaction completed (term: Term(1), logEntryIndex: 5)
[JVM-4] === [Follower] persisting time: 10 ms ===
[JVM-4] === [Follower] election-timeout after 1021 ms ===
[JVM-4] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1))),0) ===
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,5,Term(1),List(),5) to replica-group-1 ===
[JVM-2] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,5,Term(1),Vector(),5) ===
[JVM-3] === [Leader] publish InstallSnapshot(NormalizedShardId(146),Term(1),replica-group-2,Term(1),5) to replica-group-3 ===
[JVM-2] === [Follower] applying Incremented(0) to ReplicationActor ===
[JVM-2] === Transition: Follower -> Follower ===
[JVM-2] state updated: State(3)
[JVM-4] === [Follower] persisting time: 16 ms ===
[JVM-2] === [Follower] election-timeout after 1393 ms ===
[JVM-4] === Transition: Follower -> Follower ===
[JVM-4] === [Follower] election-timeout after 1034 ms ===
[JVM-4] === [Follower] created an entity (NormalizedEntityId(test)) ===
[JVM-4] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),1) ===
[JVM-4] === [Follower] persisting time: 12 ms ===
[JVM-4] === Transition: Follower -> Follower ===
[JVM-4] === [Follower] election-timeout after 1286 ms ===
[JVM-4] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),2) ===
[JVM-4] === [Follower] persisting time: 6 ms ===
[JVM-4] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-4] === Transition: Follower -> Follower ===
[JVM-4] === [Follower] election-timeout after 813 ms ===
[JVM-4] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1)), LogEntry(4, EntityEvent(Some(NormalizedEntityId(test)),Incremented(1)), Term(1))),3) ===
[JVM-4] === [Follower] persisting time: 9 ms ===
[JVM-4] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-4] === Transition: Follower -> Follower ===
[JVM-4] === [Follower] election-timeout after 1005 ms ===
[JVM-4] === Transition: Follower -> Follower ===
[JVM-4] === [Follower] election-timeout after 981 ms ===
[JVM-4] === Transition: Follower -> Follower ===
[JVM-4] === [Follower] election-timeout after 1251 ms ===
[JVM-4] === Transition: Follower -> Follower ===
[JVM-4] === [Follower] election-timeout after 1384 ms ===
[JVM-4] state updated: State(1)
[JVM-4] state updated: State(2)
[JVM-4] Snapshot synchronization started: (typeName: typeNam-2, memberIndex: replica-group-2, snapshotLastLogTerm: 1, snapshotLastLogIndex: 4) -> (typeName: typeNam-2, memberIndex: replica-group-3, snapshotLastLogTerm: 0, snapshotLastLogIndex: 0)
[JVM-4] [Follower] compaction started (logEntryIndex: 3, number of entities: 1)
[JVM-3] === [Leader] Heartbeat after 100 ms ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,5,Term(1),List(),5) to replica-group-1 ===
[JVM-3] === [Leader] publish AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,4,Term(1),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(test)),Incremented(0)), Term(1))),5) to replica-group-3 ===
[JVM-2] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,5,Term(1),Vector(),5) ===
[JVM-4] === [Follower] append AppendEntries(NormalizedShardId(146),Term(1),replica-group-2,4,Term(1),Vector(LogEntry(5, EntityEvent(Some(NormalizedEntityId(test)),Incremented(0)), Term(1))),5) ===
[JVM-2] === Transition: Follower -> Follower ===
[JVM-2] === [Follower] election-timeout after 839 ms ===
[JVM-4] === [Follower] persisting time: 14 ms ===
[JVM-4] === [Follower] applying Incremented(1) to ReplicationActor ===
[JVM-4] state updated: State(3)
[JVM-4] === [Follower] applying Incremented(0) to ReplicationActor ===
[JVM-4] state updated: State(3)
[JVM-4] === Transition: Follower -> Follower ===
[JVM-4] === [Follower] election-timeout after 1224 ms ===
[JVM-4] Snapshot synchronization aborted: (typeName: typeNam-2, memberIndex: replica-group-2) -> (typeName: typeNam-2, memberIndex: replica-group-3, snapshotLastLogTerm: 0, snapshotLastLogIndex: 0) cause: lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SnapshotUpdateConflictException: Newer (logEntryIndex: 5) snapshot found than expected (logEntryIndex: 4) in [typeName: typeNam-2, memberIndex: replica-group-2, entityId: NormalizedEntityId(test)]
The conflict was induced by AppendEntities
commands that the follower receives before receiving InstallSnapshot
.
The follower accepted AppendEntities
commands and merged log entries until LogEntryIndex(5)
.
The root cause of this issue is the follower accepts InstallSnapshot
command in spite of the follower already had made consistent with the leader.
protected def receiveInstallSnapshot(request: InstallSnapshot): Unit = request match { case installSnapshot if installSnapshot.term.isOlderThan(currentData.currentTerm) => // ignore the message because this member knows another newer leader case installSnapshot => if (installSnapshot.term == currentData.currentTerm) { applyDomainEvent(DetectedLeaderMember(installSnapshot.srcMemberIndex)) { _ => startSyncSnapshot(installSnapshot) become(Follower) } } else { applyDomainEvent(DetectedNewTerm(installSnapshot.term)) { _ => applyDomainEvent(DetectedLeaderMember(installSnapshot.srcMemberIndex)) { _ => startSyncSnapshot(installSnapshot) become(Follower) } } } }akka-entity-replication/RaftActor.scala at v2.0.0 · lerna-stack/akka-entity-replication
protected def startSyncSnapshot(installSnapshot: InstallSnapshot): Unit = { val snapshotSyncManagerName = ActorIds.actorName( snapshotSyncManagerNamePrefix, typeName.underlying, installSnapshot.srcMemberIndex.role, ) val snapshotSyncManager = context.child(snapshotSyncManagerName).getOrElse { context.actorOf( SnapshotSyncManager.props( typeName = typeName, srcMemberIndex = installSnapshot.srcMemberIndex, dstMemberIndex = selfMemberIndex, dstShardSnapshotStore = shardSnapshotStore, shardId, settings, ), snapshotSyncManagerName, ) } snapshotSyncManager ! SnapshotSyncManager.SyncSnapshot( srcLatestSnapshotLastLogTerm = installSnapshot.srcLatestSnapshotLastLogTerm, srcLatestSnapshotLastLogIndex = installSnapshot.srcLatestSnapshotLastLogLogIndex, dstLatestSnapshotLastLogTerm = currentData.lastSnapshotStatus.snapshotLastTerm, dstLatestSnapshotLastLogIndex = currentData.lastSnapshotStatus.snapshotLastLogIndex, replyTo = self, ) }akka-entity-replication/RaftActor.scala at v2.0.0 · lerna-stack/akka-entity-replication
We should add any condition to accept InstallSnapshot
.
Replicate
message from an entity and will be replicating an entry of the message for the entity.
ClientContext
with LogEntryIndex
) into LeaderData.clients
.Replica
to the entity.
ClientContext
with LogEntryIndex
) of LeaderData.clients
is not removed.The following error occurred in some fault injection tests:
04:48:20.417 xxx ERROR akka.actor.OneForOneStrategy xxx akka://xxx/system/sharding/raft-shard-xxx-replica-group-3/30/30 - requirement failed: The entry with index [583] should not conflict with the committed entry (commitIndex [584]) java.lang.IllegalArgumentException: requirement failed: The entry with index [583] should not conflict with the committed entry (commitIndex [584])
at scala.Predef$.require(Predef.scala:337)
at lerna.akka.entityreplication.raft.FollowerData.resolveNewLogEntries(RaftMemberData.scala:147)
at lerna.akka.entityreplication.raft.FollowerData.resolveNewLogEntries$(RaftMemberData.scala:119)
at lerna.akka.entityreplication.raft.RaftMemberDataImpl.resolveNewLogEntries(RaftMemberData.scala:616)
at lerna.akka.entityreplication.raft.Follower.lerna$akka$entityreplication$raft$Follower$$receiveAppendEntries(Follower.scala:111)
at lerna.akka.entityreplication.raft.Follower$$anonfun$followerBehavior$1.applyOrElse(Follower.scala:25)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at lerna.akka.entityreplication.raft.RaftActor.akka$persistence$Eventsourced$$super$aroundReceive(RaftActor.scala:141)
at akka.persistence.Eventsourced$$anon$4.stateReceive(Eventsourced.scala:923)
at akka.persistence.Eventsourced.aroundReceive$$original(Eventsourced.scala:251)
at akka.persistence.Eventsourced.aroundReceive(Eventsourced.scala:148)
at akka.persistence.Eventsourced.aroundReceive$(Eventsourced.scala:250)
at lerna.akka.entityreplication.raft.RaftActor.aroundReceive(RaftActor.scala:141)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke$$original(ActorCell.scala:548)
at akka.actor.ActorCell.invoke(ActorCell.scala:61)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run$$original(Mailbox.scala:231)
at akka.dispatch.Mailbox.run(Mailbox.scala:32)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
The following log continued (up to 1700) in some fault injection tests:
[Leader] failed to replicate the event (type=[lerna.akka.entityreplication.raft.model.NoOp$]) since the entity (entityId=[0000059981], instanceId=[34156], lastAppliedIndex=[814]) must apply [1] entries to itself. The leader will replicate a new event after the entity applies these [1] non-applied entries to itself.
By diagnosing logs, the following situation happened:
NoOp
.
lastAppliedLogEntryIndex
was 814.NoOp
replication was succeeded with index 821.commitIndex=821
, lastApplied=821
).
Replica
for index 821 to the entity since an associated event is NoOp
.lastAppliedLogEntryIndex
to 821.ProcessCommand
and then attempted to replicate an event:
Replicate(entityLastAppliedIndex=814, ...)
ReplicationFaield
Replica
for EntityEvent(Some(entityId), NoOp)
is not sent:
RaftActor
will send Replica
to an entity also if an EntityEvent
contains NoOp
.Leader
will start replication if non-applied entries contain only NoOp
.2.0.0
The following log is taken from CI.
2021-11-29T01:40:07.6088032Z [JVM-4] === Transition: Recovering -> Follower ===
2021-11-29T01:40:07.6089055Z [JVM-4] === [Recovering] election-timeout after 828 ms ===
2021-11-29T01:40:07.6243011Z [JVM-3] === Transition: Recovering -> Follower ===
2021-11-29T01:40:07.6245596Z [JVM-3] === [Recovering] election-timeout after 1451 ms ===
2021-11-29T01:40:08.4506281Z [JVM-4] === [Follower] election timeout ===
2021-11-29T01:40:08.4509293Z [JVM-4] === [Follower] broadcast RequestVote(NormalizedShardId(118),Term(1),member-3,0,Term(0)) ===
2021-11-29T01:40:08.4568858Z [JVM-3] === [Follower] accept RequestVote(NormalizedShardId(118),Term(1),member-3,0,Term(0)) ===
2021-11-29T01:40:08.4642075Z [JVM-3] === [Follower] persisting time: 9 ms ===
2021-11-29T01:40:08.4645587Z [JVM-3] === [Follower] election-timeout after 1287 ms ===
2021-11-29T01:40:08.4705631Z [JVM-4] === [Follower] persisting time: 19 ms ===
2021-11-29T01:40:08.4706871Z [JVM-4] === Transition: Follower -> Candidate ===
2021-11-29T01:40:08.4708044Z [JVM-4] === [Follower] election-timeout after 1015 ms ===
2021-11-29T01:40:08.4709174Z [JVM-4] === [Candidate] accept self RequestVote ===
2021-11-29T01:40:08.4795805Z [JVM-4] === [Candidate] persisting time: 8 ms ===
2021-11-29T01:40:08.4797535Z [JVM-4] === [Candidate] accept for member-2 ===
2021-11-29T01:40:08.4798767Z [JVM-4] === [Candidate] election-timeout after 754 ms ===
2021-11-29T01:40:08.4799938Z [JVM-4] === [Candidate] accept for member-3 ===
2021-11-29T01:40:08.4801615Z [JVM-4] === Transition: Candidate -> Leader ===
2021-11-29T01:40:08.4802734Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.4804215Z [JVM-4] [Leader] New leader was elected (term: Term(1), lastLogTerm: Term(0), lastLogIndex: 0)
2021-11-29T01:40:08.4857115Z [JVM-4] === [Leader] persisting time: 5 ms ===
2021-11-29T01:40:08.4858429Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.4860135Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1))),0) to member-1 ===
2021-11-29T01:40:08.4862454Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1))),0) to member-2 ===
2021-11-29T01:40:08.4882041Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1))),0) ===
2021-11-29T01:40:08.4965696Z [JVM-3] === [Follower] persisting time: 8 ms ===
2021-11-29T01:40:08.4970818Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:08.4973102Z [JVM-3] === [Follower] election-timeout after 1018 ms ===
2021-11-29T01:40:08.5004947Z [JVM-4] === [Leader] committed LogEntry(1, EntityEvent(None,NoOp), Term(1)) and will notify it to ClientContext(Actor[akka://ReplicationRegionSpec/system/sharding/raft-shard-typeName-6-member-3/118/118#121746048],None,None) ===
2021-11-29T01:40:08.5007878Z [JVM-4] === [Leader] broadcast TryCreateEntity(NormalizedShardId(118),NormalizedEntityId(replication-7)) ===
2021-11-29T01:40:08.5010007Z [JVM-4] === [Leader] created an entity (NormalizedEntityId(replication-7)) ===
2021-11-29T01:40:08.5012667Z [JVM-4] === [Leader] broadcast TryCreateEntity(NormalizedShardId(118),NormalizedEntityId(replication-7)) ===
2021-11-29T01:40:08.5014685Z [JVM-4] === [Leader] broadcast TryCreateEntity(NormalizedShardId(118),NormalizedEntityId(replication-7)) ===
2021-11-29T01:40:08.5016988Z [JVM-4] === [Leader] broadcast TryCreateEntity(NormalizedShardId(118),NormalizedEntityId(replication-7)) ===
2021-11-29T01:40:08.5020041Z [JVM-4] === akka://ReplicationRegionSpec/system/sharding/raft-shard-typeName-6-member-3/118/118/replication-7 started ===
2021-11-29T01:40:08.5170432Z [JVM-3] === [Follower] created an entity (NormalizedEntityId(replication-7)) ===
2021-11-29T01:40:08.5172747Z [JVM-3] === akka://ReplicationRegionSpec/system/sharding/raft-shard-typeName-6-member-2/118/118/replication-7 started ===
2021-11-29T01:40:08.5269739Z [JVM-4] === [Leader] persisting time: 18 ms ===
2021-11-29T01:40:08.5271286Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.5273739Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),1) to member-1 ===
2021-11-29T01:40:08.5277363Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,1,Term(1),List(LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),1) to member-2 ===
2021-11-29T01:40:08.5350282Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,1,Term(1),Vector(LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),1) ===
2021-11-29T01:40:08.5415919Z [JVM-3] === [Follower] persisting time: 11 ms ===
2021-11-29T01:40:08.5417542Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:08.5420013Z [JVM-3] === [Follower] election-timeout after 1047 ms ===
2021-11-29T01:40:08.5476597Z [JVM-4] === [Leader] committed LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)) and will notify it to ClientContext(Actor[akka://ReplicationRegionSpec/system/sharding/raft-shard-typeName-6-member-3/118/118/replication-7#-1166201394],Some(EntityInstanceId(4)),Some(Actor[akka://ReplicationRegionSpec/system/testActor-2#-562953879])) ===
2021-11-29T01:40:08.5480275Z [JVM-4] updateState
2021-11-29T01:40:08.5551610Z [JVM-4] === [Leader] persisting time: 6 ms ===
2021-11-29T01:40:08.5557949Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.5569675Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),2) to member-1 ===
2021-11-29T01:40:08.5572835Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,2,Term(1),List(LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),2) to member-2 ===
2021-11-29T01:40:08.5583768Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,2,Term(1),Vector(LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),2) ===
2021-11-29T01:40:08.5632158Z [JVM-3] === [Follower] persisting time: 4 ms ===
2021-11-29T01:40:08.5643330Z [JVM-3] === [Follower] applying received to ReplicationActor ===
2021-11-29T01:40:08.5644707Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:08.5646529Z [JVM-3] === [Follower] election-timeout after 1119 ms ===
2021-11-29T01:40:08.5647793Z [JVM-3] updateState
2021-11-29T01:40:08.5658162Z [JVM-4] === [Leader] committed LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)) and will notify it to ClientContext(Actor[akka://ReplicationRegionSpec/system/sharding/raft-shard-typeName-6-member-3/118/118/replication-7#-1166201394],Some(EntityInstanceId(4)),Some(Actor[akka://ReplicationRegionSpec/system/testActor-2#-562953879])) ===
2021-11-29T01:40:08.5661126Z [JVM-4] updateState
2021-11-29T01:40:08.5719364Z [JVM-4] === [Leader] persisting time: 4 ms ===
2021-11-29T01:40:08.5720831Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.5724012Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(4, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),3) to member-1 ===
2021-11-29T01:40:08.5728722Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,3,Term(1),List(LogEntry(4, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),3) to member-2 ===
2021-11-29T01:40:08.5742753Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,3,Term(1),Vector(LogEntry(4, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),3) ===
2021-11-29T01:40:08.5796262Z [JVM-3] === [Follower] persisting time: 4 ms ===
2021-11-29T01:40:08.5797722Z [JVM-3] === [Follower] applying received to ReplicationActor ===
2021-11-29T01:40:08.5799194Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:08.5800381Z [JVM-3] === [Follower] election-timeout after 1069 ms ===
2021-11-29T01:40:08.5802030Z [JVM-3] updateState
2021-11-29T01:40:08.5814218Z [JVM-4] === [Leader] committed LogEntry(4, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)) and will notify it to ClientContext(Actor[akka://ReplicationRegionSpec/system/sharding/raft-shard-typeName-6-member-3/118/118/replication-7#-1166201394],Some(EntityInstanceId(4)),Some(Actor[akka://ReplicationRegionSpec/system/testActor-2#-562953879])) ===
2021-11-29T01:40:08.5816804Z [JVM-4] updateState
2021-11-29T01:40:08.5817754Z [JVM-4] receive: GetStatusWithEnsuringConsistency: 3
2021-11-29T01:40:08.5868483Z [JVM-4] === [Leader] persisting time: 4 ms ===
2021-11-29T01:40:08.5869399Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.5872854Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(4, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),4) to member-1 ===
2021-11-29T01:40:08.5884437Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,4,Term(1),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),4) to member-2 ===
2021-11-29T01:40:08.5887307Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,4,Term(1),Vector(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),4) ===
2021-11-29T01:40:08.5945533Z [JVM-3] === [Follower] persisting time: 4 ms ===
2021-11-29T01:40:08.5946553Z [JVM-3] === [Follower] applying received to ReplicationActor ===
2021-11-29T01:40:08.5947529Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:08.5949958Z [JVM-3] === [Follower] election-timeout after 1072 ms ===
2021-11-29T01:40:08.5950938Z [JVM-3] updateState
2021-11-29T01:40:08.6016936Z [JVM-4] === [Leader] committed LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1)) and will notify it to ClientContext(Actor[akka://ReplicationRegionSpec/system/sharding/raft-shard-typeName-6-member-3/118/118/replication-7#-1166201394],Some(EntityInstanceId(4)),Some(Actor[akka://ReplicationRegionSpec/system/testActor-2#-562953879])) ===
2021-11-29T01:40:08.6097062Z [JVM-4] [Leader] compaction started (logEntryIndex: 5, number of entities: 1)
2021-11-29T01:40:08.6167771Z [JVM-3] [Follower] compaction started (logEntryIndex: 4, number of entities: 1)
2021-11-29T01:40:08.6236212Z [JVM-4] === [Leader] persisting time: 8 ms ===
2021-11-29T01:40:08.6237435Z [JVM-4] [Leader] compaction completed (term: Term(1), logEntryIndex: 5)
2021-11-29T01:40:08.6352304Z [JVM-3] === [Follower] persisting time: 13 ms ===
2021-11-29T01:40:08.6353486Z [JVM-3] [Follower] compaction completed (term: Term(1), logEntryIndex: 4)
2021-11-29T01:40:08.6999164Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.7000884Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) to member-1 ===
2021-11-29T01:40:08.7002654Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),List(),5) to member-2 ===
2021-11-29T01:40:08.7037435Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),Vector(),5) ===
2021-11-29T01:40:08.7046710Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:08.7047957Z [JVM-3] === [Follower] election-timeout after 944 ms ===
2021-11-29T01:40:08.8206990Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.8209555Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) to member-1 ===
2021-11-29T01:40:08.8211191Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),List(),5) to member-2 ===
2021-11-29T01:40:08.8226811Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),Vector(),5) ===
2021-11-29T01:40:08.8239893Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:08.8241240Z [JVM-3] === [Follower] election-timeout after 1317 ms ===
2021-11-29T01:40:08.9386334Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:08.9391154Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) to member-1 ===
2021-11-29T01:40:08.9393248Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),List(),5) to member-2 ===
2021-11-29T01:40:08.9418004Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),Vector(),5) ===
2021-11-29T01:40:08.9419669Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:08.9420657Z [JVM-3] === [Follower] election-timeout after 1370 ms ===
2021-11-29T01:40:09.0595264Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:09.0600777Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) to member-1 ===
2021-11-29T01:40:09.0604698Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),List(),5) to member-2 ===
2021-11-29T01:40:09.0618736Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),Vector(),5) ===
2021-11-29T01:40:09.0627021Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.0628435Z [JVM-3] === [Follower] election-timeout after 1002 ms ===
2021-11-29T01:40:09.1792066Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:09.1795222Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) to member-1 ===
2021-11-29T01:40:09.1797560Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),List(),5) to member-2 ===
2021-11-29T01:40:09.1828432Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),Vector(),5) ===
2021-11-29T01:40:09.1830055Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.1831340Z [JVM-3] === [Follower] election-timeout after 1386 ms ===
2021-11-29T01:40:09.2998241Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:09.3027415Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) to member-1 ===
2021-11-29T01:40:09.3029718Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),List(),5) to member-2 ===
2021-11-29T01:40:09.3031537Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),Vector(),5) ===
2021-11-29T01:40:09.3033435Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.3034572Z [JVM-3] === [Follower] election-timeout after 896 ms ===
2021-11-29T01:40:09.3611293Z [JVM-2] === Transition: Recovering -> Follower ===
2021-11-29T01:40:09.3612802Z [JVM-2] === [Recovering] election-timeout after 1089 ms ===
2021-11-29T01:40:09.3615673Z [JVM-2] === [Follower] accept RequestVote(NormalizedShardId(118),Term(1),member-3,0,Term(0)) ===
2021-11-29T01:40:09.3701289Z [JVM-2] === [Follower] persisting time: 9 ms ===
2021-11-29T01:40:09.3702623Z [JVM-2] === [Follower] election-timeout after 769 ms ===
2021-11-29T01:40:09.3704457Z [JVM-2] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1))),0) ===
2021-11-29T01:40:09.3815983Z [JVM-2] === [Follower] persisting time: 10 ms ===
2021-11-29T01:40:09.3820109Z [JVM-2] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.3821664Z [JVM-2] === [Follower] election-timeout after 765 ms ===
2021-11-29T01:40:09.3823112Z [JVM-2] === [Follower] created an entity (NormalizedEntityId(replication-7)) ===
2021-11-29T01:40:09.3827060Z [JVM-2] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),1) ===
2021-11-29T01:40:09.3830936Z [JVM-2] === akka://ReplicationRegionSpec/system/sharding/raft-shard-typeName-6-member-1/118/118/replication-7 started ===
2021-11-29T01:40:09.3888895Z [JVM-2] === [Follower] persisting time: 6 ms ===
2021-11-29T01:40:09.3890569Z [JVM-2] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.3891877Z [JVM-2] === [Follower] election-timeout after 1287 ms ===
2021-11-29T01:40:09.3895106Z [JVM-2] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),2) ===
2021-11-29T01:40:09.3967778Z [JVM-2] === [Follower] persisting time: 5 ms ===
2021-11-29T01:40:09.3970336Z [JVM-2] === [Follower] applying received to ReplicationActor ===
2021-11-29T01:40:09.3972891Z [JVM-2] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.3975181Z [JVM-2] === [Follower] election-timeout after 769 ms ===
2021-11-29T01:40:09.3978218Z [JVM-2] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(4, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1))),3) ===
2021-11-29T01:40:09.4123649Z [JVM-2] === [Follower] persisting time: 15 ms ===
2021-11-29T01:40:09.4125446Z [JVM-2] === [Follower] applying received to ReplicationActor ===
2021-11-29T01:40:09.4131575Z [JVM-2] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.4134414Z [JVM-2] === [Follower] election-timeout after 857 ms ===
2021-11-29T01:40:09.4140257Z [JVM-2] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),Vector(LogEntry(1, EntityEvent(None,NoOp), Term(1)), LogEntry(2, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(3, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(4, EntityEvent(Some(NormalizedEntityId(replication-7)),received), Term(1)), LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),4) ===
2021-11-29T01:40:09.4191180Z [JVM-4] === [Leader] Heartbeat after 100 ms ===
2021-11-29T01:40:09.4193789Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,4,Term(1),List(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) to member-1 ===
2021-11-29T01:40:09.4196093Z [JVM-4] === [Leader] publish AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),List(),5) to member-2 ===
2021-11-29T01:40:09.4206443Z [JVM-2] === [Follower] persisting time: 7 ms ===
2021-11-29T01:40:09.4228542Z [JVM-3] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,5,Term(1),Vector(),5) ===
2021-11-29T01:40:09.4230243Z [JVM-3] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.4231521Z [JVM-3] === [Follower] election-timeout after 1073 ms ===
2021-11-29T01:40:09.4289174Z [JVM-2] === [Follower] applying received to ReplicationActor ===
2021-11-29T01:40:09.4299810Z [JVM-2] === Transition: Follower -> Follower ===
2021-11-29T01:40:09.4300772Z [JVM-2] === [Follower] election-timeout after 843 ms ===
2021-11-29T01:40:09.4303220Z [JVM-2] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),Vector(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) ===
2021-11-29T01:40:09.4435699Z [JVM-2] === [Follower] persisting time: 16 ms ===
The following AppendEntries
should have prevLogIndex: 4
and prevLogTerm: 1
because entries
contains a LogEntry
which has logEntryIndex: 5
, however it has prevLogIndex: 0
and prevLogTerm: 0
.
2021-11-29T01:40:09.4672596Z [JVM-2] === [Follower] append AppendEntries(NormalizedShardId(118),Term(1),member-3,0,Term(0),Vector(LogEntry(5, EntityEvent(Some(NormalizedEntityId(replication-7)),NoOp), Term(1))),5) ===
final case class AppendEntries(
shardId: NormalizedShardId,
term: Term,
leader: MemberIndex,
prevLogIndex: LogEntryIndex,
prevLogTerm: Term,
entries: Seq[LogEntry],
leaderCommit: LogEntryIndex,
) extends RaftRequest
with ClusterReplicationSerializable
If the follower accepts the command, ReplicatedLog
is broken by merging the log entries the command has.
def merge(thatEntries: Seq[LogEntry], prevLogIndex: LogEntryIndex): ReplicatedLog = { val newEntries = this.entries.takeWhile(_.index <= prevLogIndex) ++ thatEntries copy(newEntries) }akka-entity-replication/ReplicatedLog.scala at v2.0.0 · lerna-stack/akka-entity-replication
The leader has completed compaction right before sending the AppendEntries
.
So the leader has not had enough log entries for sending to the follower.
2021-11-29T01:40:08.6237435Z [JVM-4] [Leader] compaction completed (term: Term(1), logEntryIndex: 5)
In this situation, the leader should send the InstallSnapshot
command to the follower.
InstallSnapshot
is sent when prevLogTerm
is resolved as None
.
val nextIndex = currentData.nextIndexFor(memberIndex) val prevLogIndex = nextIndex.prev() val prevLogTerm = currentData.replicatedLog.termAt(prevLogIndex)akka-entity-replication/Leader.scala at v2.0.0 · lerna-stack/akka-entity-replication
Probably the prevLogIndex
for the follower was LogEntryIndex.initial()
.
I guess that termAt
to find prevLogTerm
should return None
when LogEntryIndex.initial()
is passed.
def termAt(logEntryIndex: LogEntryIndex): Option[Term] = logEntryIndex match { case initialLogIndex if initialLogIndex == LogEntryIndex.initial() => Option(Term.initial()) case `ancestorLastIndex` => Option(ancestorLastTerm) case logEntryIndex => get(logEntryIndex).map(_.term) }akka-entity-replication/ReplicatedLog.scala at v2.0.0 · lerna-stack/akka-entity-replication
Suppose: An entity belonging to a leader has some ProcessCommand
in its mailbox (might happen on heavy load)
Ready
state) executes its command handler for the existing one ProcessCommand
in its mailbox.Replicate
message to RaftActor A and then waits for a replication result (ReplicationSucceeded
, ReplicationFaield
, or Replica
) on the WaitForReplication
state.Replicate
message from Entity X because RaftActor A is a follower.Replica
to Entity X eventually.WaitForReplication
state) receives Replica
, becomes the Ready
state, and then un-stash all stashed messages.ProcessCommand
before, it repeats the above behavior.RaftActor (Follower and Candidate, not Leader) replies to an entity with a ReplicationFaield
message instead if it receives a Replicate
message.
Simple retries may induce failures
case RaftProtocol.RecoveryTimeout => context.log.info( "Entity (name: {}) recovering timed out. It will be retried later.", setup.entityContext.entityId, ) // TODO: Enable backoff to prevent cascade failures throw RaftProtocol.EntityRecoveryTimeoutException(context.self.path)
override def stateReceive(receive: Receive, message: Any): Unit = message match { case RecoveryTimeout => // to restart // TODO: BackoffSupervisor を使ってカスケード障害を回避する log.info("Entity (name: {}) recovering timed out. It will be retried later.", self.path.name) throw EntityRecoveryTimeoutException(self.path)
Once events or snapshots have been deleted, a rollback to a timestamp that requires such deleted events or snapshots is impossible. The rollback tool can't detect such deletions yet. If such a timestamp is specified, the rollback tool will delete all events and snapshots of the target Raft shard (or persistent actors of the target Raft shard will be inconsistent state).
The rollback tool can return a failure in rollback preparations (RaftShardRollback.prepareRollback
) to address this issue. The tool first finds a sequence number for the rollback timestamp, and then it has a chance to verify the sequence number is valid for a rollback. In the verification, the tool should determine whether a persistent actor can roll back to the sequence number.
RaftShardRollback.prepareRollback
: https://github.com/lerna-stack/akka-entity-replication/blob/v2.2.0/rollback-tool-cassandra/src/main/scala/lerna/akka/entityreplication/rollback/RaftShardRollback.scala#L29-L50RaftShardRollback.prepareRaftActorsRollback
: https://github.com/lerna-stack/akka-entity-replication/blob/v2.2.0/rollback-tool-cassandra/src/main/scala/lerna/akka/entityreplication/rollback/RaftShardRollback.scala#L52-L75RaftShardRollback.prepareSnapshotStoresRollback
: https://github.com/lerna-stack/akka-entity-replication/blob/v2.2.0/rollback-tool-cassandra/src/main/scala/lerna/akka/entityreplication/rollback/RaftShardRollback.scala#L77-L113RaftShardRollback.prepareSnapshotSyncManagersRollback
: https://github.com/lerna-stack/akka-entity-replication/blob/v2.2.0/rollback-tool-cassandra/src/main/scala/lerna/akka/entityreplication/rollback/RaftShardRollback.scala#L115-L141RaftShardRollback.prepareCommitLogStoreActorRollback
: https://github.com/lerna-stack/akka-entity-replication/blob/v2.2.0/rollback-tool-cassandra/src/main/scala/lerna/akka/entityreplication/rollback/RaftShardRollback.scala#L143-L187Dead letters about ReplicationRegion$Passivate
like the below continue:
14:11:07.307 INFO ip-***-2 akka.actor.LocalActorRef system--akka.actor.default-dispatcher-25 Message [lerna.akka.entityreplication.ReplicationRegion$Passivate] to Actor[akka://my-system/system/sharding/raft-shard-***-replica-group-3/38/38#-115359973] was unhandled. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
lerna-sample-account-app
v2022.12.0
https://github.com/lerna-stack/lerna-sample-account-app/tree/v2022.12.0
diff --git a/app/application/src/main/scala/myapp/application/account/BankAccountBehavior.scala b/app/application/src/main/scala/myapp/application/account/BankAccountBehavior.scala
index 9282451..529b879 100644
--- a/app/application/src/main/scala/myapp/application/account/BankAccountBehavior.scala
+++ b/app/application/src/main/scala/myapp/application/account/BankAccountBehavior.scala
@@ -251,8 +251,10 @@ object BankAccountBehavior extends AppTypedActorLogging {
case GetBalance(replyTo) =>
Effect.reply(replyTo)(AccountBalance(balance))
case ReceiveTimeout() =>
+ println(s"BankAccountBehavior(${accountNo.value}) is passivating.")
Effect.passivate().thenNoReply()
case Stop() =>
+ println(s"BankAccountBehavior(${accountNo.value}) stopped.")
Effect.stopLocally()
}
@@ -386,7 +388,8 @@ object BankAccountBehavior extends AppTypedActorLogging {
// This is highly recommended to identify the source of log outputs
context.setLoggerName(BankAccountBehavior.getClass)
// ReceiveTimeout will trigger Effect.passivate()
- context.setReceiveTimeout(1.minute, ReceiveTimeout())
+ context.setReceiveTimeout(5.seconds, ReceiveTimeout())
+ println(s"BankAccountBehavior(${entityContext.entityId}) is starting.")
ReplicatedEntityBehavior[Command, DomainEvent, Account](
entityContext,
emptyState = Account(
diff --git a/app/entrypoint/src/main/resources/application.conf b/app/entrypoint/src/main/resources/application.conf
index cad02d5..36e4604 100644
--- a/app/entrypoint/src/main/resources/application.conf
+++ b/app/entrypoint/src/main/resources/application.conf
@@ -29,8 +29,12 @@ myapp {
akka {
actor {
provider = "cluster"
+ debug.unhandled = on
}
+ log-dead-letters = on
+ log-dead-letters-suspend-duration = 30 seconds
+
remote {
artery {
canonical {
diff --git a/app/utility/src/main/resources/logback.xml b/app/utility/src/main/resources/logback.xml
index 510896e..820c8de 100644
--- a/app/utility/src/main/resources/logback.xml
+++ b/app/utility/src/main/resources/logback.xml
@@ -8,7 +8,8 @@
<!-- <logger level="DEBUG" name="lerna.akka.entityreplication" />-->
<logger level="INFO" name="myapp" />
- <logger level="INFO" name="akka" />
+ <logger level="DEBUG" name="akka" />
+ <logger level="INFO" name="akka.cluster" />
<root level="WARN">
<appender-ref ref="STDOUT"/>
diff --git a/scripts/start-app-1.sh b/scripts/start-app-1.sh
index ffdf39d..996c2fc 100644
--- a/scripts/start-app-1.sh
+++ b/scripts/start-app-1.sh
@@ -21,4 +21,7 @@ sbt \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.url='jdbc:mysql://127.0.0.2:3306/myapp-tenant-b' \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.user='dbuser_b' \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.password='dbpass@b' \
+-Dlerna.akka.entityreplication.raft.compaction.log-size-threshold=30 \
+-Dlerna.akka.entityreplication.raft.compaction.preserve-log-size=3 \
+-Dlerna.akka.entityreplication.raft.compaction.log-size-check-interval=10s \
entrypoint/run
diff --git a/scripts/start-app-2.sh b/scripts/start-app-2.sh
index c5f9a72..d895f26 100644
--- a/scripts/start-app-2.sh
+++ b/scripts/start-app-2.sh
@@ -17,4 +17,7 @@ sbt \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.url='jdbc:mysql://127.0.0.2:3306/myapp-tenant-b' \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.user='dbuser_b' \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.password='dbpass@b' \
+-Dlerna.akka.entityreplication.raft.compaction.log-size-threshold=20 \
+-Dlerna.akka.entityreplication.raft.compaction.preserve-log-size=3 \
+-Dlerna.akka.entityreplication.raft.compaction.log-size-check-interval=10s \
entrypoint/run
diff --git a/scripts/start-app-3.sh b/scripts/start-app-3.sh
index bc7262e..6a152af 100644
--- a/scripts/start-app-3.sh
+++ b/scripts/start-app-3.sh
@@ -17,4 +17,7 @@ sbt \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.url='jdbc:mysql://127.0.0.2:3306/myapp-tenant-b' \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.user='dbuser_b' \
-Dmyapp.readmodel.rdbms.tenants.tenant-b.db.password='dbpass@b' \
+-Dlerna.akka.entityreplication.raft.compaction.log-size-threshold=10 \
+-Dlerna.akka.entityreplication.raft.compaction.preserve-log-size=3 \
+-Dlerna.akka.entityreplication.raft.compaction.log-size-check-interval=10s \
entrypoint/run
lerna-sample-account-app
This script doesn't stop automatically.
After a certain time (around 30s~60s), please stop the script.
#!/usr/bin/env bash
set -e
while :
do
curl \
--silent \
--show-error \
--request 'POST' \
--header 'X-Tenant-Id: tenant-a' \
--url "http://127.0.0.1:9001/accounts/$(date '+%s')/deposit?transactionId=$(date '+%s')&amount=100"
sleep 0.5s
done
Some of the nodes (apps) log dead letters as below, and continue that:
BankAccountBehavior(1680662388) is passivating.
BankAccountBehavior(1680662432) is passivating.
2023-04-05 12:08:44.184 INFO akka.actor.LocalActorRef - - - Message [lerna.akka.entityreplication.ReplicationRegion$Passivate] to Actor[akka://MyAppSystem/system/sharding/raft-shard-BankAccount-tenant-a-replica-group-3/74/74#-934970431] was unhandled. [1004] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
2023-04-05 12:08:44.185 INFO akka.actor.LocalActorRef - - - Message [lerna.akka.entityreplication.ReplicationRegion$Passivate] to Actor[akka://MyAppSystem/system/sharding/raft-shard-BankAccount-tenant-a-replica-group-3/74/74#-934970431] was unhandled. [1005] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
An entity on a follower is running even after the corresponding entity on the leader was passivated. This could happen in the following scenarios:
ReplicationRegion$Passivate
such that the sender entity will be passivated eventually.CI doesn't detect some test failures.
Reproduction is [DONT MERGE] CI will fail due to `java.lang.ExceptionInInitializerError:` by xirc · Pull Request #148 · lerna-stack/akka-entity-replication.
Using if always()
might be better to detect test failures on CI.
The related code and docs are the following:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.