Firstly I have to say that this is my first issue on GitHub that describes an error. Please don't be offended :). I wanted to use this library in my sample project but after digging into the code, I decided against it. I really like its api, so I can wait until this issue is closed.
This library only wraps messages in a transaction. While this is fine if you don't want immediately consistent read models, this is important to understand the real issue.
// from DoctrineMessageRepository.php
$this->connection->beginTransaction();
$this->connection->prepare($sql)->execute($params);
$this->connection->commit();
The consistency issues start in the next block of code.
// from ConstructingAggregateRootRepository.php
$this->repository->persist(...$messages);
$this->dispatcher->dispatch(...$messages);
After the messages have been persisted successfully, some of the following errors may occur. I take RabbitMq as an example, because you listed it in your documentation. But this can be replaced with any message broker, even sql implementations.
- A general purpose network fault occurs.
- RabbitMq has shut down because it is being updated.
- RabbitMq can't handle new messages.
- RabbitMq leaves you alone with a connection timeout.
- ...
If one of the errors above occur, the application is in an inconsistent state because the handlers are not informed that something happened. This isn't even eventual consistency, because it never happened from the view of the handlers.
There are several solutions to this problem.
Firstly, if the library wants to provide immediately consistent read models, like stated in the documentation, it should call the handlers within the same transaction where the messages are persisted. Of course, this only works if the same database and connection is used.
Secondly, all messages which are persisted successfully should be published to all interested handlers in another process. But it's important, that it only publishes messages which are persisted successfully. This process is also called "store and forward". I've seen recommendations to read uncommitted transactions for not reading the events out of order, but this is also faulty because the last sql statement "COMMIT;" eventually never reaches the server and the handlers already read that uncommitted event. The best I've seen is the following approach. It's implemented in my sample project's event store. Actually, the logic is spread over several classes. For the sake of simplicity, I copied the code so it works procedurally. This approach works only for auto increment event ids. Of course you can also create a global event id, like uuidv4. The auto increment id is only important for the following code.
// This script was written down quickly. It's not tested.
// The client should specify the batch size.
$batchSize = 1000;
// The client should specify the throttle time.
$throttleTimeInMicroseconds = 100000;
$lastStoredEventId = 0; // Retrieve the pointer somewhere.
// Build the handlers somewhere.
// This can be read model builders,
// or something that forwards the events to another bounded context / application.
$storedEventSubscribers = [];
while(true) {
$events = $eventStore->storedEventsSince($lastStoredEventId, $batchSize);
if (count($events) !== 0) {
$lastProcessedEventId = $lastStoredEventId;
foreach ($events as $event) {
// This following if statement can happen, when two transactions are open,
// but the second is faster than the first.
if ($event->id() !== $lastProcessedEventId + 1) {
// Throw an exception to the client of this library.
// The client knows best how to handle it.
// Maybe it's not relevant in the clients particular situation.
// Maybe the client needs to wait a short amount of
// time until the previous message comes.
// Maybe it never arrives because the first transaction was a failing one.
// However, the client knows best how to handle it.
throw new OutOfOrderException();
}
$storedEventPublisher->publish($event);
$lastProcessedEventId = $event->id();
}
$lastStoredEventId = $lastProcessedEventId;
// The $lastStoredEventId should be persisted somewhere (not only in memory).
// If the process is crashed, this process can pick up where its left off.
} else {
// Don't stress the sql server if no messages are available.
usleep($throttleTimeInMicroseconds);
}
}
There are some caveats with this approach. This is at most once consistency. It happens definitely, but maybe more than one times. All handlers should be idempotent.
Messaging is hard, you may provide a link to a good book so users don't blindly think it works just because it does work in the happy path. Maybe I've overlooked the part where you talk about immediately and eventual consistency approaches in your documentation. I think you should mention these and provide links for further reading.
I can imagine that the library is used productively. Please inform the users about it if you see an issue here.
Some book recommendations