Coder Social home page Coder Social logo

aioamqp's People

Contributors

andrewbunday avatar armata avatar cedricco avatar dave-shawley avatar dbaty avatar dimabxsx avatar dzen avatar fbochu avatar foolswood avatar fullylegit avatar hozn avatar ice2heart avatar kalaspuff avatar lburg avatar lexdene avatar mardiros avatar megadiablo avatar michael-k avatar mikmatko avatar mmariani avatar mwfrojdman avatar nhumrich avatar notmeta avatar os avatar pwistrand avatar rbarrois avatar remicardona avatar skewty avatar smurfix avatar tkukushkin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aioamqp's Issues

Should not have simultaneous publishes on a channel.

Delivery tags are server generated (and no guarantees are stated in the spec), at present aioamqp assumes that they will be incrementing by 1 from 1, which seems to hold (at least for rabbitmq).

However, the return method does not receive the delivery tag, so the asynchronous publishes that are currently allowed are not possible to implement with this.

Probably, strictly speaking, we shouldn't be guessing the delivery tag anyway.

Operation basic.publish caused a connection exception unexpected_frame

I have a big stream of data, and firstly I tried to push one data item in one publish call.
But periodically happened this error:

Server closed connection: UNEXPECTED_FRAME - expected content body, got non content body frame instead, code=505, class_id=60, method_id=40

In server logs:

Error on AMQP connection <0.11901.881> (172.30.0.83:56306 -> 172.17.0.26:5672, vhost: 'sync', user: 'producer', state: running), channel 1:
operation basic.publish caused a connection exception unexpected_frame: "expected content body, got non content body frame instead"

It was about 1000 - 10 000 items / sec.

Then I started making groups and send it (100 - 300 items / sec). An error has gone.
Is it has limit? :)
How can I fix it?

You api could be greatly improved

I've tried asynqp at first, but they for some reason don't allow coroutines to be message handlers, which is also insane (that's defiles the purpose of the library, don't you think?).

Your library on the other hand is sane enough to allow it, but the api...
I'll give examples.

channel.exchange_declare(exchange_name='logs', type_name='fanout'

  1. First of all, why is everybody so bounded to this stupid amqp names where the noun comes before verb. It's non pythonic and most of all non-English. Forget about amqp, you job is to hide nasty details from users, not expose it.
  2. Second, do not repeat yourself and do not make others repeat themselfs. You've already said that that you're declaring exchange, so what name could this possibly be? Make it channel.declare_exchange(name='logs', type='fanout'). And if you want to be anal and explicit make type Enum.
  3. This one is my favorite right now.
result = yield from channel.queue(queue_name='', exclusive=True)
queue_name = result['queue']
yield from channel.queue_bind(exchange_name='logs', queue_name=queue_name, routing_key='')

result is a Mapping? What? It should at least be a namedtuple. Mapping is a thing that can map arbitrary keys to arbitrary values, that's not what you're doing here. You task is more specific than that.
But this gets better. To get queue name you need to get value by 'queue' key. Why not 'name' at least?

  1. And this get us to the next point. queue_bind takes exchange name, but not exchange object. That's also non intuitive and inconvinient.
return (yield from self._write_frame_awaiting_response(
            'exchange_delete', frame, request, no_wait, timeout=timeout))

You know when you send tcp frames somewhere and then await for response there's a name for it. Request. return await self.send_request('exchange_delete', frame, ...)`

That's just from skimming the sources.

I hope you're won't be upset by my critique. I'd like to say that it came from amusement, because It's not obvious to me, how could you handle all this nitty-gritty low level details like tcp sockets, sending frames, serialization etc without problems, but fail to provide an intuitive, easy to use and beautiful api.

heartbeat timers completely pollute logs with thousands of errors per second

INFO 2016-09-30 16:33:48,614 asyncio poll 2994.933 ms took 2998.075 ms: timeout
WARNING 2016-09-30 16:33:48,615 aioamqp.protocol Connection lost exc=None
INFO 2016-09-30 16:33:48,621 aioamqp.protocol Close connection
WARNING 2016-09-30 16:33:48,630 asyncio socket.send() raised exception.
WARNING 2016-09-30 16:33:48,630 asyncio socket.send() raised exception.
WARNING 2016-09-30 16:33:48,631 asyncio socket.send() raised exception.
[...]

The patch for #111 actually made things much worse. This is thoroughly embarrassing.

Testing aioamqp code

Are there any example projects you could refer to which have tests for code using aioamqp?
Is there any documentation about that?
Is there an easy way to mock RabbitMQ (Exchanges, Queues, ...). e.g. so I'm able to manually put messages into a queue or just trigger the consumer handler?

Do you have other noteworthy comments for testing aioamqp code?

Exception swallowing in AmqpProtocol.run

aioamqp 0.7.0
RabbitMQ 3.6.1

Got an exception:

Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/aioamqp/protocol.py", line 262, in run
yield from self.dispatch_frame()
File "/usr/local/lib/python3.5/dist-packages/aioamqp/protocol.py", line 217, in dispatch_frame
yield from channel.dispatch_frame(frame)
File "/usr/local/lib/python3.5/dist-packages/aioamqp/channel.py", line 111, in dispatch_frame
yield from methods(frame.class_id, frame.method_id)
File "/usr/lib/python3.5/asyncio/coroutines.py", line 200, in coro
res = func(_args, *_kw)
File "/usr/local/lib/python3.5/dist-packages/aioamqp/channel.py", line 665, in server_basic_cancel
consumer_tag = frame.arguments['consumer_tag']
KeyError: 'consumer_tag'

And the client has stopped consuming messages after it.
I don't see any possibility to handle such situation due to exception swallowed in AmqpProtocol.run.

Cancel hearbeat on Close connection

The time between events depends sraight on hearbeat value

2016-10-06 11:35:55,628 | INFO | 14179 | aioamqp.protocol | 286 | Close connection
2016-10-06 11:36:10,999 | WARNING | 14179 | aioamqp.protocol | 100 | Connection lost exc=None

Relecture partie "frame"

https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L53
=> instancier un AmqpEncoder à l'intérieur de l'AmqpEncoder me semble louche. Pourquoi ne pas travailler directement avec l'instance en cours ? sinon, peut-être envisager une classmethod.

https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L78
=> ValueError ?

https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L84
=> Plutôt qu'un cast :

if bit:
    byte_value |= (1 << arg_index)

Par ailleurs, ça va écrire les bits de droite à gauche (à l'inverse du sens dans lequels ils sont donnés). Il faudrait sans doute aussi vérifier le nombre de bits donnés.

https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L94
=> Attention, ça va être endianess-dependant. Pour être indépendant de l'archi sous-jacente utiliser un "<" dans la chaîne de format (pareil pour les fonctions en-dessous):

>>> struct.pack("<H", 1)
b'\x01\x00'
>>> struct.pack(">H", 1)
b'\x00\x01'

https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L105
=> Ne faudrait-il pas préciser un encoding pour un comportement prévisible ?

https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L119
=> Même remarque que sur le write_table, je ne comprends pas pourquoi on instancie un autre encodeur.

https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L195
=> Mêmes remarques sur l'endianess. Il faut garantir un comportement indépendant de la machine.

https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L218
=> Ça ne retourne rien ?

Find a way to unqueue messages asynchronously, without callbacks

Currently, received frames are decoded, and forwarded to the correct channel.
In the case of a content-body frame, those frame are stacked in an asyncio's Queue, but I think it's not very efficient because of memory waste.

Is there an efficient way to not use callbacks when consuming messages ?

Ack'ing messages inside the consumer's callback

I was able to get a working consumer going using your examples, but I'm trying to figure out how to acknowledge messages from within a callback passed to basic_consume(). I have something that looks like this:

yield from channel.basic_consume(queue_name, callback=callback)

And then my co-routine looks roughly like this:

@asyncio.coroutine
def callback(body, envelope, properties):
    loop = asyncio.get_event_loop()
    loop.create_task(do_work(body))

It doesn't look like I can acknowledge the message from within the callback. When using other synchronous AMQP clients, such as pika, the channel is passed to the callback, and then I can do something like ch.basic_ack(). Is there another approach? Or some other reason why this isn't viable with an asynchronous model? I'm fairly new to asyncio.

Thanks very much for your work on this project, I've found it to be helpful.

AttributeError: 'NoneType' object has no attribute 'readexactly'

ERROR:aioamqp.protocol:error on dispatch
Traceback (most recent call last):
  File "/home/vagrant/venv/lib/python3.5/site-packages/aioamqp-0.8.2-py3.5.egg/aioamqp/protocol.py", line 284, in run
    yield from self.dispatch_frame()
  File "/home/vagrant/venv/lib/python3.5/site-packages/aioamqp-0.8.2-py3.5.egg/aioamqp/protocol.py", line 231, in dispatch_frame
    frame = yield from self.get_frame()
  File "/home/vagrant/venv/lib/python3.5/site-packages/aioamqp-0.8.2-py3.5.egg/aioamqp/protocol.py", line 215, in get_frame
    yield from frame.read_frame()
  File "/home/vagrant/venv/lib/python3.5/site-packages/aioamqp-0.8.2-py3.5.egg/aioamqp/frame.py", line 412, in read_frame
    data = yield from self.reader.readexactly(7)
AttributeError: 'NoneType' object has no attribute 'readexactly'

This Error occur randomly, but it does occure many times. And because this error pollute logs, I can't locate the cause of this error.

Sorry for that I can't make reproduce this time. Just let you know about it.

Close callback or on error handler for protocol

When server closed connection unexpectedly, my script (consumer) was working and do nothing (run_forever).

And all what I can do: run some function after 'basic_consume', which would be periodically check connection state. I think it wrong way for async scripts. :)

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 255, in run
    yield from self.dispatch_frame()
  File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 210, in dispatch_frame
    yield from self.channels[frame.channel].dispatch_frame(frame)
  File "/usr/local/lib/python3.5/site-packages/aioamqp/channel.py", line 110, in dispatch_frame
    yield from methods[(frame.class_id, frame.method_id)](frame)
  File "/usr/local/lib/python3.5/site-packages/aioamqp/channel.py", line 718, in basic_deliver
    content_body_frame = yield from self.protocol.get_frame()
  File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 183, in get_frame
    yield from frame.read_frame()
  File "/usr/local/lib/python3.5/site-packages/aioamqp/frame.py", line 434, in read_frame
    payload_data = yield from self.reader.readexactly(self.frame_length)
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 509, in readexactly
    block = yield from self.read(n)
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 482, in read
    yield from self._wait_for_data('read')
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 423, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/local/lib/python3.5/asyncio/selector_events.py", line 702, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
2016-02-12 19:07:02 71897a856041 aioamqp.protocol[1] INFO Close connection

As option we can add some callback/handler in Exception case
protocol.py

    @asyncio.coroutine
    def run(self):
        while not self.stop_now.done():
            try:
                yield from self.dispatch_frame()
            except exceptions.AmqpClosedConnection as exc:
                logger.info("Close connection")
                self.stop_now.set_result(None)

                self._close_channels(exception=exc)
            except Exception:
                logger.exception('error on dispatch')

Refactor TestCase

We should refactor our test cases. A clean testcase should be reusable and it should keep the rabbit clean.

Test test_connection_unexistant_vhost failed

I have one failure in tests (other tests are succeed):

nosetests  --verbosity=2 aioamqp.tests.test_protocol:ProtocolTestCase.test_connection_unexistant_vhost
test_connection_unexistant_vhost (aioamqp.tests.test_protocol.ProtocolTestCase) ... FAIL

======================================================================
FAIL: test_connection_unexistant_vhost (aioamqp.tests.test_protocol.ProtocolTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/mastak/aioamqp/aioamqp/tests/testing.py", line 41, in wrapper
    self.loop.run_until_complete(asyncio.wait_for(coro(self), timeout=timeout_, loop=self.loop))
  File "/usr/lib/python3.4/asyncio/base_events.py", line 316, in run_until_complete
    return future.result()
  File "/usr/lib/python3.4/asyncio/futures.py", line 275, in result
    raise self._exception
  File "/usr/lib/python3.4/asyncio/tasks.py", line 238, in _step
    result = next(coro)
  File "/usr/lib/python3.4/asyncio/tasks.py", line 377, in wait_for
    return fut.result()
  File "/usr/lib/python3.4/asyncio/futures.py", line 275, in result
    raise self._exception
  File "/usr/lib/python3.4/asyncio/tasks.py", line 238, in _step
    result = next(coro)
  File "/home/mastak/aioamqp/aioamqp/tests/test_protocol.py", line 43, in test_connection_unexistant_vhost
    yield from amqp_connect(virtualhost='/unexistant', loop=self.loop)
nose.proxy.AssertionError: AmqpClosedConnection not raised
-------------------- >> begin captured stdout << ---------------------
http://localhost:15672/api/vhosts/%2Ftest-aioamqp
http://localhost:15672/api/vhosts/%2Ftest-aioamqp
http://localhost:15672/api/permissions/%2Ftest-aioamqp/guest

--------------------- >> end captured stdout << ----------------------
-------------------- >> begin captured logging << --------------------
asyncio: DEBUG: Using selector: EpollSelector
aioamqp.protocol: INFO: Recv open ok
aioamqp.channel: DEBUG: Channel is open
aioamqp.protocol: WARNING: Server closed connection: NOT_ALLOWED - access to vhost '/unexistant' refused for user 'guest', code=530, class_id=10, method_id=40
aioamqp.tests.testcase: DEBUG: Delete channel <aioamqp.tests.testcase.ProxyChannel object at 0x7f7745bc4d30>
aioamqp.tests.testcase: DEBUG: Delete channel <aioamqp.tests.testcase.ProxyChannel object at 0x7f7745bc4d30>
aioamqp.tests.testcase: DEBUG: Delete amqp <aioamqp.tests.testcase.ProxyAmqpProtocol object at 0x7f7745bc4e10>
aioamqp.protocol: ERROR: error on dispatch
Traceback (most recent call last):
  File "/home/mastak/aioamqp/aioamqp/protocol.py", line 255, in run
    yield from self.dispatch_frame()
  File "/home/mastak/aioamqp/aioamqp/protocol.py", line 210, in dispatch_frame
    yield from self.channels[frame.channel].dispatch_frame(frame)
  File "/home/mastak/aioamqp/aioamqp/channel.py", line 111, in dispatch_frame
    yield from methods[(frame.class_id, frame.method_id)](frame)
  File "/usr/lib/python3.4/asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "/home/mastak/aioamqp/aioamqp/channel.py", line 166, in close_ok
    self._get_waiter('close').set_result(True)
  File "/home/mastak/aioamqp/aioamqp/channel.py", line 50, in _get_waiter
    raise exceptions.SynchronizationError("Call %s didn't set a waiter" % rpc_name)
aioamqp.exceptions.SynchronizationError: Call close didn't set a waiter
aioamqp.protocol: WARNING: Server closed connection: CHANNEL_ERROR - expected 'channel.open', code=504, class_id=20, method_id=40
aioamqp.protocol: WARNING: Connection lost exc=None
--------------------- >> end captured logging << ---------------------

----------------------------------------------------------------------
Ran 1 test in 0.151s

Also I have many failures tests when I run it on mac os.
It has some limits with docker, and I want to fix it.
But I think it would be better if all the tests will pass on linux, and only after that I will make changes.

RabbitMQ heartbeat issue ([Errno 104] Connection reset by peer)

I'm using aioamqp 0.7 in python 3.5 envioronment.

About 180 seconds after I called basic_consume

await self.channel.basic_consume(queue_name='worker_heartbeat', callback=self.handle_heartbeat, no_ack=True)

there are error message in my terminal

[2016-05-07 18:39:39,461] asyncio:ERROR: Task exception was never retrieved
future: <Task finished coro=<disconnected() done, defined at /home/vagrant/Project/foo/bar/core/mq_connection.py:31> exception=AmqpClosedConnection()>
Traceback (most recent call last):
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/frame.py", line 413, in read_frame
    data = yield from self.reader.readexactly(7)
  File "/usr/lib/python3.5/asyncio/streams.py", line 659, in readexactly
    block = yield from self.read(n)
  File "/usr/lib/python3.5/asyncio/streams.py", line 617, in read
    yield from self._wait_for_data('read')
  File "/usr/lib/python3.5/asyncio/streams.py", line 451, in _wait_for_data
    yield from self._waiter
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 297, in _wakeup
    future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
    result = coro.send(None)
  File "/home/vagrant/Project/foo/bar/core/mq_connection.py", line 37, in disconnected
    raise exception
  File "/usr/lib/python3.5/asyncio/selector_events.py", line 663, in _read_ready
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
    result = coro.send(None)
  File "/home/vagrant/Project/foo/bar/core/mq_connection.py", line 37, in disconnected
    raise exception
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 262, in run
    yield from self.dispatch_frame()
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 204, in dispatch_frame
    frame = yield from self.get_frame()
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 189, in get_frame
    yield from frame.read_frame()
  File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/frame.py", line 415, in read_frame
    raise exceptions.AmqpClosedConnection() from ex
aioamqp.exceptions.AmqpClosedConnection

At the same time I got error message in rabbitMQ log like:

=INFO REPORT==== 7-May-2016::18:36:39 ===
accepting AMQP connection <0.5469.0> (192.168.50.4:53416 -> 192.168.50.4:5672)

=ERROR REPORT==== 7-May-2016::18:39:39 ===
closing AMQP connection <0.5469.0> (192.168.50.4:53416 -> 192.168.50.4:5672):
Missed heartbeats from client, timeout: 60s

It seems that after 3 missing heartbeat, the rabbitMQ server close the connection.

And I notice that if nothing is published in the queue I consume, the connection will not be closed.

But in the rest part of my project, I use the same way to create channel, and consume the message, nothing seems to be wrong.

My question is:

  • How can I ensure aioamqp to send heartbeat normally?
  • Can you kindly tell me what is the possible mistake I made?

please forgive my poor English.

Content body in multiple frames (amqp-0.9.1 4.2.6.2)

When the content of a body frame is bigger than the supported size of the server, send it will result in a connection resetted by peer and the following error message :
aioamqp.protocol: WARNING: Server closed connection: FRAME_ERROR - type 3, all octets = <<>>: {frame_too_large,1000000,131064}, code=501, class_id=0, method_id=0

Trying to receive it will only process the first body frame of the message.

heartbeat timers are still on after connection is closed

WARNING:aioamqp.protocol:Connection lost exc=BrokenPipeError(32, 'Broken pipe')
ERROR:asyncio:Exception in callback AmqpProtocol._heartbeat_timer_recv_timeout()
handle: <TimerHandle when=75311.867231919 AmqpProtocol._heartbeat_timer_recv_timeout()>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/events.py", line 125, in _run
self._callback(*self._args)
File "/usr/lib/python3.5/site-packages/aioamqp/protocol.py", line 318, in _heartbeat_timer_recv_timeout
self._stream_writer.close()
AttributeError: 'NoneType' object has no attribute 'close'

Steps to reproduce:

  • Restart RabbitMQ server.

Support for accessing message headers?

Thanks for writing and open sourcing this library. It's great to be able to work with AMQP and asyncio already.

I was curious if there were any current plans for supporting access to message headers. I have a messaging pattern that requires the consumer to interrogate the message headers before working with the body, and those attributes are currently unavailable with the current implementation.

Some libraries provide a "message" class that has accessing the content and headers of a message (txamqp is the library I have the most experience with), which might be an option.

Thanks again.

Publishing to non existing exchange

Hi,
according to the documentation https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish basic_publish method have to return not-found error code if the exchange doesn't exist, but it returns nothing and doesn't raise any exception:

await channel.basic_publish(
       payload=pickle.dumps(request),
       exchange_name='non_existing_exc',
       routing_key=q_name)

is it an issue or it have to work the such way ? and also is there any way how I could to check if the exchange or queue exists ?
Thanks

Examples crashing with Python 3.4

I was trying aioamqp (master) today under Python 3.4

I noticed some of the examples didn't finished well. Here's the associated stack trace.

python emit_log_topic.py
 [x] Sent 'Hello World!'
Traceback (most recent call last):
  File "emit_log_topic.py", line 38, in <module>
    asyncio.get_event_loop().run_until_complete(exchange_routing())
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/base_events.py", line 208, in run_until_complete
    return future.result()
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/futures.py", line 243, in result
    raise self._exception
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py", line 283, in _step
    result = next(coro)
  File "emit_log_topic.py", line 35, in exchange_routing
    yield from asyncio.wait_for(protocol.client_close(), timeout=10)
AttributeError: 'AmqpProtocol' object has no attribute 'client_close'
--- Logging error ---
Traceback (most recent call last):
Exception ignored in: <generator object run at 0x10c621360>
Traceback (most recent call last):
  File "/Users/xordoquy/Documents/Devs/aioamqp/aioamqp/protocol.py", line 191, in run
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1287, in exception
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1280, in error
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1386, in _log
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1396, in handle
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1466, in callHandlers
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 837, in handle
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 961, in emit
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 890, in handleError
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/traceback.py", line 169, in print_exception
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/traceback.py", line 153, in _format_exception_iter
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/traceback.py", line 18, in _format_list_iter
  File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/traceback.py", line 65, in _extract_tb_or_stack_iter
  File "/Users/xordoquy/.virtualenvs/aioamqp/lib/python3.4/linecache.py", line 15, in getline
  File "/Users/xordoquy/.virtualenvs/aioamqp/lib/python3.4/linecache.py", line 41, in getlines
  File "/Users/xordoquy/.virtualenvs/aioamqp/lib/python3.4/linecache.py", line 126, in updatecache
  File "/Users/xordoquy/.virtualenvs/aioamqp/lib/python3.4/tokenize.py", line 431, in open
AttributeError: 'module' object has no attribute 'open'

RPC: How to nest calls?

Going from the RPC example ... Let's assume I want to call my caller again in the on_request callback and wait for the response. How can I achieve this without a deadlock?

How to consume / handle multiple messages simultaneously?

With a single connection and a single channel I am unable to handle more than one simultaneous message.

Am I missing something?

If I create multiple connections, I'm able to simultaneously consume one message per connection (thus, 10 connections give 10 simultaneously handled messages). This also confirms that my handler code is not blocking asyncio from that.

Connection and channel names

Is there a way to get the connection and channel names from the driver?

I want to be able to hit the API from rabbitmq and query for just the one connection/channel.

Heartbeat timeout expires directly after connection is opened

Steps to reproduce on aioamqp 0.8.2:

  • Have AMQP server which sends connection.tune method frame with parameter heartbeat = 0 (meaning turn heartbeats off)
  • Do not pass heartbeat > 0 to aioamqp.connect()

Result:

  • AmqpProtocol.start_connection() waits for connection.tune() to arrive on line 171
  • On receiving connection.tune, AmqpProtocol.tune() sets self.server_heartbeat = decoder.read_short()
  • start_connection proceeds to set key heartbeat to the tune_ok dictionary on line 176,
    but because user didn't pass heartbeat != None, it gets set to what the server sent (= 0)
  • Client confirms heartbeat 0 with tune_ok() and self.server_heartbeat gets set to the same value 0 on line 184
  • Line 186 skips calling _heartbeat_timer_recv_reset if self.server_heartbeat == 0, still OK
  • Server replies with an connection.open-ok frame, and data_received calls self._heartbeat_timer_recv_reset() unconditionally on line 108
  • which checks if self.server_heartbeat is None (which is never true, it's 0)
  • and _heartbeat_timer_recv_timeout is set to be called in 0 * 2, ie. 0 seconds
  • which then silently just closes the writer on line 318
  • which leads to connection_lost being called with no exception
  • which leads to general kaboom.

Here's a traceback. Because connection.open-ok is the last frame the broker sends on establishing the connection, my code got to calling AmqpProtocol.channel() before getting an exception:

Traceback (most recent call last):
  File "reproduce.py", line 123, in run
    channel = yield from connection.channel()
  File "/usr/lib/python3/dist-packages/aioamqp/protocol.py", line 444, in channel
    yield from channel.open()
  File "/usr/lib/python3/dist-packages/aioamqp/channel.py", line 134, in open
    yield from fut
  File "/usr/lib/python3.4/asyncio/futures.py", line 388, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.4/asyncio/tasks.py", line 286, in _wakeup
    value = future.result()
  File "/usr/lib/python3.4/asyncio/futures.py", line 277, in result
    raise self._exception
aioamqp.exceptions.ChannelClosed: (None, None)

Channel connections should close when server connection goes down

Currently when protocol connection_lost is called by asyncio, no channel listeners notice it. If a greenlet is waiting on a channel.consume() it will be stuck there for good.

I am noticing this in our staging environment. We sometimes see:

[WARNING aioamqp.protocol protocol: MainProcess] Connection lost exc=ConnectionResetError(104, 'Connection reset by peer')
[INFO aioamqp.protocol protocol: MainProcess] Close connection

And after this log message, the program keeps waiting forever on a channel.consume() but it does not receive messages anymore.

I guess it could be nice to propagate the exceptions to all open channels, a bit like the server_channel_close in Channel does.

If that's ok for you, I could try and submit a patch.

Connection reset by peer when using a virtualhost

Hi,

when I create a connection with a virtualhost specified, my connection is resetted by RabbitMQ.

It will happen with from_url everytime, and with connect if I specify /myvhost as the vhost. It will not happen with connect if I remove the leading slash.

I don't know what is the expected behaviour, if the leading slash must be removed by the user (or from_url) when creating a new connection with connect, or if it must be automatically stripped in the open method of AmqpProtocol.

rpc performance question

hi!

i have modified rpc server aand client and they are very slow:

Client - sends requests in infitite loop:

#!/usr/bin/env python

"""
    RPC client, aioamqp implementation of RPC examples from RabbitMQ tutorial

"""

import asyncio
import uuid

import aioamqp


class FibonacciRpcClient(object):
    def __init__(self):
        self.transport = None
        self.protocol = None
        self.channel = None
        self.callback_queue = None
        self.waiter = asyncio.Event()

    @asyncio.coroutine
    def connect(self):
        """ an `__init__` method can't be a coroutine"""
        self.transport, self.protocol = yield from aioamqp.connect()
        self.channel = yield from self.protocol.channel()

        result = yield from self.channel.queue_declare(queue_name='', exclusive=True)
        self.callback_queue = result['queue']

        yield from self.channel.basic_consume(
            self.on_response,
            no_ack=True,
            queue_name=self.callback_queue,
        )

    @asyncio.coroutine
    def on_response(self, channel, body, envelope, properties):
        if self.corr_id == properties.correlation_id:
            self.response = body

        self.waiter.set()

    @asyncio.coroutine
    def call(self, n):
        if not self.protocol:
            yield from self.connect()
        self.response = None
        self.waiter.clear()
        self.corr_id = str(uuid.uuid4())
        yield from self.channel.basic_publish(
            payload=str(n),
            exchange_name='',
            routing_key='rpc_queue',
            properties={
                'reply_to': self.callback_queue,
                'correlation_id': self.corr_id,
            },
        )
        yield from self.waiter.wait()

        # yield from self.protocol.close()
        return self.response


@asyncio.coroutine
def rpc_client():
    fibonacci_rpc = FibonacciRpcClient()
    print(" [x] Requesting fib(30)")
    while True:
        pass
        response = yield from fibonacci_rpc.call(30)
        print(" [.] Got %r" % response)


asyncio.get_event_loop().run_until_complete(rpc_client())

Server - no calculations, just echo:

"""
    RPC server, aioamqp implementation of RPC examples from RabbitMQ tutorial
"""

import asyncio
import aioamqp


@asyncio.coroutine
def on_request(channel, body, envelope, properties):
    n = int(body)

    print(" [.] fib(%s)" % n)
    # response = fib(n)
    response = n

    yield from channel.basic_publish(
        payload=str(response),
        exchange_name='',
        routing_key=properties.reply_to,
        properties={
            'correlation_id': properties.correlation_id,
        },
    )

    yield from channel.basic_client_ack(delivery_tag=envelope.delivery_tag)


@asyncio.coroutine
def rpc_server():

    transport, protocol = yield from aioamqp.connect()

    channel = yield from protocol.channel()

    yield from channel.queue_declare(queue_name='rpc_queue')
    yield from channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
    yield from channel.basic_consume(on_request, queue_name='rpc_queue')
    print(" [x] Awaiting RPC requests")


event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(rpc_server())
event_loop.run_forever()

And i have only 25 requests per second

Why so slow? why exactly 25?

Thanks

Does aioamqp work?

Hello,

Does driver work?

I only see this:

(.venv)vg %> python test_receive.py
b'py2.queue'
b'py2.queue'
b'py2.queue'
b'py2.queue'
b'py2.queue'
# rabbitmqctl list_channels
Listing channels ...
<[email protected]> python_test 1   5

I sent 3 messages to:

(.venv)vg %> python send.py 
Hello World!
(.venv)vg %> python send.py
Hello World!
(.venv)vg %> python send.py
Hello World!

But I still see and expecting messages:

(.venv)vg %> python test_receive.py
b'py2.queue'
b'py2.queue'
b'py2.queue'
b'py2.queue'
b'py2.queue'

Okey, time to run receiver based on pika:

(.venv)vg %> python recive.py 
Received b'Hello backend2'
Received b'Hello backend2'
Received b'Hello backend2'
# rabbitmqctl list_channels
Listing channels ...
<[email protected]> python_test 1   5
<[email protected]> python_test 1   0

test_receive.py:

#!/usr/bin/env python

import asyncio
import aioamqp

@asyncio.coroutine
def callback(body, envelope, properties):
    print(body)

@asyncio.coroutine
def receive():
    try:
        transport, protocol = yield from aioamqp.connect(host='10.10.80.23', port=5672,
            login='python_test', password='python_test', virtualhost='hello')
    except aioamqp.AmqpClosedConnection:
        print("closed connections")
        return

    channel = yield from protocol.channel()
    queue_name = 'py2.queue'

    yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=False), timeout=10)

    yield from asyncio.wait_for(channel.basic_consume(queue_name, callback=callback), timeout=10)


asyncio.get_event_loop().run_until_complete(receive())
asyncio.get_event_loop().run_forever()

send.py:

#!/usr/bin/env python
import pika

credentials = pika.PlainCredentials('python_test', 'python_test')

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.10.80.25',credentials=credentials, virtual_host='hello'))
channel = connection.channel()

channel.queue_declare(queue='py2.queue')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello backend2')
print("Hello World!")
connection.close()

recive.py:

#!/usr/bin/env python
import pika

credentials = pika.PlainCredentials('python_test', 'python_test')

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='10.10.80.23',credentials=credentials, virtual_host='hello'))
channel = connection.channel()

channel.queue_declare(queue='py2.queue')

def callback(ch, method, properties, body):
    print("Received {}".format(body))

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

channel.start_consuming()

set_result on canceled future

The asyncio documentation says:

Don’t call set_result() or set_exception() method of Future if the future is cancelled:
it would fail with an exception. For example, write:

if not fut.cancelled():
    fut.set_result('done')

The future state is not checked and breaks (I don't know how the future gets cancelled, yet) :

error on dispatch
Traceback (most recent call last):
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/protocol.py", line 196, in run
    yield from self.dispatch_frame()
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/protocol.py", line 177, in dispatch_frame
    yield from self.channels[frame.channel].dispatch_frame(frame)
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/channel.py", line 91, in dispatch_frame
    yield from methods[(frame.class_id, frame.method_id)](frame)
  File "/usr/lib/python3.4/asyncio/tasks.py", line 84, in coro
    res = func(*args, **kw)
  File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/channel.py", line 120, in open_ok
    fut.set_result(True)
  File "/usr/lib/python3.4/asyncio/futures.py", line 298, in set_result
    raise InvalidStateError('{}: {!r}'.format(self._state, self))
asyncio.futures.InvalidStateError: CANCELLED: Future<CANCELLED>

Channel Connection Pool

As per amqp docs it suggested to open one connection and have any number of channels open. There is a huge performance improvement for 1 connection - n channel pattern. It will be worth to maintain pool of amqp channels.

I am interested in sending a pull request. Please let me know if any more info is required.

Error while consuming from multiple queues: Waiter already exists

I'm trying to set up my code to be able to consume from multiple queues concurrently, using one channel.

Apart from connecting and creating the channel, this is the relevant part of the code:

tasks = []
for u in users:
    tasks.append(channel.basic_consume(callback, queue_name=u.name))
await asyncio.gather(*tasks)

Unfortunately, when I run it, I receive the following Waiter already exists error:

  File "manage.py", line 10, in <module>
    execute_from_command_line(sys.argv)
  File "/home/app/.local/lib/python3.5/site-packages/django/core/management/__init__.py", line 367, in execute_from_command_line
    utility.execute()
  File "/home/app/.local/lib/python3.5/site-packages/django/core/management/__init__.py", line 359, in execute
    self.fetch_command(subcommand).run_from_argv(self.argv)
  File "/home/app/.local/lib/python3.5/site-packages/django/core/management/base.py", line 305, in run_from_argv
    self.execute(*args, **cmd_options)
  File "/home/app/.local/lib/python3.5/site-packages/django/core/management/base.py", line 356, in execute
    output = self.handle(*args, **options)
  File "/code/callgen/management/commands/filldata.py", line 79, in handle
    loop.run_until_complete(self.connect(loop))
  File "/usr/local/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/code/callgen/management/commands/filldata.py", line 75, in connect
    await asyncio.gather(*tasks)
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
    future.result()
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 617, in basic_consume
    'basic_consume', frame, request, no_wait, timeout=timeout)
  File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 196, in _write_frame_awaiting_response
    f = self._set_waiter(waiter_id)
  File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 41, in _set_waiter
    raise exceptions.SynchronizationError("Waiter already exists")
aioamqp.exceptions.SynchronizationError: Waiter already exists

Is there a way to consume from multiple queues using one channel?

Thanks!

connect() and from_url() factory methods should accept an ssl.SSLContext instance

Currently, the connect() and from_url() methods have one point of customization for SSL connections in the form of a verify_ssl argument. However, this misses the point as:

  1. asyncio.create_connection accepts server_hostname and ssl arguments
  2. The ssl argument can be an ssl.SSLContext instance that can be customized further than just deciding whether or not server-side certificates should be verified.

In order to support such options, and to be future-compatible with any new options introduced to customize ssl.SSLContext instances, I suggest the following:

  1. Drop the verify_ssl argument. It appears to be a short-sighted attempt to deal with the new default SSL policy introduced with newer versions of Python.
  2. Overload the ssl argument to accept ssl.SSLContext instances, and forward such instances to asyncio.create_connection.

Two subtly different publish methods

Channel objects have publish() and basic_publish(), which behave subtly differently (right now I am looking at waiting for acks in confirm mode).

They should probably be consolidated.

Please don't check the basic_consume callback for iscoroutinefunction

aioamqp.exceptions.ConfigurationError: basic_consume requires a coroutine callback

This is a spurious error.
One might want to use tools like functools.partial, or a nested function, to create the callback. Example:

yield from my_channel.basic_consume(my_queue, callback=functools.partial(self._on_rpc, my_data))

Checking for None (or maybe callable()) should be sufficient.

Error callback is called after channel is "legally" closed

I use on_error parameter of aioamqp.connect to pass handler, which reestablish connection when it is accidentally closed. But it is called even after the channel is "legally" closed, which definitely is not an error case. And its exception argument looks weird: ChannelClosed(ChannelClosed(None, None), 'Channel is closed').

Here is the code to reproduce the issue:

import logging
import asyncio
import aioamqp


logger = logging.getLogger(__name__)


def on_error(exc):
    logger.error('Handle error %r', exc)


async def run(loop):
    transport, protocol = await aioamqp.connect(on_error=on_error, loop=loop)
    channel = await protocol.channel()

    logger.info('Close channel')
    await channel.close()
    logger.info('Close protocol')
    await protocol.close()
    logger.info('Close transport')
    transport.close()


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop=loop))

Here is its output:

DEBUG:asyncio:Using selector: EpollSelector
INFO:aioamqp.protocol:Recv open ok
DEBUG:aioamqp.channel:Channel is open
INFO:__main__:Close channel
INFO:aioamqp.channel:Channel closed
INFO:__main__:Close protocol
INFO:aioamqp.protocol:Recv close ok
INFO:__main__:Close transport
WARNING:aioamqp.protocol:Connection lost exc=None
ERROR:__main__:Handle error ChannelClosed(ChannelClosed(None, None), 'Channel is closed')

Environment:

  • Python 3.5.2
  • aioamqp 0.8.2
  • CentOS Linux release 7.2.1511 (Core)

Reuse channel id

For now, the library does not reuse previously used channel ID.

Thus, it cant use more than 2^16 channel

Ack messages with multiple connections

Usecase: my Python application is connecting to multiple RabbitMQ servers. These are non-clustered. Our events are sharded across these servers. We want to consume events from all these servers. These events need to be ack'ed.

Problem:
channel.basic_consume(queue_name, callback=callback)
callback looks like:
def callback(body, envelope, properties):
thus inside the callback you have no idea from which server you received the event.
Ticket #39 suggests creating a separate channel to ack. Problem I don't know which server to create the channel for.

Initially I thought I just create a lambda for the callback and bind the ack-channel, but then I found:
ff74ee7
So that's probably not going to work out. Wonder why this change was made anyway.

I'm pretty new to Python, so I am not sure what I'm supposed to do in this case. Should I wrap these functions in objects and can the callback function point to a member-function of the object? The object would then have access to the server/channel, thus we're able to send an ack.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.