sroze / messenger-enqueue-transport Goto Github PK
View Code? Open in Web Editor NEWUses Enqueue with Symfony's Messenger component.
License: MIT License
Uses Enqueue with Symfony's Messenger component.
License: MIT License
Hi! I use RabbitMQ like amqp transport, my configuration:
enqueue:
transport:
default: "amqp://guest:guest@rabbitmq:5672/%2f"
framework:
messenger:
transports:
amqp: enqueue://default
I don't understand, how I can send different messages to different RabbitMQ queues through single bus? Could I use TransportConfiguration
or it is only for topics? It is not clear in the documentation.
Thanks!
@makasim can you create the enqueue/messenger-adapter
package on Packagist? And give the according permissions to whoever needs them 😉
When adding transport-specific metadata through a TransportConfiguration it's not possible to use another transport type in the same bus, as it would have a different Message without the required setter.
I guess it would be ok to skip silently when the setter does not exist
Packagist is still on 0.2.2 as latest stable version.
We should serialize/deserialize the message envelope items via the headers (Symfony's default message serializer does this already). All we should have to do is to set the headers properly from the encoded message :)
Hi,
For several days i've been trying to figure how to configure enqueue to work with the an AMQP Topic exchange.
According to the given documentation, here is the messenger.yml config file:
framework:
messenger:
transports:
amqp_book_search: enqueue://default
?queue[name]=foo
&topic[name]=bar
routing:
# Route your messages to the transports
'App\Message\SearchForBookInformationMessage': baz
Quick look inside the controller file :
public function search($isbn) {
// Wrap the query inside a Message
$message = new SearchForBookInformationMessage(
array(
'isbn' => $isbn
));
$query = new Envelope($message);
$query->with(new TransportConfiguration(
[
'topic' => 'bar',
'metadata' => [
'routingKey' => 'foo.#'
]
]
));
$this->messageBus->dispatch($message);
// ....
}
But when I try to do so, I get the following error :
In QueueInteropTransportFactory.php line 92:
[RuntimeException]
Can't find Enqueue's transport named "default ": Service "enqueue.transport.default .context" is not found.
What am I doing wrong here ?
Does anyone have a working use case for this feature ?
Cheers,
Pxl.
Initially thought it's an issue with Messenger itself.
I'm using enqueue with Kafka and messenger adapter to send messages to external services. Here's my messenger.yaml:
framework:
messenger:
transports:
# Uncomment the following line to enable a transport named "amqp"
amqp: '%env(MESSENGER_TRANSPORT_DSN)%'
view_events: '%env(MESSENGER_TRANSPORT_DSN)%?topic[name]=view_events&queue[name]=view_events'
routing:
# Route your messages to the transports
'App\Message\DisplayNotification':
senders: [ view_events ]
send_and_handle: true
'*':
senders: [ amqp ]
send_and_handle: true
Notice: send_and_handle
are only for debugging atm.
Related packages (and Symfony 4.2):
enqueue/amqp-tools 0.9.4 Message Queue Amqp Tools
enqueue/dsn 0.9.2 Parse DSN
enqueue/enqueue 0.9.6 Message Queue Library
enqueue/enqueue-bundle 0.9.3 Message Queue Bundle
enqueue/messenger-adapter 0.2.0 Enqueue adapter for Symfony Messenger component
enqueue/null 0.9.2 Enqueue Null transport
enqueue/rdkafka 0.9.2 Message Queue Kafka Transport
And enqueue.yaml:
enqueue:
default:
transport:
dsn: "kafka:"
global:
metadata.broker.list: '%env(KAFKA_SERVERS)%'
topic:
auto.offset.reset: beginning
commit_async: true
client: ~
I'm emitting messages using:
public function addEvent(Request $request, string $hash, $selectedEntry): void
{
$event = DisplayNotification::createFromRequest($request, $hash, $selectedEntry);
$this->bus->dispatch($event);
}
This causes messages to be received twice. Once for amqp and once for view_events transports. When looking for the reason why this is happening I added send_and_handle
settings and realized:
SendMessageMiddleware
contains RdKafkaMessage
instead of expected DisplayNotification
.Looking through SenderInterface
I noticed that it's implementation QueueInteropTransport
expects to receive and return an Envelope
. However, envelope returned is a new instance of Envelope wrapping previous Envelope as message. I believe it should be the previous instance instead?
send
method exerpt below:
public function send(Envelope $message): Envelope
{
...
$encodedMessage = $this->serializer->encode($message);
$originalMessage = $message;
$message = $context->createMessage(
...
return $message; // Should be $originalMessage?
When $originalMessage
is returned, Messenger component behaves as expected: send message only once and handlers receive expected objects.
I'll create pull request in a moment, but do you see any issues with this change? Will it break anything in enqueue itself?
Hello guys !
Since the Symfony 4.2 as entered RC phase and the current stable version of this package is not compatible with it, do you plan to release a new version before the official release of Symfon 4.2 ?
Thanks
PS: and if you need some hands to help here, I'll be glad to give some time 😄.
Hi,
When i use enqueue sqs the --time-limit doesn't work but it work when i use amqp.
php bin/console messenger:consume-messages feed --time-limit=10
Here my configuration
"enqueue/messenger-adapter": "^0.1.2",
"enqueue/sqs": "^0.8.41",
framework:
messenger:
default_bus: app.messenger.command_bus
transports:
feed: "%env(ENQUEUE_DSN)%?topic[name]=feed&queue[name]=feed"
routing:
App\Message\FeedMessage: 'feed'
buses:
app.messenger.command_bus:
middleware:
- messenger.middleware.validation
- App\Middleware\RetryQueueMiddleware
app.messenger.retry_bus: ~
enqueue:
client: ~
transport:
default: 'sqs'
sqs:
key: "%env(AWS_KEY)%"
secret: "%env(AWS_SECRET)%"
region: "%env(AWS_REGION)%"
###> enqueue/sqs ###
ENQUEUE_DSN=enqueue://default
###< enqueue/sqs ###
Thanks for your help.
I do not see a way to have persistent messages with RabbitMQ. This requires instantiating the following AMQPMessage:
new \PhpAmqpLib\Message\AMQPMessage($body, ['delivery_mode' => 2]);
I cannot see a way to make that happen with existing code.
If I were to implement this, it seems like adding another option to QueueInteropTransport would be the way to do this. Usually, if you are going to persist messages you are going to persist all the messages in a queue and not just selective ones.
Took a while to figure out what was going on as we're trying to use Symfony Messenger with SQS.
SQS doesn't really have the need for topics as we're putting things into a specific queue and reading them back out of it.
We had setup our config like so:
enqueue:
transport:
default: 'sqs'
sqs:
key: "%env(SQS_KEY)%"
secret: "%env(SQS_SECRET)%"
token: null
region: 'ap-southeast-1'
version: '2012-11-05'
framework:
messenger:
transports:
sqs: "enqueue://default?queue[name]=email_queue"
routing:
'App\Component\Mail\Message': sqs
But when trying to send to the queue we were getting messages from SQS that the queue didn't exist.
Some debugging later we found the queue was being set to "messages". Looking into the code we found this https://github.com/php-enqueue/messenger-adapter/blob/master/QueueInteropTransport.php#L109
In the SQS library both createTopic
and createQueue
do the same thing, create an SqsDestination with the given topic/queue name as the queue to use.
In order to make our code work we had to update our config to this:
sqs: "enqueue://default?topic[name]=email_queue&queue[name]=email_queue"
Both are needed as while the send
method uses the topic, the receive
method uses the queue name instead.
This kind of setup makes sense from a RabbitMQ perspective, but seems a bit odd for something like SQS.
I was wondering if it would make more sense that if the topic name is not set then it uses the queue name instead (currently they both default to messages
)? If not, could it be documented that this might be required as we couldn't find any reference to needing to do this.
Hi,
when sending messages to FIFO queues parameter MessageGroupId is required.
How to configure transport for .fifo queue?
enqueue.yaml
enqueue:
default:
transport: "%env(ENQUEUE_DSN)%"
client: ~
messenger.yaml
framework:
messenger:
transports:
sqs: enqueue://default?&topic[name]=test.fifo&queue[name]=test.fifo&receiveTimeout=3
routing:
'App\Message\Message': sqs
Tnx.
Hi, I tried to integrate messenger-adapter to work with my symfony/messenger component. I set up everything according to documentation. I have rabbitmq running on DSN : amqp://rabbit:rabbit@devel_rabbit:5672/%2f so I defined it as ENQUEUE_DSN
in .env
file.
The problem is, whenever I run bin/console messenger:consume-messages amqp
following exception appears :
[LogicException]
The scheme "amqp" is not supported. Supported "null"
Do you know what could be the cause ?
Thanks
My message/command CreateStore
is properly send to Cloud Pub Sub like this :
use App\Message\Command\Store\CreateStore;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;
$command = new CreateStore();
$this->commandBus->dispatch((new Envelope($command))->with(new TransportConfiguration(
['topic' => 'enqueue.commands']
)));
All message/command are in my Cloud Pub Sub Queue, I can see them (gcloud pubsub subscriptions pull enqueue.commands
) and acknowledge them manual via the gcloud Command-Line Tool.
Now I trying to consume my message by running bin/console messenger:consume-messages enqueue
. The consumer run but nothing happening.
What I'm missing to consume my message?
Here are my config files :
service.yaml
framework:
messenger:
transports:
default: 'amqp://guest:guest@localhost:5672/%2f/messages'
enqueue: 'enqueue://gps'
default_bus: messenger.bus.commands
buses:
messenger.bus.commands: ~
messenger.bus.events: ~
routing:
# Route your messages to the transports
'App\Message\Command\Store\CreateStore': enqueue
enqueue.yaml
enqueue:
transport:
default: 'gps'
gps:
projectId: '%env(GOOGLE_PROJECT_ID)%'
keyFilePath: '%env(GOOGLE_APPLICATION_CREDENTIALS)%'
Hi,
The deletion of Amazon SQS messages is not working anymore with the new QueueInteropTransport
inherited from Symfony 4.3.
Interop\Queue\Message\SqsMessage
need to have the attribute receiptHandle
defined in order to be deleted.
This worked before 4.3 because the same object was received and sent by the transport but now, as the messages are reconstructed with QueueInteropTransport::encodeMessage
they loose this attribute.
Do you have any idea how this could be fixed ?
Thanks.
i saw that deliveryDelay
can be set at the DSN https://github.com/php-enqueue/messenger-adapter#configure-the-queues-and-exchanges.
is theres a way to set the deliveryDelay
options on the fly when i send a message because itu deliveryDelay
value will be different between messages?
in documentation is stated that to consume you need to run:
bin/console messenger:consume-messages amqp
my question is how to run this in PROD environment because in doc there is not info how to create queues in rabbitMQ
running in prod like:
messenger:consume-messages amqp --env=prod
results in error: NOT_FOUND - no queue 'messages' in vhost '/'
in dev environment, above command runs as expected
in dev env they are created because of debug flag:
if ($this->debug) {
$this->contextManager->ensureExists($destination);
}
in prod, presumably they should be crated manually
but command: enqueue:setup-broker
does not create queues
I found also queue is created here:
public function recoverException(\Exception $exception, array $destination): bool
{
if ($exception instanceof \AMQPQueueException) {
if (404 === $exception->getCode()) {
return $this->ensureExists($destination);
}
}
return false;
}
on exception but, my exception here is different from one that you are catching mine is:
\PhpAmqpLib\Exception\AMQPProtocolChannelException
from PhpAmqpLib, one of supported enqueue transports:
https://github.com/php-enqueue/enqueue-dev/tree/master/docs/transport
to sum it up either currently there is bug that messenger adapter is hardcoded to use only one of supported transports
or there is no info id doc how to create queues for prod environemnt
please, how to use the MessageBusProcessor
with symfony/messenger 4.3?
Hi,
I tryed to use your adapter with redis transporter.
I'm using symfony 4.1 with messenger, i followed steps, when at step 5 i got :
In TransportFactory.php line 37:
No transport supports the given DSN "enqueue://default".
I think messenger component isn't connected to enqueue, i checked and enqueue bundle is loaded bundles.php
It's maybe an messenger change before the release this week ?
I publish a Kafka message like this one:
$this->bus->dispatch(new \Enqueue\RdKafka\RdKafkaMessage('foo', ['topic' => 'test']));
I expect this to publish my message foo
in topic test
.
In fact, in send
method of Enqueue\MessengerAdapter\QueueInteropTransport
my message (wrapped in an Symfony\Component\Messenger\Envelope
) is encoded by an instance of Symfony\Component\Messenger\Transport\Serialization\Serializer
which gives:
array(2) {
["body"]=>
string(72) ""{\"body\":\"plop\",\"properties\":{\"topic\":\"test\"},\"headers\":[]}""
["headers"]=>
array(1) {
["type"]=>
string(30) "Enqueue\RdKafka\RdKafkaMessage"
}
}
After creation of a new \Enqueue\RdKafka\RdKafkaMessage
(in the same function), I finally have the following sent message:
object(Enqueue\RdKafka\RdKafkaMessage)#402 (7) {
["body":"Enqueue\RdKafka\RdKafkaMessage":private]=>
string(72) ""{\"body\":\"plop\",\"properties\":{\"topic\":\"test\"},\"headers\":[]}""
["properties":"Enqueue\RdKafka\RdKafkaMessage":private]=>
array(0) {
}
["headers":"Enqueue\RdKafka\RdKafkaMessage":private]=>
array(1) {
["type"]=>
string(30) "Enqueue\RdKafka\RdKafkaMessage"
}
["redelivered":"Enqueue\RdKafka\RdKafkaMessage":private]=>
bool(false)
["partition":"Enqueue\RdKafka\RdKafkaMessage":private]=>
NULL
["key":"Enqueue\RdKafka\RdKafkaMessage":private]=>
NULL
["kafkaMessage":"Enqueue\RdKafka\RdKafkaMessage":private]=>
NULL
}
My original message has been serialized in the body of the sent message and the target topic is the default one (i.e. messages
) instead of test
.
I see that the destination topic comes from $this->options
but I do not know how to set that options.
It is difficult for me to know the best way (implement a new encoder, define some options, upgrade code, ...) to achieve what I want to do. That's why I'm asking you for help.
Hi there.
I followed this code example.
https://stefanoalletti.wordpress.com/2018/11/05/from-rabbitmq-to-phpenqueue-via-symfony-messenger/
When I try to run "php bin/console messenger:consume mailer" command I got following error.
In CheckExceptionOnInvalidReferenceBehaviorPass.php line 86:
The service "enqueue.messenger_transport.factory" has a dependency on a non-existent service "enqueue.locator".
I don't know why enqueue locator is missing. Can anyone help me with this issue?
Thanks.
Hi everyone,
Small question about the adapter, is there any way to use Redis in a similar way as the amqp
transport?
I need to do something quite similar to this: https://medium.com/@dimitri.gritsajuk/symfony-messenger-is-here-but-your-project-is-on-symfony-3-4-no-problem-part-2-204fb18520b6 and I don't find any RedisTransportFactory
(I've found a RedisConnectionFactory
but Symfony wait for a TransportFactory instance).
Thanks again for the support.
I followed this documentation because i wanted to test my producer on an API-platform route.
In my case, i want to run my tests in a Gitlab CI environement, which mean there is no real need in testing the connexion to my broker (RabbitMQ here). Therefore, i've followed this configuration :
# app/config/config_test.yml
enqueue:
default:
transport: 'null:'
client: ~
I get an error because the enveloppe is not a supported class of the NullTransport. I need the enveloppe to setup a routing key for rabbitmq.
Uncaught PHP Exception Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException: "Missing "setRoutingKey" setter for "routingKey" metadata key in "Enqueue\Null\NullMessage" class" at /srv/api/vendor/enqueue/messenger-adapter/QueueInteropTransport.php line 220
In doc there is info that you could set some properites of queue:
https://github.com/php-enqueue/messenger-adapter#configure-the-queues-and-exchanges
but when you look at code where queue is created
/**
* {@inheritdoc}
*/
public function ensureExists(array $destination): bool
{
if (!$this->psrContext instanceof AmqpContext) {
return false;
}
$topic = $this->psrContext->createTopic($destination['topic']);
$topic->setType(AmqpTopic::TYPE_FANOUT);
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
$this->psrContext->declareTopic($topic);
$queue = $this->psrContext->createQueue($destination['queue']);
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
$this->psrContext->declareQueue($queue);
$this->psrContext->bind(new AmqpBind($queue, $topic));
return true;
}
you can see that not all attributes of queue are set
in particular I have problem with setting priority
in above code $destination
has only values:
Array
(
[topic] => command_bus
[queue] => command_bus
)
no arguments regarding priority are passed,
so when you set in symfony arguments:
command_bus: enqueue://default?queue[name]=command_bus&topic[name]=command_bus&priority=2
&priority=2
would be ignored, and queue will not have priorities
Hello !
I've just discovered the new symfony/messenger
ecosystem which seems pretty exciting. I've an issue (more a question than a problem) about the QueueInteropTransport
implementation.
I'm trying to log message decoding error during the development phase but I can't find a clean way to do this. It seems related to the try {} catch {}
used inside the QueueInteropTransport
object.
try {
$handler($this->decoder->decode(array(
'body' => $message->getBody(),
'headers' => $message->getHeaders(),
'properties' => $message->getProperties(),
)));
$consumer->acknowledge($message);
} catch (RejectMessageException $e) {
$consumer->reject($message);
} catch (RequeueMessageException $e) {
$consumer->reject($message, true);
} catch (\Throwable $e) {
$consumer->reject($message);
}
Is there a way to log exception messages somewhere using configuration or by extending / decorating an object ? It seems that Exception
messages are never sent outside of that bloc of code. I've tried most of the solution that I know today.
Actually, I'm using Symfony 4.1-BETA3 with the messenger-adapter, and the enqueue/redis connector.
It would be great to be able to use the main functionnalities of this adapter in a non Symfony framework project, without requiring tons of useless Symfony bundles dependencies.
Hi, i tried to use messenger-adapter and found few bugs:
QueueInteropTransport::send
dont implement TransportInterface
:Compile Error: Declaration of Enqueue\\MessengerAdapter\\QueueInteropTransport::send($message): void must be compatible with Symfony\\Component\\Messenger\\Transport\\SenderInterface::send(Symfony\\Component\\Messenger\\Envelope $envelope)
deliveryDelay
is true, must be call producer::setDelayStrategy
methodI'm using Symfony's Messenger component, going through this interop layer and finally going through Enqueue.
It seems like envelopes are serialized as-is, without inspecting their stamps.. The DelayStamp
is not taken into account at all when creating the interop message.
Now, it may probably be easy to loop through the original envelope and try to call $interopMessage->setDelay()
when a DelayStamp
is encountered. However it doesn't appear like this message delay thing is part of the interop Message interface.
So there's a need to either extend the interface or to do some method_exists()
hacks on it.
Not supporting delay stamps is a complete dealbreaker for using this interop layer. If messages cannot be retried in a sensible manner (that is, with a delay), there's usually no point in retrying (10 retries within the same second usually leads to 10 failures).
Expected result: deliveryDelay
from TransportConfiguration
is used over deliveryDelay
specified in transport's options.
Actual result:
Enqueue\MessengerAdapter\Exception\MissingMessageMetadataSetterException: Missing "setDeliveryDelay" setter for "deliveryDelay" metadata key in "Double\Enqueue\MessengerAdapter\Tests\Fixtures\DecoratedPsrMessage\P1"
Sorry for not providing a PR, this test can be pasted directly to QueueInteropTransportTest
public function testDeliveryDetail()
{
$topicName = 'topic';
$queueName = 'queue';
$message = new \stdClass();
$message->foo = 'bar';
$envelope = (new Envelope($message))->with(new TransportConfiguration(array(
'metadata' => array('routingKey' => 'foo.bar', 'deliveryDelay' => 1000),
)));
$psrMessageProphecy = $this->prophesize(DecoratedPsrMessage::class);
$psrMessageProphecy->setRoutingKey('foo.bar')->shouldBeCalled();
$psrMessage = $psrMessageProphecy->reveal();
$topicProphecy = $this->prophesize(Topic::class);
$topic = $topicProphecy->reveal();
$producerProphecy = $this->prophesize(Producer::class);
$producerProphecy->send($topic, $psrMessage)->shouldBeCalled();
$contextProphecy = $this->prophesize(Context::class);
$contextProphecy->createTopic($topicName)->shouldBeCalled()->willReturn($topic);
$contextProphecy->createProducer()->shouldBeCalled()->willReturn($producerProphecy->reveal());
$contextProphecy->createMessage('foo', array(), array())->shouldBeCalled()->willReturn($psrMessage);
$contextManagerProphecy = $this->prophesize(ContextManager::class);
$contextManagerProphecy->context()->shouldBeCalled()->willReturn($contextProphecy->reveal());
$contextManagerProphecy->ensureExists(array(
'topic' => $topicName,
'topicOptions' => array('name' => $topicName, 'foo' => 'bar'),
'queue' => $queueName,
'queueOptions' => array('name' => $queueName, 'bar' => 'foo'),
))->shouldBeCalled();
$encoderProphecy = $this->prophesize(SerializerInterface::class);
$encoderProphecy->encode($envelope)->shouldBeCalled()->willReturn(array('body' => 'foo'));
$transport = $this->getTransport(
$encoderProphecy->reveal(),
$contextManagerProphecy->reveal(),
array(
'topic' => array('name' => $topicName, 'foo' => 'bar'),
'queue' => array('name' => $queueName, 'bar' => 'foo'),
),
true
);
$transport->send($envelope);
}
I see that the commit fbcaa00 aims to add support for Messenger 4.3, but there's no tag for it yet.
Are there any missing tasks for complete 4.3 support?
if we do a $envelope->all(TransportConfiguration::class) and iterate over the results we can gather all of those stamps.
This would make it possible to incrementally add configuration options using multiple Middlewares, instead of only having one.
i have this weird error with symfony/messenger v4.3.1
and sroze/messenger-enqueue-transport v0.3
. it was working with symfony/messenger v4.2
and sroze/messenger-enqueue-transport v0.2
. looks like the problem raises in acking a message.
21:42:57 INFO [messenger] Received message "App\Messenger\Message\BidMessage" ["message" => App\Messenger\Message\BidMessage^ { …},"class" => "App\Messenger\Message\BidMessage"]
21:42:57 INFO [messenger] Sending message "App\Messenger\Message\Notification\ParticipantNotificationMessage" with "Enqueue\MessengerAdapter\QueueInteropTransport" ["message" => App\Messenger\Message\Notification\ParticipantNotificationMessage^ { …},"class" => "App\Messenger\Message\Notification\ParticipantNotificationMessage","sender" => "Enqueue\MessengerAdapter\QueueInteropTransport"]
21:42:57 INFO [messenger] Message "App\Messenger\Message\BidMessage" handled by "App\Messenger\Handler\BidHandler::__invoke" ["message" => App\Messenger\Message\BidMessage^{ …},"class" => "App\Messenger\Message\BidMessage","handler" => "App\Messenger\Handler\BidHandler::__invoke"]
21:42:57 INFO [messenger] App\Messenger\Message\BidMessage was handled successfully (acknowledging to transport). ["message" => App\Messenger\Message\BidMessage^ { …},"class" => "App\Messenger\Message\BidMessage"]
In ClientMethods.php line 998:
[Bunny\Exception\ClientException (406)]
PRECONDITION_FAILED - unknown delivery tag 0
Exception trace:
() at /code/vendor/bunny/bunny/src/Bunny/ClientMethods.php:998
Bunny\AbstractClient->awaitExchangeDeclareOk() at /code/vendor/bunny/bunny/src/Bunny/ClientMethods.php:958
Bunny\AbstractClient->exchangeDeclare() at /code/vendor/bunny/bunny/src/Bunny/ChannelMethods.php:47
Bunny\Channel->exchangeDeclare() at /code/vendor/enqueue/amqp-bunny/AmqpContext.php:161
Enqueue\AmqpBunny\AmqpContext->declareTopic() at /code/vendor/sroze/messenger-enqueue-transport/AmqpContextManager.php:64
Enqueue\MessengerAdapter\AmqpContextManager->ensureExists() at /code/vendor/sroze/messenger-enqueue-transport/QueueInteropTransport.php:65
Enqueue\MessengerAdapter\QueueInteropTransport->get() at /code/vendor/symfony/messenger/Worker.php:90
Symfony\Component\Messenger\Worker->run() at /code/vendor/symfony/messenger/Worker/StopWhenRestartSignalIsReceived.php:54
Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived->run() at /code/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:228
Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /code/vendor/symfony/console/Command/Command.php:255
Symfony\Component\Console\Command\Command->run() at /code/vendor/symfony/console/Application.php:939
Symfony\Component\Console\Application->doRunCommand() at /code/vendor/symfony/framework-bundle/Console/Application.php:87
Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /code/vendor/symfony/console/Application.php:273
Symfony\Component\Console\Application->doRun() at /code/vendor/symfony/framework-bundle/Console/Application.php:73
Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /code/vendor/symfony/console/Application.php:149
Symfony\Component\Console\Application->run() at /code/bin/console:38
composer info:
enqueue/amqp-bunny 0.9.11 Message Queue Amqp Transport
enqueue/amqp-tools 0.9.8 Message Queue Amqp Tools
enqueue/dsn 0.9.2 Parse DSN
enqueue/enqueue 0.9.11 Message Queue Library
enqueue/enqueue-bundle 0.9.10 Message Queue Bundle
enqueue/null 0.9.2 Enqueue Null transport
sroze/messenger-enqueue-transport 0.3.0 Enqueue adapter for Symfony Messenger component
symfony/framework-bundle v4.3.1 Symfony FrameworkBundle
symfony/messenger v4.3.1 Symfony Messenger Component
kunicmarko/jms-messenger-adapter dev-master 3aa771b Use JMS Serializer with Symfony Messenger.
a symfony/messenger bug, deleted
I have multiple senders configured and everything was working fine until I updated to the latest messenger version.
After some debugging, I realized that the message is being enveloped more than once here.
I also noticed that the $message
variable gets redefined here - it was originally an Envelope
instance containing my original message under the message
parameter and it becomes an instance of \Interop\Amqp\Impl\AmqpMessage
, which gets enveloped and returned in the end of this method.
It seems that the issue also started after this change on the MiddlewareInterface, as the $envelope
variable now gets redefined after each iteration.
AMQP allows different modes of routing.
At this time, it is only possible to have a durable queue with a fanout exchange.
If you currently only install this adapter, but do not install the serializer, you'll get this error at container build time:
The service "enqueue.messenger_transport.factory" has a dependency on a non-existent service
"messenger.transport.serializer
Unless you're doing something really custom, the issue will always be the same: you haven't installed the serializer. I think we could add a compiler pass that checks for the messenger.transport.serializer
service and throws an exception if it's missing (this is similar to what FrameworkExtension does):
The Messenger serializer transport is missing. Try enabling it or running "composer require symfony/serializer-pack".'
Added extension RepeatMessage (new EnvelopeItem RepeatMessage) PR: #15
It would be interesting if we were able to add arbitrary metadata to a transport's messages through the configuration.
Something like:
transports:
my_transport: 'enqueue://sqs?topic[name]=topic&metadata[messageGroupId]=my-group-id'
then that metadata could be merged with the one that comes from TransportConfiguration stamps when sending the message
This way it would be easier to make it work with more transports and different configurations, like with SQS FIFO queues (#40), for instance.
Im running my command
bin/console messenger:consume amqp
But Im getting this error
In TransportFactory.php line 42:
No transport supports the given Messenger DSN "enqueue://default".
Everything worked perfect in Symfony 4.2
# messenger.yaml
framework:
messenger:
transports:
amqp: enqueue://default
routing:
'*': amqp
# enqueue.yaml (with my database connection)
enqueue:
default:
transport: '%env(resolve:DATABASE_URL)%'
client: ~
"enqueue/dbal": "^0.9.8",
"enqueue/enqueue-bundle": "^0.9.8",
"sroze/messenger-enqueue-transport": "^0.3.0",
"symfony/messenger": "4.3.*",
AmqpContextManager::recoverException
checks against php-amqp's \AMQPQueueException
which makes it not compatible with php-amqplib. It'd be great to either have a generic AmqpContextManager
that could deal with both libraries at a time or at least have a sane way to provide our own implementation of ContextManager
as right now it requires overriding entire Enqueue\MessengerAdapter\QueueInteropTransportFactory
.
The reason is:
Enqueue team does not support the adapter project so I'd love to make it clear for everyone that this projected has nothing to do with this org.
On exceptions the QueueInteropTransport should not requeue the message instead of rejected it?
If this is the correct behaviour, how can I make the message at least not be rejected, to let te visibility timeout of AWS SQS set the message visible again?
Just to let you know that the default receive timeout of 30 seconds introduced in version 0.2.0:
Is incompatible with SQS (max 20 seconds):
https://github.com/php-enqueue/sqs/blob/b203de327b4f5045717b9420de071a19c0f78afc/SqsConsumer.php#L88
Not really a big deal but wasn't sure if 30 seconds was picked for a specific or arbitrary reason.
We're fixing our setup by setting the receive timeout, which is probably the best solution anyway, but thought you might consider updating the 30 seconds to 20 just to avoid confusion.
Hi @sroze ,
Do you think it would be possible to have a version with latest commits?
The original Kafka Message gets lost.
Here it's still present: \Enqueue\RdKafka\RdKafkaConsumer::doReceive
It gets lost when returning the date here: \Enqueue\MessengerAdapter\QueueInteropTransport::get
When it comes time for ACKing the message, we run into LogicException:
The message could not be acknowledged because it does not have kafka message set.
here \Enqueue\RdKafka\RdKafkaConsumer::acknowledge
We might need an InteropMessageStamp
, which adds the \Interop\Queue\Message
object, for later use.
Maybe even better to use \Symfony\Component\Messenger\Stamp\TransportMessageIdStamp
and set the offset as the id.
Is there any specific reason why there isn't support for enqueue/enqueue-bundle[0.9.x-dev]
?
Since this Symfony's FrameworkBundle PR was merged (4.3.3):
symfony/framework-bundle@5205108
The internal option resolver of https://github.com/sroze/messenger-enqueue-transport/blob/master/QueueInteropTransport.php makes the constructor fail because the key transport_name
(introduced in commit linked before) is not handled by the option resolver.
Error details:
In OptionsResolver.php line 796:
[Symfony\Component\OptionsResolver\Exception\UndefinedOptionsException]
The option "transport_name" does not exist. Defined options are: "delayStrategy", "deliveryDelay", "priority", "queue", "receiveTimeout", "timeToLive", "topic".
Exception trace:
() at /var/www/html/vendor/symfony/options-resolver/OptionsResolver.php:796
Symfony\Component\OptionsResolver\OptionsResolver->resolve() at /var/www/html/vendor/sroze/messenger-enqueue-transport/QueueInteropTransport.php:60
Enqueue\MessengerAdapter\QueueInteropTransport->__construct() at /var/www/html/vendor/sroze/messenger-enqueue-transport/QueueInteropTransportFactory.php:60
Enqueue\MessengerAdapter\QueueInteropTransportFactory->createTransport() at /var/www/html/vendor/symfony/messenger/Transport/TransportFactory.php:40
Symfony\Component\Messenger\Transport\TransportFactory->createTransport() at /var/www/html/var/cache/dev/ContainerCW1VQ2l/getMessenger_Transport_PublicationsService.php:23
require() at /var/www/html/var/cache/dev/ContainerCW1VQ2l/srcApp_KernelDevDebugContainer.php:441
ContainerCW1VQ2l\srcApp_KernelDevDebugContainer->load() at /var/www/html/vendor/symfony/dependency-injection/Container.php:433
Symfony\Component\DependencyInjection\Container->getService() at /var/www/html/vendor/symfony/dependency-injection/Argument/ServiceLocator.php:40
Symfony\Component\DependencyInjection\Argument\ServiceLocator->get() at /var/www/html/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:184
Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /var/www/html/vendor/symfony/console/Command/Command.php:255
Symfony\Component\Console\Command\Command->run() at /var/www/html/vendor/symfony/console/Application.php:939
Symfony\Component\Console\Application->doRunCommand() at /var/www/html/vendor/symfony/framework-bundle/Console/Application.php:87
Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /var/www/html/vendor/symfony/console/Application.php:273
Symfony\Component\Console\Application->doRun() at /var/www/html/vendor/symfony/framework-bundle/Console/Application.php:73
Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /var/www/html/vendor/symfony/console/Application.php:149
Symfony\Component\Console\Application->run() at /var/www/html/bin/console:42
messenger:consume [-l|--limit LIMIT] [-m|--memory-limit MEMORY-LIMIT] [-t|--time-limit TIME-LIMIT] [--sleep SLEEP] [-b|--bus BUS] [-h|--help] [-q|--quiet] [-v|vv|vvv|--verbose] [-V|--version] [--ansi] [--no-ansi] [-n|--no-interaction] [-e|--env ENV] [--no-debug] [--] <command> [<receivers>...]
What is the proper approach ?
unset($options['transport_name']);
like done here: symfony/messenger@326990fconfigureOptions()
method to add support of the transport_name
key (null || string) ?According to your preference, I can PR if needed.
Best regards.
When starting a consumer with a time limit, it will only be checked after messages have been received, so the consumer will not stop after the timeout when there are no messages.
I've started the worker like this:
$ bin/console messenger:consume-messages test --time-limit=1
Then there will be an infinite loop in https://github.com/php-enqueue/messenger-adapter/blob/master/QueueInteropTransport.php#L72:
while (!$this->shouldStop) {
try {
if (null === ($message = $consumer->receive($this->options['receiveTimeout'] ?? 0))) {
continue;
}
// ...
}
When taking a look at Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver::receive
, triggering the handler with null
should fix this, like:
while (!$this->shouldStop) {
try {
if (null === ($message = $consumer->receive($this->options['receiveTimeout'] ?? 0))) {
$handler(null);
continue;
}
// ...
}
Reference: https://github.com/symfony/messenger/blob/v4.1.0/Transport/AmqpExt/AmqpReceiver.php#L43
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.