celery / py-amqp Goto Github PK
View Code? Open in Web Editor NEWamqplib fork
License: Other
amqplib fork
License: Other
[root@testzlin amqp-1.0.9]# /opt/python3/bin/python3 setup.py install
...
Finished processing dependencies for amqp==1.0.9
[root@testzlin amqp-1.0.9]# /opt/python3/bin/python3
Python 3.2.2 (default, Jan 25 2012, 13:16:54)
[GCC 3.4.6 20060404 (Red Hat 3.4.6-3)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
import amqp
Traceback (most recent call last):
File "", line 1, in
File "amqp/init.py", line 32, in
from .basic_message import Message
File "amqp/basic_message.py", line 19, in
from .serialization import GenericContent
File "amqp/serialization.py", line 261
if n < 0 or n >= 4294967296L:
^
SyntaxError: invalid syntax
>>> c = amqp.connection.Connection( ssl={} )
>>> c.connect()
>>> c.transport
<amqp.transport.TCPTransport object at 0x7f8f4a7ad650>
Passing an empty dictionary causes this line to pick a TCPTransport for you: https://github.com/celery/py-amqp/blob/master/amqp/transport.py#L395
>>> import amqp
>>> c = amqp.connection.Connection( ssl=True )
>>> c.connect()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "amqp/connection.py", line 272, in connect
self.transport.connect()
File "amqp/transport.py", line 102, in connect
self.socket_settings, self.read_timeout, self.write_timeout,
File "amqp/transport.py", line 176, in _init_socket
self._setup_transport()
File "amqp/transport.py", line 286, in _setup_transport
self.sock = self._wrap_socket(self.sock, **self.sslopts or {})
AttributeError: 'SSLTransport' object has no attribute 'sslopts'
Passing True
causes this exception to be raised since this line never executes: https://github.com/celery/py-amqp/blob/master/amqp/transport.py#L279
It seems like the user is forced to specify some arguments to ssl, leaving no way to specify that we want all defaults.
I'm trying to diagnose some really annoying issues with a connection getting wedged and stopping responding, and from what I can tell, the internal AMQP ssl socket is put into blocking mode with no timeout, and left that way.
This is (I think) leading to a context where if the remote end goes away while in a blocking read (I think), my code blocks forever and can never recover.
Looking in transport.py
, the passed socket is intialized with the desired timeout in the connect call, but the connect()
call is immediately followed by a call to _init_socket()
which then sets the socket back in blocking mode, which is then never changed.
There is the later call to self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDTIMEO, write_timeout)
, and SO_SNDTIMEO
, but unless this is causing the python socket management class to get out of sync with the actual socket configuration, it doesn't have a set timeout (calling self.connection.transport.sock.gettimeout()
on an open connection returns None
, and the source behind that call is hidden in a C file somewhere).
Ok, this might be what's happening, it looks like gettimeout()
just returns the contents of a struct member from the python socket class instance. It is worth noting that unless that timeout value is set, some of the internal behaviour of the socket class will change (search for sock_timeout
in that file), so setting the timeout to non-none may be required to get all the expected socket behaviour.
Anyways, I'm specifying a value for SO_SNDTIMEO
and SO_RCVTIMEO
and still somehow getting wedged, so I'm a little desperate at this point.
In any event, is there any reason that this even needs to be done? It seems something like
self.sock.settimeout(
max(read_timeout, write_timeout) if read_timeout and write_timeout else None
) # set socket back to blocking mode
would preserve the timeout overall if the end-user sets them both, while leaving it the way it is if the user doesn't specify the timeouts fully.
My celery jobs started throwing this error after upgrading from amqp 1.4.9 to 2.0.4
[2016-08-10 14:57:54,695: ERROR/MainProcess] Unrecoverable error: ValueError('generator already executing',)
Traceback (most recent call last):
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
self.blueprint.start(self)
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
step.start(parent)
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start
return self.obj.start()
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 279, in start
blueprint.start(self)
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
step.start(parent)
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 838, in start
c.loop(*c.loop_args())
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/loops.py", line 101, in synloop
qos.update()
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/kombu/common.py", line 400, in update
return self.set(self.value)
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/kombu/common.py", line 393, in set
self.callback(prefetch_count=new_value)
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 615, in set_prefetch_count
apply_global=qos_global,
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/kombu/messaging.py", line 533, in qos
apply_global)
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/amqp/channel.py", line 1851, in basic_qos
wait=spec.Basic.QosOk,
File "/home/accuranker/.virtualenvs/grank/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 62, in send_method
conn.frame_writer.send((1, self.channel_id, sig, args, content))
ValueError: generator already executing
(not sure if this belongs to kombu or here)
I'm trying to understand how kombu/amqplib is supposed to behave when the TCP connection goes away silently and why there is no mechanism to define read/write timeouts (similar to the ones in libmemcached, for example http://docs.libmemcached.org/memcached_behavior.html#MEMCACHED_BEHAVIOR_SND_TIMEOUT).
To simulate this I'm running celery with RabbitMQ, start a celery worker and a task submitter and then SIGSTOP the RabbitMQ server. The celery consumer never notices anything is wrong, the task submitter submits tasks until the TCP buffer is full (which takes quite a while) and then fails with an error, all the published tasks are lost.
How can I detect this earlier?
Currently, we have version 1.2.0 installed, but on pypi.python.org it says the latest version is 1.0.8. Should we go to 1.0.8?
With heartbeats enabled, the application crashes after a short time with the below stacktrace. I am not sure yet if this issue is with py-amqp, kombu or celery but am recording it here in the meantime.
Traceback (most recent call last):
File "/home/.../py-amqp/amqp/connection.py", line 561, in send_heartbeat
self._frame_writer.send((8, 0, None, None, None))
StopIteration
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/.../celery/celery/worker/consumer.py", line 278, in start
blueprint.start(self)
File "/home/.../celery/celery/bootsteps.py", line 123, in start
step.start(parent)
File "/home/.../celery/celery/worker/consumer.py", line 851, in start
c.loop(*c.loop_args())
File "/home/.../celery/celery/worker/loops.py", line 74, in asynloop
next(loop)
File "/home/.../kombu/kombu/async/hub.py", line 278, in create_loop
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
File "/home/.../kombu/kombu/async/hub.py", line 134, in fire_timers
entry()
File "/home/.../kombu/kombu/async/timer.py", line 63, in __call__
return self.fun(*self.args, **self.kwargs)
File "/home/.../kombu/kombu/async/timer.py", line 131, in _reschedules
return fun(*args, **kwargs)
File "/home/.../kombu/kombu/connection.py", line 264, in heartbeat_check
return self.transport.heartbeat_check(self.connection, rate=rate)
File "/home/.../kombu/kombu/transport/pyamqp.py", line 137, in heartbeat_check
return connection.heartbeat_tick(rate=rate)
File "/home/.../py-amqp/amqp/connection.py", line 586, in heartbeat_tick
self.send_heartbeat()
File "/home/.../py-amqp/amqp/connection.py", line 563, in send_heartbeat
raise RecoverableConnectionError('connection already closed')
amqp.exceptions.RecoverableConnectionError: connection already closed
RabbitMQ logs:
=WARNING REPORT==== 21-Dec-2014::14:01:51 ===
Non-AMQP exit reason '{exit,
{amqp_error,frame_error,
"cannot decode <<0,0,0,0,0,0>>",
'connection.close'},
'connection.close',
[{rabbit_misc,protocol_error,1,[]},
{rabbit_reader,handle_method0,3,[]},
{rabbit_reader,handle_input,3,[]},
{rabbit_reader,recvloop,4,[]},
{rabbit_reader,run,1,[]},
{rabbit_reader,start_connection,5,[]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,237}]}]}'
The issue only occurs when heartbeats are enabled.
in amqplib, channel was an attribute of message. The way py-amqp does it, you need to curry your callback function with partial to get the same effect. No matter what, you need to be able to acknowledge or not, the message in the call back. Requiring functools.parital seems like extra work for the user.
It definitely caught me off guard when I moved from py-amqplib to py-amqp. In your demo code for receiving messages, is a poster child.
When running a code using py-amqp/master on Python >= 3, a TypeError
exception is raised from method_framing.py
while trying to handle frame args during the connection teardown. Issue is systematic. For example, when running the kombu hello publisher example using Python 3.4 and the latest RabbitMQ, I end up like this:
Sent: helloword, sent at 2016-05-23 16:20:11.172310
Traceback (most recent call last):
File "kombutest.py", line 9, in <module>
simple_queue.close()
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/connection.py", line 739, in __exit__
self.release()
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/connection.py", line 352, in release
self._close()
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/connection.py", line 318, in _close
self._do_close_self()
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/connection.py", line 311, in _do_close_self
self.transport.close_connection(self._connection)
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/kombu/transport/pyamqp.py", line 134, in close_connection
connection.close()
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 463, in close
wait=spec.Connection.CloseOk,
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/abstract_channel.py", line 71, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/abstract_channel.py", line 91, in wait
self.connection.drain_events(timeout=timeout)
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 388, in drain_events
return self.blocking_read(timeout)
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 393, in blocking_read
return self.on_inbound_frame(frame)
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/method_framing.py", line 63, in on_frame
callback(channel, method_sig, buf, None)
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 397, in on_inbound_method
method_sig, payload, content,
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/abstract_channel.py", line 140, in dispatch_method
listener(*args)
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 520, in _on_close
self._x_close_ok()
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/connection.py", line 537, in _x_close_ok
self.send_method(spec.Connection.CloseOk, callback=self._on_close_ok)
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/abstract_channel.py", line 62, in send_method
conn._frame_writer.send((1, self.channel_id, sig, args, content))
File "/home/fficarelli/python-3.4-virtualenv-01/lib/python3.4/site-packages/amqp/method_framing.py", line 142, in frame_writer
if type_ == 1 else b'')
TypeError: sequence item 1: expected a bytes-like object, str found
Due to the kind of exception raised and the fact that under PY2 everything seems to work fine, the issue is probably related to the fact that in PY3 strings are bytes no more.
Hello, I would like to use a rabbitmq plugin for reverse topic exchange (x-rtopic) in order to define a set of machines I could run send a message to. So could be defined on the message routing_key a '.' or 'a.#' pattern and each node had it's own routing key.
I've tested the plugin with kombu and pika using simple exemples found in the net. And both worked fine. But when I'try to send a Celery task it does not work, the Consumer connects fine, but when the Producer loads my task, create automaticaly a (direct) Exchage ignoring the configuration I've setted in the config file. Even setting the CELERY_DEFAULT_EXCHANGE_TYPE gets ignored. Anyway, then I forced celery to set create a exchange (x-rtopic) type and than I get this error below:
ConnectionError: 503: (COMMAND_INVALID - unknown exchange type 'x-rtopic', (40, 10), None)
Something in the Channel part seems to now allow this.
There is a easy (or not) way to achieve this support ?
Thanks
Andre
Nitpicking, I know, but could you add a PGP signature for the source-tarball on PyPi for the 1.4.6 upload? The build-process for the Debian package relies on it being there.
I just spent hours pulling my teeth on CentOS 7.2 before figuring out that I need 9167f65.
Could you tag a new release to make it easier for me to package a new rpm.
Thanks
Since the recent (ca. Jan 10 or so) heartbeat changes, the following occurs often (maybe 75% of the time) when attempting to publish using Kombu and py-amqp:
Traceback (most recent call last):
File "/home/...", line 430, in ...
publish('...', {'ack': ..., 'result': result}, routing_key=reply_to)
File "/home/...", line 109, in publish
maybe_declare(exchange, producer.channel)
File "/home/.../kombu/kombu/common.py", line 91, in maybe_declare
return _maybe_declare(entity)
File "/home/.../kombu/kombu/common.py", line 106, in _maybe_declare
entity.declare()
File "/home/.../kombu/kombu/entity.py", line 166, in declare
nowait=nowait, passive=passive,
File "/home/.../py-amqp/amqp/channel.py", line 613, in exchange_declare
self._send_method((40, 10), args)
File "/home/.../py-amqp/amqp/abstract_channel.py", line 56, in _send_method
self.channel_id, method_sig, args, content,
File "/home/.../py-amqp/amqp/method_framing.py", line 221, in write_method
write_frame(1, channel, payload)
File "/home/.../py-amqp/amqp/transport.py", line 163, in write_frame
frame_type, channel, size, payload, 0xce,
ConnectionResetError: [Errno 104] Connection reset by peer
RabbitMQ logs this:
=ERROR REPORT==== 21-Jan-2014::07:30:53 ===
closing AMQP connection <0.29065.28> ([2401:F000:10:0:xxx]:39688 -> [2401:F000:10:0:xxx]:5672):
{heartbeat_timeout,running}
This publish is occurring from within a celery task body, if that makes any difference.
Probably due to the fact Bitdeli doesn't exist anymore or it's just offline at the moment
Currently py-amqp works only with AMQP 0-9 brokers such as RabbitMQ. Another popular broker Qpid implements AMQP 0-10 and just silently disconnects py-amqp due protocol version mismatch.
Hi there,
the current release on PyPI is 1.0.13 whereas ampg/init.py has VERSION = (1, 2, 0) which ends up on the tarball (via setup.py). This is mildly confusing and a bit of a mess. Could you please clarify which is supposed to be the real version number?
To give some background, openSUSE ships python-amqp-1.2.0 since quite a while now since that has been the most current version in the past. We'd either have to downgrade now (which is really inconvenient for a lot of reasons) or convince you to bump the version number to 1.2.1.
While long-running message handlers (amqp_method) run, inbound frames need to continue to be processed in the background so that heartbeat frames (frame_type == 8) can continue to increment the bytes_recd attribute of the MethodReader and update the condition checks in connection.heartbeat_tick.
http://stackoverflow.com/questions/14817181/django-celery-connectionerror-too-many-heartbeats-missed
I'm trying to implement a channel multiplexing over connection with one listening thread which just call the drain_events
in while loop. Other threads creates it's own channels and use them.
But some channel actions in the threads (e.g channel opening or publishing confirmation) are stucks on the channel.wait()
method, because drain_events()
steals a response from the broker and does not dispatch the frame to waiting promise.
As I understand the problem is the race condition between adding the new pending task in the channel.wait()
method and calls drain_events
in the listening thread. Take a look on the next case:
(1) Thread A: run drain_events() in while loop
(2) Thread B: send(Channel.Open)
(3) Thread A: receive OpenOk and trying to dispatch the frame. But in this time penging list of the channel is empty
(4) Thread B: append the new promise to pending list and goto infinity loop:
while not p.ready:
self.connection.drain_events(timeout=timeout)
So, I think the problem can be solved if a promise will be added in the list before calling of _frame_writer.send()
method.
Not sure if amqp and Azure Service Bus should cooperate, but I receive a MemoryError (originating from transport._AbstractTransport.read_frame). The payload size unpacked from he frame header seems incorrect.
I attempt to establish the connection like so:
connection = amqp.connection.Connection(
host='MYSERVICENAMESPACE.servicebus.windows.net',
userid='owner',
password='MYPASSWORD')
The documentation says:
Differences from amqplib
- Support for timeouts
Look at the code in the file "transport.py":
class _AbstractTransport(object):
def __init__(self, host, connect_timeout):
...
try:
self.sock = socket.socket(af, socktype, proto)
self.sock.settimeout(connect_timeout)
self.sock.connect(sa)
except socket.error, msg:
...
self.sock.settimeout(None) # <==== WTF?
I use kombu + RabbitMQ cluster behind AWS Elastic Load Balancer. Sometimes my code freezes in the method _TCPTransport.write(...) somewhere about 900 seconds (probably in this to blame the ELB) and then it throw exception socket.timeout.
Explain to me why you disable the timeout for the socket?
PS: sorry for my English
Hey,
I am trying to change from using pika to py-amqp on my async rpc client for apache
The problem I am running into is that wait() is blocking. Which means that I cannot run it on a separate thread with a lock.
Receiving RPC requests using it's own thread.
while self.channel.callbacks:
with self.internal_lock:
self.channel.wait() # waits here endlessly
Sending RPC requests.
with self.internal_lock:
self.channel.basic_publish(exchange=self.exchange,
routing_key=routing_key,
msg=message)
Since wait() keeps the lock until it receives a response, it keeps locking, so that I can never send any requests.
I was wondering if there is a function within amqp that allows me to check for responses, without blocking the application, such as pikas process_data_events.
We are getting errors of 'Error in timer: ConnectionForced(None, 'Too many heartbeats missed', None, '')' every now and then while using the pyamqp transport. This has happened on both py-amqp 0.9.3 and 1.2.0.
I don't think there's anything wrong on the underlying rabbitmq servers, and I also don't think there's a network problem going on (no other services have any problems talking to them).
We don't get a ton of them (I would say maybe 1 or 2 per day), but it's worrisome nonetheless. Do you have any first steps on how to get to the bottom of this? Our BROKER_HEARTBEAT is set to 10, the default, should we just up that time?
Thanks,
Andy
Visiting https://amqp.readthedocs.io/ redirects to the latest
version tag of the docs.
However these don't mention v2.x on the changelog, instead only <= 1.4.9:
https://amqp.readthedocs.io/en/latest/changelog.html
(I've even tried forcing a RTD rebuild using https://readthedocs.org/projects/amqp/builds/)
Strangely the stable
version does have those changelog entries:
https://amqp.readthedocs.io/en/stable/changelog.html
(Though as #94 notes, the banner there about it being for a development version is incorrect)
It seems as though latest
and master
on readthedocs could be combined, and the default version tag be made to be stable
?
Also the readme mentions both 0.9.1 and 2.0.0rc2, rather than 2.0.1:
https://github.com/celery/py-amqp/blame/b12272c54a88ad428a7eaf837280f136b7252e08/README.rst#L2
https://github.com/celery/py-amqp/blame/b12272c54a88ad428a7eaf837280f136b7252e08/README.rst#L7
Many thanks :-)
In [1]: import amqp
In [2]: c = amqp.connection.Connection()
In [3]: c.connect()
FRAME OFFSET + SIZE: 468
OFFSET: 0 SIZE: 469 DATA: 469
FRAME OFFSET + SIZE: 19
OFFSET: 469 SIZE: 20 DATA: 20
FRAME OFFSET + SIZE: 12
OFFSET: 489 SIZE: 13 DATA: 13
In [4]: c.channel()
FRAME OFFSET + SIZE: 15
OFFSET: 502 SIZE: 16 DATA: 16
Out[4]: <amqp.channel.Channel at 0x2e62b50>
In [5]: c.close()
FRAME OFFSET + SIZE: 32
OFFSET: 518 SIZE: 33 DATA: 33
---------------------------------------------------------------------------
InternalError Traceback (most recent call last)
<ipython-input-5-b082af813204> in <module>()
----> 1 c.close()
/home/dmitry/projects/py-amqp/amqp/connection.pyc in close(self, reply_code, reply_text, method_sig, argsig)
547 spec.Connection.Close, argsig,
548 (reply_code, reply_text, method_sig[0], method_sig[1]),
--> 549 wait=spec.Connection.CloseOk,
550 )
551
/home/dmitry/projects/py-amqp/amqp/abstract_channel.pyc in send_method(self, sig, format, args, content, wait, on_sent, callback)
62 # TODO temp: callback should be after write_method ... ;)
63 if wait:
---> 64 return self.wait(wait, callback)
65 return on_sent
66
/home/dmitry/projects/py-amqp/amqp/abstract_channel.pyc in wait(self, method, callback, returns_tuple, filter)
79 try:
80 while not p.ready:
---> 81 self.connection.drain_events()
82
83 if p.value:
/home/dmitry/projects/py-amqp/amqp/connection.pyc in drain_events(self, timeout)
363
364 def drain_events(self, timeout=None):
--> 365 return self.blocking_read(timeout)
366
367 def on_readable(self, _unavail=_UNAVAIL):
/home/dmitry/projects/py-amqp/amqp/connection.pyc in blocking_read(self, timeout)
446 self._read_frame(None)
447 while self._inbound:
--> 448 self.on_inbound_frame(self._inbound.popleft())
449 self._call_pending_blocking_callbacks()
450 return
/home/dmitry/projects/py-amqp/amqp/method_framing.pyc in frame_handler(connection, callback, unpack_from, content_methods)
72 expected_types[channel] = 2
73 else:
---> 74 callback(channel, method_sig, buf, None)
75
76 elif frame_type == 2:
/home/dmitry/projects/py-amqp/amqp/connection.pyc in on_inbound_method(self, channel_id, method_sig, payload, content)
477 def on_inbound_method(self, channel_id, method_sig, payload, content):
478 return self.channels[channel_id].dispatch_method(
--> 479 method_sig, payload, content,
480 )
481
/home/dmitry/projects/py-amqp/amqp/abstract_channel.pyc in dispatch_method(self, method_sig, payload, content)
131 for listener in listeners:
132 # print('CALLING LISTENER: %r' % (listener, ))
--> 133 listener(*args)
134
135 #: Placeholder, the concrete implementations will have to
/home/dmitry/projects/py-amqp/amqp/connection.pyc in _on_close(self, reply_code, reply_text, class_id, method_id)
606 self._x_close_ok()
607 raise error_for_code(reply_code, reply_text,
--> 608 (class_id, method_id), ConnectionError)
609
610 def _on_blocked(self):
InternalError: (0, 0): (541) INTERNAL_ERROR
Therefore, in logs:
=INFO REPORT==== 5-Jul-2014::21:36:34 ===
accepting AMQP connection <0.494.0> (127.0.0.1:33018 -> 127.0.0.1:5672)
=ERROR REPORT==== 5-Jul-2014::21:36:58 ===
AMQP connection <0.494.0> (running), channel 0 - error:
{exit,{amqp_error,frame_error,"cannot decode <<0,0,0,0,0,0>>",
'connection.close'},
'connection.close',
[{rabbit_misc,protocol_error,1,[]},
{rabbit_reader,handle_method0,3,[]},
{rabbit_reader,handle_input,3,[]},
{rabbit_reader,recvloop,4,[]},
{rabbit_reader,run,1,[]},
{rabbit_reader,start_connection,5,[]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]}
=WARNING REPORT==== 5-Jul-2014::21:36:58 ===
Non-AMQP exit reason '{exit,
{amqp_error,frame_error,
"cannot decode <<0,0,0,0,0,0>>",
'connection.close'},
'connection.close',
[{rabbit_misc,protocol_error,1,[]},
{rabbit_reader,handle_method0,3,[]},
{rabbit_reader,handle_input,3,[]},
{rabbit_reader,recvloop,4,[]},
{rabbit_reader,run,1,[]},
{rabbit_reader,start_connection,5,[]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,239}]}]}'
=INFO REPORT==== 5-Jul-2014::21:36:58 ===
closing AMQP connection <0.494.0> (127.0.0.1:33018 -> 127.0.0.1:5672)
Hey,
I tried out the library and I can't get basic_publish_confirm
to return anything but None.
I looked at the source code and it at least indicates that it should be returning something.
For example basic_publish_confirm
has ret = self._basic_publish(*args, **kwargs)
, but _basic_publish
does not actually return anything.
Traceback (most recent call last):
File "/home/jenkins/workspace/gate-smaug-python34/smaug/tests/base.py", line 92, in setUp
self.messaging_conf = messaging_conffixture.ConfFixture(CONF)
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/oslo_messaging/conffixture.py", line 50, in init
'oslo_messaging_rabbit')
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/oslo_messaging/conffixture.py", line 25, in _import_opts
import(module)
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/oslo_messaging/_drivers/impl_rabbit.py", line 30, in
import kombu.connection
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/kombu/connection.py", line 19, in
from kombu import exceptions
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/kombu/exceptions.py", line 12, in
from amqp import ChannelError, ConnectionError, ResourceError
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/init.py", line 39, in
from .basic_message import Message # noqa
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/basic_message.py", line 29, in
from .serialization import GenericContent
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/serialization.py", line 34, in
from .exceptions import FrameSyntaxError
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/exceptions.py", line 21, in
from .five import python_2_unicode_compatible
File "/home/jenkins/workspace/gate-smaug-python34/.tox/py34/lib/python3.4/site-packages/amqp/five.py", line 14, in
import vine.five
ImportError: No module named 'vine'
I've been waiting for heartbeat support to make it into celery/kombu/amqplib, so I decided to take it for a spin with the newest amqp.
When I don't specify 'BROKER_HEARTBEAT', everything looks good, and celery is able to connect and process messages just fine.
When I specify any value for 'BROKER_HEARTBEAT', I get this error:
2012-08-23 00:11:23,436 cronus.int.sfo01.mml WARNING stdout: celery@cronus has started.
2012-08-23 00:11:23,440 cronus.int.sfo01.mml DEBUG amqp: Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL. See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2011 VMware, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'consumer_cancel_notify': True, u'publisher_confirms': True, u'basic.nack': True}, u'platform': u'Erlang/OTP', u'version': u'2.7.1'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']
2012-08-23 00:11:23,441 cronus.int.sfo01.mml ERROR celery.worker: Unrecoverable error: ValueError('Octet out of range 0..65535',)
Traceback (most recent call last):
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 353, in start
component.start()
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 368, in start
self.reset_connection()
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 703, in reset_connection
self.connection = self._open_connection()
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 766, in _open_connection
callback=self.maybe_shutdown)
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/connection.py", line 266, in ensure_connection
interval_start, interval_step, interval_max, callback)
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/utils/__init__.py", line 159, in retry_over_time
return fun(*args, **kwargs)
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/connection.py", line 156, in connect
return self.connection
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/connection.py", line 610, in connection
self._connection = self._establish_connection()
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/connection.py", line 569, in _establish_connection
conn = self.transport.establish_connection()
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/kombu/transport/pyamqp.py", line 102, in establish_connection
heartbeat=conninfo.heartbeat)
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/connection.py", line 157, in __init__
(10, 30), # tune
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 70, in wait
return self.dispatch_method(method_sig, args, content)
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 87, in dispatch_method
return amqp_method(self, args)
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/connection.py", line 808, in _tune
self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat)
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/connection.py", line 890, in _x_tune_ok
args.write_short(heartbeat or 0)
File "/home/andrew/.env/tapp-deps/local/lib/python2.7/site-packages/amqp/serialization.py", line 231, in write_short
raise ValueError('Octet out of range 0..65535')
ValueError: Octet out of range 0..65535
Any idea?
NOTE: I am using development versions of celery, kombu, py-amqp
I'm following the steps on the documentation to setup SSL for my celery worker (I have a working connection with RabbitMQ broker following these steps using SSL connection: http://www.rabbitmq.com/troubleshooting-ssl.html#logs). I have very minimal configuration, and I am running into this error when I try to fire up my worker:
struct.error: required argument is not an integer
at
https://github.com/celery/py-amqp/blob/master/amqp/transport.py#L122
Using pdb
, I found error is coming from the interval
variable:
> /usr/local/poapi_worker/venv/src/py-amqp/amqp/transport.py(122)__init__()
-> struct.pack('ll', interval, 0),
(Pdb) print(interval)
{'keyfile': '/etc/ssl/key.pem', 'certfile': '/etc/ssl/cert.pem', 'ca_certs': '/etc/ssl/cacert.pem', 'cert_reqs': 2}
(Pdb) print(write_timeout)
None
(Pdb) print(read_timeout)
{'keyfile': '/etc/ssl/key.pem', 'certfile': '/etc/ssl/cert.pem', 'ca_certs': '/etc/ssl/cacert.pem', 'cert_reqs': 2}
It looks like the entire BROKER_USE_SSL
from celeryconfig.py
is being assigned to read_timeout
. I'm not sure how/why that is happening, the documentation is pretty clear in how that dictionary should look: http://docs.celeryproject.org/en/latest/configuration.html#broker-use-ssl. Is this a known error in latest py-amqp
?
Fails with
[2013-02-11 10:10:57,014: ERROR/MainProcess] Unrecoverable error: ValueError('Unknown table item type: 65',)
According to http://www.rabbitmq.com/amqp-0-9-1-errata.html
65 => 'A' is array
Here is the message returned by curl
$ curl -v -XPOST -H "Content-type: application/json" -d '{"count":1,"requeue":true,"encoding":"auto"}' 'http://guest:guest@localhost:15672/api/queues/vhost/deadletterqueue/get'
[{"payload_bytes":165,"redelivered":true,"exchange":"deadletter","routing_key":"celery","message_count":0,"properties":{"delivery_mode":1,"headers":{"x-death":[{"reason":"rejected","queue":"celery","time":1360606069,"exchange":"","routing-keys":["celery"]}]}},"payload":"{\"id\": \"4cc7438e-afd4-4f8f-a2f3-f46567e7ca77\",\r\n \"task\": \"celery.task.PingTask\",\r\n \"args\": [],\r\n \"kwargs\": {},\r\n \"retries\": 0,\r\n \"eta\": \"2009-11-17T12:30:56.527191\"}","payload_encoding":"string"}]
It'd be nice to be able to put the channel in a mode where it will block (with a timeout) until a message is confirmed. TX makes the broker die in a fire at even modest message rates, but publisher confirms seems to work fine in benchmarks.
While I'was looking at Changelog I've found a typo on the release-date of version 1.1.0, shows :release-date: 2012-11-08 10:36 P.M UTC
but I guess that should be in year 2013 (after 1.0.13
release).
https://github.com/celery/py-amqp/blob/master/Changelog#L235
I'm running ubuntu linux 13.04.
In a little test app in python3 I have an error with such traceback:
https://gist.github.com/mkutsevol/5555372
While fixing the breakage from upgrading to the latest version of celery, I encountered this error. Since method_sig is a tuple, on line 84 of abstract_channel.py,
raise AMQPError('Unknown AMQP method %r' % method_sig)
will attempt to treat method_sig as the arguments for the string formatting operator, which leads to this exception:
TypeError: not all arguments converted during string formatting
The easy fix for me was to change line 84 to:
raise AMQPError('Unknown AMQP method %r' % str(method_sig))
I think _basic_ack_recv wants to use the callbacks in self.events and not the non-existant self.handlers attribute. This pseudo code makes things go hmmm when you send a message through it..
channel.events['basic_ack'].add(lambda *foo: None)
channel.confirm_select()
channel.drain_events()
Hi, we have got this error
ImportError: cannot import name spec
we use librabbitmq==1.6.1
, kombu==3.0.33
Traceback here:
``
File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/kombu/init.py", line 67, in getattr
module = import(object_origins[name], None, None, [name])
File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/kombu/connection.py", line 19, in
from kombu import exceptions
File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/kombu/exceptions.py", line 12, in
from amqp import ChannelError, ConnectionError, ResourceError
File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/amqp/init.py", line 39, in
from .basic_message import Message # noqa
File "/srv/ves/arch.note/.venv/lib/python2.7/site-packages/amqp/basic_message.py", line 19, in
from . import spec
ImportError: cannot import name spec
``
The docs say "This document is for py-amqp's development version, which can be significantly different from previous releases. Get the stable docs here: 1.4."
This can now be removed?
If a publisher manages to publish a corrupted, non-UTF8 decodable message into Rabbit, the Celery worker will fail to start. I propose that it should instead leave the message unacknowledged, log the error, and continue working on other messages.
Here's an example stack trace when this happens:
[2015-10-17 23:34:57,419: ERROR/MainProcess] Unrecoverable error: UnicodeDecodeError('utf8', 'qr=-1&qf=[date:between:2015-09-01,2015-09-Traceback (most recent call last):
File "venv/local/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start
self.blueprint.start(self)
File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
step.start(parent)
File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 374, in start
return self.obj.start()
File "venv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 278, in start
blueprint.start(self)
File "venv/local/lib/python2.7/site-packages/celery/bootsteps.py", line 123, in start
step.start(parent)
File "venv/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 821, in start
c.loop(*c.loop_args())
File "venv/local/lib/python2.7/site-packages/celery/worker/loops.py", line 70, in asynloop
next(loop)
File "venv/local/lib/python2.7/site-packages/kombu/async/hub.py", line 272, in create_loop
item()
File "venv/local/lib/python2.7/site-packages/amqp/utils.py", line 42, in __call__
self.set_error_state(exc)
File "venv/local/lib/python2.7/site-packages/amqp/utils.py", line 39, in __call__
**dict(self.kwargs, **kwargs) if self.kwargs else kwargs
File "venv/local/lib/python2.7/site-packages/kombu/transport/base.py", line 144, in _read
drain_events(timeout=0)
File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 302, in drain_events
chanmap, None, timeout=timeout,
File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 365, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "venv/local/lib/python2.7/site-packages/amqp/connection.py", line 336, in read_timeout
return self.method_reader.read_method()
File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 186, in read_method
self._next_method()
File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 126, in _next_method
self._process_content_header(channel, payload)
File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 154, in _process_content_header
partial.add_header(payload)
File "venv/local/lib/python2.7/site-packages/amqp/method_framing.py", line 54, in add_header
self.msg._load_properties(payload[12:])
File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 476, in _load_properties
d[key] = getattr(r, 'read_' + proptype)()
File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 141, in read_table
val = table_data.read_item()
File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 191, in read_item
val = self.read_table() # recurse
File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 141, in read_table
val = table_data.read_item()
File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 150, in read_item
val = self.read_longstr()
File "venv/local/lib/python2.7/site-packages/amqp/serialization.py", line 131, in read_longstr
return self.input.read(slen).decode('utf-8')
File "venv/lib/python2.7/encodings/utf_8.py", line 16, in decode
return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x96 in position 80: invalid start byte
Versions:
Celery: 3.1.16
Kombu: 3.0.23
Using pyamqp://
Hello !
It seems that between rc4 and 3.16, something changed in py-amqp or kombu that prevents SSL connection to an AMQP server to work.
Here is a stack-trace :
[2013-12-16 11:53:26,308: WARNING/MainProcess] celery@XXX ready. [2013-12-16 11:53:30,311: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection... Traceback (most recent call last): File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 270, in start blueprint.start(self) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/bootsteps.py", line 123, in start step.start(parent) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 790, in start c.loop(*c.loop_args()) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/loops.py", line 72, in asynloop next(loop) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/async/hub.py", line 333, in create_loop cb(*cbargs) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/transport/base.py", line 156, in on_readable reader(loop) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/transport/base.py", line 139, in _read raise ConnectionError('Socket was disconnected') amqp.exceptions.ConnectionError: Socket was disconnected [2013-12-16 11:53:30,333: ERROR/MainProcess] Unrecoverable error: ValueError('I/O operation on closed epoll fd',) Traceback (most recent call last): File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/__init__.py", line 212, in start self.blueprint.start(self) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/bootsteps.py", line 123, in start step.start(parent) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/bootsteps.py", line 373, in start return self.obj.start() File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 270, in start blueprint.start(self) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/bootsteps.py", line 123, in start step.start(parent) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 467, in start c.connection = c.connect() File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/celery/worker/consumer.py", line 369, in connect conn.transport.register_with_event_loop(conn.connection, self.hub) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/transport/pyamqp.py", line 124, in register_with_event_loop loop.add_reader(connection.sock, self.on_readable, connection, loop) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/async/hub.py", line 214, in add_reader return self.add(fds, callback, READ | ERR, args) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/async/hub.py", line 165, in add self.poller.register(fd, flags) File "/home/XX/.virtualenvs/XX/lib/python3.3/site-packages/kombu/utils/eventio.py", line 78, in register self._epoll.register(fd, events) ValueError: I/O operation on closed epoll fd
i don't have time to look into it right now but I'll do some digging in the week!
The AMQP spec specifies short, shortstr, short, short
as the Channel.Close
method signature, so the below patch should be applied.
This error may have been masked previously because shortstr
was allowed to contain any binary data. I’m working on cleaning up the bytes/utf-8 behaviour and caught this as part of that.
diff --git a/amqp/channel.py b/amqp/channel.py
index 7d45fe4..dd7ff14 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -61,7 +61,7 @@ class Channel(AbstractChannel):
"""
_METHODS = set([
- spec.method(spec.Channel.Close, 'ssBB'),
+ spec.method(spec.Channel.Close, 'BsBB'),
spec.method(spec.Channel.CloseOk),
spec.method(spec.Channel.Flow, 'b'),
spec.method(spec.Channel.FlowOk, 'b'),
After applying this patch, another error surfaced: _on_close
in amqp/channel.py
calls self._do_revive()
which fails because ValueError: generator already executing
(in this case, the channel was closed while we were waiting for the response to an exchange declare)
I am seeing AttributeError
exceptions being raised in my logs and I believe the culprit is amqp/transport._AbstractTransport.__del__
.
As stated in this SO post (http://stackoverflow.com/questions/18058730/python-attributeerror-on-del), the __del__
function may have been called after modules have started to be unloaded and set to None
. In the __del__
function for _AbstractTransport
, you have to check the socket
module before grabbing a property from it. Below is a proposed fix to the __del__
method:
def __del__(self):
try:
self.close()
except e:
if socket and isinstance(e, socket.error):
pass
else:
raise
finally:
self.sock = None
Similarly, you need to check socket
in the close method. Here is a proposed fix:
def close(self):
if socket and self.sock is not None:
self._shutdown_transport()
# Call shutdown first to make sure that pending messages
# reach the AMQP broker if the program exits after
# calling this method.
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
self.sock = None
self.connected = False
"amqp/method_framing.py", line 161, in frame_writer
3, channel, framelen, body, 0xce)
struct.error: argument for 's' must be a bytes object
I'm using ``celery.control´´ for remote control of other workers.
I'm using
celery 3.0.23
kombu 2.5.14
amqp 1.0.13
[librabbitmq 1.0.1]
celery.control.ping()
[2013-09-16 15:40:32,984: WARNING/MainProcess] [{u'localhost': u'pong'}]
Now, when I shutdown RabbitMQ, remote control obviously doesn't work anymore:
celery.control.ping()
Traceback (most recent call last):
File "", line 1, in
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 145, in ping
timeout=timeout, **kwargs)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 269, in broadcast
limit, callback, channel=channel,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 268, in _broadcast
timeout=timeout)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 237, in _publish
maybe_declare(self.reply_queue(channel))
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 90, in maybe_declare
return _maybe_declare(entity)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 99, in _maybe_declare
entity.declare()
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 492, in declare
self.exchange.declare(nowait)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 162, in declare
nowait=nowait, passive=passive,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/channel.py", line 603, in exchange_declare
(40, 11), # Channel.exchange_declare_ok
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/abstract_channel.py", line 69, in wait
self.channel_id, allowed_methods)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/connection.py", line 204, in _wait_method
self.method_reader.read_method()
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/method_framing.py", line 192, in read_method
raise m
IOError: Socket closed
Unfortunately, when I restart the RMQ broker now, the celery remote instance doesn't recover anymore:
celery.control.ping()
Traceback (most recent call last):
File "", line 1, in
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 145, in ping
timeout=timeout, *_kwargs)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 269, in broadcast
limit, callback, channel=channel,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 268, in _broadcast
timeout=timeout)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 237, in _publish
maybe_declare(self.reply_queue(channel))
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 90, in maybe_declare
return _maybe_declare(entity)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 99, in _maybe_declare
entity.declare()
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 492, in declare
self.exchange.declare(nowait)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 162, in declare
nowait=nowait, passive=passive,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/channel.py", line 595, in exchange_declare
self._send_method((40, 10), args)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/abstract_channel.py", line 58, in _send_method
self.channel_id, method_sig, args, content,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/method_framing.py", line 224, in write_method
write_frame(1, channel, payload)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/amqp/transport.py", line 160, in write_frame
pack('>BHI%dsB' % size, frame_type, channel, size, payload, 0xce),
File "/usr/local/lib/python2.7/socket.py", line 224, in meth
return getattr(self._sock,name)(_args)
socket.error: [Errno 32] Broken pipe
This does NOT happen if I don't use the pure-python AMQP transport but with an installed librabbitmq (and amqp:// broker URL setup):
celery.control.ping()
Traceback (most recent call last):
File "", line 1, in
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 145, in ping
timeout=timeout, **kwargs)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/celery/app/control.py", line 269, in broadcast
limit, callback, channel=channel,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 268, in _broadcast
timeout=timeout)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/pidbox.py", line 237, in _publish
maybe_declare(self.reply_queue(channel))
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 90, in maybe_declare
return _maybe_declare(entity)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/common.py", line 99, in _maybe_declare
entity.declare()
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 492, in declare
self.exchange.declare(nowait)
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/kombu/entity.py", line 162, in declare
nowait=nowait, passive=passive,
File "/home/ag/.virtualenvs/celery/lib/python2.7/site-packages/librabbitmq/init.py", line 119, in exchange_declare
exchange, type, passive, durable, auto_delete, arguments or {})
_librabbitmq.ConnectionError: exchange.declare: connection closed unexpectedly
After RMQ restart:
celery.control.ping()
[2013-09-16 16:17:22,875: WARNING/MainProcess] [{u'localhost': u'pong'}]
About 60 seconds after starting a channel.basic_consume followed by a channel.wait(), the connection closes with a Socket Closed error, as here:
Traceback (most recent call last):
File "./rmqtools.py", line 437, in run_self_test_loop
qh.channel[handle].wait(None)
File "/usr/local/lib/python3.4/site-packages/amqp/abstract_channel.py", line 91, in wait
self.connection.drain_events(timeout=timeout)
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 436, in drain_events
return self.blocking_read(timeout)
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 440, in blocking_read
frame = self.transport.read_frame()
File "/usr/local/lib/python3.4/site-packages/amqp/transport.py", line 221, in read_frame
frame_header = read(7, True)
File "/usr/local/lib/python3.4/site-packages/amqp/transport.py", line 369, in _read
raise IOError('Socket closed')
OSError: Socket closed
On the server, the message logged is:
=INFO REPORT==== 12-Jul-2016::19:09:02 ===
accepting AMQP connection <0.9396.19> (172....1:19638 -> 172....5:5672)
=WARNING REPORT==== 12-Jul-2016::19:10:01 ===
closing AMQP connection <0.9396.19> (172...1:19638 -> 172....5:5672):
client unexpectedly closed TCP connection
Here's something that may be relevant: https://www.rabbitmq.com/heartbeats.html. However, the connection shows a negotiated heartbeat (Connection.heartbeat) of 0, and, when changing it to, say, 5 during connection creation, the same above error happens after 3x the chosen heartbeat (e.g. 15 sec.).
Relatedly, the channel.wait() method is no longer documented in the web docs. Is there a different method that should be called after registering a callback with basic_callback?
A colleague reports this on the 1.4.6 client too.
Thanks!
We are seeing a bunch of these exceptions popping up when reading from a kombu SimpleQueue
, under Python 3.4, with amqp 1.4.6 and kombu 3.0.26:
Traceback (most recent call last):
[trimmed]
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/allmylespy_common/rpc.py", line 156, in _get_reply
reply = self._our_queue.get(block=True, timeout=1)
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/kombu/simple.py", line 53, in get
timeout=timeout and remaining)
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/kombu/connection.py", line 275, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/kombu/transport/pyamqp.py", line 95, in drain_events
return connection.drain_events(**kwargs)
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/connection.py", line 302, in drain_events
chanmap, None, timeout=timeout,
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/connection.py", line 365, in _wait_multiple
channel, method_sig, args, content = read_timeout(timeout)
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/connection.py", line 336, in read_timeout
return self.method_reader.read_method()
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/method_framing.py", line 189, in read_method
raise m
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/method_framing.py", line 107, in _next_method
frame_type, channel, payload = read_frame()
File "/opt/virtualenvs/allmylespy/lib/python3.4/site-packages/amqp/transport.py", line 175, in read_frame
'Received 0x{0:02x} while expecting 0xce'.format(ch))
amqp.exceptions.UnexpectedFrame: Received 0x00 while expecting 0xce
I'd say between 0.01% and 0.5% of messages are affected. The error message varies, a few seconds earlier we had Received 0x5e
instead. I see that 1.4.6 doesn't list Python 3.4 in its classifiers. Is it possibly incompatible? Are there any alternatives for Python 3?
It would be nice to keep PyPI releases and git tags in sync :)
Hi folks-
I'm tasked with going through the licensing on dependencies on a project, and I've noticed that the licensing trove classifiers indicate both BSD and LGPL, but the LICENSE file only contains LGPL. Can this be clarified (either by removing the line from the setup.py or modifying/extending the LICENSE file)?
On Py3.4, it is necessary to apply the following patch in order to write frames or deserialize data:
diff --git a/amqp/method_framing.py b/amqp/method_framing.py
index b9ffe91..64e9cd4 100644
--- a/amqp/method_framing.py
+++ b/amqp/method_framing.py
@@ -116,7 +116,7 @@ def frame_writer(connection, transport,
if no_pybuf or bigbody:
# ## SLOW: string copy and write for every frame
- frame = (''.join([pack('>HH', *method_sig), args])
+ frame = (b''.join([pack('>HH', *method_sig), args])
if type_ == 1 else '') # encode method frame
framelen = len(frame)
write(pack('>BHI%dsB' % framelen,
diff --git a/amqp/serialization.py b/amqp/serialization.py
index d74515f..81abf3d 100644
--- a/amqp/serialization.py
+++ b/amqp/serialization.py
@@ -52,7 +52,7 @@ ILLEGAL_TABLE_TYPE = """\
def _read_item(buf, offset=0, unpack_from=unpack_from):
- ftype = buf[offset]
+ ftype = chr(buf[offset])
offset += 1
# 'S': long string
In method_framing.py, ''.join()
cannot be applied on a list of bytes, and in serialization.py, buf[offset]
is an int
but is compared to str
a few lines below.
Note this patch will probably break Py2 so should not be applied verbatim, it's just to demonstrate the issue.
Line 377 in a8f334a
The # ## FIXME timezone ?
says it all.
Currently datetimes are assumed to be in system local time as that is how mktime
behaves.
This is rather confusing behaviour when you're passing around UTC times and find that all your times are offset.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.