Coder Social home page Coder Social logo

recipes's Introduction

Commanded

Use Commanded to build your own Elixir applications following the CQRS/ES pattern.

Provides support for:

  • Command registration and dispatch.
  • Hosting and delegation to aggregates.
  • Event handling.
  • Long running process managers.

Commanded provides a solid technical foundation for you to build on. It allows you to focus on modelling your domain, the most important part of your app, creating a better application at a faster pace.

You can use Commanded with one of the following event stores for persistence:

Please refer to the CHANGELOG for features, bug fixes, and any upgrade advice included for each release.

Requires Erlang/OTP v21.0 and Elixir v1.11 or later.


Sponsors

Alembic


MIT License

Build Status Join the chat at https://gitter.im/commanded/Lobby


This README and the following guides follow the master branch which may not be the currently published version.

Read the documentation for the latest published version of Commanded on Hex.

Overview


Used in production?

Yes, see the companies using Commanded.

Example application

Conduit is an open source, example Phoenix 1.3 web application implementing the CQRS/ES pattern in Elixir. It was built to demonstrate the implementation of Commanded in an Elixir application for the Building Conduit book.

Learn Commanded in 20 minutes

Watch Bernardo Amorim introduce CQRS and event sourcing at Code Beam SF 2018. Including a tutorial on how to implement an Elixir application using these concepts with Commanded.

Contributing

Pull requests to contribute new or improved features, and extend documentation are most welcome.

Please follow the existing coding conventions, or refer to the Elixir style guide.

You should include unit tests to cover any changes. Run mix test to execute the test suite.

Contributors

Commanded exists thanks to the following people who have contributed.

Need help?

Please open an issue if you encounter a problem, or need assistance. You can also seek help in the #commanded channel in the official Elixir Slack.

recipes's People

Contributors

slashdotdash avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

Forkers

capitalist42

recipes's Issues

[Recipe] Event enrichment

How to enrich notification events to event-carried state transfer events.

Event enrichment allows "thin" domain events, but have handlers receive enriched "fat" events.

[Recipe] Transient subscription

How to create a transient subscription to a stream, or all streams, and wait until an expected event is received.

Use Commanded.EventStore.subscribe/2 as shown in the following example.

alias Commanded.EventStore.RecordedEvent

defp receive_events(selector) do
  receive do
    {:events, events} ->
      case reduce(events, selector) do
        :cont -> receive_events(selector)
        {:halt, result} -> result
      end
  end
end

defp reduce([], _selector), do: :cont

defp reduce([%RecordedEvent{data: data} | events], selector) do
  case selector.(data) do
    :cont -> reduce(events, selector)
    {:halt, result} -> {:halt, result}
  end
end

Usage

:ok = Commanded.EventStore.subscribe(MyApp.App, stream_uuid)

receive_events(fn 
  %AnEvent{} = event -> {:halt, event}
  _event -> :cont
end)

Use :all as the stream identity to subscribe to all events.

[Recipe] Command validation

How to validate dispatched commands.

Use a data validation library such as Vex to validate the command (mandatory fields are present, correct data type, etc.).

[Recipe] Process manager testing

How to test a process manager produces the correct commands and state?

Use the process manager's handle/2 and apply/2 functions directly by reducing a list of one or more domain events. The resultant list of commands and process manager state can then be asserted on.

events = [%Event1{}, %Event2{}]
initial_state = %MyProcessManager{}

{commands, state} = 
  Enum.reduce(events, {[], initial_state}, fn event, {commands, state} ->
    new_commands = MyProcessManager.handle(state, event)
    new_state = MyProcessManager.apply(state, event)

    {commands ++ new_commands, new_state}
  end)

assert match?([%Command1{}, %Command2{}], commands)
assert match?(%MyProcessManager{key: value}, state)

This type of test can be run using async: true and will be very quick to execute as it does no IO.

[Recipe] Aggregate testing

How to write a unit test for an aggregate.

Use Commanded.AggregateCase ExUnit case template to simplify aggregate unit testing.

[Recipe] External events

How to capture external events, such as those recorded by an IoT device.

Events can be directly appended to an event stream, using Commanded.EventStore, without needing to be produced by an aggregate from a command. An event handler can be used to react to these external events. It may dispatch a command as a reaction to an event.

  1. Define an integration event struct which can be used for any external event without needing to define a custom struct for each type.

    defmodule MyApp.Integration.Event do
      defstruct [:source, :type, :data]
    end

    The type of the external event will be indicated by the type field. This allow loose coupling which is preferred when dealing with events from a third party, external system.

  2. Append events to a stream (e.g. devices/device-1234) using Commanded.EventStore.append_to_stream/4 and an integration Commanded Application instance.

    source = "devices/device-1234"
    
    events = [
      %Commanded.EventStore.EventData{
        event_type: "Elixir.MyApp.Integration.Event",
        data: %MyApp.Integration.Event{
          source: source,
          type: "ThirdPartyService.SomeExternalEvent"
          data: %{
            "key1" => "value1",
            "key2" => "value2"
          }      
        },
        metadata: %{}
      }
    ]
    
    :ok = Commanded.EventStore.append_to_stream(MyApp.Integration.App, source, :any_version, events)

    As these are external events we are storing in raw format we can use :any_version to skip concurrency control checking.

[Recipe] Event handler testing

How to test event handler side effects.

defmodule CustomerPromotionHandler do
  use Commanded.Event.Handler,
    application: ExampleApp,
    name: __MODULE__

  def handle(%CustomerPromoted{} = event, _metadata) do
    %CustomerPromoted{customer_id: customer_id} = event

    Notification.send_promotion(customer_id)
  end
end

Following the unit testing matrix outlined by Sandi Metz in her talk The Magic Tricks of Testing), to test side affects in an event handler you could use a mock to verify the expected external command is called. You don’t test the actual side effects when unit testing the handler, but you do verify they are called.

unit-testing-matrix

You can either call the event handler’s handle/2 function directly or cause the event to be produced (via command dispatch or appending the event to a stream) and afterwards verify the side-affect was called.

defmodule CustomerPromotionTest do
  use ExUnit.Case

  import Mox

  setup :verify_on_exit!
  
  test "`send_promotion/1` is called by promotion handler" do
    reply_to = self()
  
    NotificationMock.expect(:send_promotion, customer_id ->
      send(reply_to, {:send_promotion, customer_id})
      :ok 
    end)

    # Start handler process and dispatch command to produce trigger event
    start_supervised!(CustomerPromotionHandler)    
    :ok = MyApp.dispatch(command)

    assert_receive {:send_promotion, ^expected_customer_id}    

    # Or call handler directly with an event (doesn’t require the handler process to be started)
    :ok = CustomerPromotionHandler.handle(event, metadata)
  end
end

With Mox you must define a behaviour which can then be mocked or stubbed at runtime.

defmodule NotificationBehaviour do
  @callback send_promotion(non_neg_integer()) :: :ok | {:error, any()}
end

It's often useful to define a facade module for the behaviour which can substitute the mock at runtime or compile time. In this example the implementation module (real or mock) is determined at compile time from application config:

defmodule Notification do
  @behaviour NotificationBehaviour
  @implementation Application.get_env(:myapp, __MODULE__, Notification.DefaultImpl)

  defdelegate send_promotion(customer_id), to: @implementation
end

defmodule Notification.DefaultImpl do
  @behaviour NotificationBehaviour

  def send_promotion(customer_id) do
    # Actual implementation goes here ...
  end
end

The mock must be defined in the test support:

# test/support/mocks.ex
Mox.defmock(NotificationMock, for: NotificationBehaviour)

Then configured for the test environment:

# config/test.exs
config :my_app, Notification, NotificationMock

With the above setup the Notification.DefaultImpl module containing the actual implementation will be used in non-test environments and the mock will be used during tests. This allows the side effect call (send_promotion/1) to be tested, without actually causing the side effect. It also allows you to easily test failures by returning a different response from the mock call (e.g. {:error, failed_to_send}).

To test the event handler and its side effects you could use an integration style test and would replace the side effect module mock with the actual implementation. You could use Mox’s stub_with(NotificationMock, Notification.DefaultImpl) which forwards all calls to the mock to the actual implementation. This ensures the contract is valid between the caller and the target module.

[Recipe] GDPR compliance

How to comply with GDPR regulations which require personally identifiable information (PII) to be removed upon request. This is problematic with immutable, persistent event streams.

There are three possible solutions:

"Crypto-shredding"

PII data is stored encrypted in events and decrypted on read. The encryption key can later be "forgotten" to prevent decryption. Not being able to read the PII data is equivalent to having deleted the data.

External PII data store

Use a mutable data store, such as a SQL database, for all PII data and only store a reference to the external data store within your events. PII data must be read from the external store when events are read. PII data can be deleted from the external store. Attempting to read deleted PII data will return an appropriate error to indicate it has been removed.

Mutable events / streams

Allow events or streams containing PII to be updated or deleted.

Note: EventStore has support for hard deleting streams.

[Recipe] Link events from one stream to another

How to link an existing event from one stream to another.

Use an event handler to link events of a given type into another stream.

The source event_id is accessible via metadata:

def handle(%ProductSold{} = event, metadata) do
  %ProductSold{user_id: user_id} = event

  event_id = Map.get(metadata, :event_id)

  MyApp.EventStore.link_to_stream(user_id, :any_version, [event_id])
end

[Recipe] Running on a cluster

How to deploy and run Commanded apps on a cluster of nodes.

Multi-node deployment may be using distributed Erlang (nodes connected together) or not (nodes not directly connected together). Commanded supports both approaches.

[Recipe] Background jobs

How to enqueue background jobs triggered by an event.

Use an external background job library which supports Ecto Repo transactions, such as Rihanna or Oban. The Ecto.Multi struct provided by Commanded Ecto projections can be used to enqueue the job idempotently.

[Recipe] Event type name provider

How do I use a different event type name provider than the default which uses the Elixir module name?

For example, use account_closed instead of Elixir.MyApp.Account.Events.AccountClosed

[Recipe] Inter-service messaging

Use a separate event store for inter-service messaging.

Allows a context/service to keep its internal domain events private, but can publish enriched events for public consumption by other services.

Related to #1 (Event enrichment).

[Recipe] Long-running streams

How to deal with streams which have thousands of events and/or are long running.

Use the approach taken by accountants where at the end of each financial year the current ledger is closed and the closing balances are taken as the starting balance into the next period. This can be used to limit the duration or length of event streams by closing the old stream and migrating to a new stream with any required state carried over as an initial event in the new stream.

[Recipe] Command enrichment

How to enrich commands before or during dispatch with external data. This data may be looked up from a read model, another service, or an external third party API.

A router middleware defining an enrichment protocol is one possible solution. Here's an example gist with middleware for command enrichment.

Note processing by an aggregate instance during comand execution blocks it from handling any other commands. Whereas router middleware is executed by the dispatching process, before the aggregate instance is called (and is non-blocking to other processes).

[Recipe] Testing read model projectors

How can I test a read model projector so that tests wait until events have been projected?

Using Elixir telemetry and the after_update/3 callback function

Commanded Ecto projections provides an after_update/3 callback function. This gets called after each event is projected. You can use the funtion to publish a notification whenever an event is projected, including the database changes.

Here’s an example using the Elixir/Erlang telemetry library.

In the projector:

def after_update(event, metadata, changes) do
  :telemetry.execute(
    [:projector],
    %{system_time: System.system_time()},
    %{event: event, metadata: metadata, changes: changes, projector: __MODULE__}
  )
end

In a test:

setup do
  reply_to = self()
  :ok =
    :telemetry.attach(
      "test-handler",
      [:projector],
      fn event, measurements, metadata, reply_to ->
        send(reply_to, {:telemetry, event, measurements, metadata})
      end,
      self()
    )
end

Usage in test to wait:

assert_receive {:telemetry, [:projector], _measurements, %{event: event, metadata: metadata, changes: changes}}

This approach will ensure the test blocks until the projector has done its work and then you can run either use the changes from the Ecto.Multi or run a query to verify the database changes. You can also use pattern matching on the event from the telemetry metadata to wait for a particular event type.

The performance impact at runtime is negligible if there are no handlers attached to a telemetry event.

[Recipe] Command handling queue

How to use a queue to handle incoming commands to limit how many commands are concurrently processed or to scale load amongst a cluster of nodes.

Record incoming commands in an event store stream and use one or more handlers to process the requests.

[Recipe] Prevent duplicate commands

How to prevent duplicate commands from being executed by an aggregate.

Use a router middleware to record the identity of each successfully dispatched command and prevent duplicates. Record the causation_id for each event applied to an aggregate and reject commands that have already be seen (caused an event to be recorded).

[Recipe] Event deserialization and decoding

How to deserialize events to their Elixir types.

Commanded uses JSON as the event data and metadata serialization format by default. This means that type information is lost when serializing Elixir structs to JSON and requires one to implement the Commanded.Serialization.JsonDecoder to decode strings to their actual Elixir type, such as DateTime.

One possible option for automatic deserialization would be to provide a typespec for the domain event structs and use the defined types to deserialize events. The Spec library provides structure decoding from a typespec and could be used via a custom event store serializer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.