Coder Social home page Coder Social logo

jet / propulsion Goto Github PK

View Code? Open in Web Editor NEW
177.0 13.0 24.0 1.5 MB

.NET event stream projection and scheduling platform with CosmosDB, DynamoDB, EventStoreDB, MemoryStore, message-db, Equinox and Kafka integrations

Home Page: https://github.com/jet/dotnet-templates

License: Apache License 2.0

F# 99.60% Shell 0.07% Dockerfile 0.33%
cosmosdb eventstore changefeed cosmosdb-changefeed cosmosdb-changefeedprocessor projections equinox kafka dynamodb dynamodb-streams

propulsion's Introduction

Propulsion Build Status release NuGet license code sizeDiscord docs status Gitpod ready-to-code

Propulsion provides a granular suite of .NET NuGet packages for building Reactive event processing pipelines. It caters for:

  • Event Sourcing Reactions: Handling projections and reactions based on event feeds from stores such as EventStoreDB and MessageDB, and the Equinox Stores (DynamoStore, CosmosStore, MemoryStore).
  • Unit and Integration testing support: The AwaitCompletion mechanisms in MemoryStore and FeedSource provide a clean way to structure test suites in a manner that achieves high test coverage without flaky tests or slow tests.
  • Generic Ingestion and Publishing pipelines: The same abstractions can also be used for consuming and/or publishing to any target.
  • Serverless event pipelines: The core components do not assume a long-lived process.
    • The DynamoStore-related components implement support for running an end-to-end event sourced system using only Amazon DynamoDB and Lambda without requiring a long-lived host process.
  • Strong metrics support: Feed Sources and Projectors provide comprehensive logging and metrics. (At present, the primary integration is with Prometheus, but the mechanism is exposed in a pluggable manner).

If you're looking for a good discussion forum on these kinds of topics, look no further than the DDD-CQRS-ES Discord's #equinox channel (invite link).

Core Components

  • Propulsion NuGet Implements core functionality in a channel-independent fashion. Depends on MathNet.Numerics

    1. StreamsSink: High performance pipeline that handles parallelized event processing. Ingestion of events, and checkpointing of progress are handled asynchronously. Each aspect of the pipeline is decoupled such that it can be customized as desired.
    2. Streams.Prometheus: Helper that exposes per-scheduler metrics for Prometheus scraping.
    3. ParallelProjector: Scaled down variant of StreamsSink that does not preserve stream level ordering semantics
  • Propulsion.Feed NuGet Provides helpers for checkpointed consumption of a feed of stream-based inputs. Provides for custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database). Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres). Depends on Propulsion, a IFeedCheckpointStore implementation (from e.g., Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore)

    1. FeedSource: Handles continual reading and checkpointing of events from a set of feeds ('tranches') of a 'source' that collectively represent a change data capture source for a given system (roughly analogous to how a CosmosDB Container presents a changefeed). A readTranches function is used to identify the Tranches (sub-feeds) on startup. The Feed Source then operates a logical reader thread per Tranche. Tranches represent content as an incrementally retrievable change feed consisting of batches of FsCodec.ITimelineEvent records. Each batch has an optional associated checkpointing callback that's triggered only when the Sink has handled all events within it.
    2. Monitor.AwaitCompletion: Enables efficient waiting for completion of reaction processing within an integration test.
    3. PeriodicSource: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to save progress and then resume from that saved token (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as an IAsyncEnumerable of FsCodec.StreamName * FsCodec.IEventData * context. Checkpointing occurs only when all events have been deemed handled by the Sink.
    4. JsonSource: Simple source that feeds items from a File containing JSON (such a file can be generated via eqx query -o JSONFILE from cosmos etc)
    5. SinglePassFeedSource: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested.
    6. Prometheus: Exposes reading statistics to Prometheus (including metrics from DynamoStore.DynamoStoreSource, EventStoreDb.EventStoreSource, MessageDb.MessageDbSource and SqlStreamStore.SqlStreamStoreSource). (NOTE all other statistics relating to processing throughput and latency etc are exposed from the Scheduler component on the Sink side)
  • Propulsion.MemoryStore NuGet. Provides bindings to Equinox.MemoryStore. Depends on Equinox.MemoryStore v 4.0.0, FsCodec.Box, Propulsion

    1. MemoryStoreSource: Presents a Source that adapts an Equinox.MemoryStore to feed into a Propulsion.Sink. Typically used as part of an overall test suite to enable efficient and deterministic testing where reactions are relevant to a given scenario.
    2. Monitor.AwaitCompletion: Enables efficient deterministic waits for Reaction processing within integration or unit tests.
    3. ReaderCheckpoint: ephemeral checkpoint storage for Propulsion.DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamSteamStore in test contexts.

Store-specific Components

  • Propulsion.CosmosStore NuGet Provides bindings to Azure CosmosDB. Depends on Equinox.CosmosStore v 4.0.0

    1. CosmosStoreSource: reading from CosmosDb's ChangeFeed using Microsoft.Azure.Cosmos
    2. CosmosStoreSink: writing to Equinox.CosmosStore v 4.0.0.
    3. CosmosStorePruner: pruning from Equinox.CosmosStore v 4.0.0.
    4. ReaderCheckpoint: checkpoint storage for Propulsion.DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamSteamStore using Equinox.CosmosStore v 4.0.0.

    (Reading and position metrics are exposed via Propulsion.CosmosStore.Prometheus)

  • Propulsion.DynamoStore NuGet Provides bindings to Equinox.DynamoStore. Depends on Equinox.DynamoStore v 4.0.0

    1. AppendsIndex/AppendsEpoch: Equinox.DynamoStore aggregates that together form the DynamoStore Index
    2. DynamoStoreIndexer: writes to AppendsIndex/AppendsEpoch (used by Propulsion.DynamoStore.Indexer, Propulsion.Tool)
    3. DynamoStoreSource: reads from AppendsIndex/AppendsEpoch (see DynamoStoreIndexer)
    4. ReaderCheckpoint: checkpoint storage for Propulsion.DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamSteamStore using Equinox.DynamoStore v 4.0.0.
    5. Monitor.AwaitCompletion: See Propulsion.Feed

    (Reading and position metrics are exposed via Propulsion.Feed.Prometheus)

  • Propulsion.DynamoStore.Indexer NuGet AWS Lambda to index appends into an Index Table. Depends on Propulsion.DynamoStore, Amazon.Lambda.Core, Amazon.Lambda.DynamoDBEvents, Amazon.Lambda.Serialization.SystemTextJson

    1. Handler: parses Dynamo DB Streams Source Mapping input, feeds into Propulsion.DynamoStore.DynamoStoreIndexer
    2. Connector: Store / environment variables wiring to connect DynamoStoreIndexer to the Equinox.DynamoStore Index Event Store
    3. Function: AWS Lambda Function that can be fed via a DynamoDB Streams Event Source Mapping; passes to Handler

    (Diagnostics are exposed via Console to CloudWatch)

  • Propulsion.DynamoStore.Notifier NuGet AWS Lambda to report new events indexed by the Indexer to an SNS Topic, in order to enable triggering AWS Lambdas to service Reactions without requiring a long-lived host application. Depends on Amazon.Lambda.Core, Amazon.Lambda.DynamoDBEvents, Amazon.Lambda.Serialization.SystemTextJson, AWSSDK.SimpleNotificationService

    1. Handler: parses Dynamo DB Streams Source Mapping input, generates a message per updated Partition in the batch
    2. Function: AWS Lambda Function that can be fed via a DynamoDB Streams Event Source Mapping; passes to Handler

    (Diagnostics are exposed via Console to CloudWatch)

  • Propulsion.DynamoStore.Constructs NuGet AWS Lambda CDK deploy logic. Depends on Amazon.CDK.Lib (and, indirectly, on the binary assets included as content in the Propulsion.DynamoStore.Indexer/Propulsion.DynamoStore.Notifier NuGet packages)

    1. DynamoStoreIndexerLambda: CDK wiring for Propulsion.DynamoStore.Indexer
    2. DynamoStoreNotifierLambda: CDK wiring for Propulsion.DynamoStore.Notifier
    3. DynamoStoreReactorLambda: CDK wiring for a Reactor that's triggered based on messages supplied by Propulsion.DynamoStore.Notifier
  • Propulsion.DynamoStore.Lambda NuGet Helpers for implementing Lambda Reactors. Depends on Amazon.Lambda.SQSEvents, Propulsion.Feed

    1. SqsNotificationBatch.parse: parses a batch of notification events (queued by a Notifier) in a Amazon.Lambda.SQSEvents.SQSEvent
    2. SqsNotificationBatch.batchResponseWithFailuresForPositionsNotReached: Correlates the updated checkpoints with the input SQSEvent, generating a SQSBatchResponse that will requeue any notifications that have not yet been serviced.

    (Used by eqxShipping template)

  • Propulsion.EventStoreDb NuGet. Provides bindings to EventStoreDB, writing via Propulsion.EventStore.EventStoreSink. Depends on Equinox.EventStoreDb v 4.0.0

    1. EventStoreSource: reading from an EventStoreDB >= 20.10 $all stream using the gRPC interface into a Propulsion.Sink.
    2. EventStoreSink: writing to Equinox.EventStoreDb v 4.0.0
    3. Monitor.AwaitCompletion: See Propulsion.Feed

    (Reading and position metrics are exposed via Propulsion.Feed.Prometheus)

  • Propulsion.Kafka NuGet Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan. Depends on FsKafka v 1.7.0-1.9.99

  • Propulsion.MessageDb NuGet. Provides bindings to MessageDb, maintaining checkpoints in a postgres table Depends on Propulsion.Feed, Npgsql >= 7.0.7 #181 ๐Ÿ™ @nordfjord

    1. MessageDbSource: reading from one or more MessageDB categories into a Propulsion.Sink
    2. CheckpointStore: checkpoint storage for Propulsion.Feed using Npgsql (can be initialized via propulsion initpg -c connstr -s schema)
  • Propulsion.SqlStreamStore NuGet. Provides bindings to SqlStreamStore, maintaining checkpoints in a SQL Server table. Depends on Propulsion.Feed, SqlStreamStore, Dapper v 2.0, Microsoft.Data.SqlClient v 1.1.3

    1. SqlStreamStoreSource: reading from a SqlStreamStore $all stream into a Propulsion.Sink
    2. ReaderCheckpoint: checkpoint storage for Propulsion.EventStoreDb/Feed/SqlStreamSteamStore using Dapper, Microsoft.Data.SqlClient
    3. Monitor.AwaitCompletion: See Propulsion.Feed

    (Reading and position metrics are exposed via Propulsion.Feed.Prometheus)

The ubiquitous Serilog dependency is solely on the core module, not any sinks.

dotnet tool provisioning / projections test tool

  • Propulsion.Tool Tool NuGet: Tool used to initialize a Change Feed Processor aux container for Propulsion.CosmosStore and demonstrate basic projection, including to Kafka. See quickstart.

    • init: CosmosDB: Initialize an -aux Container for use by the CosmosDb client library ChangeFeedProcessor
    • initpg: MessageDb: Initialize a checkpoints table in a Postgres Database
    • index: DynamoStore: validate and/or reindex DynamoStore Index
    • checkpoint: CosmosStore/DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamStore: adjust checkpoints in DynamoStore/CosmosStore/SQL Server/Postgres
    • project: CosmosDB/DynamoStore/EventStoreDb/MessageDb: walk change feeds/indexes and/or project to Kafka

Deprecated components

Propulsion supports recent versions of Equinox and other Store Clients within reason - these components are intended for use on a short term basis as a way to manage phased updates from older clients to current ones by adjusting package references while retaining source compatibility to the maximum degree possible.

  • Propulsion.CosmosStore3 NuGet Provides bindings to Azure CosmosDB. Depends on Equinox.CosmosStore v 3.0.7, Microsoft.Azure.Cosmos v 3.27.0

    • Deprecated; only intended for use in migration from Propulsion.Cosmos and/or Equinox.Cosmos
    1. CosmosStoreSource: reading from CosmosDb's ChangeFeed using Microsoft.Azure.Cosmos (relies on explicit checkpointing that entered GA in 3.21.0)
    2. CosmosStoreSink: writing to Equinox.CosmosStore v 3.0.7.
    3. CosmosStorePruner: pruning from Equinox.CosmosStore v 3.0.7.
    4. ReaderCheckpoint: checkpoint storage for Propulsion.EventStoreDb/DynamoStore/'Feed'/SqlStreamSteamStore using Equinox.CosmosStore v 3.0.7.

    (Reading and position metrics are exposed via Propulsion.CosmosStore.Prometheus)

  • Propulsion.EventStore NuGet. Provides bindings to EventStore, writing via Propulsion.EventStore.EventStoreSink Depends on Equinox.EventStore v 4.0.0

    • Deprecated as reading (and writing) relies on the legacy EventStoreDB TCP interface
    • Contains ultra-high throughput striped reader implementation
    • Presently Used by proSync template

    (Reading and position metrics are emitted to Console / Serilog; no Prometheus support)

Related repos

  • See the Equinox QuickStart for examples of using this library to project to Kafka from Equinox.CosmosStore, Equinox.DynamoStore and/or Equinox.EventStoreDb.

  • See the dotnet new templates repo for examples using the packages herein:

    • Propulsion-specific templates:

      • proProjector template for CosmosStoreSource+StreamsSink logic consuming from a CosmosDb ChangeFeedProcessor.
      • proProjector template (in --kafka mode) for producer logic using StreamsProducerSink or ParallelProducerSink.
      • proConsumer template for example consumer logic using ParallelConsumer and StreamsConsumer etc.
    • Propulsion+Equinox templates:

      • eqxShipping: Event-sourced example with a Process Manager. Includes a Watchdog component that uses a StreamsSink, with example wiring for CosmosStore, DynamoStore and EventStoreDb
      • proIndexer. single-source StreamsSink based Reactor. More legible version of proReactor template, currently only supports Propulsion.CosmosStore, and provides some specific extensions such as updating snapshots.
      • proReactor generic template, supporting multiple sources and multiple processing modes
      • summaryConsumer consumes from the output of a proReactor --kafka, saving them in an Equinox.CosmosStore store
      • trackingConsumer consumes from Kafka, feeding into example Ingester logic in an Equinox.CosmosStore store
      • proSync is a fully fledged store <-> store synchronization tool syncing from a CosmosStoreSource or EventStoreSource to a CosmosStoreSink or EventStoreSink
      • feedConsumer,feedSource: illustrating usage of Propulsion.Feed.FeedSource
      • periodicIngester: illustrating usage of Propulsion.Feed.PeriodicSource
      • proArchiver, proPruner: illustrating usage of hot/cold support and support for secondary fallback in Equinox.CosmosStore
  • See the FsKafka repo for BatchedProducer and BatchedConsumer implementations (together with the KafkaConsumerConfig and KafkaProducerConfig used in the Parallel and Streams wrappers in Propulsion.Kafka)

Overview

The Equinox Perspective

Propulsion and Equinox have a Yin and yang relationship; their use cases naturally interlock and overlap.

See the Equinox Documentation's Overview Diagrams for the perspective from the other side (TL;DR the same topology, with elements that are de-emphasized here central over there, and vice versa)

C4 Context diagram

Equinox focuses on the Consistent Processing element of building an event-sourced decision processing system, offering relevant components that interact with a specific Consistent Event Store. Propulsion elements support the building of complementary facilities as part of an overall Application. Conceptually one can group such processing based on high level roles such as:

  • Ingesters: gather/consume data/events from outside the Bounded Context of the System. This role covers aspects such as feeding reference data into Read Models, ingesting changes into a consistent model via Consistent Processing. These services are not acting in reaction to events emanating from the system's Consistent Event Store, as opposed to...
  • Publishers: react to events as they are fed from the Consistent Event Store by filtering, rendering and emitting to feeds for downstream systems. While these services may in some cases rely on synchronous queries via Consistent Processing, they are not themselves transacting or driving follow-on work; which brings us to...
  • Reactors: drive reactive actions triggered based on upstream feeds, or events observed in the Consistent Event Store. These services handle anything beyond the duties of Ingesters or Publishers, and will often drive follow-on processing via Process Managers and/or transacting via Consistent Processing. In some cases, a reactor app's function may be to progressively compose a notification for a Publisher to eventually publish.

The overall territory is laid out here in this C4 System Context Diagram:

Propulsion c4model.com Context Diagram

See Overview section in DOCUMENTATION.md for further drill down

QuickStart

1. Use propulsion tool to run a CosmosDb ChangeFeedProcessor or DynamoStoreSource projector

dotnet tool uninstall Propulsion.Tool -g
dotnet tool install Propulsion.Tool -g --prerelease

propulsion init -ru 400 cosmos # generates a -aux container for the ChangeFeedProcessor to maintain consumer group progress within
# -V for verbose ChangeFeedProcessor logging
# `-g projector1` represents the consumer group - >=1 are allowed, allowing multiple independent projections to run concurrently
# stats specifies one only wants stats regarding items (other options include `kafka` to project to Kafka)
# cosmos specifies source overrides (using defaults in step 1 in this instance)
propulsion -V sync -g projector1 stats from cosmos

# load events with 2 parallel readers, detailed store logging and a read timeout of 20s
propulsion -VS sync -g projector1 stats from dynamo -rt 20 -d 2

2. Use propulsion tool to Run a CosmosDb ChangeFeedProcessor or DynamoStoreSource projector, emitting to a Kafka topic

$env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b

# `-V` for verbose logging
# `-g projector3` represents the consumer group; >=1 are allowed, allowing multiple independent projections to run concurrently
# `-l 5` to report ChangeFeed lags every 5 minutes
# `kafka` specifies one wants to emit to Kafka
# `temp-topic` is the topic to emit to
# `cosmos` specifies source overrides (using defaults in step 1 in this instance)
propulsion -V sync -g projector3 -l 5 kafka temp-topic from cosmos

3. Use propulsion tool to inspect DynamoStore Index

Summarize current state of the index being prepared by Propulsion.DynamoStore.Indexer

propulsion index dynamo -t equinox-test

Example output:

19:15:50 I Current Partitions / Active Epochs [[0, 354], [2, 15], [3, 13], [4, 13], [5, 13], [6, 64], [7, 53], [8, 53], [9, 60]]  
19:15:50 I Inspect Index Partitions list events ๐Ÿ‘‰ eqx -C dump '$AppendsIndex-0' dynamo -t equinox-test-index  
19:15:50 I Inspect Batches in Epoch 2 of Index Partition 0 ๐Ÿ‘‰ eqx -C dump '$AppendsEpoch-0_2' -B dynamo -t equinox-test-index

4. Use propulsion tool to validate DynamoStoreSource Index

Validate Propulsion.DynamoStore.Indexer has not missed any events (normally you guarantee this by having alerting on Lambda failures)

propulsion index -p 0 dynamo -t equinox-test

5. Use propulsion tool to reindex and/or add missing notifications

In addition to being able to validate the index (see preceding step), the tool facilitates ingestion of missing events from a complete DynamoDB JSON Export. Steps are as follows:

  1. Enable Point in Time Restores in DynamoDB

  2. Export data to S3, download and extract JSON from .json.gz files

  3. Run ingestion job

    propulsion index -t 0 $HOME/Downloads/DynamoDbS3Export/*.json dynamo -t equinox-test
    

CONTRIBUTING

See CONTRIBUTING.md

TEMPLATES

The best place to start, sample-wise is with the the Equinox QuickStart, which walks you through sample code, tuned for approachability, from dotnet new templates stored in a dedicated repo.

BUILDING

Please note the QuickStart is probably the best way to gain an overview, and the templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.

NB The Propulsion.Kafka.Integration tests are reliant on a TEST_KAFKA_BROKER environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run blindly writes to a guid-named topic and trusts the broker will accept the write without any initialization step)

build, including tests

dotnet build build.proj -v n

FAQ

Why do you employ Kafka as an additional layer, when downstream processes could simply subscribe directly and individually to the relevant CosmosDB change feed(s)? Is it to accommodate other messages besides those emitted from events and snapshot updates? ๐Ÿ™ @Roland Andrag

Well, Kafka is definitely not a critical component or a panacea.

You're correct that the bulk of things that can be achieved using Kafka can be accomplished via usage of the ChangeFeed. One thing to point out is that in the context of enterprise systems, having a well maintained Kafka cluster does have less incremental (or total) cost than it might do if you're building a smaller system from nothing.

Some negatives of consuming from the ChangeFeed directly:

  • each CFP reader induces RU consumption (its a set of continuous queries against each and every physical partition of which the Cosmos Container is composed)
  • you can't apply a server-side filter, so you pay to see the full content of any document that's touched
  • there's an elevated risk of implementation shortcuts that couple the reaction logic to low level specifics of the store or the data structures
  • (as you alluded to), if there's some logic or work involved in the production of events you'd emit to Kafka, each consumer would need to duplicate that

Many of these concerns can be alleviated to varying degrees by splitting the storage up into multiple Containers (potentially using database level RU allocations) such that each consumer will intrinsically be interested in a large proportion of the data it will observe, the write amplification effects of having multiple consumers will always be more significant when reading directly than when having a single reader emit to Kafka. The design of Kafka is specifically geared to running lots of concurrent readers.

However, splitting event categories into Containers solely to optimize these effects can also make the management of the transactional workload more complex; the ideal for any given Container is thus to balance the concerns of:

  • ensuring that datasets for which you want to ringfence availability / RU allocations don't share with containers/databases for which running hot (potentially significant levels of rate limiting but overall high throughput in aggregate as a result of maximizing the percentage of the allocated capacity that's being used over time)
  • avoiding prematurely splitting data prior to it being required by the constraints of CosmosDB (i.e. you want to let splitting primarily be driven by reaching the [10GB] physical partition size limit)
  • not having logical partition hotspots that lead to a small subset of physical partitions having significantly above average RU consumption
  • having relatively consistent document sizes
  • economies of scale - if each container (or database if you provision at that level) needs to be individually managed (with a degree of headroom to ensure availability for load spikes etc), you'll tend to require higher aggregate RU assignment for a given overall workload based on a topology that has more containers

Any tips for testing Propulsion (projection) in an integration/end-to-end fashion? ๐Ÿ™ @James Booth

I know for unit testing, I can just test the obvious parts. Or if end to end testing is even required

Depends what you want to achieve. One important technique for doing end-to-end scenarios, especially where some reaction is supposed to feed back into Equinox is to use Equinox.MemoryStore as the store, and then wire the Propulsion Sink (that will be fed from your real store when deployed in a production scenario) consume from that using Propulsion.MemoryStore.MemoryStoreProjector.

Other techniques I've seen/heard are:

  • rig things to use ephemeral ESDB or Cosmos databases (the CosmosDB emulator works but has restrictions; perhaps you can use serverless or database level RU allocated DBs in a shared environment) to run your system with an isolated throwaway storage with better performance, stability and/or cost properties for test purposes.
  • Once you have a store, the next question is how to validate your projector (Publisher / Reactor) apps. In some cases, people opt to spin up a large subset of the production system (maybe in docker-compose etc) and then check for externally visible effects in tests.
  • While it's important to do end-to-end tests with as much of the whole system as possible, that does tend to make for a messy test suite that quickly becomes unmaintainable. In general, the solution is to do smaller test scenarios that achieve that same goal by triangulating on subsets of the overall reactions as smaller scenarios. See the Shipping.Watchdog.Integration test suite in the equinox-shipping template for an example.

In general I'd be looking to use MemoryStoreProjector as a default technique, as it provides:

  • the best performance by far (all synchronous and in-memory, without any simulators)
  • a deterministic wait mechanism; after arranging a particular system state, you can pause until a reaction has been processed by using the projector's AwaitCompletion facility to efficiently wait for the exact moment at which the event has been handled by the reactor component without padded Sleep sequences (or, worse: retry loops).

To answer more completely, I'd say given a scenario involving Propulsion and Equinox, you'll typically have the following ingredients:

  1. writing to the store - you can either assume that's well-tested infra or take the view that you need to validate that you wired it up properly

  2. serialization/deserialization - you can either have unit tests and/or property tests to validate round-tripping as an orthogonal concern, or you can take the view that it's critical to know it really works with real data

  3. reading from the store's change feed and propagating to handler - that's harder to config and has the biggest variability in a test scenario so either:

    • you want to take it out of the equation
    • OR you want to know its wired properly
  4. validating that triggered reactions are handled and complete cleanly - yes you can and should unit test that, but maybe you want to know it works end-to-end with a much larger proportion of the overall system in play

  5. does it trigger follow-on work, i.e. a cascade of reactions. You can either do triangulation and say its proven if I observe the trigger for the next bit, or you may want to prove that end to end

  6. does the entire system as a whole really work - sometimes you want to be able to validate workflows rather than having to pay the complexity tax of going in the front door for every aspect (though you'll typically want to have a meaningful set of smoke tests that validate basic system integrity without requiring manual testing or back-door interfaces)

Any reason you didnโ€™t use one of the different subscription models available in ESDB? ๐Ÿ™ @James Booth

TL;DR Differing goals

While the implementation and patterns in Propulsion happen to overlap to a degree with the use cases of the ESDB's subscription mechanisms, the main reason they are not used directly stems from the needs and constraints that Propulsion was evolved to cover.

One thing that should be clear is that Propulsion is definitely not attempting to be the simplest conceivable projection library with a low concept count that's easy to get started with. If you were looking to build such a library, you'll likely give yourself some important guiding non-goals to enable that, e.g., if you had to add 3 concepts to get a 50% improvement in throughput, whether or not that's worth it depends on the context - if you're trying to have a low concept count, you might be prepared to leave some performance on the table to enable that.

For Propulsion, almost literally, job one was to be able to shift 1TB of ordered events in streams to/from ESDB/Cosmos/Kafka in well under 24h - a naive implementation reading and writing in small batches takes more like 24d to do the same thing. A key secondary goal was to be able to keep them in sync continually after that point (it's definitely more than a one time bulk ingestion system).

While Propulsion scales down to running simple subscriptions, its got quite a few additional concepts compared to using something built literally for that exact job; the general case of arbitrary projections was almost literally an afterthought.

That's not to say that Propulsion's concepts make for a more complex system when all is said and done; there are lots of scenarios where you avoid having to do concurrent/async tricks one might otherwise do more explicitly in a more simplistic subscription system.

When looking at the vast majority of typical projections/reactions/denormalizers one runs in an event-sourced system it should come as no surprise that EventStoreDB's subscription features offer plenty ways of achieving those common goals with a good balance of:

  • time to implement
  • ease of operation
  • good enough performance

That's literally the company's goal: enabling rapidly building systems to solve business problems, without overfitting to any specific industry or application's needs.

The potential upsides that Propulsion can offer when used as a projection system can definitely be valuable when actually needed, but on average, they can equally be overkill for a given specific requirement.

With that context set, here are some notable aspects of using Propulsion for Projectors rather than building bespoke wiring on a case by case basis:

  • similar APIs designs and concepts regardless of whether events arrive via CosmosDB, DynamoDB, EventStoreDB, MessageDB, SqlStreamStore, Kafka (or custom sources via Propulsion.Feed)
  • consistent dashboards across all those sources
  • generally excellent performance for high throughput scenarios (it was built for that)
  • good handling for processing of workloads that don't have uniform (and low) cost per handler invocation, i.e., rate-limited writes of events to Equinox.CosmosStore or Equinox.DynamoStore (compared to e.g. using a store such as Redis)
  • orthogonality to Equinox features while still offering a degree of commonality of concepts and terminology
  • provide a degree of isolation from the low level drivers, e.g.
    • moving from the deprecated Cosmos CFP V2 to any future Azure.Cosmos V4 SDK will be a matter of changing package references and fixing some minimal compilation errors, as opposed to learning a whole new API set
    • moving from EventStore's TCP API / EventStore.ClientAPI to the gRPC based >= v20 clients is a simple package switch
    • migrating a workload from EventStoreDB to or from CosmosDB or DynamoDN can be accomplished more cleanly if you're only changing the wiring of your projector host while making no changes to the handler implementations or the bulk of the reactor applicatino
    • SqlStreamStore and MessageDB also fit into the overall picture; using Propulsion can gives a cleaner mix and match / onramp to/from ESDB (Note however that migrating SSS <-> ESDB is a relatively trivial operation vs migrating from raw EventStore usage to Equinox.CosmosStore or Equinox.DynamoStore, i.e. "we're using Propulsion to isolate us from deciding between SSS or ESDB" may not be a good enough reason on its own)
  • Specifically when consuming from CosmosDB, being able to do that over a longer wire by feeding to Kafka to limit RU consumption from projections is a relatively minor change.

A Brief History of Propulsion's feature set

The order in which the need for various components arose (as a side effect of building out Equinox; solving specific needs in terms of feeding events into and out of EventStoreDB, CosmosDB and Kafka) was also an influence on the abstractions within and general facilities of Propulsion.

  • Propulsion.Cosmos's Source was the first bit done; it's a light wrapper over the CFP V2 client. Key implications from that are:

    • order of events in a logical partition can and should be maintained
    • global ordering of events across all logical streams is not achievable due to how CosmosDB works (the only ordering guarantees are at logical partition level, the data can physically split at any time as data grows)
  • Propulsion.Kafka's Sink was next; the central goal here is to be able to replicate events being read from CosmosDB onto a Kafka Topic maintaining the ordering guarantees. There are two high level ways of achieving ordering guarantees in Kafka:

    1. only ever have a single event in flight; only when you've got the ack for a write do you send the next one. However, literally doing that compromises throughput massively.
    2. use Kafka's transaction facilities (not implemented in Confluent.Kafka at the time)

    => The approach used is to continuously emit messages concurrently in order to maintain throughput, but guarantee to never emit messages for the same key at the same time.

  • Propulsion.Cosmos's Sink was next up. It writes to CosmosDB using Equinox.Cosmos. Key implications:

    • because rate-limiting is at the physical partition level, it's crucial for throughput that you keep other partitions busy while wait/retry loops are triggered by hotspots (and you absolutely don't want to exacerbate this effect by competing with yourself)

    • you want to batch the writing of multiple events/documents to minimize round-trips (write RU costs are effectively O(log N) despite high level guidance characterizing it as O(N)))

    • you can only touch one logical partition for any given write (CosmosDB does not expose transactions across logical partitions)

    • when you hit a hotspot and need to retry, ideally you'd pack events queued as the last attempt was being executed into the retry attempt

    • there is no one-size fits all batch size (yet) that balances

      1. not overloading the source and
      2. maintaining throughput

      You'll often need a small batch size, which implies larger per-event checkpointing overhead unless you make the checkpointing asynchronous

      => The implementation thus:

      • manages reading asynchronously from the writing in order to maintain throughput (you define a batch size and a number of batches to read ahead)
      • schedules write attempts at stream level (the reader concurrently ingests successor events, making all buffered events available when retrying)
      • writes checkpoints asynchronously as and when all the items involved complete within the (stream-level) processing
  • At the point where Propulsion.EventStore's Source and Sink were being implemented (within weeks of the CosmosStore equivalents; largely overlapping), the implications from realizing goals of providing good throughput while avoiding adding new concepts if that can be avoided are:

    • The cheapest (lowest impact in terms of triggering scattered reads across disks on an ESDB server, with associated latency implications) and most general API set for reading events is to read the $all stream
    • Maintaining checkpoints in an EventStoreDB that you're also monitoring is prone to feedback effects (so using the Async checkpointing strategy used for .CosmosStore but saving them in an external store such as an Equinox.CosmosStore makes sense)
    • If handlers and/or sinks don't have uniform processing time per message and/or are subject to rate limiting, most of the constraints of the CosmosSStoreink apply too; you don't want to sit around retrying the last request out of a batch of 100 while tens of thousands of provisioned RUs are sitting idle in Cosmos with throughput sitting close to zero

Conclusion/comparison checklist

The things Propulsion in general accomplishes in the projections space:

  • Uniform dashboards for throughput, successes vs failures, and latency distributions over CosmosDB, DynamoDB, EventStoreDB, MessageDb, SqlStreamStore, Kafka (and custom application-specific Feeds via Propulsion.Feed)
  • Source-neutral metrics to support trustworthy alerting and detailed analysis of busy, failing and stuck projections
  • reading, checkpointing, parsing and running handlers are all independent asynchronous activities
  • enables handlers to handle backlog of accumulated items for a stream as a batch if desired
  • maximize concurrency across streams
  • strong intrinsic support for handling idempotent processing in the face of at least once delivery and/or catchup scenarios
  • CosmosStoreSource provides for automatic load balancing over multiple instances of a Reactor application akin to how Kafka Clients manage that (as a light wrapper over the Cosmos SDK's ChangeFeedProcessor lease management system without any custom semantics beyond the proven scheme the SDK implements)
  • provide good instrumentation as to latency, errors, throughput in a pluggable way akin to how Equinox does stuff (e.g. it has built-in Prometheus support)
  • handlers/reactors/projections can be ported from Propulsion.Cosmos to Propulsion.CosmosStore3 and Propulsion.CosmosStore by swapping driver modules; similar to how Equinox.Cosmos vs Equinox.EventStore provides a common programming model despite the underpinnings being fundamentally quite different in nature
  • good stories for isolating from specific drivers - i.e., there's a Propulsion.CosmosStore (for the V3 SDK) with close-to-identical interfaces (Similarly there's a Propulsion.EventStoreDb using the gRPC-based SDKs, replacing the deprecated Propulsion.EventStore)
  • Kafka reading and writing generally fits within the same patterns - i.e. if you want to push CosmosDb CFP output to Kafka and consume over that as a 'longer wire' without placing extra load on the source if you have 50 consumers, you can stand up a ~250 line dotnet new proProjector app, and tweak the ~30 lines of consumer app wireup to connect to Kafka instead of CosmosDB

Things EventStoreDB's subscriptions can do that are not presently covered in Propulsion:

  • $et-, $ec- streams
  • honoring the full $all order
  • and more; EventStoreDB is a well designed purpose-built solution catering for diverse system sizes and industries

What's the deal with the early history of this repo?

This repo is derived from FsKafka; the history has been edited to focus only on edits to the Propulsion libraries.

Your question here

  • Please feel free to log question-issues; they'll get answered here

FURTHER READING

See DOCUMENTATION.md and Equinox's DOCUMENTATION.md

propulsion's People

Contributors

adamralph avatar avsaditya19 avatar bartelink avatar benjstephenson avatar brihadish avatar cumpsd avatar dependabot[bot] avatar deviousasti avatar dharmaturtle avatar enricosada avatar epnickcoleman avatar erichgoldman avatar jorgef avatar nordfjord avatar purkhusid avatar scrwtp avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

propulsion's Issues

Provide for detection of Stuck Projectors in StreamsProjector

At present, if a handler continually fails to make progress on a given stream, the Scheduler will continually retry, resulting in:

  • running 'hot'; there are no backoffs and/or anything else to ameliorate the impact of things failing
  • no direct way to determine that such a state has been entered from a programmatic, alerting or dashboards perspective. At present, a number of secondary effects will hint at the problem:
    • lack of progress if observing the read and/or checkpoint positions on the source e.g. fo the CFP or Feed readers
    • increase in exception outcomes on dashboards
    • reduction in successful outcomes on dashboards

In order to be able to define a clear alertable condition, it is proposed to maintain, on a per-stream basis:

  • number of consecutive failures, timestamp of first failure
  • number of consecutive successes without progress, timestamp of first attempt

While the necessary data may be maintained at the stream level, its problematic to surface these either as:

  • a log record per invocation - Handlers can receive extremely high traffic and adding this overhead as a fixed cost is not likely to work well
  • metrics tagged/instanced at the stream level - this will lead to excessive cardinality as it's unbounded, while likely making querying more complex (though more metrics gives more scope for alerting, it does not ease the task of determining which ones are relevant to someone coming to a set of metrics fresh)

Metrics

  • timeFailing: now - oldest failing since (tags: app,category)
  • countFailing: number of streams whose last outcome was a failure (tags: app,category)
  • timeStalled: now - oldest stalled (tags: app,category)
  • countStalled: number of streams whose last handler invocation did not make progress (and has messages waiting) (tags: app,category)

๐Ÿค” - longestRunning: oldest dispatched call in flight that has yet to yield a success/fail
๐Ÿค” - countRunning: number of handler invocations in flight

Example Alerts

  • max timeFailing > 5m as a reasonable default for the average projector that is writing to a rate-limited store
  • max timeStalled > 2m for a watchdog that's responsible for cancelling and/or pumping workflows that have not reached a conclusion within 1m

๐Ÿค” - max timeRunning > 1h for a workflow engine processing step sanity check

Pseudocode Logic

When a success happens:

  • consecutive failures/failing since is cleared
  • (if progress is made) consecutive stalled is cleared

When a fail or happens:

  • consecutive failures/failing since is either initialized to (1,now) or incremented

When a success with lack of progress happens:

  • consecutive stalled is either initialized to (1,now) or incremented

When a dispatch or completion of a call happens:

  • update the longest running task start time metric
  • record the latency in the metrics immediately vs waiting to surface it every n ms?

๐Ÿค” while there should probably be a set of callbacks the projector provides that can be used to hook in metrics, but we also want the system to log summaries out of the box

Other ideas/questions

  • is being able to inject backoffs based on these metrics for a given specific stream important?
  • how/would one want to be able to internally exit/restart the projector host app based on the values ?
  • is there some more important intermittent failure pattern this will be useless for ?
  • I'm excluding higher level lag based metrics e.g. stuff https://github.com/linkedin/Burrow does

tagging @ameier38 @belcher-rok @deviousasti @dunnry @enricosada @ragiano215 @swrhim @wantastic84 who have been party to discussions in this space (and may be able to extend or, hopefully, simply the requirements above, or link to a better writeup or concept regarding this)

Provide guidance for throttling/batching request processing using AsyncBatchingGate

(Prompted by a discussion in DDD-CQRS-ES; dumping here for now as the perfect is the enemy of the good; hopefully I'll get around to expanding on this as either a FAQ entry or a section in DOCUMENTATION.md in due course)

I have an impl of that sort of thing in
https://github.com/jet/equinox/blob/master/src/Equinox.Core/AsyncBatchingGate.fs (there are tests and I think there is an example in one of the PRs in dotnet-templates)
This can work well to smooth the throughput of a set of requests happening within a given process:

  • when a set of work is in flight, wait for it to complete before dispatching the next one
  • the execution piece then runs the queued work (for instance if you accumulated requests for 20 streams, you might run them with a max DOP of 4 if thats what the DB turns out to work best at) while the others wait
  • there's an optional linger period too, but the concept works independent of that

The key thing is you're not adjusting the semantics of your code or putting lots of complexity into your infra

First obvious thing is to shard by anything you can logically to reduce concurrency in general (i.e. per tenant, user etc) to avoid conflicts between things transacting against it. However requests within that will still need to do an OCC dance (and/or if you are running SQL, a locks and/or deadlocks dance)
Using the batching gate, you can feed all those requests into a gate per top level group (tenant etc), i.e. a concurrentdictionary of gates
Then within the tenant, you get to

  • constrain how many requests you send down the chute concurrently (esp for rate limited things, but this is good mechanical sympathy no matter what)
  • aggregate incoming work/commands. For instance I had a "register item" command - that becomes a register items command, and if the set of incoming requests happens to be >1 then multiple share the same fate and/or retry
    (I know you said in the OP that the work in general is not conflicting in this way, but the throttling benefit is generic, and it may help you look at the problem from a different perspective)
    The outcome is slightly higher individual request latency on average, but the deviation/jitter is much less, especially when it matters most: under load

The important thing is that this is a small safe change that you can easily add logging/metrics around
(and the benefit holds regardless of what optimisations and/or shifts of data access tech you make underneath)

To make it concrete, my scenario was:

  • N x Propulsion StreamsProjectors running with 16 concurrent 'threads' consuming from ESDB $all which periodically meet things that need to be forwarded to the next phase
  • they write to Nx [Equinox.]CosmosDB streams
  • when >1 of the 16 hit the same output stream, x-1 of those need to retry (thankfully that's not another roundtrip to read state, but it is another write attempt)
  • its cosmos so rate limiting can also force backoffs and delays at peak load
    While you can ameliorate that by ramping up the RUs, the fundamental problem remains
  • efficiency drops dramatically under load
  • peak wait times for things that take double digit ms under minimal load go out to double digit s
    The fix was to:
  • keep everything the same, still individual requests flowing into the code managing the inclusion in the batch
  • add an AsyncBatchingGate
  • add linger 200ms (note this directly adds to the minimum processing time which is obviously something to think about; the big benefit is that it reduces the number of command executions, and resulting write roundtrips and ultimately the number of sets of events written).
  • make the command handler take a set of items to add instead of just 1
    Outcome: still runs happily in 400RU (=$20 pm with 2 regions), with wait times < 1s no matter what

Another benefit (kinda specific to Equinox and how it does caching) is that feeding work through a batching gate means that requests for a given stream within a process are serialized - that means each successive transaction against a given stream is guaranteed to see the closing state within your process.
Further this means you can load state in AllowStale mode (assume the cache is in-date and dont do an etag checked read to validate the state first as that is all-but-guaranteed to be a 304) - normally you would not see as much benefit from turning that on

Turns out, in the end, the representation of the outgoing batches is being changed to do more work up front per request (saving a https://verraes.net/2019/05/patterns-for-decoupling-distsys-summary-event/, not just registering the item in the output batch). The same batching gate over the top remains, more work gets done, and it still works in 400RU.
Meanwhile, a colleague spent N months creating a CosmosDB stored procedure to [not] achieve the same effect in a very similar case :smh:

The other observation I'd make is that in some cases high write concurrency with no end in sight that like this may also be the system trying to tell you to take stock and consider splitting the app (strangler application pattern) with an independent DB
(Absolutely not saying this is the case in this instance, but over thinking of low level approaches to fixing high level problems is something I've personally done on occasion ๐Ÿ˜‰)

Some extra C# overloads/defaults

Using the library from a C# project has been pretty straightforward, but there are a couple of Func -> FSharpFunc conversions that add a bit of noise. I think the main one is the handle parameter passed to StreamsProjector.Start

I can see the signature is
FSharpFunc<Tuple<string, StreamSpan<byte[]>>, FSharpAsync<Tuple<SpanResult, Outcome>>> handle

so to get the application code to play nice I have to do something like:

    public Func<Tuple<string, StreamSpan<byte[]>>, FSharpAsync<Tuple<SpanResult, Outcome>>> Handle =>
        args =>
            FSharpAsync.AwaitTask(
                Aggregate.Reactions.Parse(args.Item1, args.Item2)
                    .MapAsync<(AggregateId, Seq<AggregateEvent>), (SpanResult, Outcome)>(
                        async r =>
                        {
                           // here I have to deconstruct the r and args tuples, to avoid a lot of `r.Item1` etc. 
                        })
                    .IfNone((SpanResult.AllProcessed, new Outcome.NotApplicable(args.Item2.events.Length)))
                    .Map(r => r.ToTuple())
            );

I find the deconstructing the tuples and the tuple return type a bit cumbersome in C#, I don't mind the FSharpAsync.AwaitTask so much but if there was an overload that accepts something like
Func<string, StreamSpan<byte[]>, Task<(SpanResult, Outcome)>> handle
I think the code would be a lot clearer? Happy to have a go at a PR (and get some F# under my belt) if you think it would be easy enough for a beginner!

Both StreamsProjector.Start and CosmosStoreSource.Start have some optional parameters at the end of their argument list. I can see that they are annotated with [OptionalArgument] from F# but my compile (using dotnet 6) still complains that the argument list is too short if I omit them. Am I missing some interop setting that would respect the annotation?

Using the equinox libs from C# has been much easier than I thought it would be, likely due to how well thought out and documented everything is, so thanks for everyones hard work on these excellent tools :)

Support EventStore Projection

For EventStore storage specifically, it looks like this library is designed so that EventStore is only used for storage of events and that subsequent communicating that events occured is offloaded to Kafka. The EventStore product does support messaging via PersistentSubscription and it would be nice for smaller projects to be able to use just EventStore and not have to spin up a Kafka cluster if that kind of scale is not needed.

Specifically I'd love an API where you can subscribe either to a specific aggregate user-1234 or a projection user-* and have the library handle things like checkpointing (storing the last read offset for a client in EventStore itself) similar to how PersistentSubscriptions work in the EventStore API today.

Add a Propulsion.Tool with `initAux` and/or `project` facilities formerly managed by `eqx` tool

Equinox.Tool presently provides an initAux facility which correctly initializes a ChangeFeedProcessor 'aux' collection correctly. The need for such a tool is not Equinox specific, and should instead be considered a generic need associated with the CFP and/or Propulsion.Cosmos and/or Propulsion.Cosmos2 package in general.

The solution is thus to move Propulsion-related aspects of the tool into a Propulsion.Tool. It may also be useful to reinstate the former eqx project facilities (projecting from Cosmos to either generate stats or feed to kafka) here too (see jet/equinox#138)

  • add a tool
  • add initAux init
  • add project ... stats
  • add project ... kafka
  • move docs from Equinox's README.md and DOCUMENTATION.md
  • target Equinox 2.0.0-rc2

Propulsion.EventStore.Reader vs EventStore's subscriptions

EventStore has subscriptions. My belief is that this is achieved without polling.

Propulsion.EventStore.Reader polls.

Imagine there's a (not necessarily Propulsion) Reactor that's updating a read model. My understanding is that it would be better to use EventStore's native subscriptions instead of Propulsion.EventStore.Reader as it would be faster and use less compute. Are there reasons to choose Propulsion.EventStore.Reader other than "stats"?

Note: A comment on EventStoreReader.exec states "can have parallel invocations". However, EventStore's persistent subscriptions allows for competing consumers, which I think is more or less the same thing.

Change Feed Retry semantics

Hello

Is there any documentation or can you explain how retries are managed via cosmos change feed? iโ€™m just trying to get a better understanding for the infrastructure pieces.

thanks!

V3 checklist

  • Cosmos: continues to target Equinox.Cosmos >= 2.6.0, so can not be used side by side with Propulsion.* >=3.0.0. Probably remove in 3.x
  • CosmosStore: target non-preview CFP logic (I believe that will be in Microsoft.Azure.Cosmos 3.21.0 see PR)
  • rename maxDocuments to maxItems for CosmosStore
  • Add AwaitWithStopOnCancellation
  • Kafka: target FsKafka/CK ~1.7 jet/FsKafka#48
  • Kafka0: remove see jet/FsKafka#48
  • remove net461 multitargeting
  • EventStore: update to target Equinox.* 3.0.0. Probably remove in 3.x
  • SqlStreamStore: update to target Equinox.* 3.0.0 (currently does not bind to Equinox)
  • CosmosStore3: target Equinox.CosmosStore 3.x
  • CosmosStore: target Equinox.CosmosStore >= 4
  • EventStoreDb: target Equinox.EventStoreDb >= 4
  • remove dependency on .Destructure.FSharpTypes by replacing Option with Nullable and list with Array
  • Replace all tuples with ValueTuple and other cleanup (see #169)
  • Propulsion: push StreamSpan and/or an equivalent down into FsCodec (or perhaps use tuples for signatures)
  • Propulsion, CosmosStore, Cosmos: separate out .Prometheus packages
  • Propulsion: Consider removing Parallel stuff and/or shifting it out to a specific package as
    a) it has no metrics
    b) it has low usage
    c) while its arguably an on-ramp to understanding the streams impl, it also increases the concept count
  • Improve serverless support and resource consumption by:
    • replacing combinations of SemaphoreSlim + ConcurrentQueue + Async Write APIs with usage of System.Threading.Channels
    • provide a Lambda-hosted DynamoStore projection loop mode which (subject to a timeout and linger period), reads from the index, runs the projections (and/or cancels them if if the timeout has been reached), checkpoints until one of:
      • timed out
      • index tail reached and linger period expired
    • provide an Azure Functions mode where
  • Kafka: Separate Sink from Consumer/Source in the StreamsProjector in manner equivalent to how it works for other sources in order that Kafka does not need to be special-cased wrt SourceArgs in the dotnet-templates proReactor template
  • EventStore: Merge the striped reader support into EventStoreDb and remove

Record and log sleep times

The Streams Scheduler Cpu log line emits a coarse record of all time usage.

It should also record and emit the time the Scheduler spends doing Thread.Sleep.

Probably also applicable to the Parallel Scheduler. Deprecating, see #116

NOTE Sleeps may change a bit when it moves to Channels, see #116

feat(Cosmos): Implement Monitor/RunUntilCaughtUp

In order to be able to wait for reactions to complete
In order to be able to stop runs of tools when the associated checkpoint has caught up
As a dev, I want to be able to do a RunUntilCaughtUp and/or Monitor.AwaitCompletion as I can for the other stores


Cosmos doesn't yet implement a Monitor for various reasons:

  1. there's no obvious way to infer from a given batch of change feed docs whether they represent the tail
  2. the Change Feed Lag estimator runs async to the read process
  3. the Change Feed Lag estimator is reliant on the position update having been persisted
  4. there is no active reader process that we control i.e. the reader/observer callback is not triggered if there are no documents to process

Ideas

  • we calculate an age from the timestamp; perhaps that can be a useful input?
  • perhaps have a way to trigger a) checkpointing b) estimation immediately after a fresh batch has been declared completed?
  • as we are on V3 and there is a pull model, we could implement a bespoke read process that knows when it's finished
  • inspect and/or re-implement some of the estimator logic to not be reliant on being able to retrieve the checkpointed position from the aux container

One consideration is there's different SLA expectations depending on the use case, e.g.

  • if we are waiting for a snapshot updater or projections to complete a) the work can be distributed b) it's long-running, and seconds or milliseconds are less relevant
  • for test scenarios, the latency benefit of being able to avoid a roundtrip to the estimator is far more significant

DynamoStore: Indexer should not index any 'system' streams

The DynamoStore Indexer takes care not to index its own streams, should they be presented in the DDB Streams input (by explicitly excluding those categories from the index by name). This is typically a moot matter, as the convention is to use a separated Table for indexes (for many reasons).

ReaderCheckpoint streams are typically configured to only emit one event per hour (a rollingstate update that only touches the unfolds will be excluded because the numebr of events will be 0 at

elif appendedLen = 0 then noEvents <- noEvents + 1
), and in general are kept alongside derived data (which may not have an Indexer running, as opposed to primary data which likely would)

Regardless, it would make sense for the Indexer by default to exclude any events whose stream name StartsWith '$' rather than the present two hardcoded names in order to avoid this (and provide a hook for app-level 'private' tables to be excluded via this same convention)

ASIDE 1: It may make sense to also provide a feature where you can (optionally) supply a whitelist of Categories to be Indexed. Going further down that road, one might imagine further providing the ability to separate the Index Tables by category. Any such things need to weighed carefully though; the more options the more things to think about and document)

ASIDE 2: If one did provide a feature to exclude some categories from indexing, one would need to manually run a S3 export and then use the propulsion index facility to re-index if you ever needed to walk all the events and/or streams for any reason).

EventStoreSource: Fix conflict between read ahead limit and stripes count

The StripedIngester read-ahead control semaphore controls the total number of batches that can be in-flight.

This can conflict with the -g 'gorge' parameter; if you have -g 10 then the 9 reading ahead threads can use up the allocation and cause the main reader (which those batches are effectively waiting on) to be denied, causing deadlock.

Solutions might be to make the limit independent of the read-ahead (we still need to be able to define a sensible read-ahead limit when tailing) and/or to separate out a "mainline read-ahead limit" The most practical solution would seem to be to move the maxRead to series-level and use that to constrain reading ahead at per-series level rather than globally - this effectively makes the limit be per stripe, which is correct e.g. if we're waiting on the completion of a stripe that's blocking progress, we want to constrain reading of tail, regardless of the throughput being attained

Add Streamwise Catchup mode

equinox-sync and Propulsion.EventStore originally had a mode in which, when syncing from EventStore to Cosmos, one could:

  1. start projecting from the tail without first doing a full sync operation
  2. request N stream-readers (defaults to 1)
  3. have each event observed on a stream that's not currently in sync request a ReadStreamEventsForwardAsync-based catchup read and project in order to enable the event to be written to the target correctly.

Large amounts of the code are in place for this with TODO markers etc; work has been deferred as its not anticipated that we'll use this mode (reading streamwise is extremely resource-intensive on an EventStore server relative to reading the $all stream, and the feature is only relevant where the wall clock time involved in an initial sync would render it prohibitive to do so)

Document consumption modes

When consuming via Kafka, the high level modes include:

Representation Description Implemented In Notes
Serial Messages Messages are individual events tagged with a key. No parallelism is or can be employed. Some current usage uses this pattern but should be deprecated
Batched Consume, processing batches for a given partition together. Checkpoint when batch completed Jet.ConfluentKafka.FSharp
Stream Event Spans Messages are spans of events from a specified Stream Name (also the Key), with a specified index Propulsion.Kafka Implemented in StreamedConsumer and proProjector + proConsumer template. Allows production in parallel without max.in.flight=1. Consumer deduplicates
Stream Summary Events Messages contain a rendition of a summary as at a particular version. Version is monotonic and consumer is free to / should only use the most recent one TODO build a dotnet-templates example and provide default wiring in StreamedConsumer. Consumer can deduplicate by taking highest version (probably tracking last-seen per stream to deduplicate). Ref http://verraes.net/2019/05/patterns-for-decoupling-distsys-summary-event/
Ordered Summaries Messages represent state at a point in time. The topic's ordering within a key defines the 'version'. Consumer needs to group by key, dropping all but the newest. There's no way to de-duplicate double sends. Stream Summary Events are preferred, but if the producer can guarantee the ordering when producing, it's a perfectly reasonable representation.

Surely there's a glossary for this sort of thing somewhere? please comment!

Cosmos: Guidance, examples and/or provide pit of success regarding reading your writes in handlers

A CFP consumer based on dotnet new proReactor currently has the following key ingredients:

  • the Propulsion Cosmos Source uses the V2 CFP lib to read from the ChangeFeed (internally using a V2 DocumentClient). Each set of changes received is a set of documents added/updated since the last poll (based on it holding the continuation token and supplying it per poll)
  • these are parsed by the ingester (which in this case is in CFP parlance a Change Feed Observer) into event-messages bearing an Index and passed to a Scheduler
  • the scheduler passes a contiguous span of events for a given stream to a handler function. Because handlers get re-invoked if there are transient exceptions (and reading is async and independent of the processing), its frequently the case that a handler will be supplied a sequence of buffered events (even outside of catchup scenarios)
  • the handler then gets to decide what to do and convey what events in the span have been processed back to the scheduler. The result is emits drives deduplication (ingester dropping events that have already been confirmed as processed so they don't even need to be passed to the scheduler) and/or marking progress (checkpointing) on the source via a SpanResult

Reading your writes

A frequent need/desire in the context of a handler that processes events by in some form going back to the source to e.g. read the overall state is to be able to read your writes.

In a typical subscription scenario using ESDB, some ways to guarantee reading will include the event that prompted the handler invocation are:

  • subscribe (cross-streams, the $all stream) from either Leader or Follower node
  • do all stream reads from the Leader node so it will always be as new or newer than the observed event

While there are equivalent strong consistency models in CosmosDB that could theoretically be utilized to achieve such a topology, those come with severe penalties in the context of a georedundant system and hence in practice are not used. Equinox in particular is designed to function with only the guarantees afforded by Session Consistency mode. The rest of this discussion will not consider any other models.

More details/background/related issues:

Approaches when using Equinox.Cosmos with Propulsion.Cosmos

Session consistency in Cosmos is based on a session token - the session token can be passed from context to context and is used to wait for writes to percolate so you can read your writes [that you performed elsewhere in your notional session]. In the normal case, the CosmosClient tracks the session token coming back from responses (esp writes, but also reads) and makes sure to take the newest every time so subsequent requests will observe consistent state.

The issue with an out-of-the-box dotnet new proReactor is that the CosmosClient used to receive the query response from the ChangeFeed is not the same one that's being used in the context of the handler to then re-read the state from the origin logical partition. This is how you end up in a situation of not seeing the write the CFP clearly showed to have occurred.

Using the normal building blocks/idioms that the Cosmos SDK and CosmosDB offers to operate with the Session Consistency mode, the valid solutions are one of:

  1. obtain the session token from the CFP read and propagate it to the handler so it can apply it on its CosmosClient (not sure if CFP libs even expose it; for V3 I'm sure they'll take a PR if needed, after the blocker preventing our moving to V3 is resolved)
  2. share the session via sharing the same DocumentClient for both the read and the CFP read (see the cited Equinox issue and PRs)

Workaround: having the handler logic recognize the prompting event is not reflected in what it reads and trigger a retry

One approach that one might take is to say "Look, lets solve this without infrastructural support". The Simplest Thing That Could Possibly Work is to:

  1. re-read the aggregate
  2. if it does not see the anticipated state, throw an exception so that the handler does not mark the prompting event as processed (triggering a retry, which will eventually observe the required synced state)

The problem with such an approach is that you can end up in all sorts of scenarios where you have to do one of:

  1. ensure there is only one base state for a given handler and reject loaded/folded state that's clearly incomplete relative to that
  2. (if you don't have exactly one possible valid trigger state that you look for), risk seeing an event thats part of state X+1 but loading state X and then saying "event X+1 is handled"

Specifically wrt problem 2, the issue is that required reactions can potentially end up being skipped. While you might say "OK well then don't do that", but IMO its often something that a dev is not going to realise while writing it

Recommended Solutions

There are 2 clean and correct ways to resolve those forces in my book:

  1. use the Consistency Model: use Session Tokens to provide a read your writes guarantee (or, in this instance, "the state of the stream, guaranteed to include the prompting event that's demonstrably been read")

  2. use Propulsion's SpanResult.OverrideWritePosition: Have the handler grab the Index of the stream as it reads and return StreamResult.OverrideWritePosition (res.Index+1)

    In other words, if event 5 triggers an action and the processing sees only up to 4, the handler says "I know the next work needs to start from Index 5", as opposed to "all done" (SpanResult.AllProcessed), which means "5 is done, don't call me until you have event 6".

    This also covers the case where handling event 5 sees [and handles] event 6 as part of the same work, yields OverrideWritePosition 7, which causes event 6 to be dropped either
    a) immediately if the CFP reader has read it while the handler was running (frequently what happens) or
    b) when the batch containing that event arrives from the CFP feed

    In some cases, this technique can provide huge benefits when dealing with catchup scenarios, i.e. the minute event 0 is observed, the handler says :"done, we are at 2000 now", and the next 1999 events get dropped without even invoking the handler.

    I'd venture that this is the most generally applicable technique in Propulsion in general; this strategy is employed by all handlers in https://github.com/jet/dotnet-templates

Isolate/fix 10 concurrent consumers not hang on AzDO

#25 was replaces in #29 with a reduction in consumer instances from 10 to 1 in order to reduce the strong likelihood of hangs.
Figuring out the exact nature of the problem is a high priority. We do have some positives in that these hangs do not replicate in Rider or as a commandline build on MacOs

feat(Scheduler): Abend in RequireAll mode where starved

In RequireAll mode, the Scheduler will sit patiently waiting forever if all the buffered streams in the baches queue are waiting or disordered events

When the MaxReadAhead is drained, there are no active handlers, and there is nothing to dispatch, the system should abend rather than just sitting there.

Starting DynamoStoreSource with no events does not read

I created a reactor using DynamoStoreSource but if there are no events then it does not start any readers.

    override _.ExecuteAsync(cancellationToken:CancellationToken) = task {
        Log.Information("[Reactor] starting reactor...")
        let checkpointFrequency = System.TimeSpan.FromMinutes 1.
        let consumerGroupName = "Reactor"
        let cache = Equinox.Cache ("Reactor", sizeMb = 10)
        let stats = Stats(Log.Logger, System.TimeSpan.FromMinutes 1., System.TimeSpan.FromMinutes 5.)
        let sink = StreamsProjector.Start(
            log=Log.Logger,
            maxReadAhead=10,
            maxConcurrentStreams=1,
            handle=handle,
            stats=stats,
            statsInterval=System.TimeSpan.FromMinutes 1.)
        let checkpoints =
            Propulsion.Feed.ReaderCheckpoint.DynamoStore.create
                Log.Logger
                (consumerGroupName, checkpointFrequency)
                (dynamodb.IndexContext, cache)
        let source = DynamoStoreSource(
            log=Log.Logger,
            statsInterval = System.TimeSpan.FromMinutes 1.,
            indexClient = dynamodb.IndexContext.StoreClient,
            batchSizeCutoff = 100,
            tailSleepInterval = System.TimeSpan.FromSeconds 5,
            checkpoints = checkpoints,
            sink = sink,
            loadMode = LoadMode.All).Start()
        let work =
            Async.Parallel
                [ source.AwaitShutdown()
                  sink.AwaitShutdown() ]
            |> Async.Ignore<unit array>
        do! Async.StartAsTask(work, cancellationToken=cancellationToken)
    }

Logs:

[05:13:38 INF] EqxDynamo Tip 404 $AppendsIndex-0 2049.8ms 0.5RU
[05:13:38 INF] Starting 0 tranche readers...

If I create at least one event and then restart then it works.

[05:21:34 INF] EqxDynamo Tip 200 $AppendsIndex-0 v1 1979.1ms 0.5RU 1e 1u 197+79b
[05:21:34 INF] Starting 1 tranche readers...

Add comparison docs for FsKafka vs Propulsion.Kafka

Transcript of a FAQ:

hey, is there any guidance on when it makes sense to use propulsion and when it makes sense to use fskafka?

FsKafka does:

a) basic logging of errors, support for periodically emitting lag stats per the broker
b) handling in Batches (BatchedConsumer.Start) - you provide a body that is not allowed to throw and takes care of any desired parallism
c) handing in Batches, grouped by Key - BatchedConsumer.StartByKey https://github.com/jet/FsKafka/blob/master/src/FsKafka/ConfluentKafka.fs#L484 - it takes care of max parallelism across the groups, but if you throw, the service stops
d) no throughput stats
e) producers

Propulsion does:

a) handlers like b/c above but with

  • more detailed logging: latencies, successes, fails
  • ability to emit outcomes from handlers and compose summary stats from them cleanly
  • keeps a list of tasks and manages retries
  • makes reading completely async from handling (which is not a big win for Kafka but is for other things)

b) BatchedConsumer: ability to hold state of multiple streams for complex stateful consumers
c) mix and match / swap ES vs CFP vs Kafka consumers with same semantics

Propulsion.Kafka

a) depends on FsKafka
b) adapt Confluent.Kafka/FsKafka to ingest into Propulsion (using CK 1.x)

Reasons not to use FsKafka

Things FsKafka currently has gaps on relative to Propulsion/Propulsion.Kafka aside from scheduling capabilities / retry management:

  • logging stats about batches wrt consuming or producing

Reasons to use Propulsion.Kafka

  • One clear case for using Propulsion over FsKafka is when you are doing lots of parallel work that can fail or be rate limited and/or otherwise necessitate retries while maintaining good throughput - i.e. writing to Cosmos
  • providing a place to hook in whitelisting/blacklisting/poison message management in a generic way

Rules of thumb

  • If it can be done with FsKafka, its a damn good place to start in general - less Things. ... But if you take that too far, you're writing loads of untested snowflake infrastructure code against specific versions of client libraries with no easy way to upgrade things when breaking changes to underlying libraries come (vs Propulsion adding a binding that implements the same abstraction over a changed API contract from something like Confluent.Kafka / EventStore.ClientAPI / Microsoft.Azure.Documents etc)

Feature Idea: Equinox MemoryStore Reader

Given ChangeFeed support in Equinox.MemoryStore, there should be a oneliner way to rig StreamsProjector(s) ingesting from the MemoryStore ChangeFeed that invoke Ingestion/Reaction handlers (also wired to In Memory stores) in order to be able to express workflows that rely on a the triggering of reactions via changefeeds without needing concrete Cosmos/EventStore/Kafka deps

The tests impls can be split into two levels:
a1) Set up SUT in-memory

  • wire services to In memory stores
  • wire Ingesters/Reactors to Store

a2) Set up SUT wiring to concrete stores

b) Run the scenario

Then mix and match test scenarios to best achieve meaningful test coverage e.g.:

  • On the desktop and for CI on PRs, run a1+b (if Property tests, 100s or 1000s of iterations is fine)
  • In a suitable staging environment, run a2+b (as the base functionality has already been validated by โ˜๏ธ, the MaxTests can be dropped by an order of magnitude in order to conserve run time)

See a diagram that pretends this already exists

cc @fnipo any aspects I missed ?
semi-related: there should be unit tests that one can use alongside to do baseline validation that Event Contracts are correct as part of a similar local/CI test suite jet/FsCodec#50

Port to Azure.Cosmos / V3 SDK

See also jet/equinox#144

We'll likely want to fork off a Propulsion.Cosmos2 for backcompat to ease interop in codebases where a V3 dependency won't fly

Provide Kafka batch size limit

At present, the Kafka consumer loops, aggressively consuming input constrained by

  • the max Batch Time (default .5s)
  • the max in flight bytes (.5GiB)
    In order to most accurately reflect progress, batches should be limited to a count which balances the potential cons:
  • (minimal) overhead of small batches memory and perf wise etc
  • overhead of frequently setting the offsets (minimal as we only commit them periodically, lazily) more frequently
    against the cons of not reporting progress to Kafka in a reasonable timeframe:
  • when work takes an especially long period, we need to maximize declarations of progress in order to assure observers we are alive
  • work gets redone if we get unassigned and another consumer takes over our partition
  • the granularity of progress means we'll progress in large chunks, which makes it inferences as to throughput or item latency harder to rely on

Clean up Handler position signatures

Prompted by @jgardella's nudge, its clear that the standard handler signature's response type can be clarified a lot

Sinks (and/or Propulsion.Kafka.*Consumer) should have a common response type (perhaps a DU), potentially with some associated affordances that allow common patterns to be expressed with intention revealing names instead of cryptic math that's prone to off by one errors

Approx shape:

/// Conveys progress handler made during the processing of the current span of events for a stream, which will be reflected in the new write position being maintained for the stream
type StreamHandlerResult =
   /// Signifies all events in span have been processed, and write position should move to beyond that of the last event
   | AllProcessed
   /// Indicates only a subset of the presented events have been processed
   | PartiallyProcessed of count : int
   /// Override the write position based on version discovered during processing (e.g., if downstream is ahead of current projection position)
   | OverrideWritePosition of index : int64

This will remove code like this from the consumer logic

Final naming tweaks re Tranche/Partition (DynamoStore Index Tranches should be Partitions)

I picked Tranche as a relatively obscure term so nobody would bring any preconceptions about what a "partition" might be (or might be limited to being). (Back when it was Cosmos and Kafka only, I originally used the term PartitionId and went to the trouble of doing a rename as the Feed features came into being)

But "what is a tranche" seems to be a perpetual question so I may be swung into doing the legwork to rename it (there are probably hundreds of usages of the term spread around the place).

Given Propulsion is about to turn 3.0 (it's rc.2 now), this is the only time it can ever happen.

Recent discussion that prompted this with @jeffijoe https://discord.com/channels/514783899440775168/1002835415239233548/1069633398131413022
and @oskardudycz #200 (comment)

This glossary entry in #200 would go away: https://github.com/jet/propulsion/blob/doc/DOCUMENTATION.md#tranches

  • Though I would have to have a paragraph and/or lots of callouts saying "when we say partition, don't necessary asume it's an actual partition - it's any slicing of the input including sharding the space based on a hash of the key"

The term shard was also suggested

Kafka support wishlist

jet/equinox#87 provides an initial Kafka Producer/Consumer pair, and jet/dotnet-templates#11 provides a skeleton app using same.

There's infinite scope for improvement from this base. This Issue gathers an unordered wishlist and should not necessarily be taken as a roadmap.

Kafka Consumer:

  • Lag monitoring needs to be integrated
  • integrate FeedValidator usage into consumption side
  • Checkpointing needs to interlink with Kafka offsets storage such that one can take a given consumption offset expressed in terms of a changefeed position + kafka offsets and pass that to a consumer working off a separate projector

Fix StripedIngester transition to tailing

When the equinox-sync tool is run in -g mode, the EventStore source doesn't consistently transition from reading N stripes to having a single thread polling (if the 'wrong' reader instance detects EOF, it can leave some tranches orphaned in the StripedIngester)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.