Coder Social home page Coder Social logo

nfibrokerage / spear Goto Github PK

View Code? Open in Web Editor NEW
82.0 12.0 14.0 818 KB

A sharp EventStoreDB v20+ client backed by Mint :yum:

Home Page: https://hex.pm/packages/spear

License: Apache License 2.0

Elixir 100.00%
grpc mint eventstoredb elixir event-sourcing

spear's Introduction

Spear

CI Coverage Status hex.pm version hex.pm license Last Updated

A sharp EventStoreDB 20+ client backed by mint ๐Ÿ˜‹

FAQ

What's EventStoreDB?

EventStoreDB is a database designed for Event Sourcing. Instead of tables with rows and columns, EventStoreDB stores information in immutable events which are appended to streams.

Why the name "spear"?

  1. best gum flavor
  2. obligatory programmer reference to ancient greek, roman, or egyptian history
  3. sounds cool ๐Ÿ˜Ž

Backed by... Mint?

elixir-mint/mint is a functional HTTP client which supports HTTP2.

gRPC is pretty thin protocol built on top of HTTP/2. Practically speaking, gRPC just adds some well-known headers and a message format that allows messages to not be aligned with HTTP2 DATA frames. It's relatively trivial to implement gRPC with a nice HTTP2 library like mint ๐Ÿ™‚.

Why not elixir-grpc/grpc?

That project looks good but it depends on :gun which doesn't play nice with other dependencies1. It also provides a server and client implementation in one library. This library only needs a client.

Does TLS work?

Yep! As of v0.1.3, custom and public CAs may be used for encrypted connections.

Does this work with EventStore <20?

Sadly no. This library only provides a gRPC client which showed up in EventStoreDB 20+. If you're looking for a similarly fashioned TCP client, NFIBrokerage uses exponentially/extreme extensively in production (specifically the v1.0.0 branch). Spear and Extreme have compatible dependencies and similar styles of making connections.

How many dependencies are we talking here?

Spear's reliance on Mint and :gpb give it a somewhat small dependency tree:

$ mix deps.tree --only prod
spear
โ”œโ”€โ”€ connection ~> 1.0 (Hex package)
โ”œโ”€โ”€ event_store_db_gpb_protobufs ~> 2.0 (Hex package)
โ”‚   โ””โ”€โ”€ gpb ~> 4.0 (Hex package)
โ”œโ”€โ”€ gpb ~> 4.0 (Hex package)
โ”œโ”€โ”€ jason >= 0.0.0 (Hex package)
โ””โ”€โ”€ mint ~> 1.0 (Hex package)

(And jason is optional!)

How close is this to being able to be used?

We @NFIBrokerage already use Spear for some production connections to Event Store Cloud. See the roadmap in #7 with the plans for reaching the v1.0.0 release.

Installation

Add :spear to your mix dependencies in mix.exs

def deps do
  [
    {:spear, "~> 1.0"},
    # If you want to encode events as JSON, :jason is a great library for
    # encoding and decoding and works out-of-the-box with spear.
    # Any JSON (de)serializer should work though, so you don't *need* to add
    # :jason to your dependencies.
    {:jason, "~> 1.0"},
    # If you're connecting to an EventStoreDB with a TLS certificate signed
    # by a public Certificate Authority (CA), include :castore
    {:castore, ">= 0.0.0"}
  ]
end

Usage

Making a connection...

Familiar with Ecto.Repo? It lets you write a database connection like a module

# note this is for illustration purposes and NOT directly related to Spear
# lib/my_app/repo.ex
defmodule MyApp.Repo do
  use Ecto.Repo,
    otp_app: :my_app,
    adapter: Ecto.Adapters.Postgres
end

and then configure it with application-config (config/*.exs)

# note this is for illustration purposes and NOT directly related to Spear
# config/config.exs
config :my_app, MyApp.Repo,
  url: "ecto://postgres:postgres@localhost/my_database"

Spear lets you do the same with a connection to the EventStoreDB:

# lib/my_app/event_store_db_client.ex
defmodule MyApp.EventStoreDbClient do
  use Spear.Client,
    otp_app: :my_app
end

and configure it,

# config/config.exs
config :my_app, MyApp.EventStoreDbClient,
  connection_string: "esdb://localhost:2113"

add it to your application's supervision tree in lib/my_app/application.ex

# lib/my_app/application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      MyApp.EventStoreDbClient
    ]
    
    Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
  end
end
Or connecting in IEx...

A Spear.Connection is just a regular ole' GenServer with a default of pulling configuration from application-config. You can start a Spear.Connection like any other process, even in IEx! Plus you can provide the configuration straight to the Spear.Connection.start_link/1 function.

Let's use the new Mix.install/1 function from Elixir 1.12 to try out Spear. Say that you have an EventStoreDB instance running locally with the --insecure option.

iex> Mix.install([:spear, :jason])
# a bunch of installation text here
:ok
iex> {:ok, conn} = Spear.Connection.start_link(connection_string: "esdb://localhost:2113")
{:ok, #PID<0.1518.0>}

And we're up and running reading and writing events!

Reading and writing streams...

Now that we have a connection process (we'll call it conn), let's read and write some events!

iex> event = Spear.Event.new("IExAndSpear", %{"hello" => "world"})
%Spear.Event{
  body: %{"hello" => "world"},
  id: "9e3a8bcf-0c22-4a38-85c6-2054a0342ec8",
  metadata: %{content_type: "application/json", custom_metadata: ""},
  type: "IExAndSpear"
}
iex> [event] |> Spear.append(conn, "MySpearDemo")
:ok
iex> Spear.stream!(conn, "MySpearDemo")
#Stream<[
  enum: #Function<62.80860365/2 in Stream.unfold/2>,
  funs: [#Function<48.80860365/1 in Stream.map/2>]
]>
iex> Spear.stream!(conn, "MySpearDemo") |> Enum.to_list()
[
  %Spear.Event{
    body: %{"hello" => "world"},
    id: "9e3a8bcf-0c22-4a38-85c6-2054a0342ec8",
    metadata: %{
      commit_position: 18446744073709551615,
      content_type: "application/json",
      created: ~U[2021-04-12 20:05:17.757215Z],
      custom_metadata: "",
      prepare_position: 18446744073709551615,
      stream_name: "MySpearDemo",
      stream_revision: 0
    },
    type: "IExAndSpear"
  }
]

Spear uses Elixir Streams to provide a flexible and efficient interface for EventStoreDB streams.

iex> Stream.repeatedly(fn -> Spear.Event.new("TinyEvent", %{}) end)
#Function<51.80860365/2 in Stream.repeatedly/1>
iex> Stream.repeatedly(fn -> Spear.Event.new("TinyEvent", %{}) end) |> Stream.take(10_000) |> Spear.append(conn, "LongStream")
:ok
iex> Spear.stream!(conn, "LongStream")
#Stream<[
  enum: #Function<62.80860365/2 in Stream.unfold/2>,
  funs: [#Function<48.80860365/1 in Stream.map/2>]
]>
iex> Spear.stream!(conn, "LongStream") |> Enum.count
10000

And that's the basics! Check out the Spear documentation on hex. Interested in writing efficient event-processing pipelines and topologies with EventStoreDB via GenStage and Broadway producers? Check out Volley.

Footnotes

  1. https://github.com/NFIBrokerage/spear/issues/66 โ†ฉ

spear's People

Contributors

antondyad avatar bminevsb avatar byu avatar dvic avatar fabriziosestito avatar hkrutzer avatar kianmeng avatar kristofka avatar svrdlans avatar the-mikedavis avatar yordis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spear's Issues

projections API

connects #7

the projections API is a bit cryptic for me

I've been able to hobble my way through the rest of the protos pretty much just on sheer experimentation and checking with the other client libraries. The projections API is a bit out there and I'm not really sure what the difference is between a continuous, a transient, and a one-time projection (just to begin with). The Event Store team is supposedly in a major documentation refactor, so I'm gonna put this one on hold and go get the more familiar persistent subscriptions API.

implement backpressure for stream requests

currently Spear.append/4 with an infinite stream fails (expected) but not for the expected reason (GenServer timeout)

instead it gives

Stream.iterate(0, &(&1 + 1))
|> Stream.map(fn n -> Spear.Event.new("incremented", n) end)
|> Spear.append(conn, "InfiniteCounter", expect: :empty)
# =>
{:error,
 %Mint.HTTPError{
   module: Mint.HTTP2,
   reason: {:exceeds_window_size, :connection, 26}
 }}

Currently the setup for all out-bound requests is like so:

defp request_and_stream_body(conn, request) do
  with {:ok, conn, request_ref} <-
         Mint.HTTP.request(conn, @post, request.path, request.headers, :stream),
       {:ok, conn} <- stream_body(conn, request_ref, request.messages),
       {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, :eof) do
    {:ok, conn, request_ref}
  else
    {:error, conn, reason} -> {:error, conn, reason}
  end
end

and stream_body/3 is implemented like so:

defp stream_body(conn, request_ref, messages) do
  Enum.reduce_while(messages, {:ok, conn}, fn message, {:ok, conn} ->
    {message, _byte_size} = Request.to_wire_data(message)

    stream_result =
      Mint.HTTP.stream_request_body(
        conn,
        request_ref,
        message
      )

    case stream_result do
      {:ok, conn} -> {:cont, {:ok, conn}}
      error -> {:halt, error}
    end
  end)
end

As it turns out, this is actually very similar to the streaming implementation in finch! (PR that added streaming support: https://github.com/keathley/finch/pull/107/files#diff-48431cc1d91063480b5006d7585c96ea39433e319aca2b5e3a6c597fdbd7e10fR153-R158)

If we add some IO.inspect/2s of the window sizes in that Enum.reduce_while/3

conn |> Mint.HTTP2.get_window_size(:connection) |> IO.inspect(label: "connection window")
conn |> Mint.HTTP2.get_window_size({:request, request_ref}) |> IO.inspect(label: "request window")

we can see the window size for the connection and request gradually decreasing down to (in this case) 26, which is not enough to send the next message.

We can cheaply detect the window sizes as we reduce, but it's not immediately clear how to halt the stream temporarily while we Mint.HTTP.stream/2 and await a WINDOW_UPDATE frame (once we realize that our window is not large enough).

`Spear.append` with `raw?: true` does not do what docs say?

Context

The docs for Spear.append state the following

:raw? - (default: false) a boolean which controls whether the return
signature should be a simple :ok | {:error, any()} or
{:ok, AppendResp.t()} | {:error, any()}. This can be used to extract
metadata and information from the append response which is not available
through the simplified return API, such as the stream's revision number
after writing the events.

Issue

With that opts set it still returns :ok

iex(1)> Spear.append([Spear.Event.new("mytest", %{})], conn, "mytest3030300303349494", raw?: true)
:ok
iex(2)> Spear.append([Spear.Event.new("mytest", %{})], conn, "mytest333349494")
:ok

Thus, cannot use the feature to determine append response stream positions, etc.

Tracking down to the code:

@doc since: "0.1.0"

Stream append has the case function:

    case request(
           conn,
           Streams,
           :Append,
           messages,
           Keyword.take(opts, [:credentials, :timeout])
         ) do
      {:ok, Streams.append_resp(result: {:success, _})} ->
        :ok

      {:ok, Streams.append_resp(result: {:wrong_expected_version, expectation_violation})} ->
        {:error, Spear.Writing.map_expectation_violation(expectation_violation)}

      error ->
        error
    end
  end

That always masks the response by only returning :ok.

prefer gpb to elixir-protobuf/protobuf

According to hex.pm, the status of protobuf projects is roughly

package recent downloads notes
tomas-abrahamsson/gpb 436_657 primarily targets erlang fwict
elixir-protobuf/protobuf 223_097 name collision with below
bitwalker/exprotobuf 58_526 name collision with above
ahamez/protox 4_627 requires protoc installed on compilation machine

Currently spear uses elixir-protobuf/protobuf, but this causes incompatibility with any project that uses bitwalker's implementation, including exponentially/extreme. For use in a service/application, I wouldn't mind just choosing either but since this is a library we need to be as dependency-permissive as possible.

So because of the module conflicts, I consider both elixir-protobuf/protobuf and bitwalker/exprotobuf non-starters.

Similarly I'm not comfortable adding installation instructions for protoc to this library (required for compiling protox definitions). Acceptable in a service/application but this library should bite the bullet of (com/trans)piling .proto files into an elixir/erlang format.

Ideally I'd like to write two separate libraries: a parser/transpiler for proto into elixir/erlang terms and then a protobuf engine which performs the encoding/decoding. That way libraries can use the transpiler to generate some files and then only export a dependency on the engine.

However, I only have so much time and energy I realize that with my strong opinions about what is right and wrong library behavior I am poking the library version of this XKCD bear:

:gpb seems to be the only good option currently. It isn't completely ideal (e.g. it fails to install with Mix.install/1 ๐Ÿคฆ I have a PR open in gpb to fix this) but I might need to just pick my poison and move on.

the :from option is actually inclusive

testing out with volley, ran into that events specified in :from are returned by Spear.read_stream/3

iirc the documentation for :from says that this is exclusive which is not the case

Performance issues/expectations

Hi!

We're using Spear in production and are noticing that performance might be less than ideal. We're not sure where exactly the problem resides - Spear, EventStoreDB or the network.

Our flow is as follows:

  1. load events from a stream (using SpearClient.stream!)
  2. build the current state from the events
  3. append new events to the same stream (using SpearClient.append)

Our SpearClient module is literally as simple as this:

defmodule SpearClient do
  use Spear.Client, otp_app: :our_app
end

All this takes consistently approx 100-120 ms on production, and almost as much locally. Which seems rather slow.

  • We have our app deployed on fly.io and the EventStoreDB instance on Event Store Cloud (deployed on Google Cloud in the same geographic region).
  • Locally EventStoreDB runs via docker compose (in OrbStack).

When isolated to just the Spear calls (no logic of our own), we see the following on localhost:

  1. SpearClient.stream!("stream_with_300_events") |> Enum.into([]) - between 40 and 60 ms
  2. SpearClient.stream!("stream_with_20_events") |> Enum.into([]) - between 20 and 40 ms
  3. `SpearClient.append(new_events, "stream_with_300_events", expect: last_known_revision) - around 10ms
  4. `SpearClient.append(new_events, "stream_with_20_events", expect: last_known_revision) - <5ms

In production append takes typically a bit less than stream! + Enum.into([], but still hovering around 50ms in majority of cases.
As a side note, relatively simple queries to PostgresSQL (deployed on Google Cloud too) only take around 3-5 ms on average.

Would it be possible to confirm that these timings are higher than expected? Anything else we could debug or try to speed it up?

Thanks in advance for any advise!

tests against projected streams

connects #32

the testing suite currently does not test any projected streams (just :all which isn't even a real stream afaik)

testing projected streams can be tough because they need to be enabled in the EventStoreDB the tests are running against. This is not always a given and also the projections API is not yet implemented (#22). Ideally we would refactor the development workflow for spear to use docker (compose?) and some image for EventStoreDB where we can enable TLS and projections and have the certificates all tucked away. Might be worth checking out what the official clients use in their CI

implement keepalive

Keepalive in gRPC is based on the HTTP2 PING mechanism. (is my capslock broken?)

The EventStoreDB docs have this to say about keepalive: https://developers.eventstore.com/server/v20.10/docs/networking/http.html#keepalive-pings. So it looks like it's configurable on the server-side.

Do we need to worry about it on the client side too? I've also seen from the gRPC clients section that it looks configurable on the client side for most of the officially supported clients.

I can see from my ๐Ÿฆˆ with a subscription that the pings do occur and Mint handles them without any explict direction from the Spear.Connection implementation.

iex> import Spear.Filter
iex> Spear.subscribe(conn, self(), :all, filter: ~f(cred_test)sp)
{:ok, #Reference<0.1465466609.3549954049.59167>}
iex> flush
# a bunch of messages
iex> # wait 20s

gives this pcap:

shark-ping-keepalive

So it's being handled correctly. I guess the idea with clients also implementing keepalive is that you can have a fickle client that's overly afraid of a stream going stale set a very nervous keepalive separate from what the server sets. (And ofc it can be rough to restart an EventStoreDB with a bunch of subscriptions active.) And a client is subject to not realizing that the server has disappeared unless it also sends keepalive pings (or at least denotes when the server sends its pings).

next EventStoreDB release will introduce a breaking behavior change for reads

This commit will introduce some new behavior: reading a stream (Spear.stream!/3 or Spear.subscribe/4) will emit
a new ReadResp content-type when finished reading the chunk (in a normal read) and when at the end of the stream

{"event_store.client.streams.ReadResp": {:stream_position,
   {:"event_store.client.streams.ReadResp.StreamPosition", 5, 5}}}

While the change to the protobuf definition is not itself a breaking change because of the way protobuf encoding works, clients using that new EventStoreDB version will be unable to use any existing release of spear as that message will fail to decode with a FunctionClauseError.

NFIBrokerage/event_store_db_gpb_protobufs will need an update as usual and the next release of spear will need to require the new version.

The following will need to be refactored:

  • Spear.Event.destructure_read_response/2
  • some functions in Spear.Reading to accept these stream position messages as input for building a ReadReq
  • Spear.Reading.Stream to filter stream position ReadResps from chunks
  • Spear.stream!/3 and Spear.subscribe/4 should add options to ignore these messages (default to true so spear doesn't need a major version bump)

As well as the creation of at least one new data structure for these stream position packets.

Appending expecting revision 0 does not return an error

I am trying to integrate spear with commanded by strangling the Extreme library from the extreme_adapter.

Appending works, but I realized some test was not passing due to the expect: 0 being ignored.
The test tries to append 3 events and then it asserts that appending expecting a revision: 0 returns an error.
Other assertions are true with revisions >= 1

Here is the test for reference:

      test "should fail to append to a stream because of wrong expected version", %{
        event_store: event_store,
        event_store_meta: event_store_meta
      } do
        assert :ok == event_store.append_to_stream(event_store_meta, "stream", 0, build_events(3))

        assert {:error, :wrong_expected_version} ==
                 event_store.append_to_stream(event_store_meta, "stream", 0, build_events(1))

        assert {:error, :wrong_expected_version} ==
                 event_store.append_to_stream(event_store_meta, "stream", 1, build_events(1))

        assert {:error, :wrong_expected_version} ==
                 event_store.append_to_stream(event_store_meta, "stream", 2, build_events(1))

        assert :ok == event_store.append_to_stream(event_store_meta, "stream", 3, build_events(1))
      end

where:

  defp expected_version(:any_version), do: :any
  defp expected_version(:no_stream), do: :empty
  defp expected_version(:stream_exists), do: :exists
  defp expected_version(0), do: :empty

This line seems to be the culprit:

defp map_expectation(revision) when is_integer(revision) and revision >= 1,

Removing the guard seems to do the trick.

Is there any reason why the revision needs to be >= 1 while mapping the expectations?

Thanks in advance!

Reading :all stream backwards doesn't return events

Hey,

In the first place - thank for the fantastic library. It's a joy to use it.

Today I faced an issue when tried to read $all stream backwards:

iex(35)> SpearClient.stream!(:all,   direction: :backwards) |> Enum.to_list() |> length()
0
iex(36)> SpearClient.stream!(:all,   direction: :forwards) |> Enum.to_list() |> length()
57

Could you please suggest what's wrong?

Best,
Yevhenii

Persistent subscriptions and clustered eventstore.

Hello!

I am trying to use spear create/connect to persistent subscriptions in a clustered eventstoredb setup.

If the Spear.Connection is connected to a non-leader node an error like this is returned, which contains the info needed to re-connect to the leader node:

%Spear.Connection.Response{
  data: "",
  headers: [
    {"content-type", "application/grpc"},
    {"date", "Thu, 02 Mar 2023 08:08:04 GMT"},
    {"server", "Kestrel"},
    {"content-length", "0"},
    {"exception", "not-leader"},
    {"leader-endpoint-host", "127.0.0.1"},
    {"leader-endpoint-port", "2111"},
    {"grpc-status", "5"},
    {"grpc-message", "Leader info available"}
  ],
  status: 200,
  type: {:event_store_db_gpb_protobufs_persistent,
   :"event_store.client.persistent_subscriptions.CreateResp"}
}

The call to Spear.create_persistent_subscription returns an error like this, which was very confusing to me at first:

%Spear.Grpc.Response{
  data: "",
  message: "Leader info available",
  status: :not_found,
  status_code: 5
}

I was able to reproduce the issue by switching the docker setup in the spear repo to use a clustered eventstoredb, and with some additional logging i get this:

โฏ mix test test/spear_test.exs:310
Compiling 2 files (.ex)
Excluding tags: [:test]
Including tags: [line: "310"]

%Spear.Connection.Response{
  data: <<0, 0, 0, 0, 18, 10, 16, 8, 6, 26, 12, 8, 237, 169, 129, 128, 1, 16,
    237, 169, 129, 128, 1>>,
  headers: [
    {"content-type", "application/grpc"},
    {"date", "Thu, 02 Mar 2023 08:13:01 GMT"},
    {"server", "Kestrel"},
    {"grpc-status", "0"}
  ],
  status: 200,
  type: {:event_store_db_gpb_protobufs_streams,
   :"event_store.client.streams.AppendResp"}
}
%Spear.Connection.Response{
  data: "",
  headers: [
    {"content-type", "application/grpc"},
    {"date", "Thu, 02 Mar 2023 08:13:01 GMT"},
    {"server", "Kestrel"},
    {"content-length", "0"},
    {"exception", "not-leader"},
    {"leader-endpoint-host", "127.0.0.1"},
    {"leader-endpoint-port", "2111"},
    {"grpc-status", "5"},
    {"grpc-message", "Leader info available"}
  ],
  status: 200,
  type: {:event_store_db_gpb_protobufs_persistent,
   :"event_store.client.persistent_subscriptions.CreateResp"}
}
%Spear.Grpc.Response{
  data: "",
  message: "Leader info available",
  status: :not_found,
  status_code: 5
}


  1) test given a stream contains events info about a psub to :all can be fetched (SpearTest)
     test/spear_test.exs:310
     Assertion with == failed
     code:  assert Spear.create_persistent_subscription(c.conn, :all, group, settings) == :ok
     left:  {:error, %Spear.Grpc.Response{data: "", message: "Leader info available", status: :not_found, status_code: 5}}
     right: :ok
     stacktrace:
       test/spear_test.exs:314: (test)



Finished in 1.3 seconds (1.3s async, 0.00s sync)
64 tests, 1 failure, 63 excluded

Randomized with seed 439068

edit:

I accidentally submitted this issue before i was done writing it.

Is there recomended way to handle this? Look up leader before creating the subscription? or is this a bug?

handle broken subscriptions

Shouldn't be too hard to reproduce in dev: open up a subscription in one pane and restart the EventStoreDB in another. Could sent a simple atom like :eos (although this is kinda co-opting the gRPC spec) or some empty struct %Spear.SubscriptionTerminated{}.

Cannot parse :caught_up Response (EventStore 23.10)

Hi, we've just tried updating from EventStore 21.10 to 23.10 and we are getting the following exception at startup. It might be possible that this issue belongs to https://github.com/fabriziosestito/commanded-spear-adapter but from the stracktrace it seems to belong here.

[error] GenServer Dia.App.Spear.Commanded.EventStore.Adapters.Spear.EventPublisher terminating ** (FunctionClauseError) no function clause matching in Spear.Event.from_read_response/2     (spear 1.3.2) lib/spear/event.ex:355: Spear.Event.from_read_response({:"event_store.client.streams.ReadResp", {:caught_up, {:"event_store.client.streams.ReadResp.CaughtUp"}}}, [json_decoder: #Function<1.23440359/2 in Commanded.EventStore.Adapters.Spear.EventPublisher.handle_info/2>])     (commanded_spear_adapter 0.2.1) lib/commanded/event_store/adapters/spear/event_publisher.ex:54: Commanded.EventStore.Adapters.Spear.EventPublisher.handle_info/2     (stdlib 5.1.1) gen_server.erl:1077: :gen_server.try_handle_info/3     (stdlib 5.1.1) gen_server.erl:1165: :gen_server.handle_msg/6     (stdlib 5.1.1) proc_lib.erl:241: :proc_lib.init_p_do_apply/3

Potential race condition when acking an event on a Persistent Subscription

Hi,
Since acknowledging a message is done asynchronously, it is possible that a subscriber would ack an event and then immediately quit. In this case, Mint.HTTP2.cancel_request/2 would be invoked, potentially preventing the acking to be sent to the db.
Would it be possible to add a synchronous acking/nacking? I would gladly work on a pull request if you are interested, however, since I'm not that familiar with Mint, nor the records that should be returned, a few pointers would be appreciated.
Thank you for the very well documented code.

Streaming events returns an error when from is greater than the last event number

I found this problem while experimenting with Spear.

Let say we have 30 events in stream with uuid stream_id (with numbers 0,...,29).
This returns an error instead of []

MySpearClient.stream!(stream_id, from: 30)

After some debugging, I found that this is caused by Spear.Reading.Stream.wrap_buffer_in_decode_stream/3 not handling this case.
The case statement receives this:

{{:"event_store.client.streams.ReadResp", {:last_stream_position, 9}}

which produces a FunctionClauseError once it propagates down to unfold_continuous.

To handle this I think it should do something like:

    case unfold_chunk(buffer) do
      {Streams.read_resp(content: {:stream_not_found, Streams.read_resp_stream_not_found()}),
               _rest} ->
         []

-      {message, _rest} ->
+      {event() = message, _rest} ->
         Stream.unfold(
           %__MODULE__{state | buffer: buffer, from: message},
           unfold_fn
         )

-      nil ->
+      _ ->
         []
     end
   end

Acking a by-category stream, doesn' seem to work

Hey again, I will try to contribute by finding the error myself, but I can post it here in the mean time.

It seems that acking on a persistent subscription on a by-category stream is not working.
:ok = Spear.ack(JaaApiLive.EventStoreDbClient, sub, msg)
evaluates fine, but the event is sent over and over.

My guess is that it is somehow related to the event being a link as it is a projection stream.

getPersistentSubcriptionInfo/1 ?

Hi, some of the other clients implement a getPersistentSubscriptionInfo function, granted, it's easy enough to implement with Spear.request/5 eg:

iex(1)>  {:ok, conn} = Spear.Connection.start_link(connection_string: "esdb://localhost:2113")
{:ok, #PID<0.234.0>}
iex(2)> require Spear.Records.Persistent
Spear.Records.Persistent
iex(3)> require Spear.Records.Shared
Spear.Records.Shared
iex(4)> stream_id = Spear.Records.Shared.stream_identifier(stream_name: "$ce-commandedtestdadc98d0_31d8_4518_b4a2_d4a148595ec8")
{:"event_store.client.StreamIdentifier",
 "$ce-commandedtestdadc98d0_31d8_4518_b4a2_d4a148595ec8"}
iex(5)> stream_options = Spear.Records.Persistent.get_info_req_options(stream_option:  {:stream_identifier, stream_id}, group_name: "subscriber")
{:"event_store.client.persistent_subscriptions.GetInfoReq.Options",
 {:stream_identifier,
  {:"event_store.client.StreamIdentifier",
   "$ce-commandedtestdadc98d0_31d8_4518_b4a2_d4a148595ec8"}}, "subscriber"}
iex(6)> get_info_message= Spear.Records.Persistent.get_info_req(options: stream_options)
{:"event_store.client.persistent_subscriptions.GetInfoReq",
 {:"event_store.client.persistent_subscriptions.GetInfoReq.Options",
  {:stream_identifier,
   {:"event_store.client.StreamIdentifier",
    "$ce-commandedtestdadc98d0_31d8_4518_b4a2_d4a148595ec8"}}, "subscriber"}}
iex(7)> {:ok, resp} = Spear.request(conn, Spear.Records.Persistent,:GetInfo, [get_info_message])
{:ok,
 {:"event_store.client.persistent_subscriptions.GetInfoResp",
  {:"event_store.client.persistent_subscriptions.SubscriptionInfo",
   "$ce-commandedtestdadc98d0_31d8_4518_b4a2_d4a148595ec8", "subscriber",
   "Live", [], 0, 2, 0, "", "2", true, "0", 10000, false, 10, 100, 300, 100,
   3000, 1, 100, 2, 3, 1, 0, 0, "RoundRobin", 1, 0}}}
iex(8)> {_, infos} = resp
{:"event_store.client.persistent_subscriptions.GetInfoResp",
 {:"event_store.client.persistent_subscriptions.SubscriptionInfo",
  "$ce-commandedtestdadc98d0_31d8_4518_b4a2_d4a148595ec8", "subscriber", "Live",
  [], 0, 2, 0, "", "2", true, "0", 10000, false, 10, 100, 300, 100, 3000, 1,
  100, 2, 3, 1, 0, 0, "RoundRobin", 1, 0}}
iex(9)>  Spear.Records.Persistent.subscription_info(infos) |> Enum.into(%{})
%{
  average_per_second: 0,
  buffer_size: 300,
  check_point_after_milliseconds: 3000,
  connections: [],
  count_since_last_measurement: 0,
  event_source: "$ce-commandedtestdadc98d0_31d8_4518_b4a2_d4a148595ec8",
  extra_statistics: false,
  group_name: "subscriber",
  last_checkpointed_event_position: "",
  last_known_event_position: "2",
  live_buffer_count: 3,
  live_buffer_size: 100,
  max_check_point_count: 100,
  max_retry_count: 10,
  max_subscriber_count: 1,
  message_timeout_milliseconds: 10000,
  min_check_point_count: 1,
  named_consumer_strategy: "RoundRobin",
  outstanding_messages_count: 0,
  parked_message_count: 0,
  read_batch_size: 100,
  read_buffer_count: 2,
  resolve_link_tos: true,
  retry_buffer_count: 1,
  start_from: "0",
  status: "Live",
  total_in_flight_messages: 0,
  total_items: 2
}

If you think such a function could live under the Spear#utility set of functions, I'd gladly attempt a pull request.

There would be two issues to keep in mind :

  • a struct Spear.PersistentSubscription.Info would probably needed;
  • from a cursory look at other clients, it seems the feature may not be present on all servers, so an extra check would be needed.

If you think you'd rather have client applications handle that through Spear.request/5, feel free to close the issue.

Thanks.

catch subscription drops due to stream deletions

just saw this testing out persistent subscriptions: it looks like it terminates the grpc request if you delete the stream while subscribed

should test it out and come up with a solution for psubs & regular subs

roadmap to v1.0.0

Here's the general plan:

  • v0.1.0: Streams API (streams.proto)
  • v0.1.1: Resolve #4
  • v0.1.2: Resolve #6
  • v0.1.3: TLS functionality & documentation
  • v0.2.0: keep-alive functionality and configuration validation
  • v0.3.0: Users API (users.proto)
  • v0.4.0: Operations API (operations.proto)
  • v0.5.0: Gossip API (gossip.proto)
  • v0.6.0: Persistent Subscriptions API (persistent.proto)
  • v0.x.0: Projections API (projections.proto) (see #22)
  • Broadway integration for persistent subscriptions API
  • v1.0.0: tested and tried out in prod for a good while

persistent subscription ack failure when writing many events on same connection

We've encountered an issue where persistent subscriptions would experience slowdowns because it seems like acknowledgments are not being processed (message would be retried). We've been able to reproduce it in [1]. The fix for us has been to use a separate connection for writing and for connecting to the persistent subscription. Not sure if this is expected behaviour (or an appropriate fix). Is this something that's a known issue?

[1] script demonstrating issue: https://gist.github.com/dvic/a2177efd0d95fc6ff755b2650928cbca (setting separate_clients = true) fixes the issue

persistent subscriptions to :all

EventStore/EventStore#2869 was recently merged that added server-side support for persistent subscriptions to the magic $all stream.

The syntax in Spear should be roughly the same as it works for Spear.subscribe/4: pass the :all atom instead of a stream name to use the all stream

iex> Spear.connect_to_persistent_subscription(conn, self(), :all, "PSubIEx")

We'll have to wait until a stable version of EventStoreDB is released that supports this version, though.

subscription backpressure (?)

Currently Spear.subscribe/4 emits info messages to the subscriber process with Kernel.send/2 as fast as the messages are received. I don't have any test cases for it but a simple test in IEx shows messages piling up in the subscriber mailbox until they're flushed (i.e. with flush/0 from the IEx helpers) or otherwise handled. There is currently no backpressure a subscriber could exert to ensure that their mailbox doesn't get flooded.

A consumer could work around this by reading up until they are at the end of the stream and then subscribing, and an implementation involving a GenStage which uses Spear.read_stream/3 or Spear.stream!/3 along with t:Enumerable.continuation/0s to catch up based on consumer demand and then subscribes with Spear.subscribe/4 could handle this backpressure pretty well. (See the GenStage.Streamer implementation for inspiration.) This implementation would suffer from bursty appends to the EventStoreDB stream, though, and could end up with a large mailbox regardless with sustained large writes to the stream. I'm sure there's some clever work-around to that problem if one can cheaply determine the process mailbox size. (I'm not too familiar with how to interact with the process mailbox.)

I'm a bit concerned that trying to solve this in spear is incorrect: doing any sort of GenServer.call/3 from the Spear.Connection would inherently block the connection. If the Spear.Connection process were to GenServer.cast/2 or Kernel.send/2 the message to some intermediary subscription GenServer which would then perform the GenServer.call/3, that intermediary GenServer would have the full mailbox instead of the subscription process. Maybe that's acceptable or some sort of improvement, but if a Spear user wants to implement subscriptions that way, the current emit-style of Spear.subscribe/4 supports building that yourself.

emit subscription events with their subscription reference?

currently subscriptions (regular & persistent) emit events to subscribers in the shape of

Spear.Event.t() | Spear.Filter.Checkpoint.t() | {:eos, :closed | :dropped}

If a process wants receive multiple subscriptions, though, it's not clear which subscription each event or closed reason is from, so that subscriber process doesn't know which subscription to re-subscribe to in cases of failure/shutdown.

So what I propose is that instead we emit messages to subscribers in the shape of

{subscription(), Spear.Event.t() | Spear.Filter.Checkpoint.t()} | {:eos, subscription(), :closed | :dropped}
  when: subscription() :: reference()

Where the subscription reference is returned by both Spear.subscribe/4 and Spear.connect_to_persistent_subscription/5.

This would allow a single subscriber process to receive events from multiple subscriptions and know which one came from which.


why is this necessary at all?

If we want to use spear through GenStage and specifically Broadway, we have to deal with there only being one producer allowed. We can subscribe to multiple persistent subscriptions if we know which one we're targetting. This is less helpful for regular ole' linear subscriptions, but with GenStage, it becomes the handler (consumer) process' responsibility to store stream position since the producer has only asynchronous communication with the consumer. The handler needs to store the stream position for the correct producer, and so it needs to be able to disambiguate which subscription an event came from. It also needs to know which subscription is terminating with the {:eos, subscription, reason} signature.

what say you @NFIBrokerage/baemax ?

implement the Streams API BatchAppend rpc

The BatchAppend rpc is a new rpc call in the streams API which is supposed to optimize append throughput.

from #45:

here's the diff of the streams.proto definitions which adds in the new batch-append rpc

diff --git a/src/Protos/Grpc/streams.proto b/src/Protos/Grpc/streams.proto
index d5fc1e50b..dd71e65f5 100644
--- a/src/Protos/Grpc/streams.proto
+++ b/src/Protos/Grpc/streams.proto
@@ -3,12 +3,16 @@ package event_store.client.streams;
 option java_package = "com.eventstore.dbclient.proto.streams";
 
 import "shared.proto";
+import "status.proto";
+import "google/protobuf/empty.proto";
+import "google/protobuf/timestamp.proto";
 
 service Streams {
 	rpc Read (ReadReq) returns (stream ReadResp);
 	rpc Append (stream AppendReq) returns (AppendResp);
 	rpc Delete (DeleteReq) returns (DeleteResp);
 	rpc Tombstone (TombstoneReq) returns (TombstoneResp);
+	rpc BatchAppend (stream BatchAppendReq) returns (stream BatchAppendResp);
 }
 
 message ReadReq {
@@ -157,48 +161,101 @@ message AppendResp {
+message BatchAppendReq {
+	event_store.client.UUID correlation_id = 1;
+	Options options = 2;
+	repeated ProposedMessage proposed_messages = 3;
+	bool is_final = 4;
+
+	message Options {
+		event_store.client.StreamIdentifier stream_identifier = 1;
+		oneof expected_stream_position {
+			uint64 stream_position = 2;
+			google.protobuf.Empty no_stream = 3;
+			google.protobuf.Empty any = 4;
+			google.protobuf.Empty stream_exists = 5;
+		}
+		google.protobuf.Timestamp deadline = 6;
+	}
+
+	message ProposedMessage {
+		event_store.client.UUID id = 1;
+		map<string, string> metadata = 2;
+		bytes custom_metadata = 3;
+		bytes data = 4;
+	}
+}
+
+message BatchAppendResp {
+	event_store.client.UUID correlation_id = 1;
+	oneof result {
+		google.rpc.Status error = 2;
+		Success success = 3;
+	}
+
+	event_store.client.StreamIdentifier stream_identifier = 4;
+
+	oneof expected_stream_position {
+		uint64 stream_position = 5;
+		google.protobuf.Empty no_stream = 6;
+		google.protobuf.Empty any = 7;
+		google.protobuf.Empty stream_exists = 8;
+	}
+
+	message Success {
+		oneof current_revision_option {
+			uint64 current_revision = 1;
+			google.protobuf.Empty no_stream = 2;
+		}
+		oneof position_option {
+			event_store.client.AllStreamPosition position = 3;
+			google.protobuf.Empty no_position = 4;
+		}
+	}
+}
+

(note that changes to event_store.client.{shared. => }* have been omitted)

Glancing at the new proto it looks like you stream individual BatchAppendReq messages where each BatchAppendReq adds a chunk of new messages. I would imagine that this rpc was created for the migrator efforts (the tool that migrates data from one eventstoredb to another) where the total append bytes would typically exceed the maximum. It also looks like there are some cool new controls in there for setting a deadline timestamp which could prove useful.

read-only mode

it shouldn't be immensely difficult to implement a read-only option for connections that disallows the Spear.Connection from emitting any write-related requests

I don't think the protos have any information about which RPCs are write-operations but it should be somewhat easy to curate a list or mapping

@write_apis %{
  Spear.Records.Streams => [:Write, :Delete, :Tombstone],
  ..
}

And check against this when a request is received, giving {:error, :read_only} when a write API is given in read-only mode.

Identical revision/commit for multiple events

This might be something I don't understand about eventstore and not a Spear problem at all..
However, I'm trying to set the :from parameter on a subscription to a $by-category stream.

On other libraries, it's common to make a position from the commit_position and prepare_position values. In spear this is wrapped by just providing the last event you want to start the catch up subscription from (I really like that feature!)

Unfortunately, I seem to get all events regardless on the from: event.
Further, by inspection I see that the commit_position and prepare_position is equal for many of my events.
Here are three events in different streams that all have the same value for commit and prepare. Is this intentional from eventstore? Is there any way to specify the from: parameter on a $by-category stream?

%Spear.Event{
  body: %{"hadet" => "e983a588-a6da-11eb-9840-f02f742e9919"},
  id: "852b72a9-05a8-4315-93b7-7fd73be6e316",
  metadata: %{
    commit_position: 18446744073709551615,
    content_type: "application/json",
    created: ~U[2021-04-26 22:01:11.574827Z],
    custom_metadata: "",
    prepare_position: 18446744073709551615,
    stream_name: "jaa_api_live-stream5",
    stream_revision: 0,
    subscription: #Reference<0.3430913167.73400327.38263>
  },
  type: "second"
}
%Spear.Event{
  body: %{"hadet" => "ea609394-a6da-11eb-83d4-f02f742e9919"},
  id: "ddd15ce0-213f-43c4-83cc-ab8063978e9c",
  metadata: %{
    commit_position: 18446744073709551615,
    content_type: "application/json",
    created: ~U[2021-04-26 22:01:13.022904Z],
    custom_metadata: "",
    prepare_position: 18446744073709551615,
    stream_name: "jaa_api_live-stream6",
    stream_revision: 0,
    subscription: #Reference<0.3430913167.73400327.38263>
  },
  type: "second"
}
%Spear.Event{
  body: %{"hadet" => "eb86aa10-a6da-11eb-a356-f02f742e9919"},
  id: "0fa0c07f-33d5-4ab6-b5a4-b6a535a8f0c5",
  metadata: %{
    commit_position: 18446744073709551615,
    content_type: "application/json",
    created: ~U[2021-04-26 22:01:14.949941Z],
    custom_metadata: "",
    prepare_position: 18446744073709551615,
    stream_name: "jaa_api_live-stream8",
    stream_revision: 0,
    subscription: #Reference<0.3430913167.73400327.38263>
  },
  type: "second"
}

`Spear.subscribe/4` can return an `{:ok, _}` tuple even when subscription failed

This issue seems to make the API less predictable, unless there are unknown to me reasons for it working like this.

Imagine we create a subscription like this:

{:ok, subscription} = Spear.subscribe(conn, self(), :all)

This code could succeed even when there are problems with the connection - e.g. esdb://wrong:password@host:2113/. And this is because Spear.subscribe/4 does return an {:ok, _} tuple even in some failure cases:

Success (as documented) - subscription created:

{:ok, #Reference<0.508686625.3000238081.184532>}

Access denied (no events will be received by self()):

{:ok,
 %Spear.Connection.Response{
   status: 200,
   type: {:event_store_db_gpb_protobufs_streams,
    :"event_store.client.streams.ReadResp"},
   headers: [
     {"date", "Wed, 09 Nov 2022 13:48:31 GMT"},
     {"content-type", "application/grpc"},
     {"server", "Kestrel"},
     {"content-length", "0"},
     {"exception", "access-denied"},
     {"grpc-status", "7"},
     {"grpc-message", "Access Denied"}
   ],
   data: ""
 }}

401 (no events will be received by self()):

{:ok,
 %Spear.Connection.Response{
   status: 401,
   type: {:event_store_db_gpb_protobufs_streams,
    :"event_store.client.streams.ReadResp"},
   headers: [
     {"date", "Wed, 09 Nov 2022 13:48:03 GMT"},
     {"server", "Kestrel"},
     {"www-authenticate", "X-Basic realm=\"ESDB\""},
     {"content-length", "0"}
   ],
   data: ""
 }}

How to Pass Event Metadata

Reading: https://developers.eventstore.com/server/v21.10/streams.html#event-metadata

How do I pass $correlationId or $causationId to the events?

I am not sure if I suppose to pass it as part of custome_metadata or if I suppose to update the metadata key under the Spear.Event struct based on the following code snippet:

spear/lib/spear/event.ex

Lines 162 to 165 in 40d5f2f

metadata: %{
content_type: Keyword.get(opts, :content_type, "application/json"),
custom_metadata: Keyword.get(opts, :custom_metadata, <<>>)
}

I appreciate any help you can provide.

[Question] About Gun

That project looks good but it depends on :gun which doesn't play nice with other dependencies.

Hey peeps, I am curious to learn from your experience about such a statement. Do you mind expanding fo what was the issue with :gun?

Thank you in advance.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.