Coder Social home page Coder Social logo

Comments (13)

jakeczyz avatar jakeczyz commented on June 4, 2024 8

Okay, I have more information on this in case it helps someone in the future. This is knowledge gained (and likely bugs discovered) after about 15 man-hours of beating my head against this. The problem seems to be (partly) caused by Amazon EC2 ELB idle timeout. This is the timeout on the load balancer that will drop any connection (including resetting TCP ones) after too long without any traffic. On AWS/EC2 ELBs the value can be set between 1 sec and 1 hour. This is a low limit if the code that uses rabbitmq registers a callback and then wants to wait()/drain_events() in perpetuity (my use case).

According to digging around, it seems that at least two things should each solve this (1) TCP keepalives or (2) the RabbitMQ heartbeat feature.

It seems to me there are two bugs conspiring against these solutions. First, unfortunately, even after enabling TCP keepalives on both sides (client and each instance on LB) and observing them in action via netstat and tcpdump, the Amazon ELB always dropps the connection after the timeout passes (yielding the OSError Socket Closed on the client as in the original post). I should probably reach out to AWS about this.

Second, although it's annoying, from my understanding, the only way to use the heartbeat feature with the basic_consume-->drain_events[blocking indefinitely] approach is to have a second thread/process that will send the heartbeats (if there's a better approach, please let me know). I tried this, and hit upon what seems like another bug. When using the connection.heartbeat_tick() method call, and printing the connection.last_heartbeat_received and last_heartbeat_sent times, the server only sends one heartbeat and the connection eventually gets closed due to missed heartbeats. Here's an example with heartbeat=5 and where I call the heartbeat_tick method every 2 seconds and print conn.last_heartbeat_{received,sent}:

[2016-07-15T01:58:00.097830] DEBUG: negotiated conn.heartbeat: 5
[2016-07-15T01:58:02.100436] DEBUG: heartbeat_tick 1; last rec:41866; last sent:41866
[2016-07-15T01:58:04.102891] DEBUG: heartbeat_tick 2; last rec:41866; last sent:41866
[2016-07-15T01:58:06.105276] DEBUG: heartbeat_tick 3; last rec:41866; last sent:41866
[2016-07-15T01:58:08.107885] DEBUG: heartbeat_tick 4; last rec:41866; last sent:41872
[2016-07-15T01:58:10.110449] DEBUG: heartbeat_tick 5; last rec:41866; last sent:41874
Process Process-13:
Traceback (most recent call last):
...
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 663, in heartbeat_tick
raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed

As you can see, the server only sends one heartbeat at time 41866 and never again. I tried several different timings with higher heartbeat values with similar results.

The final thing that worked for me was to use connection.send_heartbeat() instead of heartbeat_tick(). If I call that method (in a subprocess) every couple seconds (with or without heartbeats actually turned on for the connection!), the ticks from the client are enough to keep my connection alive from the amqp heartbeats perspective and also keep the ELB client idle timeout at bay. Thanks for reading this far. :)

from py-amqp.

odedfos avatar odedfos commented on June 4, 2024 3

I've encountered the same problem when celery (3.1.25) broker has heartbeats enabled and worker is idle (empty queue). The RabbitMQ eventually closes the connection because connection is idle and heartbeats aren't sent.
Is there any plan to fix this in celery/kombu/py-amqp?

from py-amqp.

ask avatar ask commented on June 4, 2024

I'll be looking into this, but it's now better to use connection.drain_events(timeout) instead of channel wait. This allows you to consume events from multiple channels at a time, something that was missing from the original amqplib.

from py-amqp.

ask avatar ask commented on June 4, 2024

Btw, I'm not seeing this here, even with channel.wait(None). It would be very helpful if you could write a short snippet that reproduces the issue! I guess you should try using connection.drain_events first though, as I don't think there is much use in having a separate channel.wait().

from py-amqp.

jakeczyz avatar jakeczyz commented on June 4, 2024

Thanks. Sure, here's a snippet of how to reproduce the error.

https://gist.github.com/jakeczyz/78ba1d5021b688a544b7cc8a22fefb65

It happens with both method calls. Note that it's not happening with an older RabbitMQ server. One other minor note is that the new server is actually a cluster. But, regular gets and publishing (and other cluster behavior) is fine.

Any advice or pointers would be highly appreciated.

from py-amqp.

jakeczyz avatar jakeczyz commented on June 4, 2024

After more investigation, I believe this problem was due to client TCP timeout in the load balancer used for my RabbitMQ cluster. I'll post more details after I can confirm my fix, but wanted to let you know right away so as not to waste your time.

from py-amqp.

KiranKVasu avatar KiranKVasu commented on June 4, 2024

python : 3.7
OS: windows10
service: rabbitmq (version 3.7.14)

facing error

OSError: Server unexpectedly closed connection
Removing descriptor: 920

also this queue is not available post this error message:

[email protected] 0

could anyone suggest any fix/ workaround to get rid of this error

from py-amqp.

Greenev avatar Greenev commented on June 4, 2024

I had these warnings when using celery 4.3.0 with --pool=eventlet, RabbitMQ 3.6.5
Adding connection.heartbeat_check() in celery/celery/worker/pidbox.py resolved the issue for me, see at stackoverflow
Please, check if it is a correct solution of a problem?

2019-08-12 14:22:48,564: INFO worker_dummy@dummy-celery ready.
2019-08-12 14:22:50,077: INFO Events of group {task} enabled by remote.
2019-08-12 14:25:48,572: WARNING Traceback (most recent call last):
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/eventlet/hubs/poll.py", line 111, in wait
    listener.cb(fileno)
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/celery/worker/pidbox.py", line 120, in loop
    connection.drain_events(timeout=1.0)
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/kombu/connection.py", line 315, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
    return connection.drain_events(**kwargs)
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/connection.py", line 500, in drain_events
    while not self.blocking_read(timeout):
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/connection.py", line 505, in blocking_read
    frame = self.transport.read_frame()
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/transport.py", line 252, in read_frame
    frame_header = read(7, True)
2019-08-12 14:25:48,574: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/transport.py", line 444, in _read
    raise IOError('Server unexpectedly closed connection')
2019-08-12 14:25:48,574: WARNING OSError: Server unexpectedly closed connection
2019-08-12 14:25:48,574: WARNING Removing descriptor: 9

from py-amqp.

tvallois avatar tvallois commented on June 4, 2024

Okay, I have more information on this in case it helps someone in the future. This is knowledge gained (and likely bugs discovered) after about 15 man-hours of beating my head against this. The problem seems to be (partly) caused by Amazon EC2 ELB idle timeout. This is the timeout on the load balancer that will drop any connection (including resetting TCP ones) after too long without any traffic. On AWS/EC2 ELBs the value can be set between 1 sec and 1 hour. This is a low limit if the code that uses rabbitmq registers a callback and then wants to wait()/drain_events() in perpetuity (my use case).

According to digging around, it seems that at least two things should each solve this (1) TCP keepalives or (2) the RabbitMQ heartbeat feature.

It seems to me there are two bugs conspiring against these solutions. First, unfortunately, even after enabling TCP keepalives on both sides (client and each instance on LB) and observing them in action via netstat and tcpdump, the Amazon ELB always dropps the connection after the timeout passes (yielding the OSError Socket Closed on the client as in the original post). I should probably reach out to AWS about this.

Second, although it's annoying, from my understanding, the only way to use the heartbeat feature with the basic_consume-->drain_events[blocking indefinitely] approach is to have a second thread/process that will send the heartbeats (if there's a better approach, please let me know). I tried this, and hit upon what seems like another bug. When using the connection.heartbeat_tick() method call, and printing the connection.last_heartbeat_received and last_heartbeat_sent times, the server only sends one heartbeat and the connection eventually gets closed due to missed heartbeats. Here's an example with heartbeat=5 and where I call the heartbeat_tick method every 2 seconds and print conn.last_heartbeat_{received,sent}:

[2016-07-15T01:58:00.097830] DEBUG: negotiated conn.heartbeat: 5
[2016-07-15T01:58:02.100436] DEBUG: heartbeat_tick 1; last rec:41866; last sent:41866
[2016-07-15T01:58:04.102891] DEBUG: heartbeat_tick 2; last rec:41866; last sent:41866
[2016-07-15T01:58:06.105276] DEBUG: heartbeat_tick 3; last rec:41866; last sent:41866
[2016-07-15T01:58:08.107885] DEBUG: heartbeat_tick 4; last rec:41866; last sent:41872
[2016-07-15T01:58:10.110449] DEBUG: heartbeat_tick 5; last rec:41866; last sent:41874
Process Process-13:
Traceback (most recent call last):
...
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 663, in heartbeat_tick
raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed

As you can see, the server only sends one heartbeat at time 41866 and never again. I tried several different timings with higher heartbeat values with similar results.

The final thing that worked for me was to use connection.send_heartbeat() instead of heartbeat_tick(). If I call that method (in a subprocess) every couple seconds (with or without heartbeats actually turned on for the connection!), the ticks from the client are enough to keep my connection alive from the amqp heartbeats perspective and also keep the ELB client idle timeout at bay. Thanks for reading this far. :)

Hello,

I don't know if we should continue to write in this issue but our team is almost in the same position as the one described above.

import logging
from socket import timeout
from multiprocessing import Process
from time import sleep
from kombu import Consumer, Queue, Connection 


def callback(body, message):
    logging.debug("before")
    sleep(300)
    logging.debug("after")
    message.ack()

if __name__ == "__main__":
    is_running = True
    queue = Queue("my_queue", routing_key="my_routing_key", no_declare=True)
    with Connection("amqps://blabla", heartbeat=10) as conn:
        with conn.channel() as channel:
            consumer = Consumer(channel, queue)
            consumer.register_callback(callback)
            with consumer:
                while is_running:
                    try:
                         conn.drain_events(timeout=1)
                    except timeout:
                         conn.heartbeat_check()

I use this code to reproduce the error in aws environment (with aws classic load balancer). In this case i'm unable to ack the message in the callback method because the socket connection with rabbitmq has been dropped by ELB.

I tried to use another process to manage heartbeat like this:

import logging
from amqp.method_framing import frame_handler
from socket import timeout
from multiprocessing import Process
from time import sleep
from kombu import Consumer, Queue, Connection 

logging.basicConfig(level=logging.DEBUG)

def callback(body, message):
    logging.debug("before")
    sleep(300)
    logging.debug("after")
    message.ack()

def manage_heartbeat(conn: Connection):
    while True:
        sleep(1)
        conn.heartbeat_check()

if __name__ == "__main__":
    is_running = True
    queue = Queue("my_queue_name", routing_key="my_routing_key", no_declare=True)
    with Connection("blabla", heartbeat=10) as conn:
        try:
            heartbeat_process = Process(target=manage_heartbeat, args=(conn,))
            heartbeat_process.start()
            with conn.channel() as channel:
                consumer = Consumer(channel, queue)
                consumer.register_callback(callback)
                with consumer:
                    while is_running:
                        conn.drain_events()
        finally:
            heartbeat_process.close()

In this way, an exception is raised:

amqp.exceptions.ConnectionForced: Too many heartbeats missed

Is there a way to send heartbeats in a different process with kombu/py-amqp?

from py-amqp.

auvipy avatar auvipy commented on June 4, 2024

as celery use billiard you should not use multiprocessing i think. also what is your full setup and you are using newest versions?

from py-amqp.

tvallois avatar tvallois commented on June 4, 2024

I'm not using celery. Only Kombu.
Kombu version 5.0.2, amqp 5.0.2
With a solution like this, it works (AWS ELB do not consider the connection as idle) but it would be nice to have this integrated directly in kombu or amqp:

import signal
import logging
import threading
from amqp.method_framing import frame_handler
from socket import timeout
from time import sleep
from kombu import Consumer, Queue, Connection 

logging.basicConfig(level=logging.DEBUG)

def callback(body, message):
    logging.debug("before")
    sleep(60)
    logging.debug("after")
    message.ack()

def handle_heartbeat(conn, event):
    while not event.is_set():
        print("ping")
        sleep(1)
        if conn.connection:
            conn.connection.send_heartbeat()

if __name__ == "__main__":
    event = threading.Event()
    queue = Queue("my_queue_name", routing_key="my_routing_key", no_declare=True)
    with Connection("amqps://blabla", heartbeat=10) as conn:
        try:
            t = threading.Thread(target=handle_heartbeat, args=(conn,event))
            t.start()
            with conn.channel() as channel:
                consumer = Consumer(channel, queue)
                consumer.register_callback(callback)
                with consumer:
                    while not event.is_set():
                        try:
                            conn.drain_events(timeout=1)
                        except timeout:
                            conn.heartbeat_check()
        except KeyboardInterrupt:
            event.set()
        finally:
            t.join()

from py-amqp.

tvallois avatar tvallois commented on June 4, 2024

Nope, it does not work. I have the same stacktrace as this issue in Celery (celery/celery#3773).

from py-amqp.

auvipy avatar auvipy commented on June 4, 2024

recent related discussion celery/celery#6528 (comment)

from py-amqp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.