epoplavskis / photon-pump Goto Github PK
View Code? Open in Web Editor NEWA TCP eventstore client for modern Python
License: MIT License
A TCP eventstore client for modern Python
License: MIT License
When a Client is created using the helper function connect
the retry policy created for the client connector is a default policy with a with retries_per_node set to 0. This means that by default connections will attempt to re connect with a single endpoint forever. In the situation where some eventstore node in a cloud like setup vanishes (e.g. has been terminated either on purpose or by the cloud provider) this mean that the application would never recover it's connection. I would suggest one of two solutions
class Connector:
def __init__(
self,
discovery,
dispatcher,
retry_policy=None,
ctrl_queue=None,
connect_timeout=5,
name=None,
loop=None,
):
self.name = name
...
self.retry_policy = retry_policy or DiscoveryRetryPolicy(retries_per_node=0)
connect
The dispatcher is doing way too much work.
The model where a Conversation returns a Reply is nice for testability but the current design sucks.
We should consider either:
Either of these will require us to rewrite all the conversation tests.
At the end, the Dispatcher should be much simpler.
Looks like the package is broken with newer protobuf
packages? I get this error message after a dependency update in a project using this module:
...
File "/baas/app/eventstore/eventstore_db.py", line 4, in <module>
import photonpump
File "/usr/local/lib/python3.8/site-packages/photonpump/__init__.py", line 3, in <module>
from .connection import * # noqa
File "/usr/local/lib/python3.8/site-packages/photonpump/connection.py", line 12, in <module>
from . import conversations as convo
File "/usr/local/lib/python3.8/site-packages/photonpump/conversations.py", line 16, in <module>
from photonpump import messages
File "/usr/local/lib/python3.8/site-packages/photonpump/messages.py", line 10, in <module>
from . import messages_pb2
File "/usr/local/lib/python3.8/site-packages/photonpump/messages_pb2.py", line 32, in <module>
_descriptor.EnumValueDescriptor(
File "/usr/local/lib/python3.8/site-packages/google/protobuf/descriptor.py", line 755, in __new__
_message.Message._CheckCalledFromGeneratedFile()
TypeError: Descriptors cannot not be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
1. Downgrade the protobuf package to 3.20.x or lower.
2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).
More information: https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates
I am trying to read all the events because we store every aggregate as a stream. Therefore we do not have the name of every stream.
The EventStore HTTP works by reading the stream $all. How do I do this? I see there is a TcpCommand ReadAllEventsBackward = 0xB8
Make sure the package installs its on dependency and itself.
10:47 $ python setup.py develop
Traceback (most recent call last):
File "setup.py", line 8, in <module>
import photonpump
File "/Users/csaba/work/made.com/photon-pump/photonpump/__init__.py", line 6, in <module>
from .messages import *
File "/Users/csaba/work/made.com/photon-pump/photonpump/messages.py", line 9, in <module>
from . import messages_pb2
File "/Users/csaba/work/made.com/photon-pump/photonpump/messages_pb2.py", line 6, in <module>
from google.protobuf.internal import enum_type_wrapper
ModuleNotFoundError: No module named 'google'
If we close the connection from underneath photonpump, eg. by terminating a VPN tunnel, then the client never notices any difference.
We should periodically (suggest every 60 secs) send a heartbeat request to the server. If we fail some consecutive number of requests, we should issue a reconnection request to the connection handler.
Example:
conn.on_event.append(lambda e: print(e))
conn.watch('my-stream')
If we have a running subscription and we restart the master node then we might connect to a slave. When that happens, we fail to reconnect subscriptions with a NotHandled error. This error occurs in the background and doesn't bubble to client code.
Instead, the NotHandled error should cause us to reconnect, maybe by returning a new ReplyAction.Reconnect to the EventstoreProtocol.
There are test helper methods scattered around the codebase, it's time to pull them together into some cohesive module and consider creating test data builders for each response type.
Sometimes bad things happen and the dispatcher loop explodes. When this happens we stop sending and receiving data, but we don't bubble the exception up to the open conversations.
We should call error on every open conversation so that the exception bubbles up to the caller and they can reconnect or exit as appropriate.
In order to finish type checking properly
Hi,
I am trying to reconstruct the state of an aggregate by creating a volatile subscription to the stream. All is fine except that it keeps forever waiting for new events but I was expecting it to finish after receiving all.
Am I using the wrong api call ?
There's a bunch of missing tests for the connection module still, and the situation is worse for discovery.
Let's try to get the coverage > 95% on both of those modules.
Currently if we don't receive a response to an OutboundMessage, the future for the Conversation will block forever. Instead we would like to have timeouts on our Conversations.
Implement timeout for conversations. If we reach the timeout without receiving a response we should retry the message or return a TimeoutError to the client.
In particular, when we send a ConnectPersistentSubscription message, if we don't receive a response, we should retry. This is necessary for properly handling #32
here's the first bug i ran into
import photonpump
usr/local/lib/python3.8/site-packages/photonpump/__init__.py:3: in <module>
from .connection import * # noqa
usr/local/lib/python3.8/site-packages/photonpump/connection.py:10: in <module>
from . import conversations as convo
usr/local/lib/python3.8/site-packages/photonpump/conversations.py:6: in <module>
from asyncio.base_futures import InvalidStateError
E ImportError: cannot import name 'InvalidStateError' from 'asyncio.base_futures' (/usr/local/lib/python3.8/asyncio/base_futures.py)
We want to have strict type checks. We'll do this module by module. The simplest module to start with is messages.py since it has no onward dependencies.
Set up mypy or pyre to do type checking and fail the build if there are missing or inconsistent type signatures anywhere in the module.
Currently when we shut down the client, there's a bunch of things that aren't smoothly cancelled/closed.
Running the tests with PYTHONASYNCIODEBUG=1 dumps a frightening number of stacktraces. We would like to cleanly shut down.
When trying to use a volatile subscription, I get an error:
In [6]: async with connect() as c:
...: async for event in c.subscribe_to("mystream"):
...: print(event.json())
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
cell_name in async-def-wrapper()
TypeError: 'async for' requires an object with __aiter__ method, got coroutine
I'm able to use .iter("mystream")
without a problem. Am I doing something wrong?
Make the tests pass
11:35 $ py.test
platform darwin -- Python 3.6.1, pytest-3.0.7, py-1.4.33, pluggy-0.4.0
rootdir: /Users/csaba/work/made.com/photon-pump, inifile:
collected 14 items
test/connection_test.py EEE
test/read_test.py EEEEEE
test/streaming_iterator_test.py EE
test/write_test.py EEE
E fixture 'event_loop' not found
> available fixtures: cache, capfd, capsys, doctest_namespace, monkeypatch, pytestconfig, record_xml_property, recwarn, tmpdir, tmpdir_factory
> use 'pytest --fixtures [testpath]' for help on them.
Hello, I'm using a persistent subscription to receive events from Event Store then I proceed to a heavy processing that take some time. When the time is too important (some seconds), ES cut the connection and so consider that the event weren't handled. Here is a short version the code:
async for event in subscription.events:
await subscription.ack(event)
time.sleep(21)
The result is the following error (it doesn't crash but restart the precedent event or list of event):
here are the logs of Event Store:
I think that it's the generator that is waiting for the events that handle the connection and because the process take time to come back to it, I receive a Heartbeat Timeout.
I tried to ping the connection in a thread in background but I didn't succeed (and I think that it's not the good way to do it).
Is there anything to do in order to avoid these timeouts?
Thanks
We need to make sure that the logs are sensible at info level, and that we're not completely overwhelming at DEBUG level. When there's a problem, it's incredibly useful to have the binary dumps, but not all the time. Maybe we need a special verbose flag? I introduced a custom loglevel TRACE for recording message traces but I'm not sure that's appropriate.
Example use case: a data analyst who wants to do some local analysis of real data. It would be nice to be able to instantiate a client/connection/conversation in read-only mode for safe exploration of prod data locally.
Occasionally we need to run multiple connections in a single service. The connection_number introduced in 0.5 is useful in diagnostics, but doesn't disambiguate multiple connectors.
Add an optional "name" to connections, and use it in the logger name for Client, Connector, Dispatcher, Reader, Writer, Subscription etc.
I should be able to run two distinct clients in my application, and understand which of them wrote which log lines.
After being able to read all events from #78
It is now required that we are able to subscribe to all the events in order to retrieve new events
@bobthemighty Any insight you have on this would be appreciated.
I will be tackling this soon because we will be needing this feature soon.
eventstore supports setting expected_version
on an event, to allow for optimistic concurrency
we support setting that flag, but we don't currently (i think) handle the errors that would come back.
implementation would be in the async def reply()
of class WriteEvents
. currently the only special case is for AccessDenied
. we should add a check for whatever the expected-version-error is.
at that point the reply()
method will be quite long. consider switching to the same pattern we use for Read conversations, which is delegating to a wrapper called eg WriteEventCompleted
which defines a .dispatch()
method. See for example ReadEvent.reply()
and class ReadEventCompleted
Is it possible to park a message in order to send a response to ES on the persistent subscription async generator and then acknowledge later?
I don't see anything allowing to do that: https://eventstore.org/docs/http-api/swagger/don't%20acknowledge%20a%20single%20message
Unless I am much mistaken, our connections to the server are currently unencrypted by default, and there is no option to use TLS.
EventStoreDB's hosted offering will be TLS-only
MADE.com might also want to switch to using to using TLS, for security n that.
Implementation-wise:
connection.Connector._attempt_connect()
, we use asyncio.loop.create_connection()
which has an ssl=
option. so it's just a matter of threading thru some ssl config from the top-level connect()
call on down.Hello,
Is the project under active development and considered mature for production use ?
Currently, the client has a single instance of MessageReader and MessageWriter for its entire lifetime. The Connector has an instance of transport
which it receives from Asyncio's connection_made callback.
I suspect that the heartbeat issue on Cancellation Service is happening under the following circumstances:
To fix this, we want to split the Protocol implementation out of Connector. Connector should all create_connection, passing a ProtoocolFactory that builds a new protocol per transport.
When we receive a new transport, the Connector should create a new MessageReader and MessageWriter with the StreamReader/StreamWriter pair.
When we terminate a connection, we can safely cancel the reader/writer/protocol since there's no shared state.
Because we always restart conversations on reconnection, it is safe to throw away the state in MessageReader/MessageWriter.
The Protocol type can be constructed with a unique id, plus the information about the target_host. This ought to mean that all mutable state can move away from the Connector. To effect that, we will need to separate the Discovery and Connect commands.
Now that Conversations are all neatly tested, the worst bit of the code is the connection module.
We need to decide how best to break up the required functionality into coroutines, and get each of them under test.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.