Coder Social home page Coder Social logo

Comments (5)

ask avatar ask commented on May 23, 2024

There's no way to do that so message handlers must return within the heartbeat interval

from py-amqp.

w3iBStime avatar w3iBStime commented on May 23, 2024

I have it working under Python 3 using asyncio (though it should be translatable to Py2/3 using regular threads). The snippet is below.

Essentially, there are three 'co-routines' that run asynchronously in a loop:

  1. A loop to check the socket for new frames and build them up as they arrive into messages. Essentially, it calls connection.read_timeout() repeatedly. As fully-formed messages arrive, they're enqueued in local memory under connection.channels[channel_id_from_message].method_queue .
  2. A loop to drain_events from connection.channels[channel_id_from_message].method_queue (right now, acting from the outside, it needs to check that at least one method_queue is populated otherwise it'll read the socket and conflict with the previous loop though this could be done a little more elegantly from inside).
  3. A loop to check and send heart beats

The main problem I have with it so far is that I need to use dedicated connection instances for receiving and sending (maybe not a bad idea anyway and again maybe not necessary with some adjustments to the package internals rather than from the outside). Otherwise, it seems to generally work.

    def maybe_drain_with_timeout(self, conn, timeout):
        ret = None
        for channel_id, channel in conn.connection.channels.items():
            if len(channel.method_queue) > 0:
                ret = conn.drain_events(timeout=timeout)
        return ret

    @asyncio.coroutine
    def drain_events(self, conn, timeout, ignore_timeouts):
        while conn.connected and not self._amqp_closing:
            # Yoink: https://github.com/celery/kombu/blob/master/kombu/common.py#L146-L185
            try:
                yield from self.loop.run_in_executor(None,\
                    self.maybe_drain_with_timeout, conn, timeout)
            except socket.timeout:
                if timeout and not ignore_timeouts: # pragma: no cover
                    raise
            except socket.error: # pragma: no cover
                pass

    def readWithTimeout(self, conn, timeout):
        return conn.connection.read_timeout(timeout)

    # Continue to read frames from the socket while processing dense messages so that we receive heartbeats from the peer. Requeue everything (including queue system messages) on the local method queue so it can get picked up in drain_events.
    @asyncio.coroutine
    def read_frames(self, conn, timeout, ignore_timeouts):
        while conn.connected and not self._amqp_closing:
            try:
                channel, method_sig, args, content = yield from\
                    self.loop.run_in_executor(None,\
                    self.readWithTimeout, conn, timeout)

                conn.connection.channels[channel].method_queue.append(\
                    (method_sig, args, content),)
            except socket.timeout:
                if timeout and not ignore_timeouts:
                    raise
            except socket.error:
                self.err_out("socket error")
                pass

    # Keep the connection alive while processing dense messages
    @asyncio.coroutine
    def heartbeat_check(self, conn):
        while conn.connected and not self._amqp_closing:
            yield from self.loop.run_in_executor(None, conn.heartbeat_check)
            # self.out("Ran heartbeat check")
            yield from asyncio.sleep(self._heartbeat_interval)

    def run(self):
        try:
            self.out("Consuming at {0}, on {1}".format(self._amqp_url,
                self.AMQP_QUEUE))
            self.loop = asyncio.get_event_loop()
            with Connection(self._amqp_url,\
                    heartbeat=self._heartbeat_interval) as conn:
                self.on_before_consume()
                conn.ensure_connection()
                self._amqp_connection = conn
                channel = conn.channel()
                channel.basic_qos(0,1,False)
                queue = AmqpQueue(name=self.AMQP_QUEUE, channel=channel)
                queue.declare()
                with channel.Consumer(queue, callbacks=[self.on_amqp_message])\
                        as consumer:
                    consumer.qos(0, prefetch_count=1)
                    self.loop.set_default_executor(ThreadPoolExecutor(4))
                    asyncio.async(self.read_frames(conn, 30, True))
                    asyncio.async(self.drain_events(conn, 30, True))
                    asyncio.async(self.heartbeat_check(conn))
                    self.loop.run_forever()
        except KeyboardInterrupt:
            self._amqp_closing = True
            loop.stop()
            self.out("Shutting down")

from py-amqp.

ask avatar ask commented on May 23, 2024

Right, using Async I/O would be the solution, we cannot use threads for this.

I'm already working on async support so this is forthcoming, including eventual support for tulip/asyncio

from py-amqp.

ask avatar ask commented on May 23, 2024

Btw, it does not really help for heartbeat frame processing/sending since the 'message processing' callbacks are still the problem. I.e. if you have a callback anywhere that blocks without yielding control back to the event loop. This is gradually happening in the celery worker, the only actions blocking at this point (above the amqp client layer) is message acknowledgments and sending result messages when a task process crashes (WorkerLostError, or task is revoked/terminated). I'm hoping to solve these for celery 3.2/3.3 and the improvements will be in kombu+pyamqp.

from py-amqp.

ask avatar ask commented on May 23, 2024

My work is in the celery/py-amqp@callbacks branch, it's not finished and is cheating in a number of places but it's one hell of a medal-deserving refactoring job ;)

from py-amqp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.