Comments (12)
Include the full stack trace of the error please, and make sure you have monkey patched the standard library.
from python-socketio.
Traceback (most recent call last):
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/eventlet/wsgi.py", line 454, in handle_one_response
result = self.application(self.environ, start_response)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/middleware.py", line 47, in __call__
return self.engineio_app.handle_request(environ, start_response)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 303, in handle_request
return self.eio.handle_request(environ, start_response)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/server.py", line 241, in handle_request
socket.handle_post_request(environ)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/socket.py", line 92, in handle_post_request
self.receive(pkt)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/socket.py", line 45, in receive
self.server._trigger_event('message', self.sid, pkt.data)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/server.py", line 327, in _trigger_event
return self.handlers[event](*args)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 420, in _handle_eio_message
self._handle_connect(sid, pkt.namespace)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 334, in _handle_connect
self.environ[sid]) is False:
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 392, in _trigger_event
return self.handlers[namespace][event](*args)
File "labcoreapp.py", line 68, in test_connect
namespace='/test')
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 181, in emit
self.manager.emit(event, data, namespace, room, skip_sid, callback)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/pubsub_manager.py", line 62, in emit
'skip_sid': skip_sid, 'callback': callback})
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/kombu_manager.py", line 53, in _publish
with self.kombu.SimpleQueue(self.queue) as queue:
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/connection.py", line 677, in SimpleQueue
exchange_opts, **kwargs)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/simple.py", line 125, in __init__
compression=compression)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 85, in __init__
self.revive(self._channel)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 222, in revive
self.declare()
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 105, in declare
self.exchange.declare()
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
nowait=nowait, passive=passive,
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/channel.py", line 622, in exchange_declare
(40, 11), # Channel.exchange_declare_ok
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods, timeout)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/connection.py", line 241, in _wait_method
channel, method_sig, args, content = read_timeout(timeout)
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/method_framing.py", line 189, in read_method
raise m
RuntimeError: Second simultaneous read on fileno 4 detected. Unless you really know what you're doing, make sure that only one greenthread can read any particular socket. Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of greenlet.greenlet object at 0x7febb1ec8370>; THAT THREAD=FdListener('read', 4, <built-in method switch of greenlet.greenlet object at 0x7febb1ec8050>, <built-in method throw of greenlet.greenlet object at 0x7febb1ec8050>)
I did the monkey_patch dance.
import socketio
from flask import Flask, render_template
import eventlet
eventlet.monkey_patch()
``
from python-socketio.
Do you know how many active clients did you have when this happened?
from python-socketio.
I opened 1 browser window.
The problem might come from the changes to _listen. If I make the following changes:
def _listen(self):
with kombu.Connection('amqp://guest:guest@localhost:5672//') as conn:
with conn.SimpleQueue(self.queue) as queue:
while True:
message = queue.get(block=True)
message.ack()
yield message.payload
The problem goes aways.
from python-socketio.
Okay, so your solution is basically to read and write to the queue using different connections. Can't find any references in the documentation regarding the thread safety of a kombu connection, and none of the examples seem to have the producer and the consumer working concurrently on the same process.
Thanks for your analysis, I'll probably go about this with your solution.
from python-socketio.
@NotSureAbout Unfortunately your fix does not work for me. The error goes away, but messages that the server sends do not make it to the client because the _listen
function never wakes up to deliver them.
I'll continue investigating. I haven't really found any similar working examples, pretty much all Kombu examples I have found have the producer and the consumer in different processes, not sure how well tested is the multithreaded approach that I'm using.
from python-socketio.
@NotSureAbout Can I ask you to retest with the current master branch to confirm the problem is addressed for you?
from python-socketio.
Works for me.
from python-socketio.
I am using the latest release of the great flask-socketio library, along with eventlet. I came across an issue in my code that I can't quite figure out but it seems that it may be related to this issue. I have the following code
eventlet.monkey_patch()
app = Flask("test_app")
socketio = SocketIO(app, message_queue='amqp://')
@socketio.on('content update', namespace='/test')
def content_update(message):
pass
In a separate process (a celery task; celery is run with eventlet enabled) I call:
socketio.emit('content update', {'data': 'foo'}, namespace='/test')
This task gets called quite frequently and so it is not surprising that multiple threads ended up calling the above line simultaneously. When that happens I end up with the following stack trace:
Traceback (most recent call last):
File "proj/venv/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "proj/venv/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "proj/tasks.py", line 109, in submit_new_content
socketio.emit('content update', {'data': 'foo'}, namespace='/test')
File "proj/venv/lib/python2.7/site-packages/flask_socketio/__init__.py", line 285, in emit
callback=kwargs.get('callback'))
File "proj/venv/lib/python2.7/site-packages/socketio/server.py", line 181, in emit
self.manager.emit(event, data, namespace, room, skip_sid, callback)
File "proj/venv/lib/python2.7/site-packages/socketio/pubsub_manager.py", line 62, in emit
'skip_sid': skip_sid, 'callback': callback})
File "proj/venv/lib/python2.7/site-packages/socketio/kombu_manager.py", line 57, in _publish
with self.writer_conn.SimpleQueue(self.writer_queue) as queue:
File "proj/venv/lib/python2.7/site-packages/kombu/connection.py", line 678, in SimpleQueue
exchange_opts, **kwargs)
File "proj/venv/lib/python2.7/site-packages/kombu/simple.py", line 126, in __init__
consumer = messaging.Consumer(channel, queue)
File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 364, in __init__
self.revive(self.channel)
File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 376, in revive
self.declare()
File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 386, in declare
queue.declare()
File "proj/venv/lib/python2.7/site-packages/kombu/entity.py", line 521, in declare
self.exchange.declare(nowait)
File "proj/venv/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
nowait=nowait, passive=passive,
File "proj/venv/lib/python2.7/site-packages/amqp/channel.py", line 622, in exchange_declare
(40, 11), # Channel.exchange_declare_ok
File "proj/venv/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
self.channel_id, allowed_methods, timeout)
File "proj/venv/lib/python2.7/site-packages/amqp/connection.py", line 241, in _wait_method
channel, method_sig, args, content = read_timeout(timeout)
File "proj/venv/lib/python2.7/site-packages/amqp/connection.py", line 330, in read_timeout
return self.method_reader.read_method()
File "proj/venv/lib/python2.7/site-packages/amqp/method_framing.py", line 189, in read_method
raise m
RuntimeError: Second simultaneous read on fileno 18 detected. Unless you really know what you're doing, make sure that only one greenthread can read any particular socket. Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of greenlet.greenlet object at 0x10998d0f0>; THAT THREAD=FdListener('read', 18, <built-in method switch of greenlet.greenlet object at 0x10998d190>, <built-in method throw of greenlet.greenlet object at 0x10998d190>)
This error message seems fairly similar to the one listed by @NotSureAbout. I am very interested to know if you have any insight as to what I might be doing wrong. Thanks in advance.
from python-socketio.
As an update, I have tried two things.
- I protected the
socketio.emit
method with an eventlet Semaphore. This made the error go away, but made the behavior sequential. - I made the following changes in kombu_manager.py, emulating how the kombu connection is made in the
_listen
method. This worked completely as desired.
def _publish(self, data):
writer_conn = kombu.Connection(self.url)
writer_queue = self._queue(writer_conn)
with writer_conn.SimpleQueue(writer_queue) as queue:
queue.put(pickle.dumps(data))
from python-socketio.
@pmarti28 the error is the same, but your problem is different, I think.
In your case, there actually is a legit simultaneous access of a handle by two or more threads, because you are emitting with the same socketio
object from multiple green threads, and this object keeps in its state the file handle on which the writes are made.
Your solution removes the stored file handle, and creates a new one each time a write is made. Clearly this addresses the problem, but I'm concerned about the performance cost of constantly having to open new handles.
As a short term solution I think your change is fine, but I'm not sure how I feel about penalizing the majority of users who do not have a set up like yours.
An alternative solution that I have used, is to not run celery under eventlet. In my opinion there is no reason to switch Celery to eventlet, in general Celery works better with a pool of process workers of a fixed size. Running Celery w/o eventlet will address this problem, because each worker process will have a separate instance of socketio
. If you try this, in addition to disabling eventlet support in the Celery config, make sure you create the socketio
instance with async_mode='threading'
, which effectively disables eventlet in the socketio subsystem.
A long term solution for this problem is, I think, something that works similarly to context variables in Flask would make sense. That would allow a different file handle to be kept for each thread, all transparently done for you.
from python-socketio.
@miguelgrinberg thanks for the thorough explanation. I will look into switching Celery away from eventlet.
For the long term solution, I will come back here if I think of anything that might be useful.
from python-socketio.
Related Issues (20)
- Clients never receive room emit - Gunicorn+Uvicorn/FastAPI/Redis with preload_app=True, workers>1 HOT 4
- In Python 3.12, AttributeError: 'called_once_with' is not a valid assertion HOT 3
- May need to remove "pip install aioredis" from the documentation
- 5.10.0: pytest is failing HOT 4
- `max_http_buffer_size` value seem ignored by the server HOT 14
- Implementing `__enter__`/`__exit__` on `Client` like `SimpleClient` HOT 2
- Request: type hints (e.g. for mypy) HOT 3
- sio.disconnect() will connect after a connection drop HOT 21
- Update documentation of usage with Gunicorn + gevent/eventlet HOT 5
- Wildcard / catch-all namespace for event handlers HOT 1
- Heroku Problems with Gevent (Request Timeout + Polling Hanging)
- Concurrent broadcasting HOT 2
- All connections stop processing events at the same time
- Session not found when using sio.transport(sid) due to wrong session id HOT 1
- _ping background task not cancelled by async_server.shutdown() HOT 1
- Option to reconnect on initial connection failure
- Value error : Invalid aysnc_mode specified HOT 2
- Python socket ids for client and server dont match. HOT 6
- AsyncServer disconnect event not sent with `aiohttp>=3.9.0` HOT 2
- Test failure with .helpers import AsyncMock HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from python-socketio.