Comments (5)
There's no way to do that so message handlers must return within the heartbeat interval
from py-amqp.
I have it working under Python 3 using asyncio (though it should be translatable to Py2/3 using regular threads). The snippet is below.
Essentially, there are three 'co-routines' that run asynchronously in a loop:
- A loop to check the socket for new frames and build them up as they arrive into messages. Essentially, it calls connection.read_timeout() repeatedly. As fully-formed messages arrive, they're enqueued in local memory under connection.channels[channel_id_from_message].method_queue .
- A loop to drain_events from connection.channels[channel_id_from_message].method_queue (right now, acting from the outside, it needs to check that at least one method_queue is populated otherwise it'll read the socket and conflict with the previous loop though this could be done a little more elegantly from inside).
- A loop to check and send heart beats
The main problem I have with it so far is that I need to use dedicated connection instances for receiving and sending (maybe not a bad idea anyway and again maybe not necessary with some adjustments to the package internals rather than from the outside). Otherwise, it seems to generally work.
def maybe_drain_with_timeout(self, conn, timeout):
ret = None
for channel_id, channel in conn.connection.channels.items():
if len(channel.method_queue) > 0:
ret = conn.drain_events(timeout=timeout)
return ret
@asyncio.coroutine
def drain_events(self, conn, timeout, ignore_timeouts):
while conn.connected and not self._amqp_closing:
# Yoink: https://github.com/celery/kombu/blob/master/kombu/common.py#L146-L185
try:
yield from self.loop.run_in_executor(None,\
self.maybe_drain_with_timeout, conn, timeout)
except socket.timeout:
if timeout and not ignore_timeouts: # pragma: no cover
raise
except socket.error: # pragma: no cover
pass
def readWithTimeout(self, conn, timeout):
return conn.connection.read_timeout(timeout)
# Continue to read frames from the socket while processing dense messages so that we receive heartbeats from the peer. Requeue everything (including queue system messages) on the local method queue so it can get picked up in drain_events.
@asyncio.coroutine
def read_frames(self, conn, timeout, ignore_timeouts):
while conn.connected and not self._amqp_closing:
try:
channel, method_sig, args, content = yield from\
self.loop.run_in_executor(None,\
self.readWithTimeout, conn, timeout)
conn.connection.channels[channel].method_queue.append(\
(method_sig, args, content),)
except socket.timeout:
if timeout and not ignore_timeouts:
raise
except socket.error:
self.err_out("socket error")
pass
# Keep the connection alive while processing dense messages
@asyncio.coroutine
def heartbeat_check(self, conn):
while conn.connected and not self._amqp_closing:
yield from self.loop.run_in_executor(None, conn.heartbeat_check)
# self.out("Ran heartbeat check")
yield from asyncio.sleep(self._heartbeat_interval)
def run(self):
try:
self.out("Consuming at {0}, on {1}".format(self._amqp_url,
self.AMQP_QUEUE))
self.loop = asyncio.get_event_loop()
with Connection(self._amqp_url,\
heartbeat=self._heartbeat_interval) as conn:
self.on_before_consume()
conn.ensure_connection()
self._amqp_connection = conn
channel = conn.channel()
channel.basic_qos(0,1,False)
queue = AmqpQueue(name=self.AMQP_QUEUE, channel=channel)
queue.declare()
with channel.Consumer(queue, callbacks=[self.on_amqp_message])\
as consumer:
consumer.qos(0, prefetch_count=1)
self.loop.set_default_executor(ThreadPoolExecutor(4))
asyncio.async(self.read_frames(conn, 30, True))
asyncio.async(self.drain_events(conn, 30, True))
asyncio.async(self.heartbeat_check(conn))
self.loop.run_forever()
except KeyboardInterrupt:
self._amqp_closing = True
loop.stop()
self.out("Shutting down")
from py-amqp.
Right, using Async I/O would be the solution, we cannot use threads for this.
I'm already working on async support so this is forthcoming, including eventual support for tulip/asyncio
from py-amqp.
Btw, it does not really help for heartbeat frame processing/sending since the 'message processing' callbacks are still the problem. I.e. if you have a callback anywhere that blocks without yielding control back to the event loop. This is gradually happening in the celery worker, the only actions blocking at this point (above the amqp client layer) is message acknowledgments and sending result messages when a task process crashes (WorkerLostError, or task is revoked/terminated). I'm hoping to solve these for celery 3.2/3.3 and the improvements will be in kombu+pyamqp.
from py-amqp.
My work is in the celery/py-amqp@callbacks branch, it's not finished and is cheating in a number of places but it's one hell of a medal-deserving refactoring job ;)
from py-amqp.
Related Issues (20)
- TCP_USER_TIMEOUT doesn't exist on solaris 11.4 HOT 2
- [BUG] Cannot set cert_reqs=ssl.CERT_NONE due to order of context modification in _wrap_socker_sni HOT 1
- Potentially broken _AbstractTransport.__repr__ HOT 4
- 5.0.6: sphinx warnings HOT 6
- 5.0.6: test suite is using `case` which uses `nose` (which is outdated) HOT 2
- Initial Update
- 5.0.7 SSL connection problem HOT 4
- Celery broken after release 5.0.8 HOT 6
- 5.1.1: pytest warnings HOT 2
- 'Server unexpectedly closed connection' HOT 6
- Waht is stable version? HOT 1
- amqp.exceptions.NotFound: Queue.declare: (404) NOT_FOUND
- pyamqp won't send heartbeat?
- 5.1.1: documentation build fails with sphinx 6.1.3 HOT 3
- Maximum recursion depth exceeded
- How does pyamqp/kombu make multiple threads share the same amqp connection? HOT 5
- bug: version 5.2.0 breaks celery with sqs msg processing HOT 3
- Client which use amqp lib always think that connection is blocked.
- Basic.publish: (406) PRECONDITION_FAILED - message size larger than configured max HOT 1
- create Virtual Hosts
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from py-amqp.