Comments (13)
Okay, I have more information on this in case it helps someone in the future. This is knowledge gained (and likely bugs discovered) after about 15 man-hours of beating my head against this. The problem seems to be (partly) caused by Amazon EC2 ELB idle timeout. This is the timeout on the load balancer that will drop any connection (including resetting TCP ones) after too long without any traffic. On AWS/EC2 ELBs the value can be set between 1 sec and 1 hour. This is a low limit if the code that uses rabbitmq registers a callback and then wants to wait()/drain_events() in perpetuity (my use case).
According to digging around, it seems that at least two things should each solve this (1) TCP keepalives or (2) the RabbitMQ heartbeat feature.
It seems to me there are two bugs conspiring against these solutions. First, unfortunately, even after enabling TCP keepalives on both sides (client and each instance on LB) and observing them in action via netstat and tcpdump, the Amazon ELB always dropps the connection after the timeout passes (yielding the OSError Socket Closed on the client as in the original post). I should probably reach out to AWS about this.
Second, although it's annoying, from my understanding, the only way to use the heartbeat feature with the basic_consume-->drain_events[blocking indefinitely] approach is to have a second thread/process that will send the heartbeats (if there's a better approach, please let me know). I tried this, and hit upon what seems like another bug. When using the connection.heartbeat_tick() method call, and printing the connection.last_heartbeat_received and last_heartbeat_sent times, the server only sends one heartbeat and the connection eventually gets closed due to missed heartbeats. Here's an example with heartbeat=5 and where I call the heartbeat_tick method every 2 seconds and print conn.last_heartbeat_{received,sent}:
[2016-07-15T01:58:00.097830] DEBUG: negotiated conn.heartbeat: 5
[2016-07-15T01:58:02.100436] DEBUG: heartbeat_tick 1; last rec:41866; last sent:41866
[2016-07-15T01:58:04.102891] DEBUG: heartbeat_tick 2; last rec:41866; last sent:41866
[2016-07-15T01:58:06.105276] DEBUG: heartbeat_tick 3; last rec:41866; last sent:41866
[2016-07-15T01:58:08.107885] DEBUG: heartbeat_tick 4; last rec:41866; last sent:41872
[2016-07-15T01:58:10.110449] DEBUG: heartbeat_tick 5; last rec:41866; last sent:41874
Process Process-13:
Traceback (most recent call last):
...
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 663, in heartbeat_tick
raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missed
As you can see, the server only sends one heartbeat at time 41866 and never again. I tried several different timings with higher heartbeat values with similar results.
The final thing that worked for me was to use connection.send_heartbeat() instead of heartbeat_tick(). If I call that method (in a subprocess) every couple seconds (with or without heartbeats actually turned on for the connection!), the ticks from the client are enough to keep my connection alive from the amqp heartbeats perspective and also keep the ELB client idle timeout at bay. Thanks for reading this far. :)
from py-amqp.
I've encountered the same problem when celery (3.1.25) broker has heartbeats enabled and worker is idle (empty queue). The RabbitMQ eventually closes the connection because connection is idle and heartbeats aren't sent.
Is there any plan to fix this in celery/kombu/py-amqp?
from py-amqp.
I'll be looking into this, but it's now better to use connection.drain_events(timeout)
instead of channel wait. This allows you to consume events from multiple channels at a time, something that was missing from the original amqplib.
from py-amqp.
Btw, I'm not seeing this here, even with channel.wait(None)
. It would be very helpful if you could write a short snippet that reproduces the issue! I guess you should try using connection.drain_events first though, as I don't think there is much use in having a separate channel.wait()
.
from py-amqp.
Thanks. Sure, here's a snippet of how to reproduce the error.
https://gist.github.com/jakeczyz/78ba1d5021b688a544b7cc8a22fefb65
It happens with both method calls. Note that it's not happening with an older RabbitMQ server. One other minor note is that the new server is actually a cluster. But, regular gets and publishing (and other cluster behavior) is fine.
Any advice or pointers would be highly appreciated.
from py-amqp.
After more investigation, I believe this problem was due to client TCP timeout in the load balancer used for my RabbitMQ cluster. I'll post more details after I can confirm my fix, but wanted to let you know right away so as not to waste your time.
from py-amqp.
python : 3.7
OS: windows10
service: rabbitmq (version 3.7.14)
facing error
OSError: Server unexpectedly closed connection
Removing descriptor: 920
also this queue is not available post this error message:
could anyone suggest any fix/ workaround to get rid of this error
from py-amqp.
I had these warnings when using celery 4.3.0 with --pool=eventlet, RabbitMQ 3.6.5
Adding connection.heartbeat_check()
in celery/celery/worker/pidbox.py resolved the issue for me, see at stackoverflow
Please, check if it is a correct solution of a problem?
2019-08-12 14:22:48,564: INFO worker_dummy@dummy-celery ready.
2019-08-12 14:22:50,077: INFO Events of group {task} enabled by remote.
2019-08-12 14:25:48,572: WARNING Traceback (most recent call last):
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/eventlet/hubs/poll.py", line 111, in wait
listener.cb(fileno)
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/celery/worker/pidbox.py", line 120, in loop
connection.drain_events(timeout=1.0)
2019-08-12 14:25:48,572: WARNING File "/home/celery/venv/lib/python3.6/site-packages/kombu/connection.py", line 315, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events
return connection.drain_events(**kwargs)
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/connection.py", line 500, in drain_events
while not self.blocking_read(timeout):
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/connection.py", line 505, in blocking_read
frame = self.transport.read_frame()
2019-08-12 14:25:48,573: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/transport.py", line 252, in read_frame
frame_header = read(7, True)
2019-08-12 14:25:48,574: WARNING File "/home/celery/venv/lib/python3.6/site-packages/amqp/transport.py", line 444, in _read
raise IOError('Server unexpectedly closed connection')
2019-08-12 14:25:48,574: WARNING OSError: Server unexpectedly closed connection
2019-08-12 14:25:48,574: WARNING Removing descriptor: 9
from py-amqp.
Okay, I have more information on this in case it helps someone in the future. This is knowledge gained (and likely bugs discovered) after about 15 man-hours of beating my head against this. The problem seems to be (partly) caused by Amazon EC2 ELB idle timeout. This is the timeout on the load balancer that will drop any connection (including resetting TCP ones) after too long without any traffic. On AWS/EC2 ELBs the value can be set between 1 sec and 1 hour. This is a low limit if the code that uses rabbitmq registers a callback and then wants to wait()/drain_events() in perpetuity (my use case).
According to digging around, it seems that at least two things should each solve this (1) TCP keepalives or (2) the RabbitMQ heartbeat feature.
It seems to me there are two bugs conspiring against these solutions. First, unfortunately, even after enabling TCP keepalives on both sides (client and each instance on LB) and observing them in action via netstat and tcpdump, the Amazon ELB always dropps the connection after the timeout passes (yielding the OSError Socket Closed on the client as in the original post). I should probably reach out to AWS about this.
Second, although it's annoying, from my understanding, the only way to use the heartbeat feature with the basic_consume-->drain_events[blocking indefinitely] approach is to have a second thread/process that will send the heartbeats (if there's a better approach, please let me know). I tried this, and hit upon what seems like another bug. When using the connection.heartbeat_tick() method call, and printing the connection.last_heartbeat_received and last_heartbeat_sent times, the server only sends one heartbeat and the connection eventually gets closed due to missed heartbeats. Here's an example with heartbeat=5 and where I call the heartbeat_tick method every 2 seconds and print conn.last_heartbeat_{received,sent}:
[2016-07-15T01:58:00.097830] DEBUG: negotiated conn.heartbeat: 5
[2016-07-15T01:58:02.100436] DEBUG: heartbeat_tick 1; last rec:41866; last sent:41866
[2016-07-15T01:58:04.102891] DEBUG: heartbeat_tick 2; last rec:41866; last sent:41866
[2016-07-15T01:58:06.105276] DEBUG: heartbeat_tick 3; last rec:41866; last sent:41866
[2016-07-15T01:58:08.107885] DEBUG: heartbeat_tick 4; last rec:41866; last sent:41872
[2016-07-15T01:58:10.110449] DEBUG: heartbeat_tick 5; last rec:41866; last sent:41874
Process Process-13:
Traceback (most recent call last):
...
File "/usr/local/lib/python3.4/site-packages/amqp/connection.py", line 663, in heartbeat_tick
raise ConnectionForced('Too many heartbeats missed')
amqp.exceptions.ConnectionForced: Too many heartbeats missedAs you can see, the server only sends one heartbeat at time 41866 and never again. I tried several different timings with higher heartbeat values with similar results.
The final thing that worked for me was to use connection.send_heartbeat() instead of heartbeat_tick(). If I call that method (in a subprocess) every couple seconds (with or without heartbeats actually turned on for the connection!), the ticks from the client are enough to keep my connection alive from the amqp heartbeats perspective and also keep the ELB client idle timeout at bay. Thanks for reading this far. :)
Hello,
I don't know if we should continue to write in this issue but our team is almost in the same position as the one described above.
import logging
from socket import timeout
from multiprocessing import Process
from time import sleep
from kombu import Consumer, Queue, Connection
def callback(body, message):
logging.debug("before")
sleep(300)
logging.debug("after")
message.ack()
if __name__ == "__main__":
is_running = True
queue = Queue("my_queue", routing_key="my_routing_key", no_declare=True)
with Connection("amqps://blabla", heartbeat=10) as conn:
with conn.channel() as channel:
consumer = Consumer(channel, queue)
consumer.register_callback(callback)
with consumer:
while is_running:
try:
conn.drain_events(timeout=1)
except timeout:
conn.heartbeat_check()
I use this code to reproduce the error in aws environment (with aws classic load balancer). In this case i'm unable to ack the message in the callback
method because the socket connection with rabbitmq has been dropped by ELB.
I tried to use another process to manage heartbeat like this:
import logging
from amqp.method_framing import frame_handler
from socket import timeout
from multiprocessing import Process
from time import sleep
from kombu import Consumer, Queue, Connection
logging.basicConfig(level=logging.DEBUG)
def callback(body, message):
logging.debug("before")
sleep(300)
logging.debug("after")
message.ack()
def manage_heartbeat(conn: Connection):
while True:
sleep(1)
conn.heartbeat_check()
if __name__ == "__main__":
is_running = True
queue = Queue("my_queue_name", routing_key="my_routing_key", no_declare=True)
with Connection("blabla", heartbeat=10) as conn:
try:
heartbeat_process = Process(target=manage_heartbeat, args=(conn,))
heartbeat_process.start()
with conn.channel() as channel:
consumer = Consumer(channel, queue)
consumer.register_callback(callback)
with consumer:
while is_running:
conn.drain_events()
finally:
heartbeat_process.close()
In this way, an exception is raised:
amqp.exceptions.ConnectionForced: Too many heartbeats missed
Is there a way to send heartbeats in a different process with kombu/py-amqp?
from py-amqp.
as celery use billiard you should not use multiprocessing i think. also what is your full setup and you are using newest versions?
from py-amqp.
I'm not using celery. Only Kombu.
Kombu version 5.0.2, amqp 5.0.2
With a solution like this, it works (AWS ELB do not consider the connection as idle) but it would be nice to have this integrated directly in kombu or amqp:
import signal
import logging
import threading
from amqp.method_framing import frame_handler
from socket import timeout
from time import sleep
from kombu import Consumer, Queue, Connection
logging.basicConfig(level=logging.DEBUG)
def callback(body, message):
logging.debug("before")
sleep(60)
logging.debug("after")
message.ack()
def handle_heartbeat(conn, event):
while not event.is_set():
print("ping")
sleep(1)
if conn.connection:
conn.connection.send_heartbeat()
if __name__ == "__main__":
event = threading.Event()
queue = Queue("my_queue_name", routing_key="my_routing_key", no_declare=True)
with Connection("amqps://blabla", heartbeat=10) as conn:
try:
t = threading.Thread(target=handle_heartbeat, args=(conn,event))
t.start()
with conn.channel() as channel:
consumer = Consumer(channel, queue)
consumer.register_callback(callback)
with consumer:
while not event.is_set():
try:
conn.drain_events(timeout=1)
except timeout:
conn.heartbeat_check()
except KeyboardInterrupt:
event.set()
finally:
t.join()
from py-amqp.
Nope, it does not work. I have the same stacktrace as this issue in Celery (celery/celery#3773).
from py-amqp.
recent related discussion celery/celery#6528 (comment)
from py-amqp.
Related Issues (20)
- Version 5.0.4 has an error HOT 16
- TCP_USER_TIMEOUT doesn't exist on solaris 11.4 HOT 2
- [BUG] Cannot set cert_reqs=ssl.CERT_NONE due to order of context modification in _wrap_socker_sni HOT 1
- Potentially broken _AbstractTransport.__repr__ HOT 4
- 5.0.6: sphinx warnings HOT 6
- 5.0.6: test suite is using `case` which uses `nose` (which is outdated) HOT 2
- Initial Update
- 5.0.7 SSL connection problem HOT 4
- Celery broken after release 5.0.8 HOT 6
- 5.1.1: pytest warnings HOT 2
- 'Server unexpectedly closed connection' HOT 6
- Waht is stable version? HOT 1
- amqp.exceptions.NotFound: Queue.declare: (404) NOT_FOUND
- pyamqp won't send heartbeat?
- 5.1.1: documentation build fails with sphinx 6.1.3 HOT 3
- Maximum recursion depth exceeded
- How does pyamqp/kombu make multiple threads share the same amqp connection? HOT 5
- bug: version 5.2.0 breaks celery with sqs msg processing HOT 3
- Client which use amqp lib always think that connection is blocked.
- Basic.publish: (406) PRECONDITION_FAILED - message size larger than configured max HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from py-amqp.