Coder Social home page Coder Social logo

akkadotnet / akka.persistence.postgresql Goto Github PK

View Code? Open in Web Editor NEW
32.0 11.0 35.0 2.38 MB

Akka.Persistence.PostgreSql provider

License: Apache License 2.0

Batchfile 0.02% F# 7.19% Shell 3.90% C# 86.72% PowerShell 2.17%
akka postgresql akkadotnet

akka.persistence.postgresql's Introduction

Akka.Persistence.PostgreSql

Akka Persistence journal and snapshot store backed by PostgreSql database.

Configuration

Both journal and snapshot store share the same configuration keys (however they resides in separate scopes, so they are definied distinctly for either journal or snapshot store):

Remember that connection string must be provided separately to Journal and Snapshot Store.

akka.persistence{
	journal {
		plugin = "akka.persistence.journal.postgresql"
		postgresql {
			# qualified type name of the PostgreSql persistence journal actor
			class = "Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"

			# dispatcher used to drive journal actor
			plugin-dispatcher = "akka.actor.default-dispatcher"

			# connection string used for database access
			connection-string = ""

			# default SQL commands timeout
			connection-timeout = 30s

			# PostgreSql schema name to table corresponding with persistent journal
			schema-name = public

			# PostgreSql table corresponding with persistent journal
			table-name = event_journal

			# should corresponding journal table be initialized automatically
			auto-initialize = off
			
			# timestamp provider used for generation of journal entries timestamps
			timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
		
			# metadata table
			metadata-table-name = metadata

			# defines column db type used to store payload. Available option: BYTEA (default), JSON, JSONB
			stored-as = BYTEA

			# Setting used to toggle sequential read access when loading large objects
			# from journals and snapshot stores.
			sequential-access = off

			# When turned on, persistence will use `BIGINT` and `GENERATED ALWAYS AS IDENTITY`
			# for journal table schema creation.
			# NOTE: This only affects newly created tables, as such, it should not affect any
			#       existing database.
			#
			# !!!!! WARNING !!!!!
			# To use this feature, you have to have PorsgreSql version 10 or above
			use-bigint-identity-for-ordering-column = off

			# Setting used to change size of the tags column in persistent journal table
			tags-column-size = 2000
		}
	}

	snapshot-store {
		plugin = "akka.persistence.snapshot-store.postgresql"
		postgresql {
			# qualified type name of the PostgreSql persistence journal actor
			class = "Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql"

			# dispatcher used to drive journal actor
			plugin-dispatcher = ""akka.actor.default-dispatcher""

			# connection string used for database access
			connection-string = ""

			# default SQL commands timeout
			connection-timeout = 30s

			# PostgreSql schema name to table corresponding with persistent journal
			schema-name = public

			# PostgreSql table corresponding with persistent journal
			table-name = snapshot_store

			# should corresponding journal table be initialized automatically
			auto-initialize = off
			
			# defines column db type used to store payload. Available option: BYTEA (default), JSON, JSONB
			stored-as = BYTEA

			# Setting used to toggle sequential read access when loading large objects
			# from journals and snapshot stores.
			sequential-access = off
		}
	}
}

Table Schema

PostgreSql persistence plugin defines a default table schema used for journal, snapshot store and metadate table.

CREATE TABLE {your_journal_table_name} (
	ordering BIGSERIAL NOT NULL PRIMARY KEY,
    persistence_id VARCHAR(255) NOT NULL,
    sequence_nr BIGINT NOT NULL,
    is_deleted BOOLEAN NOT NULL,
    created_at BIGINT NOT NULL,
    manifest VARCHAR(500) NOT NULL,
    payload BYTEA NOT NULL,
    tags VARCHAR(100) NULL,
    serializer_id INTEGER NULL,
    CONSTRAINT {your_journal_table_name}_uq UNIQUE (persistence_id, sequence_nr)
);

CREATE TABLE {your_snapshot_table_name} (
    persistence_id VARCHAR(255) NOT NULL,
    sequence_nr BIGINT NOT NULL,
    created_at BIGINT NOT NULL,
    manifest VARCHAR(500) NOT NULL,
    payload BYTEA NOT NULL,
    serializer_id INTEGER NULL,
    CONSTRAINT {your_snapshot_table_name}_pk PRIMARY KEY (persistence_id, sequence_nr)
);

CREATE TABLE {your_metadata_table_name} (
    persistence_id VARCHAR(255) NOT NULL,
    sequence_nr BIGINT NOT NULL,
    CONSTRAINT {your_metadata_table_name}_pk PRIMARY KEY (persistence_id, sequence_nr)
);

Note that if you turn on the akka.persistence.journal.postgresql.use-bigint-identity-for-ordering-column flag, the journal table schema will be altered to the latest recommended primary key setting.

CREATE TABLE {your_journal_table_name} (
	ordering BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    persistence_id VARCHAR(255) NOT NULL,
    sequence_nr BIGINT NOT NULL,
    is_deleted BOOLEAN NOT NULL,
    created_at BIGINT NOT NULL,
    manifest VARCHAR(500) NOT NULL,
    payload BYTEA NOT NULL,
    tags VARCHAR(100) NULL,
    serializer_id INTEGER NULL,
    CONSTRAINT {your_journal_table_name}_uq UNIQUE (persistence_id, sequence_nr)
);

Since this script is only run once during table generation, we will not provide any migration path for this change, any migration is left as an exercise for the user.

Migration

From 1.1.0 to 1.3.1

ALTER TABLE {your_journal_table_name} ADD COLUMN serializer_id INTEGER NULL;
ALTER TABLE {your_snapshot_table_name} ADD COLUMN serializer_id INTEGER NULL;

From 1.0.6 to 1.1.0

CREATE TABLE {your_metadata_table_name} (
    persistence_id VARCHAR(255) NOT NULL,
    sequence_nr BIGINT NOT NULL,
    CONSTRAINT {your_metadata_table_name}_pk PRIMARY KEY (persistence_id, sequence_nr)
);

ALTER TABLE {your_journal_table_name} DROP CONSTRAINT {your_journal_table_name}_pk;
ALTER TABLE {your_journal_table_name} ADD COLUMN ordering BIGSERIAL NOT NULL PRIMARY KEY;
ALTER TABLE {your_journal_table_name} ADD COLUMN tags VARCHAR(100) NULL;
ALTER TABLE {your_journal_table_name} ADD CONSTRAINT {your_journal_table_name}_uq UNIQUE (persistence_id, sequence_nr);
ALTER TABLE {your_journal_table_name} ADD COLUMN created_at_temp BIGINT NOT NULL;

UPDATE {your_journal_table_name} SET created_at_temp=extract(epoch from create_at);

ALTER TABLE {your_journal_table_name} DROP COLUMN create_at;
ALTER TABLE {your_journal_table_name} DROP COLUMN created_at_ticks;
ALTER TABLE {your_journal_table_name} RENAME COLUMN created_at_temp TO create_at;

Tests

The PostgreSql tests are packaged and run as part of the default "All" build task.

In order to run the tests, you must do the following things:

  1. Download and install PostgreSql from: http://www.postgresql.org/download/
  2. Install PostgreSql with the default settings. The default connection string uses the following credentials:
  3. Username: postgres
  4. Password: postgres
  5. A custom app.config file can be used and needs to be placed in the same folder as the dll

or run postgres in docker

docker run -d --rm --name=akka-postgres-db -p 5432:5432 -l deployer=akkadotnet -e POSTGRES_USER=postgres -e POSTGRES_PASSWORD=postgres postgres:9.6

akka.persistence.postgresql's People

Contributors

aaronontheweb avatar alexvaluyskiy avatar arkatufus avatar chillitom avatar cumpsd avatar danthar avatar dependabot[bot] avatar eaba avatar gmcelhanon avatar goltom avatar graemebradbury avatar heynickc avatar horusiath avatar igorfedchenko avatar jarlrasm avatar jimmyhannon avatar larsus avatar mrhockeymonkey avatar rafalpiotrowski avatar rogeralsing avatar sean-gilliam avatar seankilleen avatar worldspawn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka.persistence.postgresql's Issues

FAKE bug - can't access denied during routine NuGet packaging and cleanup

[05:21:02][Step 1/1] [WARNING][6/26/2015 5:21:01 AM][Thread 0004][akka://PostgreSqlSnapshotStoreSpec/user] DeadLetter from [akka://PostgreSqlSnapshotStoreSpec/user] to [akka://PostgreSqlSnapshotStoreSpec/user]: <Received dead system message: <DeathWatchNotification>: [akka://PostgreSqlSnapshotStoreSpec/user/testActor76], ExistenceConfirmed=True, AddressTerminated=False>
[05:21:02][Step 1/1] Finished Target: RunTests
[05:21:02][Step 1/1] Starting Target: Nuget (==> BuildRelease)
[05:21:02][Step 1/1] Target: Nuget
[05:21:02][Step 1/1] Creating nuget packages for D:\BuildAgent\work\29e799f160213a61\src\Akka.Persistence.PostgreSql\Akka.Persistence.PostgreSql.nuspec
[05:21:02][Step 1/1] Creating D:\BuildAgent\work\29e799f160213a61\bin\build
[05:21:02][Step 1/1] Creating D:\BuildAgent\work\29e799f160213a61\bin\build\src\
[05:21:02][Step 1/1] D:\BuildAgent\work\29e799f160213a61\bin\build\src\obj does not exist.
[05:21:02][Step 1/1] D:\BuildAgent\work\29e799f160213a61\bin\build\src\bin does not exist.
[05:21:02][Step 1/1] Task: NuGet D:\BuildAgent\work\29e799f160213a61\src\Akka.Persistence.PostgreSql\Akka.Persistence.PostgreSql.nuspec
[05:21:02][Step 1/1] Creating .nuspec file at D:\BuildAgent\work\29e799f160213a61\bin\build\Akka.Persistence.PostgreSql.1.0.4.1-beta.nuspec
[05:21:02][Step 1/1] Created nuspec file D:\BuildAgent\work\29e799f160213a61\bin\build\Akka.Persistence.PostgreSql.1.0.4.1-beta.nuspec
[05:21:02][Step 1/1] D:\BuildAgent\work\29e799f160213a61\src\.nuget\NuGet.exe pack -Symbols -Version 1.0.4.1-beta -OutputDirectory "D:\BuildAgent\work\29e799f160213a61\bin\nuget" "Akka.Persistence.PostgreSql.1.0.4.1-beta.nuspec"-Properties Configuration="Release"
[05:21:03][Step 1/1] Deleting D:\BuildAgent\work\29e799f160213a61\bin\build\Akka.Persistence.PostgreSql.1.0.4.1-beta.nuspec
[05:21:03][Step 1/1] Deleting D:\BuildAgent\work\29e799f160213a61\bin\build
[05:21:03][Step 1/1] Task failed with The directory is not empty.
[05:21:03][Step 1/1] 
[05:21:03][Step 1/1] Retry.
[05:21:03][Step 1/1] Task failed with Access to the path 'D:\BuildAgent\work\29e799f160213a61\bin\build\lib\net45' is denied.
[05:21:03][Step 1/1] Retry.
[05:21:03][Step 1/1] Task failed with Access to the path 'D:\BuildAgent\work\29e799f160213a61\bin\build\lib\net45' is denied.
[05:21:03][Step 1/1] Retry.
[05:21:04][Step 1/1] Running build failed.
[05:21:04][Step 1/1] ##teamcity[buildStatus status='FAILURE' text='System.UnauthorizedAccessException: Access to the path |'D:\BuildAgent\work\29e799f160213a61\bin\build\lib\net45|' is denied.   at System.IO.__Error.WinIOError(Int32 errorCode, String maybeFullPath)   at System.IO.FileSystemEnumerableIterator`1.AddSearchableDirsToStack(SearchData localSearchData)   at System.IO.FileSystemEnumerableIterator`1.MoveNext()   at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.takeInner@667|[T,TResult|](ConcatEnumerator`2 x, Unit unitVar0)   at Microsoft.FSharp.Collections.SeqModule.ToList|[T|](IEnumerable`1 source)   at Fake.Globbing.buildPaths(FSharpList`1 acc, FSharpList`1 input) in C:\code\fake\src\app\FakeLib\Globbing\Globbing.fs:line 47   at Fake.Globbing.search(String baseDir, String input) in C:\code\fake\src\app\FakeLib\Globbing\Globbing.fs:line 81   at Fake.FileSystem.System-Collections-Generic-IEnumerable-1-GetEnumerator@39-2.GenerateNext(IEnumerable`1& next) in C:\code\fake\src\app\FakeLib\Globbing\FileSystem.fs:line 40   at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.MoveNextImpl()   at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.System-Collections-IEnumerator-MoveNext()   at Microsoft.FSharp.Collections.IEnumerator.next@185|[T|](FSharpFunc`2 f, IEnumerator`1 e, FSharpRef`1 started, Unit unitVar0)   at Microsoft.FSharp.Collections.IEnumerator.filter@180.System-Collections-IEnumerator-MoveNext()   at Microsoft.FSharp.Collections.IEnumerator.next@185|[T|](FSharpFunc`2 f, IEnumerator`1 e, FSharpRef`1 started, Unit unitVar0)   at Microsoft.FSharp.Collections.IEnumerator.filter@180.System-Collections-IEnumerator-MoveNext()   at Microsoft.FSharp.Collections.SeqModule.Iterate|[T|](FSharpFunc`2 action, IEnumerable`1 source)   at Fake.FileHelper.DeleteDir(String path) in C:\code\fake\src\app\FakeLib\FileHelper.fs:line 46   at [email protected](b _arg2) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 170   at Fake.TaskRunnerHelper.runWithRetries|[a|](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32   at Fake.TaskRunnerHelper.runWithRetries|[a|](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32   at Fake.TaskRunnerHelper.runWithRetries|[a|](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32   at FSI_0001.Build.removeDir@168(String dir) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 172   at FSI_0001.Build.createNugetPackages|[a|](a _arg1) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 227   at [email protected](Unit _arg1) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 274   at Fake.TargetHelper.runSingleTarget(TargetTemplate`1 target) in C:\code\fake\src\app\FakeLib\TargetHelper.fs:line 411']
[05:21:04][Step 1/1] Error:
[05:21:04][Step 1/1] System.UnauthorizedAccessException: Access to the path 'D:\BuildAgent\work\29e799f160213a61\bin\build\lib\net45' is denied.
[05:21:04][Step 1/1]at System.IO.__Error.WinIOError(Int32 errorCode, String maybeFullPath)
[05:21:04][Step 1/1]at System.IO.FileSystemEnumerableIterator`1.AddSearchableDirsToStack(SearchData localSearchData)
[05:21:04][Step 1/1]at System.IO.FileSystemEnumerableIterator`1.MoveNext()
[05:21:04][Step 1/1]at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.takeInner@667[T,TResult](ConcatEnumerator`2 x, Unit unitVar0)
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.SeqModule.ToList[T](IEnumerable`1 source)
[05:21:04][Step 1/1]at Fake.Globbing.buildPaths(FSharpList`1 acc, FSharpList`1 input) in C:\code\fake\src\app\FakeLib\Globbing\Globbing.fs:line 47
[05:21:04][Step 1/1]at Fake.Globbing.search(String baseDir, String input) in C:\code\fake\src\app\FakeLib\Globbing\Globbing.fs:line 81
[05:21:04][Step 1/1]at Fake.FileSystem.System-Collections-Generic-IEnumerable-1-GetEnumerator@39-2.GenerateNext(IEnumerable`1& next) in C:\code\fake\src\app\FakeLib\Globbing\FileSystem.fs:line 40
[05:21:04][Step 1/1]at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.MoveNextImpl()
[05:21:04][Step 1/1]at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.System-Collections-IEnumerator-MoveNext()
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.IEnumerator.next@185[T](FSharpFunc`2 f, IEnumerator`1 e, FSharpRef`1 started, Unit unitVar0)
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.IEnumerator.filter@180.System-Collections-IEnumerator-MoveNext()
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.IEnumerator.next@185[T](FSharpFunc`2 f, IEnumerator`1 e, FSharpRef`1 started, Unit unitVar0)
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.IEnumerator.filter@180.System-Collections-IEnumerator-MoveNext()
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.SeqModule.Iterate[T](FSharpFunc`2 action, IEnumerable`1 source)
[05:21:04][Step 1/1]at Fake.FileHelper.DeleteDir(String path) in C:\code\fake\src\app\FakeLib\FileHelper.fs:line 46
[05:21:04][Step 1/1]at [email protected](b _arg2) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 170
[05:21:04][Step 1/1]at Fake.TaskRunnerHelper.runWithRetries[a](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32
[05:21:04][Step 1/1]at Fake.TaskRunnerHelper.runWithRetries[a](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32
[05:21:04][Step 1/1]at Fake.TaskRunnerHelper.runWithRetries[a](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32
[05:21:04][Step 1/1]at FSI_0001.Build.removeDir@168(String dir) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 172
[05:21:04][Step 1/1]at FSI_0001.Build.createNugetPackages[a](a _arg1) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 227
[05:21:04][Step 1/1]at [email protected](Unit _arg1) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 274
[05:21:04][Step 1/1]at Fake.TargetHelper.runSingleTarget(TargetTemplate`1 target) in C:\code\fake\src\app\FakeLib\TargetHelper.fs:line 411
[05:21:04][Step 1/1] 
[05:21:04][Step 1/1] ---------------------------------------------------------------------
[05:21:04][Step 1/1] Build Time Report
[05:21:04][Step 1/1] ---------------------------------------------------------------------
[05:21:04][Step 1/1] TargetDuration
[05:21:04][Step 1/1] --------------
[05:21:04][Step 1/1] Clean 00:00:00.0014046
[05:21:04][Step 1/1] AssemblyInfo  00:00:00.0130088
[05:21:04][Step 1/1] RestorePackages   00:00:03.9659438
[05:21:04][Step 1/1] Build 00:00:01.6350134
[05:21:04][Step 1/1] CopyOutput00:00:00.0212241
[05:21:04][Step 1/1] CleanNuget00:00:00.0010638
[05:21:04][Step 1/1] BuildRelease  00:00:00.0000556
[05:21:04][Step 1/1] CleanTests00:00:00.0001280
[05:21:04][Step 1/1] RunTests  00:00:26.9601007
[05:21:04][Step 1/1] Total:00:00:34.0761731
[05:21:04][Step 1/1] ---------------------------------------------------------------------
[05:21:04][Step 1/1] Status:   Failure
[05:21:04][Step 1/1]   1) System.UnauthorizedAccessException: Access to the path 'D:\BuildAgent\work\29e799f160213a61\bin\build\lib\net45' is denied.
[05:21:04][Step 1/1] ---------------------------------------------------------------------
[05:21:04][Step 1/1]at System.IO.__Error.WinIOError(Int32 errorCode, String maybeFullPath)
[05:21:04][Step 1/1]at System.IO.FileSystemEnumerableIterator`1.AddSearchableDirsToStack(SearchData localSearchData)
[05:21:04][Step 1/1]at System.IO.FileSystemEnumerableIterator`1.MoveNext()
[05:21:04][Step 1/1]at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.takeInner@667[T,TResult](ConcatEnumerator`2 x, Unit unitVar0)
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.SeqModule.ToList[T](IEnumerable`1 source)
[05:21:04][Step 1/1]at Fake.Globbing.buildPaths(FSharpList`1 acc, FSharpList`1 input) in C:\code\fake\src\app\FakeLib\Globbing\Globbing.fs:line 47
[05:21:04][Step 1/1]at Fake.Globbing.search(String baseDir, String input) in C:\code\fake\src\app\FakeLib\Globbing\Globbing.fs:line 81
[05:21:04][Step 1/1]at Fake.FileSystem.System-Collections-Generic-IEnumerable-1-GetEnumerator@39-2.GenerateNext(IEnumerable`1& next) in C:\code\fake\src\app\FakeLib\Globbing\FileSystem.fs:line 40
[05:21:04][Step 1/1]at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.MoveNextImpl()
[05:21:04][Step 1/1]at Microsoft.FSharp.Core.CompilerServices.GeneratedSequenceBase`1.System-Collections-IEnumerator-MoveNext()
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.IEnumerator.next@185[T](FSharpFunc`2 f, IEnumerator`1 e, FSharpRef`1 started, Unit unitVar0)
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.IEnumerator.filter@180.System-Collections-IEnumerator-MoveNext()
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.IEnumerator.next@185[T](FSharpFunc`2 f, IEnumerator`1 e, FSharpRef`1 started, Unit unitVar0)
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.IEnumerator.filter@180.System-Collections-IEnumerator-MoveNext()
[05:21:04][Step 1/1]at Microsoft.FSharp.Collections.SeqModule.Iterate[T](FSharpFunc`2 action, IEnumerable`1 source)
[05:21:04][Step 1/1]at Fake.FileHelper.DeleteDir(String path) in C:\code\fake\src\app\FakeLib\FileHelper.fs:line 46
[05:21:04][Step 1/1]at [email protected](b _arg2) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 170
[05:21:04][Step 1/1]at Fake.TaskRunnerHelper.runWithRetries[a](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32
[05:21:04][Step 1/1]at Fake.TaskRunnerHelper.runWithRetries[a](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32
[05:21:04][Step 1/1]at Fake.TaskRunnerHelper.runWithRetries[a](FSharpFunc`2 f, Int32 retries) in C:\code\fake\src\app\FakeLib\TaskRunnerHelper.fs:line 32
[05:21:04][Step 1/1]at FSI_0001.Build.removeDir@168(String dir) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 172
[05:21:04][Step 1/1]at FSI_0001.Build.createNugetPackages[a](a _arg1) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 227
[05:21:04][Step 1/1]at [email protected](Unit _arg1) in D:\BuildAgent\work\29e799f160213a61\build.fsx:line 274
[05:21:04][Step 1/1]at Fake.TargetHelper.runSingleTarget(TargetTemplate`1 target) in C:\code\fake\src\app\FakeLib\TargetHelper.fs:line 411
[05:21:04][Step 1/1] Process exited with code 42
[05:21:04][Step 1/1] Step FAKE Build (Command Line) failed
[05:21:04]Publishing internal artifacts

Ran into this issue with Helios too. Appears to be a regression in a new version of FAKE.

Conversion to .net core

Hi, I have converted this package to .net core (.NET Standard 1.6) locally. I know you have planned this conversion but maybe i can contribute. Do you want my help on this?

Snapshot deletion: null value in column "is_deleted" of relation "snapshot_store" violates not-null constraint

Version Information
Version of Akka.NET? v1.4.45
Which Akka.NET Modules? Akka.Persistence.PostgreSql, Akka.Persistence.PostgreSql.Hosting

Describe the bug

When attempting to delete a snapshot, the following error is thrown by the PostgreSql driver:

warn: Akka.Actor.ActorSystem[0]
      [22/11/09-12:14:05.3196][akka.tcp://xxx@localhost:58378/system/sharding/s1/32][akka://xxx/system/sharding/s1/32][0013]: PersistentShard snapshot failure: [23502: null value in column "is_deleted" of relation "snapshot_store" violates not-null constraint]

To Reproduce

Not sure - reported by user

Expected behavior

Snapshot should be deleted.

Actual behavior

SQL error.

Screenshots
If applicable, add screenshots to help explain your problem.

Environment
Are you running on Linux? Windows? Docker? Which version of .NET?

Additional context

This issue might be related to the users' use of IWithTimers - that came up in the thread on Discord.

Other context provided by the user:

 .WithPostgreSqlPersistence(
                connectionString,
                mode: Akka.Persistence.Hosting.PersistenceMode.Both,
                autoInitialize: true,
                configurator: (b) =>
                {
                   //my taggers
                })

Using two different journals: one for sharding, one regular:

    public static AkkaConfigurationBuilder WithPostgreSqlPersistenceSharding(
        this AkkaConfigurationBuilder builder,
        string connectionString)
    {
        Config config = @$"
            akka.persistence {{
                journal {{
                    sharding {{
                        class = ""Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql""
                        plugin-dispatcher : akka.actor.default-dispatcher
                        connection-string = ""{connectionString}""
                        connection-string-name :
                        connection-timeout : 30s
                        schema-name : public
                        table-name : event_journal
                        auto-initialize : on
                        stored-as : bytea
                        sequential-access : off
                    }}
                }}
                snapshot-store {{
                    sharding {{
                        class = ""Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql""
                        plugin-dispatcher : akka.actor.default-dispatcher
                        connection-string = ""{connectionString}""
                        connection-string-name :
                        connection-timeout : 30s
                        schema-name : public
                        table-name : snapshot_store
                        auto-initialize : on
                        stored-as : bytea
                        sequential-access : off
                    }}
                }}
            }}";

        return builder.AddHocon(config);
    }

Loading snapshot error

After running my application for a while I reach the 1000 journal limit and a coordinator snapshot is taken.

Every subsequent time I load the application from this point I receive the error below.

I'm wondering if this is a hocon configuration issue?

Many thanks

Setup:

  • Akka 1.3.8
  • Akka.Persistence.PostgreSql 1.3.8
[12:02:19 ERR] Persistence failure when replaying events for persistenceId [/system/sharding/collectCoordinator/singleton/coordinator]. Last known sequence number [0]
System.TypeLoadException: Could not load type 'AA' from assembly 'Akka.Persistence.PostgreSql, Version=1.3.8.0, Culture=neutral, PublicKeyToken=null'.
   at System.RuntimeTypeHandle.GetTypeByName(String name, Boolean throwOnError, Boolean ignoreCase, Boolean reflectionOnly, StackCrawlMarkHandle stackMark, IntPtr pPrivHostBinder, Boolean loadTypeFromPartialName, ObjectHandleOnStack type, ObjectHandleOnStack keepalive)
   at System.RuntimeTypeHandle.GetTypeByName(String name, Boolean throwOnError, Boolean ignoreCase, Boolean reflectionOnly, StackCrawlMark& stackMark, IntPtr pPrivHostBinder, Boolean loadTypeFromPartialName)
   at System.RuntimeType.GetType(String typeName, Boolean throwOnError, Boolean ignoreCase, Boolean reflectionOnly, StackCrawlMark& stackMark)
   at System.Type.GetType(String typeName, Boolean throwOnError)
   at Akka.Persistence.PostgreSql.Snapshot.PostgreSqlQueryExecutor.ReadSnapshot(DbDataReader reader)
   at Akka.Persistence.Sql.Common.Snapshot.AbstractQueryExecutor.SelectSnapshotAsync(DbConnection connection, CancellationToken cancellationToken, String persistenceId, Int64 maxSequenceNr, DateTime maxTimestamp)
   at Akka.Persistence.Sql.Common.Snapshot.SqlSnapshotStore.LoadAsync(String persistenceId, SnapshotSelectionCriteria criteria)
   at Akka.Util.Internal.AtomicState.CallThrough[T](Func`1 task)
   at Akka.Util.Internal.AtomicState.CallThrough[T](Func`1 task)

Hocon:

akka {

    loglevel = INFO,
	
    loggers = ["Akka.Logger.Serilog.SerilogLogger, Akka.Logger.Serilog"],

    actor {
        provider = cluster
    }
    
    remote {
        dot-netty.tcp {
            hostname = "127.0.0.1"
            port = 0
        }
    }

    cluster {

        ...

        auto-down-unreachable-after = 5s
		
        run-coordinated-shutdown-when-down = on

        sharding {
            remember-entities = on
            journal-plugin-id = "akka.persistence.journal.sharding"
            snapshot-plugin-id = "akka.persistence.snapshot-store.sharding"
        }
    }

    persistence {

        journal {
            plugin = "akka.persistence.journal.postgresql"
            postgresql {
                class = "Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"
                plugin-dispatcher = "akka.actor.default-dispatcher"
                connection-string = "User ID=example;Password=example;Host=localhost;Port=5432;Database=example;"
                connection-timeout = 30s
                schema-name = public
                table-name = event_journal
                auto-initialize = off
                timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
                metadata-table-name = metadata
                stored-as = BYTEA
            }
            sharding {
                class = "Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"
                plugin-dispatcher = "akka.actor.default-dispatcher"
                connection-string = "User ID=example;Password=example;Host=localhost;Port=5432;Database=example;"
                connection-timeout = 30s
                schema-name = public
                table-name = sharding_event_journal
                auto-initialize = off
                timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
                metadata-table-name = sharding_metadata
                stored-as = BYTEA
            }
        }

        snapshot-store {
            plugin = "akka.persistence.snapshot-store.postgresql"
            postgresql {
                class = "Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql"
                plugin-dispatcher = "akka.actor.default-dispatcher"
                connection-string = "User ID=example;Password=example;Host=localhost;Port=5432;Database=example;"
                connection-timeout = 30s
                schema-name = public
                table-name = snapshot_store
                auto-initialize = off
                stored-as = BYTEA
            }
            sharding {
                class = "Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql"
                plugin-dispatcher = "akka.actor.default-dispatcher"
                connection-string = "User ID=example;Password=example;Host=localhost;Port=5432;Database=example;"
                connection-timeout = 30s
                schema-name = public
                table-name = sharding_snapshot_store
                auto-initialize = off
                stored-as = BYTEA
            }
        }
    }
}

Move to IDENTITY column for ordering?

[BUG] Akka.Persistence.PostgreSql does not support `writerUUID` field

Version Information
Version of Akka.NET? 1.4.35
Which Akka.NET Modules? Akka.Persistence.PostgreSql

Describe the bug

Related: akkadotnet/akka.net#3811

Since Akka.Persistrence.PostgreSql does its own thing with serialization, the writerUUID column was never added to it. Thus we don't have an easy way of debugging scenarios where multiple nodes are all attempting to write to the same entity. I have this exact problem with a user right now where it appears as though multiple separate users are competing for the same journal entries for the same entity.

To fix this, we're going to need to either:

  1. Update the DDL and table schema - adding a new field.
  2. Change the serialization system for Akka.Persistence.PostgreSql to include this data and avoid a schema migration.

cc @to11mtm should we just ditch this and go with Linq2Db? I'm starting to get that impression.

Use JSON data types for payload column

The idea is to use Postgres specific data types, either JSON or (prefered?) JSONB. Advantages are quite obvious, as with this change, the database will be aware of content of events, giving the possibility to query by their fields.

However there are few challenges here:

  1. What to do with existing blob data? - since schema will change eventually in the near future, changes will occur this way or another. But changing column data type is more serious action. We could potentially introduce some switch mechanism or try to use event adapters for that.
  2. How combine this approach with surrogates. Personally I don't have an experience with serializing/deserializing json data types between PostgreSQL and .NET, but one of the problems we could possibly need to resolve is conversion of things such as IActorRef to be functional after receiving from database. This is basically an anti-pattern, but it should work nonetheless.

Support the driver returning JObject instead of string for Jsonb

Version Information
Version of Akka.NET? 1.15.20
Which Akka.NET Modules? Akka.Persistence.PostgreSql

Describe the bug
A clear and concise description of what the bug is.
If the extensionmethod UseJsonNet from Npgsql.Json.NET 8.0.3 is called. The driver returns JsonB as JObject instead of string. This fails inside the deserialization inside of akka if that is the case.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Links to working reproductions on Github / Gitlab are very much appreciated

Expected behavior
A clear and concise description of what you expected to happen.

Actual behavior
What actually happened and how did it differ from your expectations?

Screenshots
If applicable, add screenshots to help explain your problem.

Environment
Are you running on Linux? Windows? Docker? Which version of .NET?

Additional context
Add any other context about the problem here.

Snapshots - Deserialize Serialization.WithTransport missing?

https://github.com/akkadotnet/Akka.Persistence.PostgreSql/blob/dev/src/Akka.Persistence.PostgreSql/Snapshot/PostgreSqlQueryExecutor.cs#L86

Should this also read:

 // TODO: hack. Replace when https://github.com/akkadotnet/akka.net/issues/3811
if (serializerId.HasValue)
    return Serialization.Deserialize((byte[])payload, serializerId.Value, manifest);

// Support old writes that did not set the serializer id
var deserializer = Serialization.FindSerializerForType(type, Configuration.DefaultSerializer);
return Serialization.WithTransport(Serialization.System, () => deserializer.FromBinary((byte[])payload, type));

Identical to Akka.Persistence.PostgreSql\Journal\PostgreSqlQueryExecutor.cs

Setting SerializerId twice

It looks like this made it into master, harmless but unneeded:

if (serializer != null)
{
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
}
else
{
AddParameter(command, "@SerializerId", DbType.Int32, DBNull.Value);
}

Was already being set 2 lines higher:

if (hasSerializer)
{
AddParameter(command, "@SerializerId", DbType.Int32, serializer.Identifier);
}
else
{
AddParameter(command, "@SerializerId", DbType.Int32, DBNull.Value);
}

Update to persistent queries PR

Once akkadotnet/akka.net#1306 will get merged, it's change set including breaking changes, should be reflected in PostgreSQL plugin. List of things to update:

  • Database initializer for journal should introduce timestamp field of type corresponding to .NET DateTime . In addition payload type columns should be renamed to manifest for both journal and snapshot store databases.
  • Postgres journal should inherit from SqlJournal. Snpashot store should inherit from SqlSnapshotStore.
  • Postgres version of IQueryBuilder should be able to compose queries based on new querying mechanism.
  • IQueryMapper implementation should rename PayloadType parameter to Manifest and add Timestamp parameter.

jsonb serialization snapshot manifest instead of qualified name uses manifest of SerializerWithStringManifest

Version Information
Version of Akka.NET?
1.4.46

Which Akka.NET Modules?
Akka.Persistance.PostgreSql 1.4.46

Describe the bug
jsonb serialization snapshot manifest instead of qualified name uses manifest of SerializerWithStringManifest

Journal serialization works correctly but the snapshot is not.

Important to note that internally between cluster services protobuf serialization is used but only DB is using JSONB

Snapshot:
"market_394870" 1 638060916971832421 "mstate" "{""Data"": {""Name"": ...

Journal:
"market_394870" 1 false 638060916970892832 "Markets.Events.MarketCreated, Markets" "{""Market"": {""Name"": ...

Fix failing unit tests.

Currently the following tests are failing:

  • PostgreSqlJournalQuerySpec.Journal_queried_on_Manifest_returns_events_with_particular_manifest
  • PostgreSqlJournalQuerySpec.Journal_queried_on_PersistenceIdRange_returns_events_for_particular_persistent_ids
  • PostgreSqlJournalQuerySpec.Journal_queried_on_Timestamp_returns_events_occurred_after_or_equal_From_value
  • PostgreSqlJournalQuerySpec.Journal_queried_on_Timestamp_returns_events_occurred_before_To_value
  • PostgreSqlJournalQuerySpec.Journal_queried_on_Timestamp_returns_events_occurred_between_both_range_values
  • PostgreSqlJournalQuerySpec.Journal_queried_using_multiple_hints_should_apply_all_of_them

These needs to be fixed.

Snapshots Deserialize to JObject and then ignored

Related to: #63 and #49

I am currently using Akka.Persistence.PostgreSql version 1.4.17 and have seen issues recovering from snapshots using the JSON format. I am using PersistentFSM<TState, TData, TEvent> which is offered a snapshot which is subsequently ignored because it has not been deserialized to the correct type.

The actor handles this by reading all events from the journal instead so no errors are thrown, indeed its only by chance that I spotted this behavior.

protected override bool ReceiveRecover(object message)
    {
      switch (message)
      {
        case TEvent domainEvent:
          this.StartWith(this.StateName, this.ApplyEvent(domainEvent, this.StateData));
          return true;
        case PersistentFSM.StateChangeEvent stateChangeEvent:
          this.StartWith(this.StatesMap[stateChangeEvent.StateIdentifier], this.StateData, stateChangeEvent.Timeout);
          return true;
        case SnapshotOffer snapshotOffer:
          if (!(snapshotOffer.Snapshot is PersistentFSM.PersistentFSMSnapshot<TData> snapshot))
            return false; // <~~ snapshotOffer.snapshot is Newtonsoft.Json.Linq.JObject but should be PersistentFSM.PersistentFSMSnapshot<TData>
          this.StartWith(this.StatesMap[snapshot.StateIdentifier], snapshot.Data, snapshot.Timeout);
          return true;
        case RecoveryCompleted _:
          this.Initialize();
          this.OnRecoveryCompleted();
          return true;
        default:
          return false;
      }
    }

Reading the snapshot store from the DB I can see that the manifest field is "" as was mentioned to be the issue in #63.

Following through the code I can see the same method mentioned is at fault for not adding the manifest correctly.

    protected override void SetManifestParameters(object snapshot, DbCommand command)
    {
      Type type = snapshot.GetType();
      Serializer serializerForType = this.Serialization.FindSerializerForType(type, this.Configuration.DefaultSerializer);
      string str = "";
      if (serializerForType is SerializerWithStringManifest withStringManifest)
        str = withStringManifest.Manifest(snapshot);
      else if (!serializerForType.IncludeManifest)
        str = type.TypeQualifiedName();
      this.AddParameter(command, "@Manifest", DbType.String, (object) str); // <~~ str = ""
      this.AddParameter(command, "@SerializerId", DbType.Int32, (object) serializerForType.Identifier);
    }

I have also tried switching to BYTEA with a mind to use my own serialization but the lack of manifest again prevents me from doing so. Journal entries looks to have manifest set correctly, this only seems to effect snapshots.

My akka.conf for context:

akka {
    loglevel=DEBUG
    loggers=["Akka.Logger.Serilog.SerilogLogger, Akka.Logger.Serilog"]
    persistence {
        fsm {
            snapshot-after = 3 # for testing purposes, normally 100
        }
        journal {
            plugin = "akka.persistence.journal.postgresql"
            postgresql {
                class = "Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"
                connection-string = "<dbConnectionString>"
                schema-name = public
                table-name = event_journal
                auto-initialize = on
                stored-as = BYTEA
            }
        }    
        snapshot-store {
            plugin = "akka.persistence.snapshot-store.postgresql"
            postgresql {
                class = "Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql"
                connection-string = "<dbConnectionString>"
                schema-name = public
                table-name = snapshot_store
                auto-initialize = on
                stored-as = JSON
            }
        }
    }
}

PR49 breaks BYTEA snapshots

I'm using my own custom protobuf serializer and have run into an issue where the correct manifest information is not stored correctly in the snapshot store.

I've isolated the problem to:

#49

which overrides the behaviour in AbstractQueryExecutor from

        if (serializer is SerializerWithStringManifest)
        {
            manifest = ((SerializerWithStringManifest)serializer).Manifest(snapshot);
        }
        else
        {
            if (serializer.IncludeManifest)
            {
                manifest = snapshotType.TypeQualifiedName();
            }
        }

to the following in PostgreSqlQueryExecutor :

        if (serializer is SerializerWithStringManifest)
        {
            manifest = ((SerializerWithStringManifest)serializer).Manifest(snapshot);
        }
        else if (!serializer.IncludeManifest) \\ <-- this test has been flipped
        {
            manifest = snapshotType.TypeQualifiedName();
        }

This is plainly wrong, and while it may fix JSON issues specific to the JSON implementation of Akka.Persistence.PostgreSql, when using one's own serialization scheme, ultimately causes snapshots to be stored without manifest information, which should (in my case) be the string : Akka.Persistence.Fsm.PersistentFSM+PersistentFSMSnapshot1[[SomeCustomStateType]], Akka.Persistence`.

This causes divergent behaviour from other snapshot/journal plugins, such as sqlite, which, by relying on the correct base class implementation, successfully persist and restore these snapshots.

Recovery can return items out of order

Version Information
Version of Akka.NET? v1.4.32
Which Akka.NET Modules?

Describe the bug
Have seen incidences inside Phobos traces that indicate that the journal is delivering its RecoverySuccess message prior to returning the final ReplayedMessage from inside the journal, resulting in an invalid recovery.

To Reproduce

  1. Create a ReceivePersistentActor that recovers an event type that is not handled as a Command<T>
  2. Persist 100 of this event type
  3. Test to see if an Unhandled version of this event type appears, sent immediately after RecoverySuccess

Expected behavior
RecoverySuccess is the final message sent to the persistent actor after recovering from PostgreSql.

Actual behavior
RecoverySuccess can be sent to the persistent actor prior to one of the events its recovering.

Readme incorrectly lists need for "snapshot" column

This is a continuation of #45 / #46.

While attempting to get snapshots up and running, I saw the following error:

Failed to SaveSnapshot given metadata [SnapshotMetadata<pid: [redacted], seqNr: 1300, timestamp: 0001/01/01>] due to: [Npgsql.PostgresException (0x80004005): 23502: null value in column "snapshot" violates not-null constraint at Npgsql.NpgsqlConnector.

Seeing this, I realized that the issue in the README I discovered in #45 wasn't that it was missing the payload column in the snapshot table, it's that the payload column was meant to replace the snapshot column. At least, I think that's what it is.

When I removed the snapshot column from the schema, the snapshots began saving correctly.

PR incoming in a second to correct this; feel free to accept if you agree with my understanding of the issue.

[PERF] Use DbBatch to increase PersistAll performance

Version Information
Version of Akka.NET: 1.4.35
Which Akka.NET Modules. PostgreSQL

Describe the performance issue
Performance of PersistAll is way to slow, using DbBatch api we can increase performance of PersistAll with 2000%.

Using Postgres LISTEN/NOTIFY for persistence queries

Currently by default all SQL journal implementations allow to perform live-queries on the database. However since SQL databases don't support push notifications, we do that by periodically pulling the latest changeset from the database.

However Postgres implements a feature called LISTEN/NOTIFY which could allow us to signal, when there's a new data awaiting to be read by live queries. Maybe we could use that.

Wrong PayloadIndex in DefaultJournalQueryMapper

I've fixed minor problems (r2-r@e42d8e9) in postgresql plugin and it is almost working, but there is one more thing broken: https://github.com/akkadotnet/akka.net/blob/642a65ffa56857c5e47f2d4cb6f47230f30ece12/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryMapper.cs#L36 - PayloadIndex is hardcoded to 4, but there is a timestamp at given index, at least in case of postgresql journal.

I'm not sure if this is generic issue or specific to postgresql, so I'm reporting it here.

Broken license link in NuGet Package

Version Information
Akka.Persistence.PostgreSql 1.4.25

Describe the bug
The "License Info" link on https://www.nuget.org/packages/Akka.Persistence.PostgreSql/1.4.25 points to https://github.com/AkkaNetContrib/Akka.Persistence.PostgreSql/blob/master/LICENSE which leads to an 404 error page for me.

To Reproduce
Steps to reproduce the behavior:

  1. Go to https://www.nuget.org/packages/Akka.Persistence.PostgreSql/1.4.25
  2. Click on 'License Info' in the right navigation bar

Expected behavior
A correct license link (maybe https://github.com/akkadotnet/Akka.Persistence.PostgreSql/blob/dev/LICENSE.md which seems to work) , or even better, include the license within the NuGet package itself (see https://github.com/NuGet/Samples/blob/main/PackageLicenseFileExample/PackageLicenseFileExample.csproj for an example.)

Actual behavior
I got a 404 error site.

Screenshots
image

Environment
Reproduced using firefox and chrome on Windows 10.

JSON serialization needs adjusting

I've been working on a shareded cluster app that uses postgresql for its persistence store. I tried to use jsonb (or json) as my store-as setting. When I do so, i get an error about it not being able to store the objects used by the sharding engine as they contain self references in their structure.

It is possible to tell the newtonsoft json serializer to deal with references and structural loops like that with some settings, but so far I haven't found a way to override what the json serializers settings.

Would it be possible to expose such, or perhaps just easier to default the serializer to handling self references correctly?

Please indicate that this project is scheduled for deprecation in Akka.Net 1.16

Please describe what you are trying to understand
Please indicate that this project is scheduled for deprecation in Akka.Net 1.16.

Which pages have you looked at?
README.md

What did these pages not make clear?
That Akka.Persistence.Sql would become the successor

How can we do it better?
What information do you need us to communicate on those pages or on others on the site?

Additional suggestions
Any additional suggestions on how we can help?

Intermittently, only latest journal entry is sent to actor as SnapshotOffer

An application has a single Typed Persistent Actor.
This actor is created (and therefore, recovered) right at the start of the application.
The actor does not (yet) implement snapshotting, at all.

In (Pre)Production (never in dev environment), in about 50% of the startups, recovery goes completely wrong.
Instead of receiving the thousands of journal entries, it receives ONLY the latestmost journal entry, but instead of just receiving it, it gets it presented as the Snapshot of a SnapshotOffer.

Most relevant piece of logging:

2019-08-13 12:27:54,032Z DEBUG [5  ] PostgreSqlSnapshotStore                  Started (Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore)
2019-08-13 12:27:54,047Z DEBUG [8  ] PostgreSqlJournal                        Started (Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal)
2019-08-13 12:27:54,625Z DEBUG [8  ] RuntimeActor                             Recovering an Snapshot 
snapshotOffer.ToString: SnapshotOffer<meta: SnapshotMetadata<pid: RuntimeActor, seqNr: 66298, timestamp: 2019/08/09>, snapshot: de7a22d1-bd93-4fef-81c3-00878a9f3517 en-queued at 2019-08-09T15:55:33.9508626+02:00>
snapshotOffer.SnapShot.GetType: Divv.IxIx.TitleCatalogizer.Actors.Runtime.RuntimeActor+EnqueuedWork
snapshotOffer.SnapShot.ToString de7a22d1-bd93-4fef-81c3-00878a9f3517 en-queued at 2019-08-09T15:55:33.9508626+02:00
2019-08-13 12:27:54,750Z DEBUG [5  ] ReplayFilter                             Started (Akka.Persistence.Journal.ReplayFilter)
2019-08-13 12:27:54,766Z DEBUG [5  ] PostgreSqlJournal

Using a client to inspect the 3 tables in postgres, only event_journal is populated, metadata and snapshot_store are empty, 0 rows.

Config:

akka : {
  log-config-on-start : on
  log-dead-letters : on
  stdout-loglevel : DEBUG
  loglevel : DEBUG
  loggers : ["Akka.Logger.log4net.Log4NetLogger, Akka.Logger.log4net","Akka.Event.StandardOutLogger, Akka"]
  suppress-json-serializer-warning : on
  actor : {
    debug : {
      receive : on
      autoreceive : on
      lifecycle : on
      event-stream : on
      unhandled : on
    }
  }
  persistence : {
    journal : {
      plugin : akka.persistence.journal.postgresql
      postgresql : {
        class : "Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"
        plugin-dispatcher : akka.actor.default-dispatcher
        connection-string : "Server=divv-1-pgsql-l.xixi.com;Port=5432;User Id=RtiTitleCatalogizer;Password=****;Database=RtiTitleCatalogizer;SslMode=Disable;"
        schema-name : public
        connection-timeout : 30s
        auto-initialize : on
        timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
        metadata-table-name : metadata
        stored-as : BYTEA
      }
    }
    snapshot-store : {
      plugin : akka.persistence.snapshot-store.postgresql
      postgresql : {
        class : "Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql"
        plugin-dispatcher : akka.actor.default-dispatcher
        connection-string : "Server=divv-1-pgsql-l.xixi.com;Port=5432;User Id=RtiTitleCatalogizer;Password=****;Database=RtiTitleCatalogizer;SslMode=Disable;"
        schema-name : public
        connection-timeout : 30s
        timestamp-provider : "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"
        auto-initialize : on
        stored-as : BYTEA
      }
    }
  }
}

When switching to Sqlite, the actor always correctly recovers.

[Regression] Updating Npgsql to 6.0.1 broke netcoreapp3.1 support for Entity Framework

Our latest Npgsql version bump broke netcoreapp3.1 support. Npgsql 6.0.1 does support netstandard2.0, but Npgsql.EntityFrameworkCore.PostgreSql does not, it only supports net6.0.
In essence, bumping Npgsql to 6.0.1 broke netcoreapp3.1 support for all users who uses Entity Framework in their project.

Will try and see if we can fix this using version range instead of explicit version number.

sequential-access = on causes TCK failures

Version: 1.3.8

Looks like turning on sequential-access causes TCK failures.

Failed: Expected a message of type Akka.Persistence.LoadSnapshotResult, but received {LoadSnapshotFailed<Cause: System.InvalidOperationException: Invalid attempt to read from column ordinal '4'. With CommandBehavior.SequentialAccess, you may only read from column ordinal '5' or greater.
  at Npgsql.BackendMessages.DataRowSequentialMessage+<SeekToColumn>d__2.MoveNext () [0x0005e] in <2ff537c166614a2caa75e138c5a11ce5>:0 
--- End of stack trace from previous location where exception was thrown ---
  at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw () [0x0000c] in <e29f59307e5e4e33ab73bb6530764ec9>:0 
  at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess (System.Threading.Tasks.Task task) [0x0003e] in <e29f59307e5e4e33ab73bb6530764ec9>:0 
  at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification (System.Threading.Tasks.Task task) [0x00028] in <e29f59307e5e4e33ab73bb6530764ec9>:0 
  at System.Runtime.CompilerServices.TaskAwaiter.ValidateEnd (System.Threading.Tasks.Task task) [0x00008] in <e29f59307e5e4e33ab73bb6530764ec9>:0 
  at System.Runtime.CompilerServices.TaskAwaiter.GetResult () [0x00000] in <e29f59307e5e4e33ab73bb6530764ec9>:0 
  at Npgsql.BackendMessages.DataRowMessage+<SeekToColumnStart>d__17.MoveNext () [0x00078] in <2ff537c166614a2caa75e138c5a11ce5>:0 

Akka 1.4 support

I'm trying to use this library with Akka 1.4.14 and .net5.0, but receive this exception when replaying events

System.MissingMethodException: Method not found: 'Void Akka.Persistence.Persistent..ctor(System.Object, Int64, System.String, System.String, Boolean, Akka.Actor.IActorRef, System.String)'.

I pulled in the package locally, upgraded to netstandard2.0 and updated its dependencies, and my testing has been successful so far. Will you accept a pr with these updates?

How does Connectionsstring looks like?

I want to use this library. How does a valid connectionsstring in the HOCON(App.config) looks like?

I tried

connection-string = "Host=localhost;Port=32771;Database=event_db;Username=postgres"

But i doesn't work! I get a restore Timeout.

I only need to create a database and this library creates the rest for itself ?

rubiktubik

Racy Akka.Persistence.Query specs

PostgreSqlEventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_MaxLong

Failed: Expected a message of type Akka.Streams.TestKit.TestSubscriber+OnComplete, but received {TestSubscriber.OnNext(g1-1)} (type Akka.Streams.TestKit.TestSubscriber+OnNext`1[System.Object]) instead  from [akka://test/user/StreamSupervisor-5/Flow-0-0-select#1854348898]
Expected: True
Actual:   False
  at Akka.TestKit.Xunit2.XunitAssertions.Fail (System.String format, System.Object[] args) <0x4159bb60 + 0x0002f> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T] (Nullable`1 timeout, System.Action`2 assert, System.String hint, Boolean shouldLog) <0x4146b370 + 0x00580> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T] (Nullable`1 timeout, System.Action`1 msgAssert, System.Action`1 senderAssert, System.String hint) <0x4146a390 + 0x004a3> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase.InternalExpectMsg[T] (Nullable`1 timeout, System.Action`1 msgAssert, System.String hint) <0x41469dd0 + 0x0004f> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase.ExpectMsg[T] (Nullable`1 duration, System.String hint) <0x41469920 + 0x00093> in <filename unknown>:0 
  at Akka.Streams.TestKit.TestSubscriber+ManualProbe`1[T].ExpectComplete () <0x41577660 + 0x00037> in <filename unknown>:0 
  at Akka.Persistence.Sql.TestKit.EventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_empty_stream_for_cleaned_journal_from_0_to_MaxLong () <0x4159b5d0 + 0x001df> in <filename unknown>:0 
  at (wrapper managed-to-native) System.Reflection.MonoMethod:InternalInvoke (System.Reflection.MonoMethod,object,object[],System.Exception&)
  at System.Reflection.MonoMethod.Invoke (System.Object obj, BindingFlags invokeAttr, System.Reflection.Binder binder, System.Object[] parameters, System.Globalization.CultureInfo culture) <0x412ffd20 + 0x000b7> in <filename unknown>:0 

Second one:

PostgreSqlEventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_remaining_values_after_partial_journal_cleanup

Got a message of the expected type <Akka.Streams.TestKit.TestSubscriber+OnNext`1[[System.Object, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]]>. Also expected the predicate to return true but the message {TestSubscriber.OnNext(h-1)} of type <Akka.Streams.TestKit.TestSubscriber+OnNext`1[[System.Object, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]]> did not match
Expected: True
Actual:   False
  at Akka.TestKit.Xunit2.XunitAssertions.AssertTrue (Boolean condition, System.String format, System.Object[] args) <0x414c4c30 + 0x00037> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase.AssertPredicateIsTrueForMessage[T] (System.Predicate`1 isMessage, Akka.TestKit.T m, System.String hint) <0x4159b330 + 0x0022c> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase+<>c__DisplayClass91_0`1[T].<ExpectMsg>b__0 (T m, IActorRef sender) <0x4159b2e0 + 0x0002f> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase.InternalExpectMsgEnvelope[T] (Nullable`1 timeout, System.Action`2 assert, System.String hint, Boolean shouldLog) <0x41599460 + 0x0073d> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase.InternalExpectMsg[T] (Nullable`1 timeout, System.Action`2 assert, System.String hint) <0x4159b250 + 0x0002f> in <filename unknown>:0 
  at Akka.TestKit.TestKitBase.ExpectMsg[T] (System.Predicate`1 isMessage, Nullable`1 timeout, System.String hint) <0x4159af90 + 0x0025b> in <filename unknown>:0 
  at Akka.Streams.TestKit.TestSubscriber+ManualProbe`1[T].ExpectNext (T element) <0x4157e110 + 0x00148> in <filename unknown>:0 
  at Akka.Persistence.Sql.TestKit.EventsByPersistenceIdSpec.Sql_query_EventsByPersistenceId_should_return_remaining_values_after_partial_journal_cleanup () <0x415a2260 + 0x001e3> in <filename unknown>:0 
  at (wrapper managed-to-native) System.Reflection.MonoMethod:InternalInvoke (System.Reflection.MonoMethod,object,object[],System.Exception&)
  at System.Reflection.MonoMethod.Invoke (System.Object obj, BindingFlags invokeAttr, System.Reflection.Binder binder, System.Object[] parameters, System.Globalization.CultureInfo culture) <0x412ffd20 + 0x000b7> in <filename unknown>:0 

Following README leads to "column 'payload' doesn't exist"

Hi all,

Which tables does the payload column need to be available in?

Goal

I've got a postgresql db in a docker container, and am trying to use Akka.Persistence.PostgreSql to enable a simple persistence scenario (the demo code in the akka.net persistence docs).

Steps

  • I grab the HOCON config from the repo docs
  • I note the journal, snapshot store, and metadata table names
  • I replace the tokens in the suggested SQL script and run them

This creates a table, event_journal, with what appears to be the correct columns:

image

  • I use a basic console app that creates the demo persistent actor.

Expected behavior: Working demo

Actual Behavior: Error message:

ERR Persistence failure when replaying events for persistenceId ["HardCoded"]. Last known sequence number [0]
Npgsql.PostgresException (0x80004005): 42703: column "payload" does not exist`

I know I'm likely missing something and will keep digging into it, but wanted to drop this here in case anyone had a quick fix.

If it's the docs, I'm happy to figure it out and then send a PR to update them.

Intermittent Postgres unit test failures

http://petabridge-ci.cloudapp.net/viewLog.html?buildId=1815&tab=buildResultsDiv&buildTypeId=AkkaNet_AkkaPersistenceImplementations_AkkaPersistencePostgreSql_PrBuilds&guest=1

Three intermittent spec failures:

  • PostgreSqlJournalSpec.Journal_should_not_replay_messages_if_lower_sequence_number_bound_is_greater_than_upper_sequence_number_bound
  • PostgreSqlJournalSpec.Journal_should_replay_messages_using_a_count_limit
  • PostgreSqlSnapshotStoreSpec.SnapshotStore_should_load_a_most_recent_snapshot_matching_an_upper_sequence_number_bound

Used table format leaks to SnapshotOffer

When trying to cast type from snapshot payload the content depends upon configured column type (JSON or BYTEA).
The offer payload should be already deserialized, instead now you have to take care inside PersitentActor Recover method.

Nothing is persisted

Hi I'm an Akka.Net noob trying to persist something. I have a console app using the below configuration. I'm not getting any errors but no tables are created in my database, nothing is persisted. Is there something else required to make this work? Here is the persist call.

public class UserActor: ReceivePersistentActor
{
    public UserActor()
        {
            Command<UserCreateCommand>(message =>
            {
                Id = message.UserId;

                var evt = new UserCreated
                {
                    Id = Id,
                    Username = message.Username
                };

                Persist(evt, Create);
                Context.System.EventStream.Publish(evt);
                Sender.Tell(Id, Self);
            });
    }
}

My actor system starts up like this:

Config config = ConfigurationFactory.FromResource<Program>("AkkaTest.Console.Akka.conf");
ActorSystem actorSystem = ActorSystem.Create("ControlPanel", config);
akka.persistence{
    journal {
        postgresql {
            # qualified type name of the PostgreSql persistence journal actor
            class = "Akka.Persistence.PostgreSql.Journal.PostgreSqlJournal, Akka.Persistence.PostgreSql"

            # dispatcher used to drive journal actor
            plugin-dispatcher = "akka.actor.default-dispatcher"

            # connection string used for database access
            connection-string = "Server=127.0.0.1;Port=5432;Database=akkatest;User Id=postgres;Password=****;"

            # default SQL commands timeout
            connection-timeout = 30s

            # PostgreSql schema name to table corresponding with persistent journal
            schema-name = public

            # PostgreSql table corresponding with persistent journal
            table-name = event_journal

            # should corresponding journal table be initialized automatically
            auto-initialize = on

            # timestamp provider used for generation of journal entries timestamps
            timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common"

            # metadata table
            metadata-table-name = metadata
        }
    }

    snapshot-store {
        postgresql {
            # qualified type name of the PostgreSql persistence journal actor
            class = "Akka.Persistence.PostgreSql.Snapshot.PostgreSqlSnapshotStore, Akka.Persistence.PostgreSql"

            # dispatcher used to drive journal actor
            plugin-dispatcher = ""akka.actor.default-dispatcher""

            # connection string used for database access
            connection-string = "Server=127.0.0.1;Port=5432;Database=akkatest;User Id=postgres;Password=****;"

            # default SQL commands timeout
            connection-timeout = 30s

            # PostgreSql schema name to table corresponding with persistent journal
            schema-name = public

            # PostgreSql table corresponding with persistent journal
            table-name = snapshot_store

            # should corresponding journal table be initialized automatically
            auto-initialize = on
        }
    }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.