Coder Social home page Coder Social logo

apache / pekko-projection Goto Github PK

View Code? Open in Web Editor NEW
15.0 13.0 7.0 3.73 MB

Apache Pekko Projections is intended for building systems with the CQRS pattern, and facilitate in event-based service-to-service communication.

Home Page: https://pekko.apache.org/

License: Apache License 2.0

Java 27.76% Scala 72.23% Shell 0.01%
pekko cqrs envelope projection pekko-projection

pekko-projection's Issues

KafkaSourceProviderImplSpec: The KafkaSourceProviderImpl must successfully verify offsets from assigned partitions

This test appears to be flaky:

--> [org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec: The KafkaSourceProviderImpl must successfully verify offsets from assigned partitions] Start of log messages of test that [Failed(java.lang.AssertionError: timeout (3 seconds) while expecting 5 messages (got 4))]
[2024-02-07 19:15:36,982] [INFO] [org.apache.pekko.event.slf4j.Slf4jLogger] [] [KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-5] - Slf4jLogger started MDC: {}
[2024-02-07 19:15:37,249] [DEBUG] [KafkaSourceProviderImplSpec] [] [pool-6-thread-24] - Starting ActorTestKit MDC: {}
[2024-02-07 19:15:37,415] [DEBUG] [KafkaSourceProviderImplSpec] [] [pool-6-thread-24] - Starting ActorTestKit MDC: {}
[2024-02-07 19:15:37,606] [INFO] [org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing] [] [pool-6-thread-24-ScalaTest-running-KafkaSourceProviderImplSpec] - Logging started for test [org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec: The KafkaSourceProviderImpl must successfully verify offsets from assigned partitions] MDC: {}
[2024-02-07 19:15:38,102] [WARN] [org.apache.pekko.projection.testkit.internal.TestInternalProjectionState] [] [KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-5] - Source provider instructed projection to skip offset [MergeableOffset(Map(topic-1 -> 5))] with reason: The offset contains Kafka topic partitions that were revoked or lost in a previous rebalance MDC: {pekkoUid=57598404356998700, sourceThread=KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-3, sourceActorSystem=KafkaSourceProviderImplSpec, pekkoAddress=pekko://KafkaSourceProviderImplSpec, pekkoSource=TestInternalProjectionState(pekko://KafkaSourceProviderImplSpec), pekkoTimestamp=19:15:38.101UTC}
[2024-02-07 19:15:38,104] [WARN] [org.apache.pekko.projection.testkit.internal.TestInternalProjectionState] [] [KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-5] - Source provider instructed projection to skip offset [MergeableOffset(Map(topic-1 -> 6))] with reason: The offset contains Kafka topic partitions that were revoked or lost in a previous rebalance MDC: {pekkoUid=57598404356998700, sourceThread=KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-3, sourceActorSystem=KafkaSourceProviderImplSpec, pekkoAddress=pekko://KafkaSourceProviderImplSpec, pekkoSource=TestInternalProjectionState(pekko://KafkaSourceProviderImplSpec), pekkoTimestamp=19:15:38.103UTC}
[2024-02-07 19:15:38,127] [WARN] [org.apache.pekko.projection.testkit.internal.TestInternalProjectionState] [] [KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-5] - Source provider instructed projection to skip offset [MergeableOffset(Map(topic-1 -> 7))] with reason: The offset contains Kafka topic partitions that were revoked or lost in a previous rebalance MDC: {pekkoUid=57598404356998700, sourceThread=KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-3, sourceActorSystem=KafkaSourceProviderImplSpec, pekkoAddress=pekko://KafkaSourceProviderImplSpec, pekkoSource=TestInternalProjectionState(pekko://KafkaSourceProviderImplSpec), pekkoTimestamp=19:15:38.127UTC}
[2024-02-07 19:15:38,128] [WARN] [org.apache.pekko.projection.testkit.internal.TestInternalProjectionState] [] [KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-5] - Source provider instructed projection to skip offset [MergeableOffset(Map(topic-1 -> 8))] with reason: The offset contains Kafka topic partitions that were revoked or lost in a previous rebalance MDC: {pekkoUid=57598404356998700, sourceThread=KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-3, sourceActorSystem=KafkaSourceProviderImplSpec, pekkoAddress=pekko://KafkaSourceProviderImplSpec, pekkoSource=TestInternalProjectionState(pekko://KafkaSourceProviderImplSpec), pekkoTimestamp=19:15:38.128UTC}
[2024-02-07 19:15:38,129] [WARN] [org.apache.pekko.projection.testkit.internal.TestInternalProjectionState] [] [KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-5] - Source provider instructed projection to skip offset [MergeableOffset(Map(topic-1 -> 9))] with reason: The offset contains Kafka topic partitions that were revoked or lost in a previous rebalance MDC: {pekkoUid=57598404356998700, sourceThread=KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-3, sourceActorSystem=KafkaSourceProviderImplSpec, pekkoAddress=pekko://KafkaSourceProviderImplSpec, pekkoSource=TestInternalProjectionState(pekko://KafkaSourceProviderImplSpec), pekkoTimestamp=19:15:38.129UTC}
[2024-02-07 19:15:38,132] [WARN] [org.apache.pekko.projection.testkit.internal.TestInternalProjectionState] [] [KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-5] - Source provider instructed projection to skip offset [MergeableOffset(Map(topic-1 -> 10))] with reason: The offset contains Kafka topic partitions that were revoked or lost in a previous rebalance MDC: {pekkoUid=57598404356998700, sourceThread=KafkaSourceProviderImplSpec-pekko.actor.default-dispatcher-3, sourceActorSystem=KafkaSourceProviderImplSpec, pekkoAddress=pekko://KafkaSourceProviderImplSpec, pekkoSource=TestInternalProjectionState(pekko://KafkaSourceProviderImplSpec), pekkoTimestamp=19:15:38.132UTC}
[2024-02-07 19:15:41,131] [INFO] [org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing] [] [pool-6-thread-24-ScalaTest-running-KafkaSourceProviderImplSpec] - Logging finished for test [org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec: The KafkaSourceProviderImpl must successfully verify offsets from assigned partitions] that [Failed(java.lang.AssertionError: timeout (3 seconds) while expecting 5 messages (got 4))] MDC: {}
<-- [org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec: The KafkaSourceProviderImpl must successfully verify offsets from assigned partitions] End of log messages of test that [Failed(java.lang.AssertionError: timeout (3 seconds) while expecting 5 messages (got 4))]
[info] - must successfully verify offsets from assigned partitions *** FAILED *** (3 seconds, 564 milliseconds)
[info]   java.lang.AssertionError: timeout (3 seconds) while expecting 5 messages (got 4)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.assertFail(TestProbeImpl.scala:410)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.$anonfun$receiveMessages_internal$1(TestProbeImpl.scala:273)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.run(AnyWordSpecLike.scala:1353)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.run$(AnyWordSpecLike.scala:1351)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.run(KafkaSourceProviderImplSpec.scala:56)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.TestRunner.runTest$1(TestFramework.scala:153)
[info]   at sbt.TestRunner.run(TestFramework.scala:168)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:336)
[info]   at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:296)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFunction.apply(TestFramework.scala:348)
[info]   at sbt.Tests$.$anonfun$toTask$1(Tests.scala:436)
[info]   at sbt.std.Transform$$anon$3.$anonfun$apply$2(Transform.scala:47)
[info]   at sbt.std.Transform$$anon$4.work(Transform.scala:69)
[info]   at sbt.Execute.$anonfun$submit$2(Execute.scala:283)
[info]   at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24)
[info]   at sbt.Execute.work(Execute.scala:292)
[info]   at sbt.Execute.$anonfun$submit$1(Execute.scala:283)
[info]   at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[info]   at sbt.CompletionService$$anon$2.call(CompletionService.scala:65)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]   at java.base/java.lang.Thread.run(Thread.java:829)
[info] Run completed in 5 seconds, 708 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
[info] *** 1 TEST FAILED ***
[error] Failed tests:
[error] 	org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec

flaky KafkaSourceProviderImplSpec

I notice that the Spec is always failed. Submit a issue to record it.

https://productionresultssa13.blob.core.windows.net/actions-results/3504d61a-8568-45ea-9f2a-b7900e6f5c2b/workflow-job-run-35d3cacb-642d-555e-dd82-30681df97a83/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-04-07T02%3A40%3A18Z&sig=FLGsW4DeijcSGAnQ3gW2d6%2Bbkw5%2F%2BS6%2Bd9on7K3z%2BnA%3D&sp=r&spr=https&sr=b&st=2024-04-07T02%3A30%3A13Z&sv=2021-12-02

[2024-04-07 00:23:19,483] [INFO] [org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing] [] [pool-6-thread-51-ScalaTest-running-KafkaSourceProviderImplSpec] - Logging finished for test [org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec: The KafkaSourceProviderImpl must successfully verify offsets from assigned partitions] that [Failed(java.lang.AssertionError: timeout (3 seconds) while expecting 5 messages (got 4))] MDC: {}
<-- [org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec: The KafkaSourceProviderImpl must successfully verify offsets from assigned partitions] End of log messages of test that [Failed(java.lang.AssertionError: timeout (3 seconds) while expecting 5 messages (got 4))]
[info] - must successfully verify offsets from assigned partitions *** FAILED *** (3 seconds, 696 milliseconds)
[info]   java.lang.AssertionError: timeout (3 seconds) while expecting 5 messages (got 4)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.assertFail(TestProbeImpl.scala:410)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.$anonfun$receiveMessages_internal$1(TestProbeImpl.scala:273)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.$anonfun$receiveMessages_internal$1$adapted(TestProbeImpl.scala:268)
[info]   at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
[info]   at scala.collection.immutable.Range.foreach(Range.scala:158)
[info]   at scala.collection.TraversableLike.map(TraversableLike.scala:286)
[info]   at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
[info]   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.receiveMessages_internal(TestProbeImpl.scala:268)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.receiveMessages(TestProbeImpl.scala:255)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.$anonfun$new$7(KafkaSourceProviderImplSpec.scala:123)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.$anonfun$new$7$adapted(KafkaSourceProviderImplSpec.scala:98)
[info]   at org.apache.pekko.projection.testkit.scaladsl.ProjectionTestKit.runWithTestSink(ProjectionTestKit.scala:136)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.$anonfun$new$2(KafkaSourceProviderImplSpec.scala:98)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.wordspec.AnyWordSpecLike$$anon$3.apply(AnyWordSpecLike.scala:1239)
[info]   at org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing.withFixture(LogCapturing.scala:79)
[info]   at org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing.withFixture$(LogCapturing.scala:77)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.withFixture(KafkaSourceProviderImplSpec.scala:56)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.invokeWithFixture$1(AnyWordSpecLike.scala:1237)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTest$1(AnyWordSpecLike.scala:1249)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTest(AnyWordSpecLike.scala:1249)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTest$(AnyWordSpecLike.scala:1231)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.runTest(KafkaSourceProviderImplSpec.scala:56)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTests$1(AnyWordSpecLike.scala:1308)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTests(AnyWordSpecLike.scala:1308)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTests$(AnyWordSpecLike.scala:1307)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.runTests(KafkaSourceProviderImplSpec.scala:56)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.apache.pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit.org$scalatest$BeforeAndAfterAll$$super$run(ScalaTestWithActorTestKit.scala:41)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.org$scalatest$wordspec$AnyWordSpecLike$$super$run(KafkaSourceProviderImplSpec.scala:56)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$run$1(AnyWordSpecLike.scala:1353)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.run(AnyWordSpecLike.scala:1353)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.run$(AnyWordSpecLike.scala:1351)
[info]   at org.apache.pekko.projection.kafka.internal.KafkaSourceProviderImplSpec.run(KafkaSourceProviderImplSpec.scala:56)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.TestRunner.runTest$1(TestFramework.scala:153)
[info]   at sbt.TestRunner.run(TestFramework.scala:168)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:336)
[info]   at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:296)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info] done compiling
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFunction.apply(TestFramework.scala:348)
[info]   at sbt.Tests$.$anonfun$toTask$1(Tests.scala:436)
[info]   at sbt.std.Transform$$anon$3.$anonfun$apply$2(Transform.scala:47)
[info]   at sbt.std.Transform$$anon$4.work(Transform.scala:69)
[info]   at sbt.Execute.$anonfun$submit$2(Execute.scala:283)
[info]   at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24)
[info]   at sbt.Execute.work(Execute.scala:292)
[info]   at sbt.Execute.$anonfun$submit$1(Execute.scala:283)
[info]   at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[info]   at sbt.CompletionService$$anon$2.call(CompletionService.scala:65)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:[511](https://github.com/apache/pekko-projection/actions/runs/8584773850/job/23525420052?pr=134#step:6:512))
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)

pekko-projection-slick scala 3 support?

pekko-projection-slick depends on pekko-persistence-jdbc and Slick

To pick up the Scala 3 support in pekko-persistence-jdbc, we would need pekko-persistence-jdbc 1.1.x (when it is released).

I think our general preference would be not to have to change to using pekko-persistence-jdbc 1.1.x but in this case, we might want to make an exception.

  • In theory, we could only depend on pekko-persistence-jdbc 1.1.x for pekko-projection-slick_3 (i.e the Scala 3 jar and not the Scala 2.x jars)
  • we could just not bother supporting Scala 3 for pekko-projection-slick because this module is deprecated - see https://pekko.apache.org/docs/pekko-projection/current/slick.html - we have pekko-projection-jdbc and this supports Scala 3
  • we could just upgrade to pekko-persistence-jdbc 1.1.x as an exception

Move away from using sbt integration config

See https://github.com/sbt/sbt/releases/tag/v1.9.0-RC3, specifically

Deprecation of IntegrationTest configuration
sbt 1.9.0 deprecates IntegrationTest configuration. (RFC-3 proposes to deprecate general use of configuration axis beyond Compile and Test, and this is the first installment of the change.)

The recommended migration path is to create a subproject named "integration", or "foo-integration" etc.

lazy val integration = (project in file("integration"))
  .dependsOn(core) // your current subproject
  .settings(
    publish / skip := true,
    // test dependencies
    libraryDependencies += something % Test,
  )

ProjectionBehaviorSpec: must work with ProjectionManagement extension

This test appears to be flaky:

<-- [org.apache.pekko.projection.ProjectionBehaviorSpec: A ProjectionBehavior must work with ProjectionManagement extension] End of log messages of test that [Failed(java.lang.AssertionError: Received unexpected message StopObserved)]
[info] ProjectionBehaviorSpec:
[info] A ProjectionBehavior
[info] - must start immediately on demand (734 milliseconds)
[info] - must stop after receiving stop message (103 milliseconds)
[info] - must stop after stopping actor without stop message (28 milliseconds)
[info] - must also stop when stopping underlying stream results in failure (7 milliseconds)
[info] - must provide access to current offset (38 milliseconds)
[info] - must support update of current offset (59 milliseconds)
[info] - must handle offset operations sequentially (309 milliseconds)
[info] - must support pause/resume (11 milliseconds)
[info] - must handle pause/resume operations sequentially (9 milliseconds)
[info] - must work with ProjectionManagement extension *** FAILED *** (1 second, 456 milliseconds)
[info]   java.lang.AssertionError: Received unexpected message StopObserved
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.assertFail(TestProbeImpl.scala:410)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.expectNoMessage_internal(TestProbeImpl.scala:224)
[info]   at org.apache.pekko.actor.testkit.typed.internal.TestProbeImpl.expectNoMessage(TestProbeImpl.scala:218)
[info]   at org.apache.pekko.projection.ProjectionBehaviorSpec.$anonfun$new$15(ProjectionBehaviorSpec.scala:502)
[info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
[info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.wordspec.AnyWordSpecLike$$anon$3.apply(AnyWordSpecLike.scala:1239)
[info]   at org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing.withFixture(LogCapturing.scala:79)
[info]   at org.apache.pekko.actor.testkit.typed.scaladsl.LogCapturing.withFixture$(LogCapturing.scala:77)
[info]   at org.apache.pekko.projection.ProjectionBehaviorSpec.withFixture(ProjectionBehaviorSpec.scala:214)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.invokeWithFixture$1(AnyWordSpecLike.scala:1237)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTest$1(AnyWordSpecLike.scala:1249)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTest(AnyWordSpecLike.scala:1249)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTest$(AnyWordSpecLike.scala:1231)
[info]   at org.apache.pekko.projection.ProjectionBehaviorSpec.runTest(ProjectionBehaviorSpec.scala:214)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$runTests$1(AnyWordSpecLike.scala:1308)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
[info]   at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
[info]   at scala.collection.immutable.List.foreach(List.scala:431)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info]   at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTests(AnyWordSpecLike.scala:1308)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.runTests$(AnyWordSpecLike.scala:1307)
[info]   at org.apache.pekko.projection.ProjectionBehaviorSpec.runTests(ProjectionBehaviorSpec.scala:214)
[info]   at org.scalatest.Suite.run(Suite.scala:1114)
[info]   at org.scalatest.Suite.run$(Suite.scala:1096)
[info]   at org.apache.pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit.org$scalatest$BeforeAndAfterAll$$super$run(ScalaTestWithActorTestKit.scala:41)
[info]   at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info]   at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info]   at org.apache.pekko.projection.ProjectionBehaviorSpec.org$scalatest$wordspec$AnyWordSpecLike$$super$run(ProjectionBehaviorSpec.scala:214)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.$anonfun$run$1(AnyWordSpecLike.scala:1353)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.run(AnyWordSpecLike.scala:1353)
[info]   at org.scalatest.wordspec.AnyWordSpecLike.run$(AnyWordSpecLike.scala:1351)
[info]   at org.apache.pekko.projection.ProjectionBehaviorSpec.run(ProjectionBehaviorSpec.scala:214)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.TestRunner.runTest$1(TestFramework.scala:153)
[info]   at sbt.TestRunner.run(TestFramework.scala:168)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:336)
[info]   at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:296)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFunction.apply(TestFramework.scala:348)
[info]   at sbt.Tests$.$anonfun$toTask$1(Tests.scala:436)
[info]   at sbt.std.Transform$$anon$3.$anonfun$apply$2(Transform.scala:47)
[info]   at sbt.std.Transform$$anon$4.work(Transform.scala:69)
[info]   at sbt.Execute.$anonfun$submit$2(Execute.scala:283)
[info]   at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24)
[info]   at sbt.Execute.work(Execute.scala:292)
[info]   at sbt.Execute.$anonfun$submit$1(Execute.scala:283)
[info]   at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:[265](https://github.com/apache/incubator-pekko-projection/actions/runs/7831923269/job/21369593583?pr=105#step:6:266))
[info]   at sbt.CompletionService$$anon$2.call(CompletionService.scala:65)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:[266](https://github.com/apache/incubator-pekko-projection/actions/runs/7831923269/job/21369593583?pr=105#step:6:267))
[info]   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[info] - failureInsideProjectionPropagatesToTestkit
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[info]   at java.lang.Thread.run(Thread.java:750)

Managing Slick DB resources

Discussed in #168

Originally posted by pjfanning June 13, 2024
When you call databaseConfig.db, you are expected to close that db instance.

pekko-projection-slick has never bothered to do this.

It appears that pekko-projection-slick users create their own databaseConfig and it is left to them (at the moment) to know about and implement the databaseConfig.db.close() call. Have a look at SlickProjectionSpec to see how this test spec creates a databaseConfig and then in afterAll it calls databaseConfig.db.close()

If we were to take responsibility for closing the databaseConfig.db instances, there are some problems:

  • there is no lifecycle that we can rely on, we are probably left with adding a shutdown hook or something like that
  • There are a lot of functions in SlickProjection that call createOffsetStore and this createOffsetStore calls databaseConfig.db - potentially create a new db instance that needs managing. If you call databaseConfig.db more than once on the same databaseConfig instance, you should get the same db instance (they are lazily created and reused).

This is my opinion - I think we should not change anything and just document that users need to look after their own databaseConfig lifecycle.

JdbcProjectionTest: atLeastOnceShouldRestartFromPreviousOffset flaky

This test appears to be flaky:

--> [org.apache.pekko.projection.jdbc.JdbcProjectionTest: atLeastOnceShouldRestartFromPreviousOffset] Start of log messages of test that failed with expected:<abc|def|[ghi|]> but was:<abc|def|[]>
[2024-02-07 19:15:35,185] [DEBUG] [JdbcProjectionTest] [] [pool-6-thread-5] - Starting ActorTestKit MDC: {}
[2024-02-07 19:15:35,185] [INFO] [org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing] [] [pool-6-thread-5] - Logging started for test [org.apache.pekko.projection.jdbc.JdbcProjectionTest: atLeastOnceShouldRestartFromPreviousOffset] MDC: {}
<-- [org.apache.pekko.projection.jdbc.JdbcProjectionTest: atLeastOnceShouldRestartFromPreviousOffset] End of log messages of test that failed with expected:<abc|def|[ghi|]> but was:<abc|def|[]>
[info] - atLeastOnceShouldRestartFromPreviousOffset *** FAILED ***
[info]   org.junit.ComparisonFailure: expected:<abc|def|[ghi|]> but was:<abc|def|[]>
[info]   at org.junit.Assert.assertEquals(Assert.java:117)
[info]   at org.junit.Assert.assertEquals(Assert.java:146)
[info]   at org.apache.pekko.projection.jdbc.JdbcProjectionTest.lambda$atLeastOnceShouldRestartFromPreviousOffset$1cd53367$1(JdbcProjectionTest.java:351)
[info]   at org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit.$anonfun$runWithTestSink$1(ProjectionTestKit.scala:108)
[info]   at org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit.$anonfun$runWithTestSink$1$adapted(ProjectionTestKit.scala:108)
[info]   at org.apache.pekko.projection.testkit.scaladsl.ProjectionTestKit.runWithTestSink(ProjectionTestKit.scala:136)
[info]   at org.apache.pekko.projection.testkit.javadsl.ProjectionTestKit.runWithTestSink(ProjectionTestKit.scala:108)
[info]   at org.apache.pekko.projection.jdbc.JdbcProjectionTest.atLeastOnceShouldRestartFromPreviousOffset(JdbcProjectionTest.java:340)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[info]   at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[info]   at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[info]   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
[info]   at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
[info]   at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
[info]   at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
[info]   at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
[info]   at org.apache.pekko.actor.testkit.typed.javadsl.LogCapturing$$anon$1.evaluate(LogCapturing.scala:58)
[info]   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
[info]   at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
[info]   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
[info]   at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
[info]   at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
[info]   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
[info]   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
[info]   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
[info]   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
[info]   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
[info]   at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
[info]   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
[info]   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
[info]   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
[info]   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
[info]   at org.junit.runners.Suite.runChild(Suite.java:128)
[info]   at org.junit.runners.Suite.runChild(Suite.java:27)
[info]   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
[info]   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
[info]   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
[info]   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
[info]   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
[info]   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
[info]   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
[info]   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
[info]   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
[info]   at org.junit.runner.JUnitCore.run(JUnitCore.java:105)
[info]   at org.junit.runner.JUnitCore.run(JUnitCore.java:94)
[info]   at org.scalatestplus.junit.JUnitSuiteLike.run(JUnitSuiteLike.scala:241)
[info]   at org.scalatestplus.junit.JUnitSuiteLike.run$(JUnitSuiteLike.scala:229)
[info]   at org.scalatestplus.junit.JUnitSuite.run(JUnitSuite.scala:71)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info]   at sbt.TestRunner.runTest$1(TestFramework.scala:153)
[info]   at sbt.TestRunner.run(TestFramework.scala:168)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.$anonfun$apply$1(TestFramework.scala:336)
[info]   at sbt.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:296)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFramework$$anon$3$$anonfun$$lessinit$greater$1.apply(TestFramework.scala:336)
[info]   at sbt.TestFunction.apply(TestFramework.scala:348)
[info]   at sbt.Tests$.$anonfun$toTask$1(Tests.scala:436)
[info]   at sbt.std.Transform$$anon$3.$anonfun$apply$2(Transform.scala:47)
[info]   at sbt.std.Transform$$anon$4.work(Transform.scala:69)
[info]   at sbt.Execute.$anonfun$submit$2(Execute.scala:283)
[info]   at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:24)
[info]   at sbt.Execute.work(Execute.scala:292)
[info]   at sbt.Execute.$anonfun$submit$1(Execute.scala:283)
[info]   at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[info]   at sbt.CompletionService$$anon$2.call(CompletionService.scala:65)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
[info]   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[info]   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
[info]   at java.base/java.lang.Thread.run(Thread.java:829)

Add github tag protection rules after release candidate

iirc This is one of the pekko modules that actually needs a git tag to function (hence why the v0.0.0 tag already exists). Due to this we cannot add git tag protection rules now because we would be unable to delete the v0.0.0 tag).

Instead after the first release candidate is made and a git tag for that release candidate is pushed, we can then delete the v0.0.0 git tag and then add git tag protection rules.

mssql ci error because of license

 Cause: java.lang.IllegalStateException: The image mcr.microsoft.com/mssql/server:2019-CU8-ubuntu-16.04 requires you to accept a license agreement. Please place a file at the root of the classpath named container-license-acceptance.txt, e.g. at src/test/resources/container-license-acceptance.txt. This file should contain the line:
[info]   mcr.microsoft.com/mssql/server:2019-CU8-ubuntu-16.04
[info]   at org.testcontainers.utility.LicenseAcceptance.assertLicenseAccepted(LicenseAcceptance.java:30)
[info]   at org.testcontainers.containers.MSSQLServerContainer.configure(MSSQLServerContainer.java:71)
[info]   at org.testcontainers.containers.GenericContainer.doStart(GenericContainer.java:316)
[info]   at org.testcontainers.containers.GenericContainer.start(GenericContainer.java:311)
[info]   at org.apache.pekko.projection.slick.SlickContainerOffsetStoreSpec$ContainerJdbcSpecConfig.initContainer(SlickContainerOffsetStoreSpec.scala:57)
[info]   at org.apache.pekko.projection.slick.SlickContainerOffsetStoreSpec$MSSQLServerSpecConfig.<init>(SlickContainerOffsetStoreSpec.scala:111)
[info]   at org.apache.pekko.projection.slick.MSSQLServerSlickOffsetStoreSpec.<init>(SlickContainerOffsetStoreSpec.scala:147)
[info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[info]   at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[info]   at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

SlickProjectionSpec: A Slick grouped projection must restart from previous offset - handler throwing an exception, save after 2

https://github.com/apache/pekko-projection/actions/runs/10040359450/job/27746257858

Notably:

org.h2.jdbc.JdbcSQLIntegrityConstraintViolationException: NULL not allowed for column "CONCATENATED"; SQL statement:
merge into "TEST_MODEL" ("ID","CONCATENATED")  values (?,?) [23502-224]
        at org.h2.message.DbException.getJdbcSQLException(DbException.java:520)

and

[info]   check: last seen offset is 2L The Option on which value was invoked was not defined. (SlickProjectionSpec.scala:953)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.OptionValues$OptionValuable.value(OptionValues.scala:114)
[info]   at org.apache.pekko.projection.slick.SlickProjectionSpec.f$proxy17$1$$anonfun$4(SlickProjectionSpec.scala:953)
[info]   at org.scalatest.Assertions.withClue(Assertions.scala:1208)
[info]   at org.scalatest.Assertions.withClue$(Assertions.scala:421)
[info]   at org.apache.pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit.withClue(ScalaTestWithActorTestKit.scala:41)
[info]   at org.apache.pekko.projection.slick.SlickProjectionSpec.f$proxy17$1(SlickProjectionSpec.scala:955)
[info]   at org.apache.pekko.projection.slick.SlickProjectionSpec.$init$$$anonfun$3$$anonfun$5(SlickProjectionSpec.scala:923)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.