Comments (12)
Hi, and thank you for your contribution. I am moving and having no proper internet connection. I'll be back on you in a couple of days.
from stompest.
Hi, I am back. Do you have any code snippets you wish to share and discuss?
from stompest.
Sure. I hacked our code to make async.Stomp.onFrame
and async.Stomp.onMessage
issue synchronous notifications. This was sufficient for our needs, but I'm not sure if it's sufficient as a general solution. If you could advise me of a desirable approach to take to solve this issue, I'd be happy to submit a pull request.
For reference, here is the monkey patch we're currently using:
class Stomp(async.Stomp):
"""A class that overrides stompest's asynchronous dispatches when receiving
frames and messages. The stock implementation wraps notifications in tasks
and has the event loop handle the dispatch. However, when messages arrive
very fast, eventually we end up receiving messages faster than dispatching
the notifications for them, creating a backlog inside the Python process.
This causes memory consumption in the Python process to balloon.
Instead, we dispatch the notifications synchronously for frames and messages,
making sure we don't create StompFrames faster than we can process them.
"""
def _synchronous_notify(self, notify):
"""A synchronous version of the _notify method in async.Stompest.
"""
for listener in self._listeners:
notify(listener)
@defer.inlineCallbacks
def _onFrame(self, frame):
# Only implementation difference from superclass is the use of
# self._synchronous_notify rather than self._notify.
yield self._synchronous_notify(lambda l: l.onFrame(self, frame))
if not frame:
return
try:
handler = self._handlers[frame.command]
except KeyError:
raise StompFrameError('Unknown STOMP command: %s' % repr(frame))
yield handler(frame)
@defer.inlineCallbacks
def _onMessage(self, frame):
# Only implementation difference from superclass is the use of
# self._synchronous_notify rather than self._notify.
headers = frame.headers
messageId = headers[StompSpec.MESSAGE_ID_HEADER]
try:
token = self.session.message(frame)
except:
self.log.error('Ignoring message (no handler found): %s [%s]' % (messageId, frame.info()))
defer.returnValue(None)
context = self.session.subscription(token)
try:
yield self._synchronous_notify(lambda l: l.onMessage(self, frame, context))
except Exception as e:
self.log.error('Disconnecting (error in message handler): %s [%s]' % (messageId, frame.info()))
self.disconnect(failure=e)
from stompest.
Sorry for letting you down, but I haven't got much time at the moment to look into this.
The way to proceed is to let the test suite run and see whether your monkey patch passes it. I installed the latest ActiveMQ and it didn't pass. (There are two test cases which don't work with stompest as-is implementation, but those are related to heartbeating and therefore strongly dependent on the server side, but your patch causes a lot more tests to fail.)
So the next step would be to check whether the broken test cases are caused by incorrect test cases or incorrect implementation.
from stompest.
Yes, after running the test suite I saw the problem with what I was doing. I've solved this differently now, by avoiding the use of task.cooperate
in async.Stomp._notify
. Full details are are in the pull request.
Some tests were still failing for me, but they also fail without my changes. I was testing with ActiveMQ 5.10 and Twisted 14:
======================================================================
ERROR: test_connection_timeout_after_failover (stompest.async.tests.async_client_test.AsyncClientConnectTimeoutTestCase)
test_connection_timeout_after_failover
----------------------------------------------------------------------
DirtyReactorAggregateError: Reactor was unclean.
DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)
<DelayedCall 0x2913368 [59.9730100632s] called=0 cancelled=0 ThreadedResolver._cleanup('nosuchhost', <Deferred at 0x29136c8>)>
<DelayedCall 0x29799e0 [59.9976089001s] called=0 cancelled=0 ThreadedResolver._cleanup('nosuchhost', <Deferred at 0x2979710>)>
======================================================================
ERROR: test_not_connected (stompest.async.tests.async_client_test.AsyncClientConnectTimeoutTestCase)
test_not_connected
----------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 577, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 248, in _checkTimeout
cancelCall.cancel()
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 86, in cancel
raise error.AlreadyCancelled
AlreadyCancelled: Tried to cancel an already-cancelled event.
======================================================================
ERROR: test_not_connected (stompest.async.tests.async_client_test.AsyncClientConnectTimeoutTestCase)
test_not_connected
----------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 577, in _runCallbacks
current.result = callback(current.result, *args, **kw)
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 248, in _checkTimeout
cancelCall.cancel()
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 86, in cancel
raise error.AlreadyCancelled
AlreadyCancelled: Tried to cancel an already-cancelled event.
======================================================================
ERROR: test_disconnect_timeout (stompest.async.tests.async_client_test.AsyncClientDisconnectTimeoutTestCase)
test_disconnect_timeout
----------------------------------------------------------------------
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 1099, in _inlineCallbacks
result = g.send(result)
File "/home/vagrant/stompest/src/async/stompest/async/client.py", line 197, in disconnect
protocol = self._protocol
File "/home/vagrant/stompest/src/async/stompest/async/client.py", line 110, in _protocol
raise StompConnectionError('Not connected')
StompConnectionError: Not connected
----------------------------------------------------------------------
Ran 29 tests in 11.328s
from stompest.
I just built a new release 2.1.5 and pushed it to PyPI s.t. you can revert back to the standard package.
from stompest.
Thanks a lot!
from stompest.
Sorry, just noticed that I made a typo in my commit message headline which has now made it into the changelog. "Avoid unintended inflation of message handling time" reads better. (But no need to cut a separate release or anything.)
from stompest.
Ray, I understand that you and Peter McAlpine are using stompest at your employer. So does my employer, but I took over other responsibilities and no longer use stompest in production myself. My former team does, but its members are not exactly Twisted specialists. My new position still allows me to maintain stompest (I do not necessarily have to) but I no longer have strong incentives to implement new features, so it could surely profit from active contributions and some hardening of the test suite. Are you or Peter interested?
from stompest.
Jan, thanks for your offer. I will discuss it with my team and get back to you. To be completely honest with you, we also don't have a lot of spare cycles at the moment, and it's unlikely we'd do a better or faster job than you adding new features or extending the test suite.
We have noticed an issue with the slowness of the parser and have mitigated that by using pypy rather than CPython. This may not be sufficient for us in the medium term, and at that point we may look at contributing optimizations in that area. However, aside from that, we are fairly happy with stompest
as it stands.
from stompest.
Jan, I just wanted to circle back on this. I have talked this over with my team, and we don't feel that we would do a better or a more active job of maintaining stompest than you're currently doing. We appreciate all your great work, and we will continue to contribute to stompest as the need arises.
from stompest.
Ok, thanks anyway.
from stompest.
Related Issues (20)
- ipv6 support HOT 1
- stompest consumer is slow for activemq HOT 4
- How to config exclusive=true ? HOT 4
- Messages not recieving after queue is idle for some time HOT 1
- failover option 'startupMaxReconnectAttempts' default value should be -1
- Support for asyncio HOT 2
- Asynch examples on ActiveMQ - Artemis HOT 1
- support SSL client auth in stompest.async HOT 5
- Exception <class 'select.error'>: (4, 'Interrupted system call') after upgrading stompest HOT 4
- stompest.async will be unusable on python 3.7+ HOT 10
- receiveFrame does not accept a timeout parameter HOT 3
- Thread-unsafe issue if we set ack=True in SubscriptionListener HOT 3
- How to dynamically increase the number of consumers HOT 1
- Invalid SUBSCRIBE format HOT 2
- StompFrame and StompHeartBeat __str__ method returns bytes in Python 3
- failover doesn't seem to be working HOT 1
- how to get JMSXDeliveryCount in the stomp frame HOT 1
- When use version=1.2, the consumer stop to connect on topic
- collections.MutableMapping is not available in Python 3.10 HOT 7
- STOMP over websocket
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from stompest.