Coder Social home page Coder Social logo

Comments (12)

miguelgrinberg avatar miguelgrinberg commented on May 22, 2024

Include the full stack trace of the error please, and make sure you have monkey patched the standard library.

from python-socketio.

NotSureAbout avatar NotSureAbout commented on May 22, 2024
Traceback (most recent call last):
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/eventlet/wsgi.py", line 454, in handle_one_response
    result = self.application(self.environ, start_response)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/middleware.py", line 47, in __call__
    return self.engineio_app.handle_request(environ, start_response)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 303, in handle_request
    return self.eio.handle_request(environ, start_response)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/server.py", line 241, in handle_request
    socket.handle_post_request(environ)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/socket.py", line 92, in handle_post_request
    self.receive(pkt)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/socket.py", line 45, in receive
    self.server._trigger_event('message', self.sid, pkt.data)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/engineio/server.py", line 327, in _trigger_event
    return self.handlers[event](*args)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 420, in _handle_eio_message
    self._handle_connect(sid, pkt.namespace)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 334, in _handle_connect
    self.environ[sid]) is False:
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 392, in _trigger_event
    return self.handlers[namespace][event](*args)
  File "labcoreapp.py", line 68, in test_connect
    namespace='/test')
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/server.py", line 181, in emit
    self.manager.emit(event, data, namespace, room, skip_sid, callback)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/pubsub_manager.py", line 62, in emit
    'skip_sid': skip_sid, 'callback': callback})
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/socketio/kombu_manager.py", line 53, in _publish
    with self.kombu.SimpleQueue(self.queue) as queue:
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/connection.py", line 677, in SimpleQueue
    exchange_opts, **kwargs)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/simple.py", line 125, in __init__
    compression=compression)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 85, in __init__
    self.revive(self._channel)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 222, in revive
    self.declare()
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/messaging.py", line 105, in declare
    self.exchange.declare()
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
    nowait=nowait, passive=passive,
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/channel.py", line 622, in exchange_declare
    (40, 11),  # Channel.exchange_declare_ok
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods, timeout)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/connection.py", line 241, in _wait_method
    channel, method_sig, args, content = read_timeout(timeout)
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/connection.py", line 330, in read_timeout
    return self.method_reader.read_method()
  File "/home/oliver/.virtualenvs/labcore3-api/local/lib/python2.7/site-packages/amqp/method_framing.py", line 189, in read_method
    raise m
RuntimeError: Second simultaneous read on fileno 4 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of greenlet.greenlet object at 0x7febb1ec8370>; THAT THREAD=FdListener('read', 4, <built-in method switch of greenlet.greenlet object at 0x7febb1ec8050>, <built-in method throw of greenlet.greenlet object at 0x7febb1ec8050>)

I did the monkey_patch dance.

import socketio
from flask import Flask, render_template
import eventlet
eventlet.monkey_patch()
``

from python-socketio.

miguelgrinberg avatar miguelgrinberg commented on May 22, 2024

Do you know how many active clients did you have when this happened?

from python-socketio.

NotSureAbout avatar NotSureAbout commented on May 22, 2024

I opened 1 browser window.

The problem might come from the changes to _listen. If I make the following changes:

    def _listen(self):
        with kombu.Connection('amqp://guest:guest@localhost:5672//') as conn:
            with conn.SimpleQueue(self.queue) as queue:
                while True:
                    message = queue.get(block=True)
                    message.ack()
                    yield message.payload

The problem goes aways.

from python-socketio.

miguelgrinberg avatar miguelgrinberg commented on May 22, 2024

Okay, so your solution is basically to read and write to the queue using different connections. Can't find any references in the documentation regarding the thread safety of a kombu connection, and none of the examples seem to have the producer and the consumer working concurrently on the same process.

Thanks for your analysis, I'll probably go about this with your solution.

from python-socketio.

miguelgrinberg avatar miguelgrinberg commented on May 22, 2024

@NotSureAbout Unfortunately your fix does not work for me. The error goes away, but messages that the server sends do not make it to the client because the _listen function never wakes up to deliver them.

I'll continue investigating. I haven't really found any similar working examples, pretty much all Kombu examples I have found have the producer and the consumer in different processes, not sure how well tested is the multithreaded approach that I'm using.

from python-socketio.

miguelgrinberg avatar miguelgrinberg commented on May 22, 2024

@NotSureAbout Can I ask you to retest with the current master branch to confirm the problem is addressed for you?

from python-socketio.

NotSureAbout avatar NotSureAbout commented on May 22, 2024

Works for me.

from python-socketio.

phillip-martin avatar phillip-martin commented on May 22, 2024

I am using the latest release of the great flask-socketio library, along with eventlet. I came across an issue in my code that I can't quite figure out but it seems that it may be related to this issue. I have the following code

eventlet.monkey_patch()
app = Flask("test_app")
socketio = SocketIO(app, message_queue='amqp://')

@socketio.on('content update', namespace='/test')
def content_update(message):
    pass

In a separate process (a celery task; celery is run with eventlet enabled) I call:

socketio.emit('content update', {'data': 'foo'}, namespace='/test')

This task gets called quite frequently and so it is not surprising that multiple threads ended up calling the above line simultaneously. When that happens I end up with the following stack trace:

Traceback (most recent call last):
  File "proj/venv/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "proj/venv/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
    return self.run(*args, **kwargs)
  File "proj/tasks.py", line 109, in submit_new_content
    socketio.emit('content update', {'data': 'foo'}, namespace='/test')
  File "proj/venv/lib/python2.7/site-packages/flask_socketio/__init__.py", line 285, in emit
    callback=kwargs.get('callback'))
  File "proj/venv/lib/python2.7/site-packages/socketio/server.py", line 181, in emit
    self.manager.emit(event, data, namespace, room, skip_sid, callback)
  File "proj/venv/lib/python2.7/site-packages/socketio/pubsub_manager.py", line 62, in emit
    'skip_sid': skip_sid, 'callback': callback})
  File "proj/venv/lib/python2.7/site-packages/socketio/kombu_manager.py", line 57, in _publish
    with self.writer_conn.SimpleQueue(self.writer_queue) as queue:
  File "proj/venv/lib/python2.7/site-packages/kombu/connection.py", line 678, in SimpleQueue
    exchange_opts, **kwargs)
  File "proj/venv/lib/python2.7/site-packages/kombu/simple.py", line 126, in __init__
    consumer = messaging.Consumer(channel, queue)
  File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 364, in __init__
    self.revive(self.channel)
  File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 376, in revive
    self.declare()
  File "proj/venv/lib/python2.7/site-packages/kombu/messaging.py", line 386, in declare
    queue.declare()
  File "proj/venv/lib/python2.7/site-packages/kombu/entity.py", line 521, in declare
    self.exchange.declare(nowait)
  File "proj/venv/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare
    nowait=nowait, passive=passive,
  File "proj/venv/lib/python2.7/site-packages/amqp/channel.py", line 622, in exchange_declare
    (40, 11),  # Channel.exchange_declare_ok
  File "proj/venv/lib/python2.7/site-packages/amqp/abstract_channel.py", line 67, in wait
    self.channel_id, allowed_methods, timeout)
  File "proj/venv/lib/python2.7/site-packages/amqp/connection.py", line 241, in _wait_method
    channel, method_sig, args, content = read_timeout(timeout)
  File "proj/venv/lib/python2.7/site-packages/amqp/connection.py", line 330, in read_timeout
    return self.method_reader.read_method()
  File "proj/venv/lib/python2.7/site-packages/amqp/method_framing.py", line 189, in read_method
    raise m
RuntimeError: Second simultaneous read on fileno 18 detected.  Unless you really know what you're doing, make sure that only one greenthread can read any particular socket.  Consider using a pools.Pool. If you do know what you're doing and want to disable this error, call eventlet.debug.hub_prevent_multiple_readers(False) - MY THREAD=<built-in method switch of greenlet.greenlet object at 0x10998d0f0>; THAT THREAD=FdListener('read', 18, <built-in method switch of greenlet.greenlet object at 0x10998d190>, <built-in method throw of greenlet.greenlet object at 0x10998d190>)

This error message seems fairly similar to the one listed by @NotSureAbout. I am very interested to know if you have any insight as to what I might be doing wrong. Thanks in advance.

from python-socketio.

phillip-martin avatar phillip-martin commented on May 22, 2024

As an update, I have tried two things.

  1. I protected the socketio.emit method with an eventlet Semaphore. This made the error go away, but made the behavior sequential.
  2. I made the following changes in kombu_manager.py, emulating how the kombu connection is made in the _listen method. This worked completely as desired.
    def _publish(self, data):
        writer_conn = kombu.Connection(self.url)
        writer_queue = self._queue(writer_conn)
        with writer_conn.SimpleQueue(writer_queue) as queue:
            queue.put(pickle.dumps(data))

from python-socketio.

miguelgrinberg avatar miguelgrinberg commented on May 22, 2024

@pmarti28 the error is the same, but your problem is different, I think.

In your case, there actually is a legit simultaneous access of a handle by two or more threads, because you are emitting with the same socketio object from multiple green threads, and this object keeps in its state the file handle on which the writes are made.

Your solution removes the stored file handle, and creates a new one each time a write is made. Clearly this addresses the problem, but I'm concerned about the performance cost of constantly having to open new handles.

As a short term solution I think your change is fine, but I'm not sure how I feel about penalizing the majority of users who do not have a set up like yours.

An alternative solution that I have used, is to not run celery under eventlet. In my opinion there is no reason to switch Celery to eventlet, in general Celery works better with a pool of process workers of a fixed size. Running Celery w/o eventlet will address this problem, because each worker process will have a separate instance of socketio. If you try this, in addition to disabling eventlet support in the Celery config, make sure you create the socketio instance with async_mode='threading', which effectively disables eventlet in the socketio subsystem.

A long term solution for this problem is, I think, something that works similarly to context variables in Flask would make sense. That would allow a different file handle to be kept for each thread, all transparently done for you.

from python-socketio.

phillip-martin avatar phillip-martin commented on May 22, 2024

@miguelgrinberg thanks for the thorough explanation. I will look into switching Celery away from eventlet.

For the long term solution, I will come back here if I think of anything that might be useful.

from python-socketio.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.