Coder Social home page Coder Social logo

photon-pump's People

Contributors

bghitulescu avatar bobthemighty avatar bram2000 avatar brandon2255p avatar epoplavskis avatar hjwp avatar jlee1-made avatar lurst avatar palankai avatar pyup-bot avatar reecehub avatar shaunickmistry avatar vtermanis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

photon-pump's Issues

connect helper function does not allow definition of a retry policy

When a Client is created using the helper function connect the retry policy created for the client connector is a default policy with a with retries_per_node set to 0. This means that by default connections will attempt to re connect with a single endpoint forever. In the situation where some eventstore node in a cloud like setup vanishes (e.g. has been terminated either on purpose or by the cloud provider) this mean that the application would never recover it's connection. I would suggest one of two solutions

  1. The default is set to something that will allow it to try other nodes in the cluster
    https://github.com/madedotcom/photon-pump/blob/master/photonpump/connection.py#L78
class Connector:
    def __init__(
        self,
        discovery,
        dispatcher,
        retry_policy=None,
        ctrl_queue=None,
        connect_timeout=5,
        name=None,
        loop=None,
    ):
        self.name = name
...
        self.retry_policy = retry_policy or DiscoveryRetryPolicy(retries_per_node=0)
  1. We allow the retry policy to be passed in to connect

Remove the Reply logic from Dispatcher

The dispatcher is doing way too much work.

The model where a Conversation returns a Reply is nice for testability but the current design sucks.

We should consider either:

  • A polymorphic Reply type that encapsulates the logic that's in handle_reply OR
  • Moving the reply logic into the Conversation.

Either of these will require us to rewrite all the conversation tests.

At the end, the Dispatcher should be much simpler.

Broken with latest `protobuf`?

Looks like the package is broken with newer protobuf packages? I get this error message after a dependency update in a project using this module:

  ...
  File "/baas/app/eventstore/eventstore_db.py", line 4, in <module>
    import photonpump
  File "/usr/local/lib/python3.8/site-packages/photonpump/__init__.py", line 3, in <module>
    from .connection import *  # noqa
  File "/usr/local/lib/python3.8/site-packages/photonpump/connection.py", line 12, in <module>
    from . import conversations as convo
  File "/usr/local/lib/python3.8/site-packages/photonpump/conversations.py", line 16, in <module>
    from photonpump import messages
  File "/usr/local/lib/python3.8/site-packages/photonpump/messages.py", line 10, in <module>
    from . import messages_pb2
  File "/usr/local/lib/python3.8/site-packages/photonpump/messages_pb2.py", line 32, in <module>
    _descriptor.EnumValueDescriptor(
  File "/usr/local/lib/python3.8/site-packages/google/protobuf/descriptor.py", line 755, in __new__
    _message.Message._CheckCalledFromGeneratedFile()
TypeError: Descriptors cannot not be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
 1. Downgrade the protobuf package to 3.20.x or lower.
 2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).

More information: https://developers.google.com/protocol-buffers/docs/news/2022-05-06#python-updates

Reading "$all" to read all events does not work.

I am trying to read all the events because we store every aggregate as a stream. Therefore we do not have the name of every stream.

The EventStore HTTP works by reading the stream $all. How do I do this? I see there is a TcpCommand ReadAllEventsBackward = 0xB8

Fix setup

Make sure the package installs its on dependency and itself.

10:47 $ python setup.py develop
Traceback (most recent call last):
  File "setup.py", line 8, in <module>
    import photonpump
  File "/Users/csaba/work/made.com/photon-pump/photonpump/__init__.py", line 6, in <module>
    from .messages import *
  File "/Users/csaba/work/made.com/photon-pump/photonpump/messages.py", line 9, in <module>
    from . import messages_pb2
  File "/Users/csaba/work/made.com/photon-pump/photonpump/messages_pb2.py", line 6, in <module>
    from google.protobuf.internal import enum_type_wrapper
ModuleNotFoundError: No module named 'google'

Photonpump should send heartbeat requests to the server

If we close the connection from underneath photonpump, eg. by terminating a VPN tunnel, then the client never notices any difference.

We should periodically (suggest every 60 secs) send a heartbeat request to the server. If we fail some consecutive number of requests, we should issue a reconnection request to the connection handler.

When reconnecting from master to slave, subscriptions fail in the background

If we have a running subscription and we restart the master node then we might connect to a slave. When that happens, we fail to reconnect subscriptions with a NotHandled error. This error occurs in the background and doesn't bubble to client code.

Instead, the NotHandled error should cause us to reconnect, maybe by returning a new ReplyAction.Reconnect to the EventstoreProtocol.

Tidy up test helpers

There are test helper methods scattered around the codebase, it's time to pull them together into some cohesive module and consider creating test data builders for each response type.

Kill all conversations if the dispatcher loop fails

Sometimes bad things happen and the dispatcher loop explodes. When this happens we stop sending and receiving data, but we don't bubble the exception up to the open conversations.

We should call error on every open conversation so that the exception bubbles up to the caller and they can reconnect or exit as appropriate.

Stop a volatile subscription when reaching the end of the stream

Hi,

I am trying to reconstruct the state of an aggregate by creating a volatile subscription to the stream. All is fine except that it keeps forever waiting for new events but I was expecting it to finish after receiving all.

Am I using the wrong api call ?

Improve overall test coverage

There's a bunch of missing tests for the connection module still, and the situation is worse for discovery.

Let's try to get the coverage > 95% on both of those modules.

Timeout and Retry for Conversations

Currently if we don't receive a response to an OutboundMessage, the future for the Conversation will block forever. Instead we would like to have timeouts on our Conversations.

Implement timeout for conversations. If we reach the timeout without receiving a response we should retry the message or return a TimeoutError to the client.

In particular, when we send a ConnectPersistentSubscription message, if we don't receive a response, we should retry. This is necessary for properly handling #32

Python 3.8 support

here's the first bug i ran into

    import photonpump
usr/local/lib/python3.8/site-packages/photonpump/__init__.py:3: in <module>
    from .connection import *  # noqa
usr/local/lib/python3.8/site-packages/photonpump/connection.py:10: in <module>
    from . import conversations as convo
usr/local/lib/python3.8/site-packages/photonpump/conversations.py:6: in <module>
    from asyncio.base_futures import InvalidStateError
E   ImportError: cannot import name 'InvalidStateError' from 'asyncio.base_futures' (/usr/local/lib/python3.8/asyncio/base_futures.py)

Add strict type checking to build for messages.py

We want to have strict type checks. We'll do this module by module. The simplest module to start with is messages.py since it has no onward dependencies.

Set up mypy or pyre to do type checking and fail the build if there are missing or inconsistent type signatures anywhere in the module.

Clean shutdown

Currently when we shut down the client, there's a bunch of things that aren't smoothly cancelled/closed.

Running the tests with PYTHONASYNCIODEBUG=1 dumps a frightening number of stacktraces. We would like to cleanly shut down.

Unable to use volatile subscriptions

When trying to use a volatile subscription, I get an error:

In [6]: async with connect() as c:
   ...:     async for event in c.subscribe_to("mystream"):
   ...:         print(event.json())
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
cell_name in async-def-wrapper()

TypeError: 'async for' requires an object with __aiter__ method, got coroutine

I'm able to use .iter("mystream") without a problem. Am I doing something wrong?

Fix tests

Make the tests pass

11:35 $ py.test
platform darwin -- Python 3.6.1, pytest-3.0.7, py-1.4.33, pluggy-0.4.0
rootdir: /Users/csaba/work/made.com/photon-pump, inifile:
collected 14 items

test/connection_test.py EEE
test/read_test.py EEEEEE
test/streaming_iterator_test.py EE
test/write_test.py EEE


E       fixture 'event_loop' not found
>       available fixtures: cache, capfd, capsys, doctest_namespace, monkeypatch, pytestconfig, record_xml_property, recwarn, tmpdir, tmpdir_factory
>       use 'pytest --fixtures [testpath]' for help on them.

Connection lost on persistent subscription (HEARTBEAT timeout)

Hello, I'm using a persistent subscription to receive events from Event Store then I proceed to a heavy processing that take some time. When the time is too important (some seconds), ES cut the connection and so consider that the event weren't handled. Here is a short version the code:

    async for event in subscription.events:
        await subscription.ack(event)
        time.sleep(21)

The result is the following error (it doesn't crash but restart the precedent event or list of event):
image

here are the logs of Event Store:
image

I think that it's the generator that is waiting for the events that handle the connection and because the process take time to come back to it, I receive a Heartbeat Timeout.

I tried to ping the connection in a thread in background but I didn't succeed (and I think that it's not the good way to do it).

Is there anything to do in order to avoid these timeouts?
Thanks

Review Logs

We need to make sure that the logs are sensible at info level, and that we're not completely overwhelming at DEBUG level. When there's a problem, it's incredibly useful to have the binary dumps, but not all the time. Maybe we need a special verbose flag? I introduced a custom loglevel TRACE for recording message traces but I'm not sure that's appropriate.

[feature] Afford write/read and read-only modes

Example use case: a data analyst who wants to do some local analysis of real data. It would be nice to be able to instantiate a client/connection/conversation in read-only mode for safe exploration of prod data locally.

Add a "name" to Clients, Connectors, Dispatchers et.

Occasionally we need to run multiple connections in a single service. The connection_number introduced in 0.5 is useful in diagnostics, but doesn't disambiguate multiple connectors.

Add an optional "name" to connections, and use it in the logger name for Client, Connector, Dispatcher, Reader, Writer, Subscription etc.

I should be able to run two distinct clients in my application, and understand which of them wrote which log lines.

Add subscribe_to for all events

After being able to read all events from #78
It is now required that we are able to subscribe to all the events in order to retrieve new events

@bobthemighty Any insight you have on this would be appreciated.
I will be tackling this soon because we will be needing this feature soon.

explicitly handle expected version failures (and tidy up WriteEvents.reply())

eventstore supports setting expected_version on an event, to allow for optimistic concurrency

https://eventstore.com/docs/http-api/creating-writing-a-stream/index.html?tabs=tabid-1%2Ctabid-3%2Ctabid-5%2Ctabid-7%2Ctabid-17%2Ctabid-11%2Ctabid-13%2Ctabid-15#expected-version-header

we support setting that flag, but we don't currently (i think) handle the errors that would come back.

implementation would be in the async def reply() of class WriteEvents. currently the only special case is for AccessDenied. we should add a check for whatever the expected-version-error is.

at that point the reply() method will be quite long. consider switching to the same pattern we use for Read conversations, which is delegating to a wrapper called eg WriteEventCompleted which defines a .dispatch() method. See for example ReadEvent.reply() and class ReadEventCompleted

TLS

Unless I am much mistaken, our connections to the server are currently unencrypted by default, and there is no option to use TLS.

EventStoreDB's hosted offering will be TLS-only

MADE.com might also want to switch to using to using TLS, for security n that.

Implementation-wise:

Recreate MessageReader, MessageWriter, EventstoreProtocol per-transport

Currently, the client has a single instance of MessageReader and MessageWriter for its entire lifetime. The Connector has an instance of transport which it receives from Asyncio's connection_made callback.

I suspect that the heartbeat issue on Cancellation Service is happening under the following circumstances:

  • Photonpump doesn't respond to a server heartbeat in a timely manner because [CLOUD].
  • We receive EOF on the transport
  • We schedule cancellation of our reader, writer, dispatcher loops.
  • We respond to the connection closed by opening a new connection
  • We start sending and receiving data with our new connection
  • Asyncio cancels the reader, writer, dispatcher loops as scheduled earlier
  • Photonpump stops processing data because its loops have stopped
  • Go to step 1.

To fix this, we want to split the Protocol implementation out of Connector. Connector should all create_connection, passing a ProtoocolFactory that builds a new protocol per transport.

When we receive a new transport, the Connector should create a new MessageReader and MessageWriter with the StreamReader/StreamWriter pair.

When we terminate a connection, we can safely cancel the reader/writer/protocol since there's no shared state.

Because we always restart conversations on reconnection, it is safe to throw away the state in MessageReader/MessageWriter.

The Protocol type can be constructed with a unique id, plus the information about the target_host. This ought to mean that all mutable state can move away from the Connector. To effect that, we will need to separate the Discovery and Connect commands.

Rewrite the connection module

Now that Conversations are all neatly tested, the worst bit of the code is the connection module.

We need to decide how best to break up the required functionality into coroutines, and get each of them under test.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.