evilkost / brukva Goto Github PK
View Code? Open in Web Editor NEWThis project forked from kmerenkov/brukva
Asynchronous Redis client that works within Tornado IO loop.
License: Other
This project forked from kmerenkov/brukva
Asynchronous Redis client that works within Tornado IO loop.
License: Other
I think it stops working after this change: 6f5514c#L1R321
_sudden_disconnect is raising exception now but on_reconnect is usually called after _sudden_disconnect. Exception pops up and prevents reconnection. This is quite easy to reproduce if redis is executed with 'timeout 1' option.
The traceback (not so useful though):
ERROR:brukva.client:Socket closed on remote end
ERROR:root:Exception in callback <functools.partial object at 0x102966470>
Traceback (most recent call last):
File "/Users/kmike/svn/tornado/tornado/ioloop.py", line 345, in _run_callback
callback()
File "/Users/kmike/svn/tornado/tornado/stack_context.py", line 173, in wrapped
callback(*args, **kwargs)
File "/Users/kmike/svn/brukva/brukva/adisp.py", line 111, in _send_result
self.call(self.g.throw(result))
File "/Users/kmike/dev/planor/realtime/handlers/pubsub.py", line 27, in change_active_sessions_count
session_count = yield self.redis.async.hincrby('active_sessions', sessionid, diff)
ConnectionError: Socket closed on remote end
I'm not sure if this is an issue with brukva, or just something I don't understand about the best way to use it.
I have handler such as the following:
class MessageHandler(RequestHandler):
def __init__(self, *args, **kwargs):
super(MessageHandler, self).__init__(*args, **kwargs)
self.redis = brukva.Client()
self.redis.connect()
self.redis.subscribe('messages')
@tornado.web.asynchronous
@tornado.web.authenticated
def post(self):
self.redis.listen(self.on_new_message)
def on_new_message(self, result):
self.redis.unsubscribe('messages')
self.redis.disconnect()
self.finish(dict(
messages=tornado.escape.json_decode(result.body)
))
This works, but I see "ConnectionError('Tried to read from non-existent connection')" in the console - it seems to be happening after the on_new_message is done. If I take out the disconnect call it fixes it, but then all my connections stay open.
Do you know of any solution to this problem?
Thanks!
I'm trying to make long polling from asynchronous brukva pubsub
but got error this error in implement tornado chat example
class MessageNewHandler(BaseHandler):
@tornado.web.authenticated
def post(self):
message = {
"id": str(uuid.uuid4()),
"from": self.current_user["first_name"],
"body": self.get_argument("body"),
}
message["html"] = self.render_string("message.html", message=message)
if self.get_argument("next", None):
self.redirect(self.get_argument("next"))
else:
self.write(message)
c = brukva.Client()
c.connect()
c.publish('test_channel', message)
class MessageUpdatesHandler(BaseHandler):
@tornado.web.authenticated
@tornado.web.asynchronous
def post(self):
self.client = brukva.Client()
self.client.connect()
self.client.subscribe('test_channel')
self.client.listen(self.on_new_messages)
def on_new_messages(self, messages):
# Closed client connection
if self.request.connection.stream.closed():
return
self.finish(dict(messages=messages))
but i got these error
[E 110607 02:44:39 web:900] Uncaught exception POST /a/message/updates (127.0.0.1)
.
.
.
File "/usr/lib/python2.7/json/encoder.py", line 178, in default
raise TypeError(repr(o) + " is not JSON serializable")
TypeError: <brukva.client.Message object at 0x9fafe0c> is not JSON serializable
When callback throws an exception (like HTTPError
) this callback's called again by brükva, with the exception as argument.
A simple example (with a kludgy workaround) which reproduces the problem:
#!/usr/bin/env python
import tornado.ioloop
import tornado.web
import brukva
rc = brukva.Client()
rc.connect()
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
rc.get("no-such-key", self._get_1)
def _get_1(self, data):
# BUG: The following condition feels wrong
if isinstance(data, tornado.web.HTTPError):
print("BUG: We're being called with self-thrown exception")
raise data
if data is None:
raise tornado.web.HTTPError(404)
self.write(data)
self.finish()
application = tornado.web.Application([
(r"/", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
The problem is, brükva catches exception, thinks it happened in its own internals and sends it to callbacks at forward_error:29
.
Tested with current HEAD (57f8103).
When I close the WebSocket my python program prints out the following error:
ERROR:brukva.client:Connection lost
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/brukva/client.py", line 926, in listen
data = yield async(self.connection.readline)()
GeneratorExit: Connection lost
Exception AttributeError: "'ConnectionError' object has no attribute 'body'" in <generator object listen at 0x90a798c> ignored
Although the connection to redis gets closed I still receive that error.
Here a snippet of the code I use:
import tornado.httpserver
import tornado.web
import tornado.websocket
import tornado.ioloop
import brukva
c = brukva.Client()
c.connect()
class WSHandler(tornado.websocket.WebSocketHandler):
def __init__(self, *args, **kwargs):
super(WSHandler, self).__init__(*args, **kwargs)
self.client = brukva.Client()
self.client.connect()
self.client.subscribe('measurements')
def open(self):
self.client.listen(self.on_message)
print('new connection')
def on_message(self, message):
self.write_message(str(message.body))
print('message received %s' %message.body)
def on_close(self):
self.client.unsubscribe('measurements')
self.client.disconnect()
print('connection closed')
File "my_module", line 4, in
import brukva
File ".../env/lib/python3.6/site-packages/brukva/init.py", line 1, in
from brukva.client import Connection, Client
File ".../env/lib/python3.6/site-packages/brukva/client.py", line 158
except socket.error, e:
^
SyntaxError: invalid syntax
class MessageUpdatesHandler(BaseHandler):
@tornado.web.authenticated
@tornado.web.asynchronous
def post(self):
self.listing_id = self.get_argument("listing_id", None)
self.cursor = self.get_argument("cursor", None)
self.client = brukva.Client()
self.client.connect()
self.client.subscribe(self.listing_id)
self.client.listen(self.on_new_messages)
def on_new_messages(self, messages):
# Closed client connection
if self.request.connection.stream.closed():
return
msg = json.loads(messages.body,'ascii')
self.finish(dict(messages=[msg]))
self.client.unsubscribe(self.listing_id)
def on_connection_close(self):
# unsubscribe user from channel
self.client.unsubscribe(self.listing_id)
self.client.disconnect()
When I long poll using above code and check redis activity seems like when ever client send or posts something it creates new connection, so total connected client increasing on top of each other like 2 client talking and send 10 messages total connected client show 15 or sth like that. Is this bug or should I use different approach while connecting ?
I see that if the IP is not configured properly, brukva client waits in an endless loop.
What do you think about extracting tornado-specific adisp library from brukva and releasing it under e.g. 'adisp-tornado' name?
Please add examples, Its takes few hours to understand that module works:
On Windows:
c = brukva.Client(host = '127.0.0.1')
Example:
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options
define("port", default=8888, help="run on the given port", type=int)
import logging
import brukva
from brukva import adisp
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('app')
c = brukva.Client()
c.connect()
class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
@adisp.process
def get(self):
yield c.async.set('hello', 'Hello, world')
foo = yield c.async.get('hello')
self.write(foo)
self.finish()
def main():
tornado.options.parse_command_line()
application = tornado.web.Application([
(r"/", MainHandler),
])
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()
if name == "main":
main()
When user subscribe to channel and do nothing unsubscribe and disconnect this is the exception I got. Its sometimes appear when some messae passed between publisher and subscribers
[E 111217 01:07:03 client:49] None
Traceback (most recent call last):
File "/usr/local/lib/python2.6/dist-packages/brukva-0.0.1-py2.6.egg/brukva/client.py", line 926, in listen
data = yield async(self.connection.readline)()
GeneratorExit
I'm having trouble with an intermittent timeout between redis and brukva. I'm setting a key,value record and assigning a callback which is never called.
TCPDump shows redis receiving the SET message with the appropriate payload but the callback is never executed leaving requests to time out.
Restarting the tornado instances that use brukva does not solve the issue, however restarting both restarting redis-server and tornado in that order solves the problem.
I'm using redis version 2.4.4 on Debian Squeeze. I'm using tornado 2.1.1 and brukva 0.0.1
Any help would be appreciated as this bug is killing my app on a regular basis.
I got this problem again.
Stack trace from ipdb's 'bt' command:
/Users/kmike/svn/tornadio/tornadio/server.py(71)__init__()
67 except Exception, ex:
68 logging.error('Failed to start Flash policy server: %s', ex)
69
70 logging.info('Entering IOLoop...')
---> 71 io_loop.start()
/Users/kmike/svn/tornado/tornado/ioloop.py(270)start()
268 fd, events = self._events.popitem()
269 try:
--> 270 self._handlers[fd](fd, events)
271 except (KeyboardInterrupt, SystemExit):
272 raise
/Users/kmike/svn/tornado/tornado/stack_context.py(173)wrapped()
171 callback(*args, **kwargs)
172 else:
--> 173 callback(*args, **kwargs)
174 if getattr(fn, 'stack_context_wrapped', False):
175 return fn
/Users/kmike/svn/tornado/tornado/iostream.py(199)_handle_events()
197 try:
198 if events & self.io_loop.READ:
--> 199 self._handle_read()
200 if not self.socket:
201 return
/Users/kmike/svn/tornado/tornado/iostream.py(251)_handle_read()
249 # can't see it; the only way to find out if it's there is to
250 # try to read it.
--> 251 result = self._read_to_buffer()
252 except Exception:
253 self.close()
/Users/kmike/svn/tornado/tornado/iostream.py(287)_read_to_buffer()
285 """
286 try:
--> 287 chunk = self._read_from_socket()
288 except socket.error, e:
289 # ssl.SSLError is a subclass of socket.error
/Users/kmike/svn/tornado/tornado/iostream.py(275)_read_from_socket()
273 raise
274 if not chunk:
--> 275 self.close()
276 return None
277 return chunk
/Users/kmike/svn/tornado/tornado/iostream.py(180)close()
178 self.socket = None
179 if self._close_callback:
--> 180 self._run_callback(self._close_callback)
181
182 def reading(self):
/Users/kmike/svn/tornado/tornado/iostream.py(230)_run_callback()
228 # inside our blanket exception handler rather than outside.
229 with stack_context.NullContext():
--> 230 callback(*args, **kwargs)
231 except:
232 logging.error("Uncaught exception, closing connection.",
/Users/kmike/svn/tornado/tornado/stack_context.py(173)wrapped()
171 callback(*args, **kwargs)
172 else:
--> 173 callback(*args, **kwargs)
174 if getattr(fn, 'stack_context_wrapped', False):
175 return fn
/Users/kmike/svn/tornado/tornado/websocket.py(204)on_connection_close()
202 def on_connection_close(self):
203 self.client_terminated = True
--> 204 self.on_close()
205
206 def _not_supported(self, *args, **kwargs):
/Users/kmike/svn/tornadio/tornadio/persistent.py(84)on_close()
82 def on_close(self):
83 try:
---> 84 self.connection.on_close()
85 finally:
86 self.connection.is_closed = True
/Users/kmike/dev/planor/realtime/connection.py(73)on_close()
71 def on_close(self):
72 if self.authenticated:
---> 73 self.logout()
74
75 def logout(self):
/Users/kmike/dev/planor/realtime/connection.py(77)logout()
75 def logout(self):
76 self.authenticated = False
---> 77 self.pubsub_client.disconnect()
78 for handler in self.handlers:
79 handler.on_logout()
/Users/kmike/svn/brukva/brukva/client.py(334)disconnect()
332
333 def disconnect(self):
--> 334 self.connection.disconnect()
335
336 def on_connect(self):
/Users/kmike/svn/brukva/brukva/client.py(141)disconnect()
139 except socket.error, e:
140 pass
--> 141 self._stream = None
142
143 def write(self, data, try_left=None):
/Users/kmike/svn/brukva/brukva/client.py(873)listen()
871 raise response
872 result = self.format_reply(cmd_listen, response)
--> 873 ctx.ret_call(result)
874
875 ### CAS
/Users/kmike/svn/brukva/brukva/client.py(39)__exit__()
37
38 if self.is_active:
---> 39 self._call_callbacks(value)
40 return True
41 else:
/Users/kmike/svn/brukva/brukva/client.py(29)_call_callbacks()
27 cb(value)
28 else:
---> 29 self.callbacks(value)
30
31 def __enter__(self):
/Users/kmike/dev/planor/realtime/connection.py(83)on_pubsub_message()
79 handler.on_logout()
80
81 def on_pubsub_message(self, message):
82 for handler in self.handlers:
---> 83 handler.on_pubsub_message(message)
None
> /Users/kmike/dev/planor/realtime/handlers/debug.py(14)on_pubsub_message()
12 logging.debug(message)
13 if message is None:
---> 14 import ipdb; ipdb.set_trace()
15 #logging.debug('channel: %s, body: %s' % (message.channel, message.body))
16
The fix that works for me is at #1
they are using old error handling method
Pretty simple to reproduce:
$ pip install brukva
Collecting brukva
Could not find a version that satisfies the requirement brukva (from versions: )
No matching distribution found for brukva
Note: This does not reproduce when running
git clone [email protected]:evilkost/brukva.git ./brukva; pip install ./brukva
or when running
git clone [email protected]:evilkost/brukva.git ./brukva; cd brukva; python setup.py install
The example code below should expose the issue. Subscribing twice on the same channel, prevents further new subscriptions on any channel.
Calling listen() only once doesn't seem to help.
import time
import random
from functools import partial
import brukva
import tornado
def start_data_producer():
now = time.time()
ioloop = tornado.ioloop.IOLoop.instance()
c = brukva.Client()
c.connect()
def pub(channel):
data = random.randint(0,10)
print "Publishing %s on %s" % (data, channel)
c.publish(channel, data)
tick = tornado.ioloop.PeriodicCallback(partial(pub, "test"), 1000)
tick.start()
tick2 = tornado.ioloop.PeriodicCallback(partial(pub, "test_2"), 1000)
tick2.start()
def run_test():
now = time.time()
ioloop = tornado.ioloop.IOLoop.instance()
c = brukva.Client()
c.connect()
def callback(result):
print "Received: %s" % result.body
def unsub(channel):
print "=== Unsubscribing: %s ===" % channel
c.unsubscribe(channel)
def sub(channel):
print "=== Suscribing: %s ===" % channel
c.subscribe(channel)
# Should listen() be called for every subscribe() call?
c.listen(callback)
ioloop.add_timeout(now, partial(sub, "test"))
# If we don't perform this second subscribe call then all is good.
ioloop.add_timeout(now + 5, partial(sub, "test"))
ioloop.add_timeout(now + 10, partial(unsub, "test"))
ioloop.add_timeout(now + 15, partial(sub, "test"))
ioloop.add_timeout(now + 20, partial(sub, "test_2"))
if __name__ == '__main__':
start_data_producer()
run_test()
tornado.ioloop.IOLoop.instance().start()
It's trivial yet it doesn't exist..
tornado.gen and adisp.process don't mix well together. Have there been any forks that prefer tornado.gen or is this a future goal for brukva?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.