Coder Social home page Coder Social logo

celery / py-amqp Goto Github PK

View Code? Open in Web Editor NEW
310.0 22.0 198.0 2.4 MB

amqplib fork

License: Other

Makefile 0.96% Python 96.77% Shell 0.27% Dockerfile 1.39% Cython 0.61%
python amqp-client amqp amqp-connection amqp-messages python3 python-library rabbitmq

py-amqp's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

py-amqp's Issues

SyntaxError for python 3.2.2

[root@testzlin amqp-1.0.9]# /opt/python3/bin/python3 setup.py install
...
Finished processing dependencies for amqp==1.0.9

[root@testzlin amqp-1.0.9]# /opt/python3/bin/python3
Python 3.2.2 (default, Jan 25 2012, 13:16:54)
[GCC 3.4.6 20060404 (Red Hat 3.4.6-3)] on linux2
Type "help", "copyright", "credits" or "license" for more information.

import amqp
Traceback (most recent call last):
File "", line 1, in
File "amqp/init.py", line 32, in
from .basic_message import Message
File "amqp/basic_message.py", line 19, in
from .serialization import GenericContent
File "amqp/serialization.py", line 261
if n < 0 or n >= 4294967296L:
^
SyntaxError: invalid syntax

Can't specify default ssl arguments when creating a Connection

>>> c = amqp.connection.Connection( ssl={} )
>>> c.connect()
>>> c.transport
<amqp.transport.TCPTransport object at 0x7f8f4a7ad650>

Passing an empty dictionary causes this line to pick a TCPTransport for you: https://github.com/celery/py-amqp/blob/master/amqp/transport.py#L395

>>> import amqp
>>> c = amqp.connection.Connection( ssl=True )
>>> c.connect()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "amqp/connection.py", line 272, in connect
    self.transport.connect()
  File "amqp/transport.py", line 102, in connect
    self.socket_settings, self.read_timeout, self.write_timeout,
  File "amqp/transport.py", line 176, in _init_socket
    self._setup_transport()
  File "amqp/transport.py", line 286, in _setup_transport
    self.sock = self._wrap_socket(self.sock, **self.sslopts or {})
AttributeError: 'SSLTransport' object has no attribute 'sslopts'

Passing True causes this exception to be raised since this line never executes: https://github.com/celery/py-amqp/blob/master/amqp/transport.py#L279

It seems like the user is forced to specify some arguments to ssl, leaving no way to specify that we want all defaults.

No timeout set on socket!

I'm trying to diagnose some really annoying issues with a connection getting wedged and stopping responding, and from what I can tell, the internal AMQP ssl socket is put into blocking mode with no timeout, and left that way.

This is (I think) leading to a context where if the remote end goes away while in a blocking read (I think), my code blocks forever and can never recover.

Looking in transport.py, the passed socket is intialized with the desired timeout in the connect call, but the connect() call is immediately followed by a call to _init_socket() which then sets the socket back in blocking mode, which is then never changed.

There is the later call to self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, write_timeout), and SO_SNDTIMEO, but unless this is causing the python socket management class to get out of sync with the actual socket configuration, it doesn't have a set timeout (calling self.connection.transport.sock.gettimeout() on an open connection returns None, and the source behind that call is hidden in a C file somewhere).


Edit:

Ok, this might be what's happening, it looks like gettimeout() just returns the contents of a struct member from the python socket class instance. It is worth noting that unless that timeout value is set, some of the internal behaviour of the socket class will change (search for sock_timeout in that file), so setting the timeout to non-none may be required to get all the expected socket behaviour.


Anyways, I'm specifying a value for SO_SNDTIMEO and SO_RCVTIMEO and still somehow getting wedged, so I'm a little desperate at this point.

In any event, is there any reason that this even needs to be done? It seems something like


            self.sock.settimeout(
                max(read_timeout, write_timeout) if read_timeout and write_timeout else None
                )  # set socket back to blocking mode

would preserve the timeout overall if the end-user sets them both, while leaving it the way it is if the user doesn't specify the timeouts fully.

generator already executing

My celery jobs started throwing this error after upgrading from amqp 1.4.9 to 2.0.4

[2016-08-10 14:57:54,695: ERROR/MainProcess] Unrecoverable error: ValueError('generator already executing',)
Traceback (most recent call last):
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start
    return self.obj.start()
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 279, in start
    blueprint.start(self)
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 838, in start
    c.loop(*c.loop_args())
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/loops.py", line 101, in synloop
    qos.update()
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/kombu/common.py", line 400, in update
    return self.set(self.value)
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/kombu/common.py", line 393, in set
    self.callback(prefetch_count=new_value)
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 615, in set_prefetch_count
    apply_global=qos_global,
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/kombu/messaging.py", line 533, in qos
    apply_global)
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/amqp/channel.py", line 1851, in basic_qos
    wait=spec.Basic.QosOk,
  File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 62, in send_method
    conn.frame_writer.send((1, self.channel_id, sig, args, content))
ValueError: generator already executing

read/write timeouts?

(not sure if this belongs to kombu or here)

I'm trying to understand how kombu/amqplib is supposed to behave when the TCP connection goes away silently and why there is no mechanism to define read/write timeouts (similar to the ones in libmemcached, for example http://docs.libmemcached.org/memcached_behavior.html#MEMCACHED_BEHAVIOR_SND_TIMEOUT).

To simulate this I'm running celery with RabbitMQ, start a celery worker and a task submitter and then SIGSTOP the RabbitMQ server. The celery consumer never notices anything is wrong, the task submitter submits tasks until the TCP buffer is full (which takes quite a while) and then fails with an error, all the published tasks are lost.

How can I detect this earlier?

AMQP heartbeats fail

With heartbeats enabled, the application crashes after a short time with the below stacktrace. I am not sure yet if this issue is with py-amqp, kombu or celery but am recording it here in the meantime.

 Traceback (most recent call last):
   File "/home/.../py-amqp/amqp/connection.py", line 561, in send_heartbeat
     self._frame_writer.send((8, 0, None, None, None))
 StopIteration

 During handling of the above exception, another exception occurred:

 Traceback (most recent call last):
   File "/home/.../celery/celery/worker/consumer.py", line 278, in start
     blueprint.start(self)
   File "/home/.../celery/celery/bootsteps.py", line 123, in start
     step.start(parent)
   File "/home/.../celery/celery/worker/consumer.py", line 851, in start
     c.loop(*c.loop_args())
   File "/home/.../celery/celery/worker/loops.py", line 74, in asynloop
     next(loop)
   File "/home/.../kombu/kombu/async/hub.py", line 278, in create_loop
     poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
   File "/home/.../kombu/kombu/async/hub.py", line 134, in fire_timers
     entry()
   File "/home/.../kombu/kombu/async/timer.py", line 63, in __call__
     return self.fun(*self.args, **self.kwargs)
   File "/home/.../kombu/kombu/async/timer.py", line 131, in _reschedules
     return fun(*args, **kwargs)
   File "/home/.../kombu/kombu/connection.py", line 264, in heartbeat_check
     return self.transport.heartbeat_check(self.connection, rate=rate)
   File "/home/.../kombu/kombu/transport/pyamqp.py", line 137, in heartbeat_check
     return connection.heartbeat_tick(rate=rate)
   File "/home/.../py-amqp/amqp/connection.py", line 586, in heartbeat_tick
     self.send_heartbeat()
   File "/home/.../py-amqp/amqp/connection.py", line 563, in send_heartbeat
     raise RecoverableConnectionError('connection already closed')
 amqp.exceptions.RecoverableConnectionError: connection already closed

RabbitMQ logs:

=WARNING REPORT==== 21-Dec-2014::14:01:51 ===
Non-AMQP exit reason '{exit,
{amqp_error,frame_error,
"cannot decode <<0,0,0,0,0,0>>",
'connection.close'},
'connection.close',
[{rabbit_misc,protocol_error,1,[]},
{rabbit_reader,handle_method0,3,[]},
{rabbit_reader,handle_input,3,[]},
{rabbit_reader,recvloop,4,[]},
{rabbit_reader,run,1,[]},
{rabbit_reader,start_connection,5,[]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,237}]}]}'

The issue only occurs when heartbeats are enabled.

make a reference to channel part of the message returned to the call back

in amqplib, channel was an attribute of message. The way py-amqp does it, you need to curry your callback function with partial to get the same effect. No matter what, you need to be able to acknowledge or not, the message in the call back. Requiring functools.parital seems like extra work for the user.

It definitely caught me off guard when I moved from py-amqplib to py-amqp. In your demo code for receiving messages, is a poster child.

Exception while handling frame args in PY3

When running a code using py-amqp/master on Python >= 3, a TypeError exception is raised from method_framing.py while trying to handle frame args during the connection teardown. Issue is systematic. For example, when running the kombu hello publisher example using Python 3.4 and the latest RabbitMQ, I end up like this:

Sent: helloword, sent at 2016-05-23 16:20:11.172310
Traceback (most recent call last):
  File "kombutest.py", line 9, in <module>
    simple_queue.close()
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/connection.py", line 739, in __exit__
    self.release()
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/connection.py", line 352, in release
    self._close()
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/connection.py", line 318, in _close
    self._do_close_self()
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/connection.py", line 311, in _do_close_self
    self.transport.close_connection(self._connection)
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/transport/pyamqp.py", line 134, in close_connection
    connection.close()
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 463, in close
    wait=spec.Connection.CloseOk,
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/abstract_channel.py", line 71, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/abstract_channel.py", line 91, in wait
    self.connection.drain_events(timeout=timeout)
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 388, in drain_events
    return self.blocking_read(timeout)
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 393, in blocking_read
    return self.on_inbound_frame(frame)
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/method_framing.py", line 63, in on_frame
    callback(channel, method_sig, buf, None)
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 397, in on_inbound_method
    method_sig, payload, content,
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/abstract_channel.py", line 140, in dispatch_method
    listener(*args)
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 520, in _on_close
    self._x_close_ok()
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 537, in _x_close_ok
    self.send_method(spec.Connection.CloseOk, callback=self._on_close_ok)
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/abstract_channel.py", line 62, in send_method
    conn._frame_writer.send((1, self.channel_id, sig, args, content))
  File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/method_framing.py", line 142, in frame_writer
    if type_ == 1 else b'')
TypeError: sequence item 1: expected a bytes-like object, str found

Due to the kind of exception raised and the fact that under PY2 everything seems to work fine, the issue is probably related to the fact that in PY3 strings are bytes no more.

Reverse topic exchange support

Hello, I would like to use a rabbitmq plugin for reverse topic exchange (x-rtopic) in order to define a set of machines I could run send a message to. So could be defined on the message routing_key a '.' or 'a.#' pattern and each node had it's own routing key.

I've tested the plugin with kombu and pika using simple exemples found in the net. And both worked fine. But when I'try to send a Celery task it does not work, the Consumer connects fine, but when the Producer loads my task, create automaticaly a (direct) Exchage ignoring the configuration I've setted in the config file. Even setting the CELERY_DEFAULT_EXCHANGE_TYPE gets ignored. Anyway, then I forced celery to set create a exchange (x-rtopic) type and than I get this error below:

ConnectionError: 503: (COMMAND_INVALID - unknown exchange type 'x-rtopic', (40, 10), None)

Something in the Channel part seems to now allow this.

There is a easy (or not) way to achieve this support ?

Thanks

Andre

No PGP signature on 1.4.6 PyPi upload

Nitpicking, I know, but could you add a PGP signature for the source-tarball on PyPi for the 1.4.6 upload? The build-process for the Debian package relies on it being there.

please tag v2.0.4

I just spent hours pulling my teeth on CentOS 7.2 before figuring out that I need 9167f65.

Could you tag a new release to make it easier for me to package a new rpm.

Thanks

heartbeat timeouts since recent changes

Since the recent (ca. Jan 10 or so) heartbeat changes, the following occurs often (maybe 75% of the time) when attempting to publish using Kombu and py-amqp:

Traceback (most recent call last):
  File "/home/...", line 430, in ...
    publish('...', {'ack': ..., 'result': result}, routing_key=reply_to)
  File "/home/...", line 109, in publish
    maybe_declare(exchange, producer.channel)
  File "/home/.../kombu/kombu/common.py", line 91, in maybe_declare
    return _maybe_declare(entity)
  File "/home/.../kombu/kombu/common.py", line 106, in _maybe_declare
    entity.declare()
  File "/home/.../kombu/kombu/entity.py", line 166, in declare
    nowait=nowait, passive=passive,
  File "/home/.../py-amqp/amqp/channel.py", line 613, in exchange_declare
    self._send_method((40, 10), args)
  File "/home/.../py-amqp/amqp/abstract_channel.py", line 56, in _send_method
    self.channel_id, method_sig, args, content,
  File "/home/.../py-amqp/amqp/method_framing.py", line 221, in write_method
    write_frame(1, channel, payload)
  File "/home/.../py-amqp/amqp/transport.py", line 163, in write_frame
    frame_type, channel, size, payload, 0xce,
ConnectionResetError: [Errno 104] Connection reset by peer

RabbitMQ logs this:

=ERROR REPORT==== 21-Jan-2014::07:30:53 ===
closing AMQP connection <0.29065.28> ([2401:F000:10:0:xxx]:39688 -> [2401:F000:10:0:xxx]:5672):
{heartbeat_timeout,running}

This publish is occurring from within a celery task body, if that makes any difference.

AMQP 0-10 support

Currently py-amqp works only with AMQP 0-9 brokers such as RabbitMQ. Another popular broker Qpid implements AMQP 0-10 and just silently disconnects py-amqp due protocol version mismatch.

Versioning issues

Hi there,

the current release on PyPI is 1.0.13 whereas ampg/init.py has VERSION = (1, 2, 0) which ends up on the tarball (via setup.py). This is mildly confusing and a bit of a mess. Could you please clarify which is supposed to be the real version number?

To give some background, openSUSE ships python-amqp-1.2.0 since quite a while now since that has been the most current version in the past. We'd either have to downgrade now (which is really inconvenient for a lot of reasons) or convince you to bump the version number to 1.2.1.

[py-amqp 2.0] Race condition on drain_events & channel pending list

I'm trying to implement a channel multiplexing over connection with one listening thread which just call the drain_events in while loop. Other threads creates it's own channels and use them.

But some channel actions in the threads (e.g channel opening or publishing confirmation) are stucks on the channel.wait() method, because drain_events() steals a response from the broker and does not dispatch the frame to waiting promise.

As I understand the problem is the race condition between adding the new pending task in the channel.wait() method and calls drain_events in the listening thread. Take a look on the next case:

(1) Thread A: run drain_events() in while loop
(2) Thread B: send(Channel.Open)
(3) Thread A: receive OpenOk and trying to dispatch the frame. But in this time penging list of the channel is empty
(4) Thread B: append the new promise to pending list and goto infinity loop:

while not p.ready:
      self.connection.drain_events(timeout=timeout)

So, I think the problem can be solved if a promise will be added in the list before calling of _frame_writer.send() method.

MemoryError establishing a connection to Azure Service Bus

Not sure if amqp and Azure Service Bus should cooperate, but I receive a MemoryError (originating from transport._AbstractTransport.read_frame). The payload size unpacked from he frame header seems incorrect.

I attempt to establish the connection like so:

connection = amqp.connection.Connection(
host='MYSERVICENAMESPACE.servicebus.windows.net',
userid='owner',
password='MYPASSWORD')

Bad support for timeouts

The documentation says:

Differences from amqplib

  • Support for timeouts

Look at the code in the file "transport.py":

class _AbstractTransport(object):

    def __init__(self, host, connect_timeout):
        ...
            try:
                self.sock = socket.socket(af, socktype, proto)
                self.sock.settimeout(connect_timeout)
                self.sock.connect(sa)
            except socket.error, msg:
        ...
        self.sock.settimeout(None)  # <====  WTF?

I use kombu + RabbitMQ cluster behind AWS Elastic Load Balancer. Sometimes my code freezes in the method _TCPTransport.write(...) somewhere about 900 seconds (probably in this to blame the ELB) and then it throw exception socket.timeout.
Explain to me why you disable the timeout for the socket?

PS: sorry for my English

implementing async rpc client

Hey,

I am trying to change from using pika to py-amqp on my async rpc client for apache

The problem I am running into is that wait() is blocking. Which means that I cannot run it on a separate thread with a lock.

Receiving RPC requests using it's own thread.

while self.channel.callbacks:
    with self.internal_lock:
        self.channel.wait() # waits here endlessly

Sending RPC requests.

with self.internal_lock:
    self.channel.basic_publish(exchange=self.exchange,
                                           routing_key=routing_key,
                                           msg=message)

Since wait() keeps the lock until it receives a response, it keeps locking, so that I can never send any requests.

I was wondering if there is a function within amqp that allows me to check for responses, without blocking the application, such as pikas process_data_events.

Too many heartbeats missed

We are getting errors of 'Error in timer: ConnectionForced(None, 'Too many heartbeats missed', None, '')' every now and then while using the pyamqp transport. This has happened on both py-amqp 0.9.3 and 1.2.0.

I don't think there's anything wrong on the underlying rabbitmq servers, and I also don't think there's a network problem going on (no other services have any problems talking to them).

We don't get a ton of them (I would say maybe 1 or 2 per day), but it's worrisome nonetheless. Do you have any first steps on how to get to the bottom of this? Our BROKER_HEARTBEAT is set to 10, the default, should we just up that time?

Thanks,
Andy

Docs version 'latest' missing changelog entries for 2.x

Visiting https://amqp.readthedocs.io/ redirects to the latest version tag of the docs.

However these don't mention v2.x on the changelog, instead only <= 1.4.9:
https://amqp.readthedocs.io/en/latest/changelog.html
(I've even tried forcing a RTD rebuild using https://readthedocs.org/projects/amqp/builds/)

Strangely the stable version does have those changelog entries:
https://amqp.readthedocs.io/en/stable/changelog.html
(Though as #94 notes, the banner there about it being for a development version is incorrect)

It seems as though latest and master on readthedocs could be combined, and the default version tag be made to be stable?

Also the readme mentions both 0.9.1 and 2.0.0rc2, rather than 2.0.1:
https://github.com/celery/py-amqp/blame/b12272c54a88ad428a7eaf837280f136b7252e08/README.rst#L2
https://github.com/celery/py-amqp/blame/b12272c54a88ad428a7eaf837280f136b7252e08/README.rst#L7

Many thanks :-)

Cannot close connection

In [1]: import amqp

In [2]: c = amqp.connection.Connection()

In [3]: c.connect()
FRAME OFFSET + SIZE: 468
OFFSET: 0 SIZE: 469 DATA: 469
FRAME OFFSET + SIZE: 19
OFFSET: 469 SIZE: 20 DATA: 20
FRAME OFFSET + SIZE: 12
OFFSET: 489 SIZE: 13 DATA: 13

In [4]: c.channel()
FRAME OFFSET + SIZE: 15
OFFSET: 502 SIZE: 16 DATA: 16
Out[4]: <amqp.channel.Channel at 0x2e62b50>

In [5]: c.close()
FRAME OFFSET + SIZE: 32
OFFSET: 518 SIZE: 33 DATA: 33
---------------------------------------------------------------------------
InternalError                             Traceback (most recent call last)
<ipython-input-5-b082af813204> in <module>()
----> 1 c.close()

/home/dmitry/projects/py-amqp/amqp/connection.pyc in close(self, reply_code, reply_text, method_sig, argsig)
    547             spec.Connection.Close, argsig,
    548             (reply_code, reply_text, method_sig[0], method_sig[1]),
--> 549             wait=spec.Connection.CloseOk,
    550         )
    551 

/home/dmitry/projects/py-amqp/amqp/abstract_channel.pyc in send_method(self, sig, format, args, content, wait, on_sent, callback)
     62         # TODO temp: callback should be after write_method ... ;)
     63         if wait:
---> 64             return self.wait(wait, callback)
     65         return on_sent
     66 

/home/dmitry/projects/py-amqp/amqp/abstract_channel.pyc in wait(self, method, callback, returns_tuple, filter)
     79         try:
     80             while not p.ready:
---> 81                 self.connection.drain_events()
     82 
     83             if p.value:

/home/dmitry/projects/py-amqp/amqp/connection.pyc in drain_events(self, timeout)
    363 
    364     def drain_events(self, timeout=None):
--> 365         return self.blocking_read(timeout)
    366 
    367     def on_readable(self, _unavail=_UNAVAIL):

/home/dmitry/projects/py-amqp/amqp/connection.pyc in blocking_read(self, timeout)
    446             self._read_frame(None)
    447             while self._inbound:
--> 448                 self.on_inbound_frame(self._inbound.popleft())
    449             self._call_pending_blocking_callbacks()
    450             return

/home/dmitry/projects/py-amqp/amqp/method_framing.pyc in frame_handler(connection, callback, unpack_from, content_methods)
     72                 expected_types[channel] = 2
     73             else:
---> 74                 callback(channel, method_sig, buf, None)
     75 
     76         elif frame_type == 2:

/home/dmitry/projects/py-amqp/amqp/connection.pyc in on_inbound_method(self, channel_id, method_sig, payload, content)
    477     def on_inbound_method(self, channel_id, method_sig, payload, content):
    478         return self.channels[channel_id].dispatch_method(
--> 479             method_sig, payload, content,
    480         )
    481 

/home/dmitry/projects/py-amqp/amqp/abstract_channel.pyc in dispatch_method(self, method_sig, payload, content)
    131         for listener in listeners:
    132             # print('CALLING LISTENER: %r' % (listener, ))
--> 133             listener(*args)
    134 
    135     #: Placeholder, the concrete implementations will have to

/home/dmitry/projects/py-amqp/amqp/connection.pyc in _on_close(self, reply_code, reply_text, class_id, method_id)
    606         self._x_close_ok()
    607         raise error_for_code(reply_code, reply_text,
--> 608                              (class_id, method_id), ConnectionError)
    609 
    610     def _on_blocked(self):

InternalError: (0, 0): (541) INTERNAL_ERROR

Therefore, in logs:

=INFO REPORT==== 5-Jul-2014::21:36:34 ===
accepting AMQP connection <0.494.0> (127.0.0.1:33018 -> 127.0.0.1:5672)

=ERROR REPORT==== 5-Jul-2014::21:36:58 ===
AMQP connection <0.494.0> (running), channel 0 - error:
{exit,{amqp_error,frame_error,"cannot decode <<0,0,0,0,0,0>>",
                  'connection.close'},
      'connection.close',
      [{rabbit_misc,protocol_error,1,[]},
       {rabbit_reader,handle_method0,3,[]},
       {rabbit_reader,handle_input,3,[]},
       {rabbit_reader,recvloop,4,[]},
       {rabbit_reader,run,1,[]},
       {rabbit_reader,start_connection,5,[]},
       {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}

=WARNING REPORT==== 5-Jul-2014::21:36:58 ===
Non-AMQP exit reason '{exit,
                          {amqp_error,frame_error,
                              "cannot decode <<0,0,0,0,0,0>>",
                              'connection.close'},
                          'connection.close',
                          [{rabbit_misc,protocol_error,1,[]},
                           {rabbit_reader,handle_method0,3,[]},
                           {rabbit_reader,handle_input,3,[]},
                           {rabbit_reader,recvloop,4,[]},
                           {rabbit_reader,run,1,[]},
                           {rabbit_reader,start_connection,5,[]},
                           {proc_lib,init_p_do_apply,3,
                               [{file,"proc_lib.erl"},{line,239}]}]}'

=INFO REPORT==== 5-Jul-2014::21:36:58 ===
closing AMQP connection <0.494.0> (127.0.0.1:33018 -> 127.0.0.1:5672)

Confirm delivery returns none?

Hey,

I tried out the library and I can't get basic_publish_confirm to return anything but None.

I looked at the source code and it at least indicates that it should be returning something.

For example basic_publish_confirm has ret = self._basic_publish(*args, **kwargs), but _basic_publish does not actually return anything.

openstack oslo.messaging load failed due to amqp can't find module 'vine' in Python3.4

Traceback (most recent call last):
File "/home/jenkins/workspace/gate-smaug-python34/smaug/tests/base.py", line 92, in setUp
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/oslo_messaging/conffixture.py", line 50, in init
'oslo_messaging_rabbit')
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/oslo_messaging/conffixture.py", line 25, in _import_opts
import(module)
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/oslo_messaging/_drivers/impl_rabbit.py", line 30, in
import kombu.connection
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/kombu/connection.py", line 19, in
from kombu import exceptions
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/kombu/exceptions.py", line 12, in
from amqp import ChannelError, ConnectionError, ResourceError
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/init.py", line 39, in
from .basic_message import Message # noqa
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/basic_message.py", line 29, in
from .serialization import GenericContent
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/serialization.py", line 34, in
from .exceptions import FrameSyntaxError
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/exceptions.py", line 21, in
from .five import python_2_unicode_compatible
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/five.py", line 14, in
import vine.five
ImportError: No module named 'vine'

Error when using 'BROKER_HEARTBEAT'

I've been waiting for heartbeat support to make it into celery/kombu/amqplib, so I decided to take it for a spin with the newest amqp.

When I don't specify 'BROKER_HEARTBEAT', everything looks good, and celery is able to connect and process messages just fine.

When I specify any value for 'BROKER_HEARTBEAT', I get this error:

2012-08-23 00:11:23,436 cronus.int.sfo01.mml WARNING stdout: celery@cronus has started.
2012-08-23 00:11:23,440 cronus.int.sfo01.mml DEBUG amqp: Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2011 VMware, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'consumer_cancel_notify': True, u'publisher_confirms': True, u'basic.nack': True}, u'platform': u'Erlang/OTP', u'version': u'2.7.1'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
2012-08-23 00:11:23,441 cronus.int.sfo01.mml ERROR celery.worker: Unrecoverable error: ValueError('Octet out of range 0..65535',)
Traceback (most recent call last):
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 353, in start
    component.start()
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 368, in start
    self.reset_connection()
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 703, in reset_connection
    self.connection = self._open_connection()
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 766, in _open_connection
    callback=self.maybe_shutdown)
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/connection.py", line 266, in ensure_connection
    interval_start, interval_step, interval_max, callback)
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 159, in retry_over_time
    return fun(*args, **kwargs)
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/connection.py", line 156, in connect
    return self.connection
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/connection.py", line 610, in connection
    self._connection = self._establish_connection()
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/connection.py", line 569, in _establish_connection
    conn = self.transport.establish_connection()
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 102, in establish_connection
    heartbeat=conninfo.heartbeat)
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/connection.py", line 157, in __init__
    (10, 30),  # tune
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 70, in wait
    return self.dispatch_method(method_sig, args, content)
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 87, in dispatch_method
    return amqp_method(self, args)
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/connection.py", line 808, in _tune
    self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat)
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/connection.py", line 890, in _x_tune_ok
    args.write_short(heartbeat or 0)
  File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/serialization.py", line 231, in write_short
    raise ValueError('Octet out of range 0..65535')
ValueError: Octet out of range 0..65535

Any idea?

Error packing illegal values into struct

NOTE: I am using development versions of celery, kombu, py-amqp

I'm following the steps on the documentation to setup SSL for my celery worker (I have a working connection with RabbitMQ broker following these steps using SSL connection: http://www.rabbitmq.com/troubleshooting-ssl.html#logs). I have very minimal configuration, and I am running into this error when I try to fire up my worker:

struct.error: required argument is not an integer at
https://github.com/celery/py-amqp/blob/master/amqp/transport.py#L122

Using pdb, I found error is coming from the interval variable:

> /usr/local/poapi_worker/venv/src/py-amqp/amqp/transport.py(122)__init__()
-> struct.pack('ll', interval, 0),
(Pdb) print(interval)
{'keyfile': '/etc/ssl/key.pem', 'certfile': '/etc/ssl/cert.pem', 'ca_certs': '/etc/ssl/cacert.pem', 'cert_reqs': 2}
(Pdb) print(write_timeout)
None
(Pdb) print(read_timeout)
{'keyfile': '/etc/ssl/key.pem', 'certfile': '/etc/ssl/cert.pem', 'ca_certs': '/etc/ssl/cacert.pem', 'cert_reqs': 2}

It looks like the entire BROKER_USE_SSL from celeryconfig.py is being assigned to read_timeout. I'm not sure how/why that is happening, the documentation is pretty clear in how that dictionary should look: http://docs.celeryproject.org/en/latest/configuration.html#broker-use-ssl. Is this a known error in latest py-amqp?

pyampq fails to load message from deadletter queue

Fails with

[2013-02-11 10:10:57,014: ERROR/MainProcess] Unrecoverable error: ValueError('Unknown table item type: 65',)

According to http://www.rabbitmq.com/amqp-0-9-1-errata.html

65 => 'A' is array

Here is the message returned by curl

$ curl -v -XPOST -H "Content-type: application/json" -d '{"count":1,"requeue":true,"encoding":"auto"}' 'http://guest:guest@localhost:15672/api/queues/vhost/deadletterqueue/get'
[{"payload_bytes":165,"redelivered":true,"exchange":"deadletter","routing_key":"celery","message_count":0,"properties":{"delivery_mode":1,"headers":{"x-death":[{"reason":"rejected","queue":"celery","time":1360606069,"exchange":"","routing-keys":["celery"]}]}},"payload":"{\"id\": \"4cc7438e-afd4-4f8f-a2f3-f46567e7ca77\",\r\n \"task\": \"celery.task.PingTask\",\r\n \"args\": [],\r\n \"kwargs\": {},\r\n \"retries\": 0,\r\n \"eta\": \"2009-11-17T12:30:56.527191\"}","payload_encoding":"string"}]

Block on Publisher Confirms

It'd be nice to be able to put the channel in a mode where it will block (with a timeout) until a message is confirmed. TX makes the broker die in a fire at even modest message rates, but publisher confirms seems to work fine in benchmarks.

Unknown AMQP method

While fixing the breakage from upgrading to the latest version of celery, I encountered this error. Since method_sig is a tuple, on line 84 of abstract_channel.py,

raise AMQPError('Unknown AMQP method %r' % method_sig)

will attempt to treat method_sig as the arguments for the string formatting operator, which leads to this exception:

TypeError: not all arguments converted during string formatting

The easy fix for me was to change line 84 to:

raise AMQPError('Unknown AMQP method %r' % str(method_sig))

ImportError: cannot import name spec

Hi, we have got this error

ImportError: cannot import name spec

we use librabbitmq==1.6.1, kombu==3.0.33

Traceback here:

``
File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/kombu/init.py", line 67, in getattr

module = import(object_origins[name], None, None, [name])

File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/kombu/connection.py", line 19, in

from kombu import exceptions

File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/kombu/exceptions.py", line 12, in

from amqp import ChannelError, ConnectionError, ResourceError

File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/amqp/init.py", line 39, in

from .basic_message import Message # noqa

File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/amqp/basic_message.py", line 19, in

from . import spec

ImportError: cannot import name spec
``

Is 2.0 the stable version?

The docs say "This document is for py-amqp's development version, which can be significantly different from previous releases. Get the stable docs here: 1.4."

This can now be removed?

If a malformed message gets into Rabbit, Celery worker fails to start

If a publisher manages to publish a corrupted, non-UTF8 decodable message into Rabbit, the Celery worker will fail to start. I propose that it should instead leave the message unacknowledged, log the error, and continue working on other messages.

Here's an example stack trace when this happens:

[2015-10-17 23:34:57,419: ERROR/MainProcess] Unrecoverable error: UnicodeDecodeError('utf8', 'qr=-1&qf=[date:between:2015-09-01,2015-09-Traceback (most recent call last):
  File "venv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
    self.blueprint.start(self)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start
    return self.obj.start()
  File "venv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
    blueprint.start(self)
  File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "venv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
    c.loop(*c.loop_args())
  File "venv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 70, in asynloop
    next(loop)
  File "venv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 272, in create_loop
    item()
  File "venv/local/lib/python2.7/site-packages/amqp/utils.py", line 42, in __call__
    self.set_error_state(exc)
  File "venv/local/lib/python2.7/site-packages/amqp/utils.py", line 39, in __call__
    **dict(self.kwargs, **kwargs) if self.kwargs else kwargs
  File "venv/local/lib/python2.7/site-packages/kombu/transport/base.py", line 144, in _read
    drain_events(timeout=0)
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 302, in drain_events
    chanmap, None, timeout=timeout,
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 365, in _wait_multiple
    channel, method_sig, args, content = read_timeout(timeout)
  File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 336, in read_timeout
    return self.method_reader.read_method()
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
    self._next_method()
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 126, in _next_method
    self._process_content_header(channel, payload)
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 154, in _process_content_header
    partial.add_header(payload)
  File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 54, in add_header
    self.msg._load_properties(payload[12:])
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 476, in _load_properties
    d[key] = getattr(r, 'read_' + proptype)()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 141, in read_table
    val = table_data.read_item()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 191, in read_item
    val = self.read_table()  # recurse
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 141, in read_table
    val = table_data.read_item()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 150, in read_item
    val = self.read_longstr()
  File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 131, in read_longstr
    return self.input.read(slen).decode('utf-8')
  File "venv/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x96 in position 80: invalid start byte

Versions:

Celery: 3.1.16
Kombu: 3.0.23

Using pyamqp://

SSL issue with Celery 3.1.6

Hello !

It seems that between rc4 and 3.16, something changed in py-amqp or kombu that prevents SSL connection to an AMQP server to work.

Here is a stack-trace :

[2013-12-16 11:53:26,308: WARNING/MainProcess] celery@XXX ready.
[2013-12-16 11:53:30,311: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 270, in start
    blueprint.start(self)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 790, in start
    c.loop(*c.loop_args())
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/loops.py", line 72, in asynloop
    next(loop)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/async/hub.py", line 333, in create_loop
    cb(*cbargs)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/transport/base.py", line 156, in on_readable
    reader(loop)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/transport/base.py", line 139, in _read
    raise ConnectionError('Socket was disconnected')
amqp.exceptions.ConnectionError: Socket was disconnected
[2013-12-16 11:53:30,333: ERROR/MainProcess] Unrecoverable error: ValueError('I/O operation on closed epoll fd',)
Traceback (most recent call last):
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/__init__.py", line 212, in start
    self.blueprint.start(self)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/bootsteps.py", line 373, in start
    return self.obj.start()
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 270, in start
    blueprint.start(self)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/bootsteps.py", line 123, in start
    step.start(parent)
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 467, in start
    c.connection = c.connect()
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 369, in connect
    conn.transport.register_with_event_loop(conn.connection, self.hub)                                                                                                                                                                                                         
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/transport/pyamqp.py", line 124, in register_with_event_loop                                                                                                                                                 
    loop.add_reader(connection.sock, self.on_readable, connection, loop)                                                                                                                                                                                                       
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/async/hub.py", line 214, in add_reader                                                                                                                                                                      
    return self.add(fds, callback, READ | ERR, args)                                                                                                                                                                                                                           
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/async/hub.py", line 165, in add                                                                                                                                                                             
    self.poller.register(fd, flags)                                                                                                                                                                                                                                            
  File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/utils/eventio.py", line 78, in register                                                                                                                                                                     
    self._epoll.register(fd, events)                                                                                                                                                                                                                                           
ValueError: I/O operation on closed epoll fd

i don't have time to look into it right now but I'll do some digging in the week!

Incorrect method signature for Channel.Close

The AMQP spec specifies short, shortstr, short, short as the Channel.Close method signature, so the below patch should be applied.

This error may have been masked previously because shortstr was allowed to contain any binary data. I’m working on cleaning up the bytes/utf-8 behaviour and caught this as part of that.

diff --git a/amqp/channel.py b/amqp/channel.py
index 7d45fe4..dd7ff14 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -61,7 +61,7 @@ class Channel(AbstractChannel):

     """
     _METHODS = set([
-        spec.method(spec.Channel.Close, 'ssBB'),
+        spec.method(spec.Channel.Close, 'BsBB'),
         spec.method(spec.Channel.CloseOk),
         spec.method(spec.Channel.Flow, 'b'),
         spec.method(spec.Channel.FlowOk, 'b'),

After applying this patch, another error surfaced: _on_close in amqp/channel.py calls self._do_revive() which fails because ValueError: generator already executing (in this case, the channel was closed while we were waiting for the response to an exchange declare)

AttributeError in __dell__

I am seeing AttributeError exceptions being raised in my logs and I believe the culprit is amqp/transport._AbstractTransport.__del__.

As stated in this SO post (http://stackoverflow.com/questions/18058730/python-attributeerror-on-del), the __del__ function may have been called after modules have started to be unloaded and set to None. In the __del__ function for _AbstractTransport, you have to check the socket module before grabbing a property from it. Below is a proposed fix to the __del__ method:

  def __del__(self):
        try:
            self.close()
        except e:
            if socket and isinstance(e, socket.error):
               pass
            else:
               raise
        finally:
            self.sock = None

Similarly, you need to check socket in the close method. Here is a proposed fix:

  def close(self):
        if socket and self.sock is not None:
            self._shutdown_transport()
            # Call shutdown first to make sure that pending messages
            # reach the AMQP broker if the program exits after
            # calling this method.
            self.sock.shutdown(socket.SHUT_RDWR)
            self.sock.close()
            self.sock = None
        self.connected = False

Unable to use "remote control" after broker loss if pyamqp:// is used

I'm using ``celery.control´´ for remote control of other workers.

I'm using
celery 3.0.23
kombu 2.5.14
amqp 1.0.13
[librabbitmq 1.0.1]

celery.control.ping()
[2013-09-16 15:40:32,984: WARNING/MainProcess] [{u'localhost': u'pong'}]

Now, when I shutdown RabbitMQ, remote control obviously doesn't work anymore:

celery.control.ping()
Traceback (most recent call last):
File "", line 1, in
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 145, in ping
timeout=timeout, **kwargs)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 269, in broadcast
limit, callback, channel=channel,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 268, in _broadcast
timeout=timeout)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 237, in _publish
maybe_declare(self.reply_queue(channel))
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 90, in maybe_declare
return _maybe_declare(entity)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 99, in _maybe_declare
entity.declare()
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 492, in declare
self.exchange.declare(nowait)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 162, in declare
nowait=nowait, passive=passive,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/channel.py", line 603, in exchange_declare
(40, 11), # Channel.exchange_declare_ok
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/abstract_channel.py", line 69, in wait
self.channel_id, allowed_methods)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/connection.py", line 204, in _wait_method
self.method_reader.read_method()
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/method_framing.py", line 192, in read_method
raise m
IOError: Socket closed

Unfortunately, when I restart the RMQ broker now, the celery remote instance doesn't recover anymore:

celery.control.ping()
Traceback (most recent call last):
File "", line 1, in
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 145, in ping
timeout=timeout, *_kwargs)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 269, in broadcast
limit, callback, channel=channel,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 268, in _broadcast
timeout=timeout)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 237, in _publish
maybe_declare(self.reply_queue(channel))
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 90, in maybe_declare
return _maybe_declare(entity)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 99, in _maybe_declare
entity.declare()
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 492, in declare
self.exchange.declare(nowait)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 162, in declare
nowait=nowait, passive=passive,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/channel.py", line 595, in exchange_declare
self._send_method((40, 10), args)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/abstract_channel.py", line 58, in _send_method
self.channel_id, method_sig, args, content,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/method_framing.py", line 224, in write_method
write_frame(1, channel, payload)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/transport.py", line 160, in write_frame
pack('>BHI%dsB' % size, frame_type, channel, size, payload, 0xce),
File "/usr/local/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(_args)
socket.error: [Errno 32] Broken pipe

This does NOT happen if I don't use the pure-python AMQP transport but with an installed librabbitmq (and amqp:// broker URL setup):

celery.control.ping()

Traceback (most recent call last):
File "", line 1, in
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 145, in ping
timeout=timeout, **kwargs)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 269, in broadcast
limit, callback, channel=channel,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 268, in _broadcast
timeout=timeout)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 237, in _publish
maybe_declare(self.reply_queue(channel))
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 90, in maybe_declare
return _maybe_declare(entity)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 99, in _maybe_declare
entity.declare()
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 492, in declare
self.exchange.declare(nowait)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 162, in declare
nowait=nowait, passive=passive,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/librabbitmq/init.py", line 119, in exchange_declare
exchange, type, passive, durable, auto_delete, arguments or {})
_librabbitmq.ConnectionError: exchange.declare: connection closed unexpectedly

After RMQ restart:

celery.control.ping()
[2013-09-16 16:17:22,875: WARNING/MainProcess] [{u'localhost': u'pong'}]

Getting OSError: Socket closed against new RabbitMQ server 3.6.2 (on 2.0.3)

About 60 seconds after starting a channel.basic_consume followed by a channel.wait(), the connection closes with a Socket Closed error, as here:

Traceback (most recent call last):
File "./rmqtools.py", line 437, in run_self_test_loop
qh.channel[handle].wait(None)
File "/usr/local/lib/python3.4/site-packages/amqp/abstract_channel.py", line 91, in wait
self.connection.drain_events(timeout=timeout)
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 436, in drain_events
return self.blocking_read(timeout)
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 440, in blocking_read
frame = self.transport.read_frame()
File "/usr/local/lib/python3.4/site-packages/amqp/transport.py", line 221, in read_frame
frame_header = read(7, True)
File "/usr/local/lib/python3.4/site-packages/amqp/transport.py", line 369, in _read
raise IOError('Socket closed')
OSError: Socket closed

On the server, the message logged is:
=INFO REPORT==== 12-Jul-2016::19:09:02 ===
accepting AMQP connection <0.9396.19> (172....1:19638 -> 172....5:5672)

=WARNING REPORT==== 12-Jul-2016::19:10:01 ===
closing AMQP connection <0.9396.19> (172...1:19638 -> 172....5:5672):
client unexpectedly closed TCP connection

Here's something that may be relevant: https://www.rabbitmq.com/heartbeats.html. However, the connection shows a negotiated heartbeat (Connection.heartbeat) of 0, and, when changing it to, say, 5 during connection creation, the same above error happens after 3x the chosen heartbeat (e.g. 15 sec.).

Relatedly, the channel.wait() method is no longer documented in the web docs. Is there a different method that should be called after registering a callback with basic_callback?

A colleague reports this on the 1.4.6 client too.

Thanks!

UnexpectedFrame exceptions when reading from queue

We are seeing a bunch of these exceptions popping up when reading from a kombu SimpleQueue, under Python 3.4, with amqp 1.4.6 and kombu 3.0.26:

Traceback (most recent call last):
  [trimmed]
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/allmylespy_common/rpc.py", line 156, in _get_reply
    reply = self._our_queue.get(block=True, timeout=1)
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/kombu/simple.py", line 53, in get
    timeout=timeout and remaining)
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/kombu/connection.py", line 275, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/kombu/transport/pyamqp.py", line 95, in drain_events
    return connection.drain_events(**kwargs)
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/connection.py", line 302, in drain_events
    chanmap, None, timeout=timeout,
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/connection.py", line 365, in _wait_multiple
    channel, method_sig, args, content = read_timeout(timeout)
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/connection.py", line 336, in read_timeout
    return self.method_reader.read_method()
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/method_framing.py", line 189, in read_method
    raise m
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/method_framing.py", line 107, in _next_method
    frame_type, channel, payload = read_frame()
  File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/transport.py", line 175, in read_frame
    'Received 0x{0:02x} while expecting 0xce'.format(ch))
amqp.exceptions.UnexpectedFrame: Received 0x00 while expecting 0xce

I'd say between 0.01% and 0.5% of messages are affected. The error message varies, a few seconds earlier we had Received 0x5e instead. I see that 1.4.6 doesn't list Python 3.4 in its classifiers. Is it possibly incompatible? Are there any alternatives for Python 3?

Setup.py PyPi licensing metadata and LICENSE file are not aligned

Hi folks-
I'm tasked with going through the licensing on dependencies on a project, and I've noticed that the licensing trove classifiers indicate both BSD and LGPL, but the LICENSE file only contains LGPL. Can this be clarified (either by removing the line from the setup.py or modifying/extending the LICENSE file)?

Cannot write frames or deserialize on Py3.4

On Py3.4, it is necessary to apply the following patch in order to write frames or deserialize data:

diff --git a/amqp/method_framing.py b/amqp/method_framing.py
index b9ffe91..64e9cd4 100644
--- a/amqp/method_framing.py
+++ b/amqp/method_framing.py
@@ -116,7 +116,7 @@ def frame_writer(connection, transport,

         if no_pybuf or bigbody:
             # ## SLOW: string copy and write for every frame
-            frame = (''.join([pack('>HH', *method_sig), args])
+            frame = (b''.join([pack('>HH', *method_sig), args])
                      if type_ == 1 else '')  # encode method frame
             framelen = len(frame)
             write(pack('>BHI%dsB' % framelen,
diff --git a/amqp/serialization.py b/amqp/serialization.py
index d74515f..81abf3d 100644
--- a/amqp/serialization.py
+++ b/amqp/serialization.py
@@ -52,7 +52,7 @@ ILLEGAL_TABLE_TYPE = """\


 def _read_item(buf, offset=0, unpack_from=unpack_from):
-    ftype = buf[offset]
+    ftype = chr(buf[offset])
     offset += 1

     # 'S': long string

In method_framing.py, ''.join() cannot be applied on a list of bytes, and in serialization.py, buf[offset] is an int but is compared to str a few lines below.

Note this patch will probably break Py2 so should not be applied verbatim, it's just to demonstrate the issue.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.