Coder Social home page Coder Social logo

rabbitmqbundle's Introduction

php-amqplib

PHPUnit tests Latest Version on Packagist Total Downloads Software License

codecov Coverage Status Quality Score

This library is a pure PHP implementation of the AMQP 0-9-1 protocol. It's been tested against RabbitMQ.

The library was used for the PHP examples of RabbitMQ in Action and the official RabbitMQ tutorials.

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms.

Project Maintainers

Thanks to videlalvaro and postalservice14 for creating php-amqplib.

The package is now maintained by Ramūnas Dronga, Luke Bakken and several VMware engineers working on RabbitMQ.

Supported RabbitMQ Versions

Starting with version 2.0 this library uses AMQP 0.9.1 by default and thus requires RabbitMQ 2.0 or later version. Usually server upgrades do not require any application code changes since the protocol changes very infrequently but please conduct your own testing before upgrading.

Supported RabbitMQ Extensions

Since the library uses AMQP 0.9.1 we added support for the following RabbitMQ extensions:

  • Exchange to Exchange Bindings
  • Basic Nack
  • Publisher Confirms
  • Consumer Cancel Notify

Extensions that modify existing methods like alternate exchanges are also supported.

Related libraries

  • enqueue/amqp-lib is a amqp interop compatible wrapper.

  • AMQProxy is a proxy library with connection and channel pooling/reusing. This allows for lower connection and channel churn when using php-amqplib, leading to less CPU usage of RabbitMQ.

Setup

Ensure you have composer installed, then run the following command:

$ composer require php-amqplib/php-amqplib

That will fetch the library and its dependencies inside your vendor folder. Then you can add the following to your .php files in order to use the library

require_once __DIR__.'/vendor/autoload.php';

Then you need to use the relevant classes, for example:

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

Usage

With RabbitMQ running open two Terminals and on the first one execute the following commands to start the consumer:

$ cd php-amqplib/demo
$ php amqp_consumer.php

Then on the other Terminal do:

$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish

You should see the message arriving to the process on the other Terminal

Then to stop the consumer, send to it the quit message:

$ php amqp_publisher.php quit

If you need to listen to the sockets used to connect to RabbitMQ then see the example in the non blocking consumer.

$ php amqp_consumer_non_blocking.php

Change log

Please see CHANGELOG for more information what has changed recently.

API Documentation

http://php-amqplib.github.io/php-amqplib/

Tutorials

To not repeat ourselves, if you want to learn more about this library, please refer to the official RabbitMQ tutorials.

More Examples

  • amqp_ha_consumer.php: demos the use of mirrored queues.
  • amqp_consumer_exclusive.php and amqp_publisher_exclusive.php: demos fanout exchanges using exclusive queues.
  • amqp_consumer_fanout_{1,2}.php and amqp_publisher_fanout.php: demos fanout exchanges with named queues.
  • amqp_consumer_pcntl_heartbeat.php: demos signal-based heartbeat sender usage.
  • basic_get.php: demos obtaining messages from the queues by using the basic get AMQP call.

Multiple hosts connections

If you have a cluster of multiple nodes to which your application can connect, you can start a connection with an array of hosts. To do that you should use the create_connection static method.

For example:

$connection = AMQPStreamConnection::create_connection([
    ['host' => HOST1, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST],
    ['host' => HOST2, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST]
],
$options);

This code will try to connect to HOST1 first, and connect to HOST2 if the first connection fails. The method returns a connection object for the first successful connection. Should all connections fail it will throw the exception from the last connection attempt.

See demo/amqp_connect_multiple_hosts.php for more examples.

Batch Publishing

Let's say you have a process that generates a bunch of messages that are going to be published to the same exchange using the same routing_key and options like mandatory. Then you could make use of the batch_basic_publish library feature. You can batch messages like this:

$msg = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg, $exchange);

$msg2 = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg2, $exchange);

and then send the batch like this:

$ch->publish_batch();

When do we publish the message batch?

Let's say our program needs to read from a file and then publish one message per line. Depending on the message size, you will have to decide when it's better to send the batch. You could send it every 50 messages, or every hundred. That's up to you.

Optimized Message Publishing

Another way to speed up your message publishing is by reusing the AMQPMessage message instances. You can create your new message like this:

$properties = array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT);
$msg = new AMQPMessage($body, $properties);
$ch->basic_publish($msg, $exchange);

Now let's say that while you want to change the message body for future messages, you will keep the same properties, that is, your messages will still be text/plain and the delivery_mode will still be AMQPMessage::DELIVERY_MODE_PERSISTENT. If you create a new AMQPMessage instance for every published message, then those properties would have to be re-encoded in the AMQP binary format. You could avoid all that by just reusing the AMQPMessage and then resetting the message body like this:

$msg->setBody($body2);
$ch->basic_publish($msg, $exchange);

Truncating Large Messages

AMQP imposes no limit on the size of messages; if a very large message is received by a consumer, PHP's memory limit may be reached within the library before the callback passed to basic_consume is called.

To avoid this, you can call the method AMQPChannel::setBodySizeLimit(int $bytes) on your Channel instance. Body sizes exceeding this limit will be truncated, and delivered to your callback with a AMQPMessage::$is_truncated flag set to true. The property AMQPMessage::$body_size will reflect the true body size of a received message, which will be higher than strlen(AMQPMessage::getBody()) if the message has been truncated.

Note that all data above the limit is read from the AMQP Channel and immediately discarded, so there is no way to retrieve it within your callback. If you have another consumer which can handle messages with larger payloads, you can use basic_reject or basic_nack to tell the server (which still has a complete copy) to forward it to a Dead Letter Exchange.

By default, no truncation will occur. To disable truncation on a Channel that has had it enabled, pass 0 (or null) to AMQPChannel::setBodySizeLimit().

Connection recovery

Some RabbitMQ clients using automated connection recovery mechanisms to reconnect and recover channels and consumers in case of network errors.

Since this client is using a single-thread, you can set up connection recovery using exception handling mechanism.

Exceptions which might be thrown in case of connection errors:

PhpAmqpLib\Exception\AMQPConnectionClosedException
PhpAmqpLib\Exception\AMQPIOException
\RuntimeException
\ErrorException

Some other exceptions might be thrown, but connection can still be there. It's always a good idea to clean up an old connection when handling an exception before reconnecting.

For example, if you want to set up a recovering connection:

$connection = null;
$channel = null;
while(true){
    try {
        $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
        // Your application code goes here.
        do_something_with_connection($connection);
    } catch(AMQPRuntimeException $e) {
        echo $e->getMessage();
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    } catch(\RuntimeException $e) {
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    } catch(\ErrorException $e) {
        cleanup_connection($connection);
        usleep(WAIT_BEFORE_RECONNECT_uS);
    }
}

A full example is in demo/connection_recovery_consume.php.

This code will reconnect and retry the application code every time the exception occurs. Some exceptions can still be thrown and should not be handled as a part of reconnection process, because they might be application errors.

This approach makes sense mostly for consumer applications, producers will require some additional application code to avoid publishing the same message multiple times.

This was a simplest example, in a real-life application you might want to control retr count and maybe gracefully degrade wait time to reconnection.

You can find a more excessive example in #444

UNIX Signals

If you have installed PCNTL extension dispatching of signal will be handled when consumer is not processing message.

$pcntlHandler = function ($signal) {
    switch ($signal) {
        case \SIGTERM:
        case \SIGUSR1:
        case \SIGINT:
            // some stuff before stop consumer e.g. delete lock etc
            pcntl_signal($signal, SIG_DFL); // restore handler
            posix_kill(posix_getpid(), $signal); // kill self with signal, see https://www.cons.org/cracauer/sigint.html
        case \SIGHUP:
            // some stuff to restart consumer
            break;
        default:
            // do nothing
    }
};

pcntl_signal(\SIGTERM, $pcntlHandler);
pcntl_signal(\SIGINT,  $pcntlHandler);
pcntl_signal(\SIGUSR1, $pcntlHandler);
pcntl_signal(\SIGHUP,  $pcntlHandler);

To disable this feature just define constant AMQP_WITHOUT_SIGNALS as true

<?php
define('AMQP_WITHOUT_SIGNALS', true);

... more code

Signal-based Heartbeat

If you have installed PCNTL extension and are using PHP 7.1 or greater, you can register a signal-based heartbeat sender.

<?php

$sender = new PCNTLHeartbeatSender($connection);
$sender->register();
... code
$sender->unregister();

Debugging

If you want to know what's going on at a protocol level then add the following constant to your code:

<?php
define('AMQP_DEBUG', true);

... more code

?>

Benchmarks

To run the publishing/consume benchmark type:

$ make benchmark

Tests and Contributing

Please see CONTRIBUTING for details.

Using AMQP 0.8

If you still want to use the old version of the protocol then you can do it by setting the following constant in your configuration code:

define('AMQP_PROTOCOL', '0.8');

The default value is '0.9.1'.

Providing your own autoloader

If for some reason you don't want to use composer, then you need to have an autoloader in place fo the library classes. People have reported to use this autoloader with success.

Original README:

Below is the original README file content. Credits goes to the original authors.

PHP library implementing Advanced Message Queuing Protocol (AMQP).

The library is port of python code of py-amqplib http://barryp.org/software/py-amqplib/

It have been tested with RabbitMQ server.

Project home page: http://code.google.com/p/php-amqplib/

For discussion, please join the group:

http://groups.google.com/group/php-amqplib-devel

For bug reports, please use bug tracking system at the project page.

Patches are very welcome!

Author: Vadim Zaliva [email protected]

rabbitmqbundle's People

Contributors

alexbumbacea avatar andreea-anamaria avatar bburnichon avatar caciobanu avatar come avatar doppynl avatar eloar avatar fatmuemoo avatar goetas avatar haswalt avatar icolomina avatar igaponov avatar igrizzli avatar ikwattro avatar joelwurtz avatar mihaileu avatar nathanjrobertson avatar passkey1510 avatar ramunasd avatar ruudk avatar sixdayz avatar skafandri avatar steveyeah avatar stloyd avatar stof avatar torondor27 avatar trompette avatar twistedlogic avatar vicb avatar videlalvaro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmqbundle's Issues

Consumer performance decreasing with increasing run time

I've run a simple consumer that does nothing but ackonwledges the messages. When run it consumes 30-40 messages per second, but then performance decreases constantly, dropping to 2-3 messages per second after 2 minutes. The process uses about 90MB of memory and it doesn't increase much during those 2 minutes. Any ideas what might be the reason? How can this solution be used for consumes supposed to run constantly, if performance drops that fast?

Another question is: when started, consumer fetches all messages from the queue (11k messages, all move from Ready to Unacked in RabbitMQ panel). When I run another consumer it has nothing to process, which makes it impossible to paralelize processing. Is there a way not to fetch all messages from the queue? I tried running it with "-m" parameter to limit number of messages to process, but it doesn't help.

Make RabbitMQ unavailability catchable

Currently if the server is unavailable then even on pages that do not use rabbitmq, our app dies with a fatal error.

I want to implement a fallback in this case but seem unable to prevent the fatal errors.

As long as the rabbitmq bundle is configured in config.yml, then it seems there is no way to operate the app in the case of the rabbit server being unavailable, so this is a problem.

[RFC] Added Stub producer

Hello.

IMHO, it could be usefull to add a stub producer into this bundle (or in php-aqm lib) that will mock a producer. It will we usefull in functionnal tests to mock the real rabbitmq.
Thanks to the data-collector, we could ask to the mock if new message has been pushed to rabbitmq, etc.

What do you think ?

Reconnect if connection is closed

I have a consumer that once started performs some time-consuming operations and then tries to write to Rabbit. If the connection was closed by RabbitMQ server, then the application crashes with:

Notice: fwrite(): send of 31 bytes failed with errno=104 Connection reset by peer in vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php line 157

What can be done about it? Doesn't the bundle have possibility to re-connect to RabbitMQ server?

Bundle INACTIVE/DEAD?

Hi @videlalvaro ,

Is this bundle/project considered inactive or dead? Questions/Issues are not answered and code was not maintained within the last 6 months.

Enrico

Connect to RabbitMQ server even without using it

Hi,
I find that RabbitMqBundle will try to connect to the server if we declare a producer service.
When we are doing some functional test, we can see a lot of connection at the RabbitMQ server even these tests is not about RabbitMqBundle.

We are updating RabbitMQ to the latest master, and aware this new behavior.
Could we avoid this? I think this is not necessary.

Using routing_keys parameter

Hello there,
If I try add routing_keys parameter into config.yml like this:

!#yaml
old_sound_rabbit_mq:
    connections:
        default:
            host:      'localhost'
            port:      5672
            user:      'guest'
            password:  'guest'
            vhost:     '/'
    producers:
        twitter_metrics:
            connection: default
            exchange_options: {name: 'social-networks', type: topic}
    consumers:
        twitter_metrics:
            connection: default
            exchange_options:
                name: 'social-networks'
                type: topic
            queue_options:
                name: 'twitter-metrics-queue'
                routing_keys:
                  - 'metrics.twitter'
            callback: twitter_metrics_service

I tried add routing_keys parameter into producers too and variants only in consumers and producers. But when I try to use symfony2 console, I get this:

[Symfony\Component\Config\Definition\Exception\InvalidConfigurationException]
Unrecognized options "routing_keys" under "old_sound_rabbit_mq.consumers.twitter_metrics.queue_options"

What I'm doing wrong?

If this issue isn't suitable here, please delete it and asnwer at stackoverflow

get channel

Hello, is there any easy way to get the channel from inside a consumer interface?

example, you have this consumer:

<?php

//src/Sensio/HelloBundle/Consumer/UploadPictureConsumer.php

namespace Sensio\HelloBundle\Consumer;

use Symfony\Component\DependencyInjection\ContainerAware;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;

class UploadPictureConsumer extends ContainerAware implements ConsumerInterface
{
    public function execute($msg)
    {
        //Process picture upload. 
        //$msg will be what was published from the Controller.
    }
}

what I want to do is something like this:

<?php

//src/Sensio/HelloBundle/Consumer/UploadPictureConsumer.php

namespace Sensio\HelloBundle\Consumer;

use Symfony\Component\DependencyInjection\ContainerAware;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;

class UploadPictureConsumer extends ContainerAware implements ConsumerInterface
{
    public function execute($msg)
    {
        // Get the channel
        $channel = $this->getChannel();
    }
}

Should I code it by myself or there is a way to do it currently cuz from what I see from the interface there isn't any way to do it

Create rpc server

Hi,
I want to write a rpc server and I found that the document only mentioned rpc client,
I read the source code and guessed the way to write rpc server is to write a callback "function"
which is defined in service.yml, but in this way I would get a "object", not "function".

Thanks in advanced.

There is no way to setup queues\bindings

Currently you manually should declare queues and bindings between queues and exchanges. It will be cool to have a CLI command that will setup necessary relations for the application based on some configuration. Should it be implemented in this bundle and how would you like to see it? What's your opinion on that feature?

hardcoded values in exchange_declare call

Hi,

RabbitMq/Producer.php, line 12:
$this->ch->exchange_declare($this->exchangeOptions['name'], $this->exchangeOptions['type'], false, true, false);

Why are you using hardcoded values here? I have completely equal declaration of producer and consumer (with auto_delete set to true, and durable to false) and the code above tries to redeclare (with no success of course) the queue with other options just because it ignores my settings

Is it a bug or am I missing something?

Thanks

[Feature Request]

It looks like this requirement may overlap a couple of existing requests.

We need the ability to dynamically create and destroy queues - while running as a consumer - using the same connection.

I will continue to review the code - maybe this can be done already - I was just concerned reading "lazy" threads.

Not register as a service ??

Hi,

I recently create one consumer and it's fully works even if it's not declare as a Symfony service.
According to what I understand of your code, that sounds logic but if I read the manual it's not since :
"Keep in mind that your callbacks need to be registered as normal Symfony2 services."

My question is : Which one is wrong ? If none, what did I miss ?

Regards,

PxlCtzn.

requeue loop

Hi,

I'm using rabbitmq to verify a status change in a third-party service, reading messages from a queue and sending them to a webservice that returns if it's status has changed or not. If there is no status change, I have to requeue the message to check it again later (I'm doing this just returning false in the consumer's execute method), but the message is again delivered to consumer instantaneously, causing a loop until the message is dropped. Is there any way to avoid this behavior? Some way to postpone the delivery of the message to consumer or postpone the requeuing of the message?

Message rates stop at 25 msg/s

I have a simple command

config.yml:

producers:
    my-producer':
        connection: default
        exchange_options: {name: 'my-producer', type: direct}

SimpleCommand.php

    ...
    $producer = $this->getContainer()
            ->get('old_sound_rabbit_mq.my-producer');

    for($i = 0; $i < 10000; $i++) {
        $producer->publish('for_test');
    }
    ...

It runs correctly but the message rates display on the RabbitMq management is alway 25 msg/s.
Run 2 process at the same time can get 50 msg/s.

At the same machine, I can get about 2000 msg/s using node-amqp.
Do I miss something?

debian 6
symfony 2.0.3
rabbitmq-server version 2.6.1 / R14A

Container is not being passed into consumers

The documentation indicates that by declaring your consumer to extend ContainerAware, the consumer should have access to the DIC. In practice, however, this does not appear to work.

$this->container is always set to null, and defining the setContainer function:

public function setContainer(ContainerInterface $container = null) {
  die('Consumer container set');
  $this->container = $container;
}

the setContainer function is never called.

Is this an implementation problem with RabbitMQBundle or a symfony core issue?

[Feature request] Support for configuring with an amqp uri

It would be great if the config could accept an amqp uri and convert it internally to extract each setting. This would make it easier to use the bundle with hosted rabbitmq providing an amqp uri with the settings (for instance, CloudAMQP sets the uri directly as an environment variable on heroku)

[RPC Server] Error in Tutorial example: Undefined index: routing_keys

Hi Alvaro,
when I configure RPC or Reply/Response as defined in the Usage section and try to start random_int server via command line, I give the following error:

[ErrorException]
Notice: Undefined index: routing_keys in /home/matteo/git_prj/sf21/vendor/oldsound/rabbitmq-bundle/OldSound/RabbitMqBundle/RabbitMq/BaseAmqp.php line 135

If I add the routing_keys parameter in the queueOptions array in the abstract class BaseAmqp:

protected $queueOptions = array(
    'name' => '',
    'passive' => false,
    'durable' => true,
    'exclusive' => false,
    'auto_delete' => false,
    'nowait' => false,
    'arguments' => null,
    'ticket' => null,
    'routing_keys'=> null
);

Then run succesfully.

I missing some configuration or is a bug?

Thanks in advice
Matteo

pcntl signals processed only after message is handled

Hey @videlalvaro,

Great bundle. We use it in many of our projects.

Recently, we've had some issues with one of our consumers that doesn't receive messages that often. 1 per 30 minutes or so.

The issue we have is with handling the pcntl signals. Presently, handling signals happens after the next message is processed. When your script is processing messages at a rate of 1 per second, sending a SIGINT will only have you waiting 1 second before it's handled properly.

Now, in the case of 1 per 30 minutes, this is obviously a problem. We're specifically having the issue during deploys. We want to restart all consumers each deploy. Waiting 30 minutes really isn't an option.

Is there a way to have the following happen?

  • If a signal arrives while the consumer is waiting for the message, it just handles it immediately and close down.
  • If a signal arrives during the processing of one of these messages, it obviously should finish handling the message first, then handle the signal and close down.

I tried playing around with the bundle and the underlying lib, but the architecture seems to make this hard to accomplish.

Any ideas? I'm happy to write the code if there's a good solution without a ton of refactoring.

Queues/Messages not created automatically ?

Hi,

I have the following configuration :

old_sound_rabbit_mq:
    connections:
        default:
            host:      %rabbitmq.host%
            port:      %rabbitmq.port%
            user:      %rabbitmq.user%
            password:  %rabbitmq.password%
            vhost:     '/'
    producers:
        main:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
    consumers:
       post_processing:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'post_processing', durable: true}
            callback:          consumer.post_processing
        permission:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'permission', durable: true}
            callback:         consumer.permission
        index:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'index', durable: true}
            callback:         consumer.index_document
        following_encode:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'following_encode', durable: true}
            callback:         consumer.encode_following_list
        user_activity:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'user_activity', durable: true}
            callback:         consumer.user_activity
        build_feed:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'build_feed', durable: true}
            callback:         consumer.build_feed

And i add jobs through a manager which basically does :

    public function addJob($queue, $data, $priority = 0)
    {
        $message = serialize($data);
        $this->producer->publish($message, $queue);
    }

$queue being the queue name defined in the configuration.

The things is that with this configuration I have no queue nor message being added.
sudo rabbitmqctl list_queues returns nothing.

What am i doing wrong ?

Thanks !

multiple routing key

hi,

I would like to know if it is possible to have several routing key in the configuration:
my configuartion is:

# define producers
rabbitmq_producers:
    upload_picture:
        connection:         default
        exchange_options:   {name: 'upload-picture', type: topic, durable: true }
    content_manager:
        connection:         default
        exchange_options:   {name: 'Service', type: topic, durable: true }

rabbitmq_consumers:
    upload_picture:
        connection: default
        exchange_options: {name: 'upload-picture', type: topic}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service

rabbitmq_urn:
    upload_picture:

rabbitmq_routing_key:  upload_picture.state

I would like to have something like :

rabbitmq_producers:
    upload_picture:
        connection:         default
        exchange_options:   {name: 'upload-picture', type: topic, durable: true }
        routing_key: upload_picture.state
    content_manager:
        connection:         default
        exchange_options:   {name: 'Service', type: topic, durable: true 
         routing_key: content.state

it is possible for consumers too?
thank you for ur help =)

Exchange names cannot contain the "period" character

Messages sent to an exchange which name contains a period (.) never arrive to the RabbitMQ broker. Here's the config for such a producer:

producers:
    accounts_update:
        connection: default
        exchange_options: { name: 'accounts.update', type: direct }

According to RabbitMQ's reference for AMQP 0-9-1 periods are allowed in the exchange name:

The exchange name consists of a non-empty sequence of these characters: 
letters, digits, hyphen, underscore, period, or colon.

timeout for RPC client + custom (de)serialization

This bundle is great and we are very happy with it.

Only thing we are missing at the moment are these 2 features:

Timeout for RPC client:
Our RPC client works fine when RPC server is listening but in case that RPC server is offline, client is blocking whole apache which is very unfortunate. I tried to build a timeout mechanism around getReplies() function like it is mentioned here: https://gist.github.com/3085581 but it is very unstable. It works first few times (throws timeout exception as we want) but then same old issue occurs where whole apache thread is blocked and no other web requests can be made.

Custom (de)serialization:
Now serialization and deserialization are both done using php native serialize command. We wish to have a custom setting for adding our own serialization (for use with JMSSerializerBundle for example https://github.com/schmittjoh/JMSSerializerBundle/blob/master/Resources/doc/index.rst)

Receive fwrite error exception

Hi,
I wrote some producer/consumer commands in sf2, and when I excuted a certain producer command,
I continuously receive the following error exception:

[ErrorException]                                                                                                                                              
  Notice: fwrite(): send of 10 bytes failed with errno=11 Resource temporarily unavailable in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php line 141  






PHP Fatal error:  Uncaught exception 'Exception' with message 'Error reading data. Recevived 0 instead of expected 1 bytes' in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:64
Stack trace:
#0 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(95): PhpAmqpLib\Wire\AMQPReader->rawread(1)
#1 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(260): PhpAmqpLib\Wire\AMQPReader->read_octet()
#2 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(280): PhpAmqpLib\Connection\AMQPConnection->wait_frame()#3 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(119): PhpAmqpLib\Connection\AMQPConnection->wait_channel(0)
#4 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(225): PhpAmqpLib\Channel\AbstractChannel->next_frame()#5 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(321): PhpAmq in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php on line 64
Fatal error: Uncaught exception 'Exception' with message 'Error reading data. Recevived 0 instead of expected 1 bytes' in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:64 
Stack trace:
#0 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(95): PhpAmqpLib\Wire\AMQPReader->rawread(1) #1 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(260): PhpAmqpLib\Wire\AMQPReader->read_octet()
#2 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(280): PhpAmqpLib\Connection\AMQPConnection->wait_frame()
#3 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(119): PhpAmqpLib\Connection\AMQPConnection->wait_channel(0)
#4 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(225): PhpAmqpLib\Channel\AbstractChannel->next_frame()
#5 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(321): PhpAmq in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php on line 64

It would throw exception until it had published about 88000 or above messages,

Thanks in advanced.

Consumer architecture

Hello @videlalvaro

I review your bundle a bit and I have couple of issues.

  1. Does producer should know anything about queues? As far as I understand producer just publishes messages to exchange. The only thing producer should declare is an exchange.
  2. Does consumer should know anything about exchanges and routing keys? As far as I understand consumer just consumes the message from a queue and doesn't care where it come from. The only thing consumer should declare is a queue he consumes.
  3. If both questions above should be answered "no" then we need a way to setup exchanges,queues and bindings between them and here comes my issue #27. I can implement these changes, but I just need your opinion maybe I have missed something important.

New Symfony preview release compatibility

The Symfony DependencyInjector has no more Extension class, it's now an Interface, so the actual call
use Symfony\Component\DependencyInjection\Extension\Extension
in
src/OldSound/RabbitMqBundle/DependencyInjection/OldSoundRabbitmqExtension.php
is broken. We tried to use the ExtensionInterface instead but we faced a missing "Debug" method when launching a consumer

SF2 app with RabbitMqBundle failed when rabbitmq is stopped

Hi @videlalvaro,

there is an issue with the RabbitMqBundle and php-amqplib. Here the steps to reproduce the bug:

  1. bootstrap a Symfony2 app with "oldsound/rabbitmq-bundle": "dev-master" in composer.json
  2. don't install rabbitmq server or stop it.
  3. Try to display any page on the bootstrapped SF2 app with rabbitmq-bundle enabled

=> you will have the following exception:

ErrorException: Warning: stream_socket_client() [function.stream-socket-client]: unable to connect to tcp://vserver-jrouff.dev.sensio.net:5672 (Connection refused) in /Users/joseph.rouff/projects/insight/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php line 80

It would be cool that RabbitMqBundle fail silently without any exception (or just a warning on the datacollector icon).

Thanks for you answer :).

Consumer and Producer config

Default parameters for the consumer and the producer aren't the same. So when you don't explicitly specify all the parameters you will have tha kind of exceptions

PRECONDITION_FAILED - cannot redeclare exchange 'exchange-name' in vhost '/' with different type, durable, internal or autodelete value

with that config.yml

old_sound_rabbit_mq:
    connections:
        default:
            host:      'localhost'
            port:      5672
            user:      'guest'
            password:  'guest'
            vhost:     '/'
    producers:
        sample_task:
            connection: default
            exchange_options: {name: 'exchange-name', type: direct}
    consumers:
        sample_task:
            connection: default
            exchange_options: {name: 'exchange-name', type: direct}
            queue_options:    {name: 'queue-name'}
            callback:         sample_task_service

Do Consumer have the timeout mechanism?

Hi, I'm using this bundle to deal with the data transfer, it really help me a lot.
I have several consumers, each use the bulk insert to get better performance.
But i find out every consumer hold some data because the amount didn't reach the batch size.
Will you add the timeout mechanism to Consumer?
Or there has any method can let Consumer know the queue situation?

Handle messages that can't be delivered

According to my experience, currently when a consumer fails to consume a message (the callback ends with fatal error, exception, etc), the message is still considered as delivered and is not restored.

Is there any workaround for this?

RpcClient infinite loop if the RpcServer dies

The current implementation makes RpcClient wait in an infinite loop if the processing Server side dies silently (segfault, or unhandled php error like out of memory error).
This is caused by server, because it confirms the message before calling the user function.
If the two lines are exchanged (and the ack placed in the catch as well) it will be more robust, more tolerant to errors.
The backdraw is that this case the call maybe processed twice or more, depends where the exception happens in the RPC code.

Can't install with composer, wrong deps

There seems to be a problem with the name of the branches, I think they have changed. When I try to install with composer, I get this:

Problem 1
    - Installation request for oldsound/rabbitmq-bundle dev-master -> satisfiable by oldsound/rabbitmq-bundle dev-master.
    - oldsound/rabbitmq-bundle dev-master requires videlalvaro/php-amqplib dev-master -> no matching package found.

Can not set 'x-message-ttl' in arguments in queue_options

Following the example here:

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

I'm trying to do:

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': 3600000}}

as I'm trying to set the message ttl for one hour

Queue configuration doesn't handle argument passing correctly

When trying to pass an argument array into the queue declaration the Configuration.php doesn't accept it because it actually isn't a scalar value.

queue_options:    {name: 'search_indexer', arguments: { x-ha-policy: ['S', 'all']} }

Results in:

  [Symfony\Component\Config\Definition\Exception\InvalidTypeException]                                                           
  Invalid type for path "old_sound_rabbit_mq.consumers.search_indexer.queue_options.arguments". Expected scalar, but got array.  

I tried to modify the TreeBuilder configuration a bit, but without success. The variant below cuts off the key 'x-ha-policy' key.

    public function addQueueConfiguration(NodeBuilder $nb)
    {
        return $nb
            ->arrayNode('queue_options')
                ->children()
                    ->scalarNode('name')->end()
                    ->booleanNode('passive')->defaultFalse()->end()
                    ->booleanNode('durable')->defaultTrue()->end()
                    ->booleanNode('exclusive')->defaultFalse()->end()
                    ->booleanNode('auto_delete')->defaultFalse()->end()
                    ->booleanNode('nowait')->defaultFalse()->end()
                    ->arrayNode('arguments')->defaultNull()
                        ->prototype('array')
                            ->prototype('scalar')->end()
                        ->end()
                    ->end()
                    ->scalarNode('ticket')->defaultNull()->end()
                ->end()
            ->end();
    }

Results in

array(8) {
  ["name"]=>
  string(14) "search_indexer"
  ["arguments"]=>
  array(1) {
    [0]=>
    array(2) {
      [0]=>
      string(1) "S"
      [1]=>
      string(3) "all"
    }
  }
  ["passive"]=>
  bool(false)
  ["durable"]=>
  bool(true)
  ["exclusive"]=>
  bool(false)
  ["auto_delete"]=>
  bool(false)
  ["nowait"]=>
  bool(false)
  ["ticket"]=>
  NULL
}

Queue on producer

Hi there,
I'm working with AMQP since few days.
I have build a Java application which use AMQP & do some little test with phpamqp-lib out-of-box.

Right now, I'm trying to integrate the AMQP producer to our Symfony2 application.
I'm looking on your RabbitMqBundle.

The first question is about queue.
When work with AMQP, Producer & Consumer have to work with a specific channel. This channel can be configure to use a specific queue_name. Isn't it ?

The problem is, that I have not found any method to specify a queue_name for the producer channel.

For the consumer, the configuration have a dedicated part for queue options.. Like queue_name.

Thanks for help.

Symfony 2.2 Compatibility

Trying to upgrade an app to 2.2, but this bundle is still requiring 2.1. Are there any known issues with 2.2?

Producers Not Declaring Queue

Great work on this, it has saved us tons of time.

One thing however... In my somewhat brief exposure to the AMQP world, I find it is usually recommended to create your exchange and queues on the sending as well as the receiving side. This prevents messages from never reaching the exchange/queue due to it being deleted etc...

We are pushing into a queue that is talking to a log server. The server will create the queue, but it would be good if that server were to go down and the queue was to somehow get deleted, we could have assurance that it would recreate and continue sending.

We are working on a pull for this but wanted to find out if there was a rationale behind it.

InvalidDefinitionException

After upgrade symfony to 2.1, we got this error.

  [Symfony\Component\Config\Definition\Exception\InvalidDefinitionException]                              
  ->addDefaultsIfNotSet() is not applicable to prototype nodes at path "old_sound_rabbit_mq.connections"  

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.