Coder Social home page Coder Social logo

tornado-redis's Introduction

Tornado-Redis

Build Status

THIS PROJECT IS NOT MAINTAINED ANYMORE.

HERE ARE SOME ALTERNATIVES:

Asynchronous Redis client for the Tornado Web Server.

This is a fork of brükva redis client modified to be used via Tornado's native 'tornado.gen' interface instead of 'adisp' call dispatcher.

Tornado-Redis is licensed under the Apache Licence, Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0.html).

Tornado-Redis vs Redis-py

I recommend using both tornado-redis and redis-py clients for your Tornado Web Application. Use tornado-redis to subscribe to Pub/Sub notifications and for blocking commands such as BLPOP, BRPOP, BRPOPLPUSH. You may safely use redis-py for most of other cases.

I suggest NOT to use connection pools with redis-py client. In most cases you may use a single 'global' instance of Redis object wherever you'll need it.

Note, that Tornado-redis is far less efficient than redis-py client in handling MGET/MSET requests and working with Pipelines. I suggest using the redis-py client and hiredis transport to get maximum performance on these commands.

Please check my answer on StackOverflow on querying Redis server from Tornado application for some additional details.

Installation

You may install the tornado-redis client library using pip or easy_install tools:

pip install tornado-redis

or

easy_install install tornado-redis

To build and install the tornado-redis client library from source, clone the git://github.com/leporo/tornado-redis.git repository or download the archive from the download page and extract it into the separate directory. Then execute the following commands in the source directory:

python setup.py build
python setup.py install

Usage

import tornadoredis
import tornado.web
import tornado.gen

...

c = tornadoredis.Client()
c.connect()

...

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        foo = yield tornado.gen.Task(c.get, 'foo')
        bar = yield tornado.gen.Task(c.get, 'bar')
        zar = yield tornado.gen.Task(c.get, 'zar')
        self.set_header('Content-Type', 'text/html')
        self.render("template.html", title="Simple demo", foo=foo, bar=bar, zar=zar)

Pub/Sub

Tornado-redis comes with helper classes to simplify Pub/Sub implementation. These helper classes introduced in tornadoredis.pubsub module use a single redis server connection to handle multiple client and channel subscriptions.

Here is a sample SockJSConnection handler code:

# Use the synchronous redis client to publish messages to a channel
redis_client = redis.Redis()
# Create the tornadoredis.Client instance
# and use it for redis channel subscriptions
subscriber = tornadoredis.pubsub.SockJSSubscriber(tornadoredis.Client())

class SubscriptionHandler(sockjs.tornado.SockJSConnection):
    """
    SockJS connection handler.

    Note that there are no "on message" handlers - SockJSSubscriber class
    calls SockJSConnection.broadcast method to transfer messages
    to subscribed clients.
    """
    def __init__(self, *args, **kwargs):
        super(MessageHandler, self).__init__(*args, **kwargs)
        subscriber.subscribe('test_channel', self)

    def on_close(self):
        subscriber.unsubscribe('test_channel', self)

See the sockjs application in the demo folder and tornadoredis.pubsub module for more implementation details.

Using Pipelines

Pipelines correspond to the Redis transaction feature.

Here is a simple example of pipeline feature usage:

client = Client()
# Create a 'Pipeline' to pack a bunldle of Redis commands
# and send them to a Redis server in a single request
pipe = client.pipeline()
# Add commands to a bundle
pipe.hset('foo', 'bar', 1)
pipe.expire('foo', 60)
# Send them to the Redis server and retrieve execution results
res_hset, res_expire = yield gen.Task(pipe.execute)

Note that nothing is being sent to the Redis server until the pipe.execute method call so there is no need to wrap a pipe.hset and pipe.expire calls with the yield gen.Task(...) statement.

Using Locks

A Lock is a synchronization mechanism for the enforcing of limits on access to a resource in an environment where there are many threads of execution. In this case, the Lock uses the Redis server as state store, thus it can be used by multiple processes/computers to allow distributed coordination.

Here is a simple example of how to use a lock:

client = Client()

# Create a 'Lock' object, with a maximum lock time of 10 seconds, and a polling interval
# of 100ms
my_lock = client.lock("testLock", lock_ttl=10, polling_interval=0.1)

# Try and acquire the lock, blocking until it is acquired or an error has occured.
# This does not block the IOLoop in any way.
result = yield gen.Task(my_lock.acquire, blocking=True)

# Create another, identical lock and try and acquire it
his_lock = client.lock("testLock", lock_ttl=10, polling_interval=0.1)
# We will fail, as the lock is already acquired by my_lock. Result will be False
result = yield gen.Task(his_lock.acquire, blocking=False)

# Schedule my_lock to be released in 5 seconds
client._io_loop.add_timeout(client._io_loop.time() + 5, my_lock.release)

# Meanwhile, attempt to get the lock again. This time block until we succeed
# In 5 seconds, my_lock will be released and his_lock will be acquired instead
result = yield gen.Task(his_lock.acquire, blocking=True)

# Release the lock. Even if we don't, it will be timed out in 10 seconds 
# (because of the lock_ttl setting)
yield gen.Task(his_lock.release)

In this case we chose to run both locks in the same process - generally, we prefer to use Redis locks only if we need to synchronize several processes on different machines.

Connection Pool Support

To limit a number of redis server connections opened by an application and reuse them the tornado-redis library has the connection pooling support. To activate it, create the ConnectionPool object instance and pass it as connection_pool argument to the Client object:

CONNECTION_POOL = tornadoredis.ConnectionPool(max_connections=500,
                                              wait_for_available=True)
# ...
class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        c = tornadoredis.Client(connection_pool=CONNECTION_POOL)
        info = yield tornado.gen.Task(c.info)
        ....
        # Release the connection to be reused by connection pool.
        yield tornado.gen.Task(c.disconnect)
        self.render(....)

Note that you have to add a disconnect method call at the end of the code block using the Client instance to release the pooled connection (it's to be fixed it future library releases).

See the connection pool demo for an example of the 'connection pool' feature usage.

Note that connection pooling feature has multiple drawbacks and may affect your application performance. See the "Tornado-Redis vs Redis-py" note for more details.

Please consider using Pub/Sub helper classes implemented in tornadoredis.pubsub module or using a similar approach instead of connection polling for Pub/Sub operations.

Demos

Check the Demos folder for tornado-redis usage examples.

Here is the list of demo applications available from this repository:

simple - a very basic example of tornado-redis client usage

connection_pool - a 'connection pool' feature demo

websockets - a demo web chat application using WebSockets and Redis' PubSub feature.

sockjs - the same-looking demo web chat application but using the SockJS transport to support and using the SockJSSubscriber helper class to share a single redis server connection between subscribed clients.

Running Tests

The redis server must be started on the default (:6379) port.

Use the following command to run the test suite:

python -m tornado.testing tornadoredis.tests

Or use the tox to test how it works in different environments:

tox

or

tox -e pypy

Credits and Contributors

The brükva project has been started by Konstantin Merenkov but seem to be not maintained any more.

tornado-redis was inspired and based on the work of Andy McCurdy and redis-py contributors.

evilkost

Matt Dawson

maeldur

Olivier Yiptong

Juarez Bochi

Jakub Roztocil

nizox

Lessandro Mariano

Kwok-kuen Cheung

Ofir Herzas

Alon Diamant

Jonas Hagstedt

The Tornado-Redis project's source code and 'tornado-redis' PyPI package are maintained by Vlad Glushchuk.

Tornado is an open source version of the scalable, non-blocking web server and tools that power FriendFeed. Documentation and downloads are available at http://www.tornadoweb.org/

tornado-redis's People

Contributors

advance512 avatar ashwinrajeev avatar bufferx avatar cheungpat avatar clement avatar evilkost avatar jbochi avatar jettify avatar jkbrzt avatar kmerenkov avatar kmike avatar leporo avatar lessandro avatar lispython avatar maeldur avatar mattd avatar murph avatar njo avatar oyiptong avatar polyn0m avatar sharik avatar sinamt avatar taylorbarstow avatar tnm avatar whardier avatar yukirin avatar zguangyu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tornado-redis's Issues

PyPy: "ReferenceError: weakly-referenced object no longer exists"

We use the following sample code and JMeter with 100 concurrent connections (non-stop)

import tornado.ioloop
import tornado.web
import tornadoredis
import tornado.gen

pool = tornadoredis.ConnectionPool(host='192.168.68.234', port=6379, max_connections=10, wait_for_available=True)

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        c = tornadoredis.Client(connection_pool=pool)
        foo = yield tornado.gen.Task(c.get, 'myKey')
        c.disconnect()
        self.finish(str(foo))

application = tornado.web.Application([
    (r"/", MainHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

But we got errors:

ERROR:tornado.application:Uncaught exception GET / (192.168.68.22)
HTTPRequest(protocol='http', host='192.168.68.235:8888', method='GET', uri='/', version='HTTP/1.1', remote_ip='192.168.68.22', headers={'Connection': 'close', 'Host': '192.168.68.235:8888', 'Authorization': 'Bearer andre', 'User-Agent': 'Apache-HttpClient/4.2.1 (java 1.5)'})
Traceback (most recent call last):
  File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/web.py", line 1115, in _stack_context_handle_exception raise_exc_info((type, value, traceback))
  File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/stack_context.py", line 302, in wrapped ret = fn(*args, **kwargs)
  File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/gen.py", line 550, in inner self.set_result(key, result)
  File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/gen.py", line 476, in set_result self.run()
  File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornado/gen.py", line 507, in run yielded = self.gen.send(next)
  File "/opt/andre/tor-redis-test/lib/python2.7/site-packages/tornadoredis/client.py", line 323, in disconnect proxy = pool.make_proxy(client_proxy=self._weak,
ReferenceError: weakly-referenced object no longer exists

messages are not sent if page is reloaded.

Running the demos/sockjs/app.py, works fine when the chat window is loaded first but if the page is reloaded, the chat page fails to send msg.

The logs states that, the class SockJSSubscriber(BaseSubscriber) didn't get an unsubscribe message for the broadcast_channel but for the private channel.

logs:
Class SockJSSubscriber(BaseSubscriber)
on_message:

First time when started:

Message(kind=u'subscribe', channel=u'broadcast_channel', body=1, pattern=u'broadcast_channel')
Message(kind=u'subscribe', channel=u'private.10ba6', body=2, pattern=u'private.10ba6')

After refresh:

Message(kind=u'unsubscribe', channel=u'private.10ba6', body=1, pattern=u'private.10ba6')
Message(kind=u'unsubscribe', channel=u'broadcast_channel', body=0, pattern=u'broadcast_channel')

and when one more tab is opened:
Message(kind=u'subscribe', channel=u'broadcast_channel', body=1, pattern=u'broadcast_channel')

Message(kind=u'subscribe', channel=u'private.301ee', body=2, pattern=u'private.301ee')

Message(kind=u'subscribe', channel=u'private.4c759', body=3, pattern=u'private.4c759')


So after opening the second tab, chat works fine, but it should also work in single tab .

'Subscribe' method never calls a callback if client uses a connection pool

This application never print 'subscribed'.

Here is the main application code:

import json
import os.path

import tornado.httpserver
import tornado.gen
import tornado.ioloop
import sockjs.tornado
import tornado.options

import tornadoredis

c = tornadoredis.Client()
c.connect()
pool = tornadoredis.ConnectionPool(max_connections=1, wait_for_available=True)


class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("template.html", title="PubSub + WebSocket Demo")


class NewMessageHandler(tornado.web.RequestHandler):
    def post(self):
        message = self.get_argument('message')
        print('Publishing a message "%s"' % message)
        c.publish('test_channel', message)
        self.set_header('Content-Type', 'text/plain')


class RealtimeHandler(sockjs.tornado.SockJSConnection):
    def __init__(self, session):
        super(RealtimeHandler, self).__init__(session)
        self.listen()

    @tornado.gen.engine
    def listen(self):
        self.client = tornadoredis.Client(connection_pool=pool)
        self.client.connect()
        print('subscribing')
        yield tornado.gen.Task(self.client.subscribe, 'invalidation')
        print('subscribed')

    def on_open(self, request):
        print("User connected to realtime socket")
        self.client.listen(self.on_message)

    def on_event(self, name, *args, **kwargs):
        print('on_event', name)
        if name == 'invalidation':
            self.client.publish('invalidation', json.dumps(kwargs['args']))

    def on_message(self, msg):
        print('on_message', msg)
        if msg.kind == 'message':
            message = [json.loads(msg.body)]
            self.send(message)
        elif msg.kind == 'disconnect':
            self.close()
        elif msg.kind == 'subscribe':
            pass

    def on_close(self):
        print("User disconnected from realtime socket")
        self.client.unsubscribe('invalidation')
        self.client.disconnect()


def rel(path):
    return os.path.join(os.path.dirname(os.path.abspath(__file__)), path)


application = tornado.web.Application(
    [(r'/', MainHandler),
     (r'/msg', NewMessageHandler),
     (r"/static/(.*)", tornado.web.StaticFileHandler,
      {"path": rel("static")})] +
     sockjs.tornado.SockJSRouter(RealtimeHandler, '/sock').urls)

if __name__ == '__main__':
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print('Demo is runing at 0.0.0.0:8888\n'
          'Quit the demo with CONTROL-C')
    tornado.ioloop.IOLoop.instance().start()

template.html:

<!doctype html>
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> 
        <title>{{ title }}</title>
        <script src="http://cdn.sockjs.org/sockjs-0.3.min.js"></script>
        <script>
            function open_websocket(){

                function show_message(message){
                    var el = document.createElement('div');
                    el.innerHTML = message;
                    document.body.appendChild(el);
                }

                var sock = new SockJS('http://127.0.0.1:8888/sock');
                sock.onopen = function() {
                    show_message('Connected.');
                };
                sock.onmessage = function(e) {
                    show_message(event.data);
                };
                sock.onclose = function() {
                    show_message("Closed.");
                };
            }
        </script>
    </head>
    <body onload="open_websocket()">
        <h1>{{ title }}</h1>
        <p>Enter your message and press the 'Send' button. You may open another browser window and send messages from here.</p>
        <form method="POST" action="/msg" target="_hidden">
            <input name="message" style="width: 500px;border: 1px solid #CCC;padding: 3px;" value="" placeholder="Enter your message here and press the 'Send' button."/>
            <input type="submit" value="Post"/>
        </form>
        <iframe id="_hidden" style="display:none"></iframe>
        <h2>Messages</h2>
        <hr/>
    </body>
</html>

Error when subscribing to multiple channels

The following error is raised on the second subscription to multiple channels (works first time on a connection, fails on the second subscription on the same connection):
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/web.py", line 1115, in _stack_context_handle_exception
raise_exc_info((type, value, traceback))
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception
if tail.exit(_exc):
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit
return self.exception_handler(type, value, traceback)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(_exc_info)
File "/Users/adam/BUD/budlist/app/backend/handlers/realtime_updates/realtime_updates.py", line 23, in get
yield gen.Task(self.controller.subscribe_and_listen, app, user)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception
if tail.exit(_exc):
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit
return self.exception_handler(type, value, traceback)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(_exc_info)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 1147, in listen
response = yield gen.Task(response)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(_args, *_kwargs)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 550, in inner
self.set_result(key, result)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 476, in set_result
self.run()
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornado/gen.py", line 507, in run
yielded = self.gen.send(next)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 504, in consume_multibulk
token = self.process_data(data, cmd_line)
File "/Users/adam/.virtualenvs/BUD/lib/python2.7/site-packages/tornadoredis/client.py", line 491, in process_data
cmd_line)
ResponseError: ResponseError (on LISTEN [(), {}]): Unknown response type s

Here is some additional info (logging of "data" in process_data):
E 140815 15:50:48 realtime_updates:36] got a message: Message(kind=u'subscribe', channel=u'b', body=2, pattern=u'b')
[E 140815 15:51:08 client:468] *3
[E 140815 15:51:08 client:468] $9
[E 140815 15:51:08 client:468] $1
[E 140815 15:51:08 client:468] :2
[E 140815 15:51:08 realtime_updates:36] got a message: Message(kind=u'subscribe', channel=u'a', body=2, pattern=u'a')
[E 140815 15:51:08 client:468] *3
[E 140815 15:51:08 client:468] $9
[E 140815 15:51:08 client:468] subscribe

"SELECT" command in pipeline

I have tornado app with redis connect:

class Application(tornado.web.Application):
    def __init__(self):
        self.client = tornadoredis.Client(selected_db=self.config.get('redis', 'db'),
                                          password=self.config.get('redis', 'password'),)
        self.client.connect()

And i have request handler, that use pipeline:

pipe = self.application.client.pipeline()

pipe.smembers('%s_unmoderated' % self.current_user)
pipe.smembers('%s_moderated' % self.current_user)
pipe.smembers('%s_templates' % self.current_user)

print pipe.command_stack
res = [a for a in (yield tornado.gen.Task(pipe.execute))]
print pipe.command_stack

First print: [SMEMBERS(('test_unmoderated',),{'callback': None}), SMEMBERS(('test_moderated',),{'callback': None}), SMEMBERS(('test_templates',),{'callback': None})]

Second print: [SELECT(('4',),{'callback': <tornado.stack_context._StackContextWrapper object at 0x1eafaf8>})]

Thus, on second function call my res
contains 4 elements instead of 3.

Connection needs to be disconnected when remote redis exit unexpectedly

When remote redis goes down, client will receive a on_disconnect event, and throw a
ConnectionError("Socket closed on remote end")

but when redis goes up, and we try to redo our previous actions, we will be blocked at
client.py 1290: yield gen.Task(self.connection.wait_until_ready)
the callback will never be called, hence we can't automatically recover from failure cases.

I fix this issue by add two lines(321,322)
318 def on_disconnect(self):
319 if self.subscribed:
320 self.subscribed = set()
321 if self.connection:
322 self.connection.disconnect()
323 raise ConnectionError("Socket closed on remote end")

When we are disconnected from remote server, we need to disconnect locally, so we won't be blocked for the previous failed unfinished request.

Add a support for new simplified generator-based syntax

Return Future instances instead of calling callbacks directly.

Add the support for simplified syntax:

@gen.coroutine
def some_method(self):
    ...
    res = yield self.redis_client.get('mykey')
    ...

instead of

@gen.engine
def some_method(self):
    ...
    res = yield gen.Task(self.redis_client.get, 'mykey')
    ...

Benchmark the performance change before merging changes to master.

Optimize the multi-bulk reply handling

SyncMset is much quicker than AsyncMset. Is that right?

I ran the demos/benchmark/app.py and found that SyncMset is much quicker than AsyncMset. Is it work right?
My logging:
INFO:tornado.access:200 GET /redis-py/mset (192.168.1.100) 83.39ms
WARNING:tornado.access:404 GET /favicon.ico (192.168.1.100) 0.41ms
INFO:tornado.access:200 GET /mset (192.168.1.100) 1736.85ms
WARNING:tornado.access:404 GET /favicon.ico (192.168.1.100) 0.40ms

I installed tornado, redis, redis-py, torando-redis today.
tornado version is 3.0.1.
redis-py version is 2.7.5.
tornado-redis version 2.4.2.
Redis server v=2.6.13 sha=00000000:0 malloc=jemalloc-3.2.0 bits=32

By the way, my linux version is "centOS 6.2(Final)" and "kernel linux 2.6.32-220.e16.i686".

Pipeline.execute_command() appends SELECT command to command_stack

It should return after execution of SELECT command:

class Pipeline(Client):

    def __init__(self, transactional, *args, **kwargs):
        super(Pipeline, self).__init__(*args, **kwargs)
        self.transactional = transactional
        self.command_stack = [] 

    def execute_command(self, cmd, *args, **kwargs):
        if cmd in ('AUTH', 'SELECT'):
            # NOTE: do not append AUTH & SELECT commands to self.command_stack
            return super(Pipeline, self).execute_command(cmd, *args, **kwargs)
        elif cmd in PUB_SUB_COMMANDS:
            raise RequestError(
                'Client is not supposed to issue command %s in pipeline' % cmd) 
        self.command_stack.append(CmdLine(cmd, *args, **kwargs))

wrap redis commands with @return_future decorator

In tornado.concurrent there is nice decorator return_future, which allows to return Future if callback argument is None. With this decorator it is possible to use modern api: yield f(a, b) and yield gen.Task(f, a, b) for backward compatibility.

I am not sure about pipelining but for simple commands works perfectly fine:

@return_future
def delete(self, *keys, **kwargs):
    self.execute_command('DEL', *keys, callback=kwargs.get('callback'))

@return_future
def set(self, key, value, expire=None, pexpire=None,
        only_if_not_exists=False, only_if_exists=False, callback=None):
    args = []
    if expire is not None:
        args.extend(("EX", expire))
    if pexpire is not None:
        args.extend(("PX", pexpire))
    if only_if_not_exists and only_if_exists:
        raise ValueError("only_if_not_exists and only_if_exists "
                         "cannot be true simultaneously")
    if only_if_not_exists:
        args.append("NX")
    if only_if_exists:
        args.append("XX")

@return_future
def get(self, key, callback=None):
    self.execute_command('GET', key, callback=callback)

and here simple test:

@async_test
engine
def test_get_set_coroutine(self):
    res = yield self.client.set('foo', 'bar')
    self.assertEqual(res, True)
    res = yield self.client.get('foo')
    self.assertEqual(res, 'bar')
    res = yield self.client.delete('foo')
    self.assertTrue(res)
    self.stop()

What do you think about decorating all commands with return_future ?

SockJSSubscriber raise 'UnicodeEncodeError' Exception

Hi,
I am writing a realtime web application with SockJSSubscriber.
When I send a unicode string that exceed ASCII's coding range, like Chinese character, there will raise UnicodeEncodeError Exception on 'tornadoredis/pubsub.py", line 146, in on_message'.

[Solutions]
I inherit a SockJSSubscriber and overwrite the on_message to encode the msg.body to str, and it work well.

Leaking file descriptors

Hi,
I've been using tornado-redis with tornado-2.3 and have noticed that when a Client object with an open connection leaves scope and is reclaimed by the garbage collector, the file descriptor for the socket object is not closed.

I think this is caused by a reference to a connection object that is saved in a tornado ExceptionStackContext that should have been removed. The reference in the ExceptionStackContext stops the garbage collector from reclaiming the IOStream connected to redis and closing the socket.

I've attached a link to code for the test server I use to recreate the bug. I've also attached a link to a graph showing the references to the IOStream object.
To run the server you will need the objgraph module: http://mg.pov.lt/objgraph/
Server code: http://dl.dropbox.com/u/107369108/test_server.py

Reference Graph: http://dl.dropbox.com/u/107369108/tmpEQOP5_.png

Thanks!!!

ascii encode

Hello,

I've updated to the new version and now getting errors on:

File "/Library/Python/2.7/site-packages/tornadoredis/client.py", line 351, in format_command
e_t = self.encode(t)
File "/Library/Python/2.7/site-packages/tornadoredis/client.py", line 340, in encode
value = str(value)
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-12: ordinal not in range(128)

There is a bug in the code:
Instead this code code:
if not isinstance(value, str):
value = str(value)

need to use this one:
if isinstance(value, str):
value = str(value)

Please, fix this

Thank you

info() fails with message 'need more than 1 value to unpack' when redis server has attached slaves.

The issue seems to be the way it parses output from info command like this:

slave0:10.176.0.235,51657,online

i think it is trying to split k,v on = in the value but they dont exist in this type of output for slaveXXX and thus it crashes with an error like 'need more than 1 value to unpack' .. possibly wrapping the split in a try/except and just using the raw value if it cannot unpack would be a solution? (around line 112 in client.py)

support for Python 3

Is there a plan to support Python 3 ?

Python 3.3.0 (default, Oct  9 2012, 16:21:00) 
[GCC 4.2.1 Compatible Apple Clang 4.1 ((tags/Apple/clang-421.11.65))] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import tornadoredis
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/akh/dev/pyvenv3/lib/python3.3/site-packages/tornadoredis/__init__.py", line 1, in <module>
    from .client import Connection, Client
  File "/Users/akh/dev/pyvenv3/lib/python3.3/site-packages/tornadoredis/client.py", line 345
    except Exception, e:
                    ^
SyntaxError: invalid syntax

listen() keeps communicating to redis after connection closed

Not sure if this is a bug, or I am just using it wrong, my apologies in advance)
I've tried to use pub/sub for wrapping long polling and my handler looks pretty much same as WebSocket example you have. But right after first message handled (after unsubscribe and disconnect called) it still tries to communicate to redis and fails since connection was closed. BTW, WebSocket example is not working either.

https://gist.github.com/3720800

Pub/Sub duplicate messages

Hi,

I'm writing a small application which allows users to publish/subscribe multi channels at a time and I chose Tornado-Redis to implement this feature. The scenario is that one message would be sent to multi channels, but if a user who subscribed all these channels, then he/she should get the message one time only. Right now, users keep receiving duplicate messages from the Pub/Sub handler.

[Solutions]:
I have an idea that I will override the SockJSSubscriber class to use a bloom filter to avoid duplicating messages sent over this object. If you guys have any better idea, just let me know. I would highly appreciate that.

Thanks,
-Canh

Do not subscribe all time

Hi,
I building a new application with tornado websocket and tornado-redis, a chat like application. It is very great to work with both of them. Thanks for that.

when a user connect to websocket he will subscribe to channel with his user id. so every message some one sent to him can be listen. a user may connect and disconnect websocket frequent times.

Problem is when user establish a successful socket connection, it cannot subscribe to respected channel all the time. Most of the time it works, but some times it making problem

If anyone have any idea, please help me to solve this issue. and i wonder if any one else experiencing same problem?

Thanks in advance!

Note about 'client.listen'

When I use 'subscribe' and 'psubscribe' with Python2.7,there is someting wrong.
My code look like:

def handle_message(msg):
    print 'hello world'
yield gen.Task(client.subscribe, 'channel_one')
yield gen.Task(client.psubscribe, 'channel_two'+'/*')
client.listen(handle_message)

But this code can't triggers the callback function when it received message.
Then I delete the code

callback = None

in 'client._subscribe' .Then it works well.

Weird error when integrating with SockJS

Hi,
I'm in early stages of using tornado/redis/sockjs so maybe the error is mine or placed on the incorrect place, sorry.

I implemented the example on https://github.com/leporo/tornado-redis/blob/master/demos/sockjs/app.py

I always get next traceback when I disconnect, I added function calls from classes MessageHandler and SockJSSubscriber.

MessageHandler:__init__()
SockJSSubscriber:on_message()
MessageHandler:on_close
SockJSSubscriber:unsubscribe()

ERROR:tornado.application:Uncaught exception POST /subscribe/782/5es5waak/xhr_streaming (127.0.0.1)
HTTPRequest(protocol='http', host='127.0.0.1:8889', method='POST', uri='/subscribe/782/5es5waak/xhr_streaming', version='HTTP/1.0', remote_ip='127.0.0.1', headers={'Origin': 'http://testtornado.oonair.net', 'Content-Length': '0', 'Accept-Language': 'en-US,en;q=0.8,es;q=0.6,ca;q=0.4', 'Accept-Encoding': 'gzip,deflate,sdch', 'X-Forwarded-Host': 'testtornado.oonair.net', 'Host': '127.0.0.1:8889', 'Accept': '*/*', 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1678.0 Safari/537.36', 'Connection': 'close', 'Referer': 'http://testtornado.oonair.net/index.html', 'X-Real-Ip': '192.168.0.135', 'Cookie': '__utma=65992624.1844703960.1382547828.1382547828.1382547828.1; __utmc=65992624; __utmz=65992624.1382547828.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none); JSESSIONID=dummy'})
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/tornado/web.py", line 1115, in _stack_context_handle_exception
    raise_exc_info((type, value, traceback))
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 550, in inner
    self.set_result(key, result)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 476, in set_result
    self.run()
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 507, in run
    yielded = self.gen.send(next)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 1106, in listen
    self.on_unsubscribed([result.channel])
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 1013, in on_unsubscribed
    self.subscribed.remove(channel)
KeyError: u'test_channel'

Handling disconnections during blocking commands (e.g. BLPOP)

When the client is waiting on a blocking command, if a disconnection occurs the caller is never notified.

Sample code:

import tornado, tornadoredis

@tornado.gen.engine
def run():
    r = tornadoredis.Client()
    r.connect()
    while True:
        result = yield tornado.gen.Task(r.blpop, 'key')
        print result

if __name__ == '__main__':
    run()
    tornado.ioloop.IOLoop.instance().start()

Run the code above, then stop redis-server. This exception will be thrown by the library.

Ideally the client should be notified about the exception so that it can handle it (reconnecting, for instance), but currently it just fails silently.

I pushed a solution to my personal tornado-redis fork that simply calls the client callback passing the exception to it: https://github.com/lessandro/tornado-redis/commit/1795b8f8064fdf4c6098ee0ba8b88fb7471a8a50

This is how i handle disconnections: https://github.com/lessandro/ircd/blob/09df12a6bce4f60ee57b90a3f8d31f52e25ee7ee/servers/redisutil.py#L48

Sequential subscribe problem

Hello!

Consider this test case:

class SequentialPubSubTestCase(RedisTestCase):

    def setUp(self):
        super(SequentialPubSubTestCase, self).setUp()
        self.subscriber = self._new_client()

    def tearDown(self):
        try:
            self.subscriber.connection.disconnect()
            del self.subscriber
        except AttributeError:
            pass
        super(SequentialPubSubTestCase, self).tearDown()

    @async_test
    @gen.engine
    def test_sequential_subscribe(self):
        yield gen.Task(self.subscriber.subscribe, 'test.channel')
        yield gen.Task(self.subscriber.subscribe, 'test.channel2')
        self.assertTrue(bool(self.subscriber.subscribed))

When I run it - the second subcription doesn't not yield to the caller and test case hangs until timeout:

(env)MacAir:tornado-redis fz$ python -m tornado.testing tornadoredis.tests.test_pubsub.SequentialPubSubTestCase
F
======================================================================
FAIL: test_sequential_subscribe (tornadoredis.tests.test_pubsub.SequentialPubSubTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/private/var/www/different/python/centrifuge/env/lib/python2.7/site-packages/tornado/testing.py", line 118, in __call__
    result = self.orig_method(*args, **kwargs)
  File "tornadoredis/tests/redistest.py", line 21, in _runner
    return self.wait(timeout=timeout)
  File "/private/var/www/different/python/centrifuge/env/lib/python2.7/site-packages/tornado/testing.py", line 312, in wait
    self.__rethrow()
  File "/private/var/www/different/python/centrifuge/env/lib/python2.7/site-packages/tornado/testing.py", line 248, in __rethrow
    raise_exc_info(failure)
  File "/private/var/www/different/python/centrifuge/env/lib/python2.7/site-packages/tornado/testing.py", line 296, in timeout_func
    timeout)
AssertionError: Async operation timed out after 5 seconds

----------------------------------------------------------------------
Ran 1 test in 5.011s

FAILED (failures=1)
[E 141025 13:00:12 testing:687] FAIL

Reconnect on disconnect (or drop all subscriptions)

If the redis server goes down and comes back up, there is no subscription to redis, however the SockJSSubscriber still retains the list of subscribers.
These subscribers will never receive any messages as it's no longer subscribed to the redis channel.

Unsubscribe from pub/sub returns before the Client finishes unsubscribing

Issue:
Code is similar to the Websocket pub/sub demo. The problem occures when the websocket is disconnected and close() is called:

@tornado.gen.engine
def cleanup(self):

    print "-- Connection %s to client %s closing" % (self.identifier, self.client_identifier)

    if self.client_pubsub.subscribed:
        yield tornado.gen.Task(self.client_pubsub.unsubscribe, ["ngs:fc:chat:global", self.disconnect_channel])

    self.client_pubsub.disconnect()

The yield from unsubscribing returns when the channel is supposed to be unsubscribed. The disconnect is then called. The problem occures when the pub/sub is receiving a published message in the exact moment the unsubscribe is called. The yield returns and the disconnect is called. After that the server throws an exception:

ERROR:tornado.application:Exception in callback <functools.partial object at 0x1694e10>
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/tornado/ioloop.py", line 458, in _run_callback
    callback()
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/iostream.py", line 341, in wrapper
    callback(*args)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py", line 148, in read_callback
    callback(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/local/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 550, in inner
    self.set_result(key, result)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 476, in set_result
    self.run()
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 1087, in listen
    response = yield gen.Task(response)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 533, in run
    self.yield_point.start(self)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 371, in start
    self.func(*self.args, **self.kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 153, in wrapper
    runner.run()
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/client.py", line 479, in consume_multibulk
    data = yield gen.Task(self.connection.readline)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 533, in run
    self.yield_point.start(self)
  File "/usr/local/lib/python2.7/dist-packages/tornado/gen.py", line 371, in start
    self.func(*self.args, **self.kwargs)
  File "/usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py", line 154, in readline
    raise ConnectionError('Tried to read from '
ConnectionError: Tried to read from non-existent connection

When checking the "subscribed" variable of the Client object its value is a set with objects in it for a millisecond or so after the unsubscribe is finished. The solution is the following code:

@tornado.gen.engine
def cleanup(self):

    print "-- Connection %s to client %s closing" % (self.identifier, self.client_identifier)

    if self.client_pubsub.subscribed:
        yield tornado.gen.Task(self.client_pubsub.unsubscribe, "ngs:fc:chat:global")
        yield tornado.gen.Task(self.client_pubsub.unsubscribe, self.disconnect_channel)

    def check():        
        if self.client_pubsub.subscribed:
            tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(milliseconds=1), check)
        else:
            self.client_pubsub.disconnect()

    tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(milliseconds=1), check)

The unsubscribe task should yield back when the Client really finishes unsubscribing.

Pipeline destroys a connection in the pool

Lets start with an example:

from tornado import gen
import tornadoredis

POOL = tornadoredis.ConnectionPool(...)

@gen.coroutine
def func():
    client = tornadoredis.Client(connection_pool=POOL, selected_db=0)
    with client.pipeline() as pipe:
          pipe.get('xxx')
          # ....
          yield gen.Task(pipe.execute)
    yield gen.Task(client.disconnect)

Generally it's working, but the connection is destroyed unnecessarily. The next operation takes a connection from the pool and fails with an exception "Tried to read from non-existent connection".

It happens because the Client.pipeline() shares a connection with a new Pipeline object, which also is a Client. Therefore there are two clients using this same Connection object. At the end of the scope, the original Client releases a connection to the pool (via .del()). The Pipeline object also uses a Client.del() but have on information about the pool, so it calls a disconnect().

To solve this, you can simply add an empty impl of .del() to the Pipeline class and expect that creator of the Pipeline will take care about the connection.

Regards,
Tom

Executing non pubsub command after unsubscribe

I'm trying to implement simple long-polling chat and have strange situation. When adding new message, message id will be pushed to channel. On updates handler I subscribing to that channel. But I can't execute any commands, because I got exception

RequestError (on ZRANGEBYSCORE [('messages', '22', '22'), {}]): Calling not pub/sub command during subscribed state

I'm doing like this:

    @tornado.web.asynchronous
    @tornado.gen.engine
    def on_pubsub_message(self, message):
        if message.kind == 'message':
            yield tornado.gen.Task(self.client.unsubscribe, 'chat_messages')
            messages = yield tornado.gen.Task(self.client.zrangebyscore, 'messages', message.body, message.body)
            result = []
            for m in messages:
                result.append(json.loads(m[0]))

            response = {
                'm': result[::-1]
            }
            self.set_header('Content-Type', 'application/json')
            self.finish(json.dumps(response))

So you can see that I'm unsubscribing and still get error. What I'm missing?

unsubscribe callback not called when listen loop not running

We fell into a trap when using the unsubscribe, the callback does not trigger when the subscription listen loop has been exited or is not running yet.

So if an app does:

@gen.coroutine
def cleanup(self):
    yield gen.Task(self.client.unsubscribe, channels=self.subscriptions)
    yield gen.Task(self.client.disconnect)

There is a good chance that in the real world disconnect is not called.

I suggest you make unsubscribe an idempotent operation and always fire the callback, even if there is no subscription or not subscription loop running.

Test cases below:

    @tornado.testing.gen_test(timeout=60)
    def test_rainy_day(self):
        subscriptions = ["one", "two"]
        self.client = tornadoredis.Client(host=options.redis_host)
        self.client.connect()
        result = yield gen.Task(self.client.execute_command, 'CLIENT', 'SETNAME', 'foo')
        log.debug(result)
        yield gen.Task(self.client.subscribe, subscriptions)
        self.client.listen(self.on_redis_msg, exit_callback=self.on_redis_pub_sub_exit)
        log.debug("subscribed to: %s", subscriptions)
        yield gen.Task(self.client.unsubscribe, channels=subscriptions)
        log.debug("unsub 1")
        yield gen.Task(self.client.unsubscribe, channels=subscriptions)
        log.debug("unsub 2")
        result = yield gen.Task(self.client.disconnect)

    @tornado.testing.gen_test(timeout=60)
    def test_rainy_day_two(self):
        subscriptions = ["one", "two"]
        self.client = tornadoredis.Client(host=options.redis_host)
        self.client.connect()
        result = yield gen.Task(self.client.execute_command, 'CLIENT', 'SETNAME', 'foo')
        log.debug(result)
        yield gen.Task(self.client.unsubscribe, channels=subscriptions)
        log.debug("unsub 2")
        result = yield gen.Task(self.client.disconnect)

Blocking on connect to redis blocks the whole process

If I make a query to redis that needs to open a connection, and that connection blocks (eg. slow network, network partition), then the whole tornado process blocks. No requests can be handled, even requests that do not depend on redis.

Example: here is a simple tornado HTTP service that handles two URLs, /r1 and /r2. /r1 does not depend on redis at all: it immediately returns a hardcoded string. /r2 fetches a value from redis and returns it.

import tornadoredis
import tornado.httpserver
import tornado.web
import tornado.ioloop
import tornado.gen


class R1Handler(tornado.web.RequestHandler):
    '''handle a request which does not depend on redis'''
    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        self.write('all is well\n')
        self.finish()


class R2Handler(tornado.web.RequestHandler):
    '''handle a request which does depend on redis'''
    @tornado.web.asynchronous
    @tornado.gen.engine
    def get(self):
        c = tornadoredis.Client()
        foo = yield tornado.gen.Task(c.get, 'foo')
        self.set_header('Content-Type', 'text/plain')
        self.write('foo = %s\n' % (foo,))
        self.finish()


application = tornado.web.Application([
    (r'/r1', R1Handler),
    (r'/r2', R2Handler),
])


if __name__ == '__main__':
    # Start the data initialization routine
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8888)
    print 'Demo is runing at 0.0.0.0:8888\nQuit the demo with CONTROL-C'
    tornado.ioloop.IOLoop.instance().start()

(I put that script in demos/mixed/app.py in the tornado-redis source tree.)

Expected outcome: if my tornado process is unable to connect to redis, then I expect queries to /r2 to fail or block, but queries to /r1 should continue to work fine.

Actual outcome: as soon as a request for /r2 blocks, requests for /r1 also block. It appears that the entire process is blocked connecting to redis.

Here's how I reproduced it. First, in one terminal run the tornado server:

$ PYTHONPATH=. python demos/mixed/app.py
Demo is runing at 0.0.0.0:8888
Quit the demo with CONTROL-C

In another window, start polling /r1:

$ while true ; do echo -n `date`": "  ; curl http://localhost:8888/r1 ; sleep 1 ; done
Mon Jan 12 13:25:01 EST 2015: all is well
Mon Jan 12 13:25:02 EST 2015: all is well
Mon Jan 12 13:25:03 EST 2015: all is well
[...]

In a third window, ensure that /r2 works:

$ redis-cli 
127.0.0.1:6379> set foo "hello world"
OK
$ curl http://localhost:8888/r2
foo = hello world

Now simulate a network partition: make all packets to redis disappear (this assumes Linux):

$ sudo iptables -F              # wipe all existing firewall rules
$ sudo iptables -A INPUT --proto tcp --dport 6379 -j DROP    # drop packets to redis

Hop over to the window that is polling /r1; it should still be working fine:

[...]
Mon Jan 12 13:27:50 EST 2015: all is well
Mon Jan 12 13:27:51 EST 2015: all is well
Mon Jan 12 13:27:52 EST 2015: all is well
[...]

Now request /r2, which blocks because redis is on the other side of a network partition:

$ curl http://localhost:8888/r2

This request blocks, which is entirely OK. (I'd like it to fail with a timeout eventually, but whatever. Not important.)

However, the /r1 poll is now blocked:

[...]
Mon Jan 12 13:28:28 EST 2015: all is well
Mon Jan 12 13:28:29 EST 2015: all is well
Mon Jan 12 13:28:30 EST 2015: [blocked here]

We can see where it's blocked by hitting Ctrl-C on the tornado process:

^CTraceback (most recent call last):
  File "demos/mixed/app.py", line 40, in <module>
    tornado.ioloop.IOLoop.instance().start()
  File "/usr/lib/python2.7/dist-packages/tornado/ioloop.py", line 607, in start
    self._run_callback(callback)
  File "/usr/lib/python2.7/dist-packages/tornado/ioloop.py", line 458, in _run_callback
    callback()
  File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/lib/python2.7/dist-packages/tornado/iostream.py", line 341, in wrapper
    callback(*args)
  File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 331, in wrapped
    raise_exc_info(exc)
  File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 302, in wrapped
    ret = fn(*args, **kwargs)
  File "/usr/lib/python2.7/dist-packages/tornado/httpserver.py", line 327, in _on_headers
    self.request_callback(self._request)
  File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1600, in __call__
    handler._execute(transforms, *args, **kwargs)
  File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1134, in _execute
    self._when_complete(self.prepare(), self._execute_method)
  File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1141, in _when_complete
    callback()
  File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1162, in _execute_method
    self._when_complete(method(*self.path_args, **self.path_kwargs),
  File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1311, in wrapper
    return result
  File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 198, in __exit__
    return self.exception_handler(type, value, traceback)
  File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1115, in _stack_context_handle_exception
    raise_exc_info((type, value, traceback))
  File "/usr/lib/python2.7/dist-packages/tornado/web.py", line 1298, in wrapper
    result = method(self, *args, **kwargs)
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 159, in wrapper
    deactivate()
  File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 198, in __exit__
    return self.exception_handler(type, value, traceback)
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 136, in handle_exception
    return runner.handle_exception(typ, value, tb)
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 556, in handle_exception
    self.run()
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
    yielded = self.gen.throw(*exc_info)
  File "demos/mixed/app.py", line 23, in get
    foo = yield tornado.gen.Task(c.get, 'foo')
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 153, in wrapper
    runner.run()
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 533, in run
    self.yield_point.start(self)
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 371, in start
    self.func(*self.args, **self.kwargs)
  File "/data/src/tornado-redis/tornadoredis/client.py", line 690, in get
    self.execute_command('GET', key, callback=callback)
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 159, in wrapper
    deactivate()
  File "/usr/lib/python2.7/dist-packages/tornado/stack_context.py", line 198, in __exit__
    return self.exception_handler(type, value, traceback)
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 136, in handle_exception
    return runner.handle_exception(typ, value, tb)
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 556, in handle_exception
    self.run()
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 505, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 153, in wrapper
    runner.run()
  File "/usr/lib/python2.7/dist-packages/tornado/gen.py", line 507, in run
    yielded = self.gen.send(next)
  File "/data/src/tornado-redis/tornadoredis/client.py", line 407, in execute_command
    self.connection.connect()
  File "/data/src/tornado-redis/tornadoredis/connection.py", line 72, in connect
    timeout=self.timeout
  File "/usr/lib/python2.7/socket.py", line 562, in create_connection
    sock.connect(sa)
  File "/usr/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
KeyboardInterrupt

The problem appears to be that tornado-redis is using the blocking socket calls to connect to redis. I suspect it should probably use tornado.tcpclient instead. I am not a tornado expert though! I just spotted that module in the docs.

Bug in ConnectionProxy

I've found bug with ConnectionProxy when all created_connection >= max_connections and wait_for_available option is True

File "env/lib/python2.7/site-packages/tornado/gen.py", line 367, in run 
  self.yield_point.start(self)
File "env/lib/python2.7/site-packages/tornado/gen.py", line 241, in start
  self.func(*self.args, **self.kwargs)
File "env/lib/python2.7/site-packages/tornado/gen.py", line 120, in wrapper
  runner.run()
File "env/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
  yielded = self.gen.send(next)
File " env/lib/python2.7/site-packages/tornadoredis/client.py", line 1146, in execute
  self.connection.info.get('db', None) != self.selected_db):
AttributeError: 'ConnectionProxy' object has no attribute 'info'

I can add "self.info = {'db': 0}" attribute to ConnectionProxy but I don't fully understand process of connection releasing and how wait_until_ready function works.

Self is Nonetype, attribute error raised

I am attempting to implement a simple chat client with connection pooling. Working from the demo, I have created the following ChatHandler class.

https://gist.github.com/gregory80/300e377b91ceac2a3e04

While attempting to use this class in a basic call, an attribute error is generated (See stack trace below). Its not clear why "self" here is Nonetype, or why it would generate an attribute error for write_message.

This exception manifests itself running tornado under gunicorn, and only after at least one connection has been already opened.

    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/app/wsgiapp.py", line 34, in run
    22:38:51 web.1  |     WSGIApplication("%(prog)s [OPTIONS] APP_MODULE").run()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/app/base.py", line 131, in run
    22:38:51 web.1  |     Arbiter(self).run()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/arbiter.py", line 173, in run
    22:38:51 web.1  |     self.manage_workers()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/arbiter.py", line 460, in manage_workers
    22:38:51 web.1  |     self.spawn_workers()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/arbiter.py", line 512, in spawn_workers
    22:38:51 web.1  |     self.spawn_worker()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/arbiter.py", line 485, in spawn_worker
    22:38:51 web.1  |     worker.init_process()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/workers/base.py", line 104, in init_process
    22:38:51 web.1  |     self.run()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/gunicorn/workers/gtornado.py", line 96, in run
    22:38:51 web.1  |     self.ioloop.start()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/ioloop.py", line 271, in start
    22:38:51 web.1  |     self._run_callback(callback)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/ioloop.py", line 421, in _run_callback
    22:38:51 web.1  |     callback()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/iostream.py", line 311, in wrapper
    22:38:51 web.1  |     callback(*args)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/stack_context.py", line 229, in wrapped
    22:38:51 web.1  |     callback(*args, **kwargs)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/connection.py", line 125, in read_callback
    22:38:51 web.1  |     callback(*args, **kwargs)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 383, in inner
    22:38:51 web.1  |     self.set_result(key, result)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 315, in set_result
    22:38:51 web.1  |     self.run()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
    22:38:51 web.1  |     yielded = self.gen.send(next)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/client.py", line 447, in process_data
    22:38:51 web.1  |     callback(response)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 383, in inner
    22:38:51 web.1  |     self.set_result(key, result)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 315, in set_result
    22:38:51 web.1  |     self.run()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
    22:38:51 web.1  |     yielded = self.gen.send(next)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/client.py", line 462, in consume_multibulk
    22:38:51 web.1  |     callback(tokens)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 383, in inner
    22:38:51 web.1  |     self.set_result(key, result)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 315, in set_result
    22:38:51 web.1  |     self.run()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
    22:38:51 web.1  |     yielded = self.gen.send(next)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/client.py", line 447, in process_data
    22:38:51 web.1  |     callback(response)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 383, in inner
    22:38:51 web.1  |     self.set_result(key, result)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 315, in set_result
    22:38:51 web.1  |     self.run()
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornado/gen.py", line 345, in run
    22:38:51 web.1  |     yielded = self.gen.send(next)
    22:38:51 web.1  |   File "/Volumes/projects/javascript/gif_chat/venv/lib/python2.7/site-packages/tornadoredis/client.py", line 1014, in listen
    22:38:51 web.1  |     callback(result)
    22:38:51 web.1  |   File "PATH/app/webapp.py", line 86, in pubsub_message
    22:38:51 web.1  |     traceback.print_stack()

Subscribe to more than one channel

I used the follow code:

import tornadoredis
from tornadoredis.pubsub import SockJSSubscriber

subscriber = SockJSSubscriber(tornadoredis.Client())


def on_open(self, info):
        subscriber.subscribe('users-status', self)
        subscriber.subscribe('scores', self)

def on_close(self):
        subscriber.unsubscribe('users-status', self)
        subscriber.unsubscribe('scores', self)

and worked only one channel for SockJSSubscriber subscriber.

I found that if not self.redis.subscribed: allow to listen redis PubSub only for one channel

https://github.com/leporo/tornado-redis/blob/master/tornadoredis/pubsub.py:25-52

def subscribe(self, channel_name, subscriber, callback=None):
        """
        Subscribes a given subscriber object to a redis channel.
        Does nothing if subscribe calls are nested for the
        same subscriber object.

        The broadcast method of the subscriber object will be called
        for each message received from specified channel.
        Override the on_message method to change this behaviour.
        """
        self.subscribers[channel_name][subscriber] += 1
        self.subscriber_count[channel_name] += 1
        if self.subscriber_count[channel_name] == 1:
            if not self.redis.subscribed:
                if callback:
                    callback = stack_context.wrap(callback)

                def _cb(*args, **kwargs):
                    self.redis.listen(self.on_message)
                    if callback:
                        callback(*args, **kwargs)

                cb = _cb
            else:
                cb = callback
            self.redis.subscribe(channel_name, callback=cb)
        elif callback:
            callback(True)

How can I deal with two and more channel at the same time?

The client does not handle disconnects from a Redis server properly when subscribed to a Pub/Sub channel

(by Nicolas de Bari Embriz Garcia Rojas)
I notice is that if I restart the redis server, the program that was subscribed to the redis, throws and exeption saying that there is no callback on " on_stream_close", thefore the script hangs and dosn't exit making in this case my app IDLE until I manually restart it and.

altering on_stream_close on line 78 here:  https://github.com/leporo/tornado-redis/blob/master/tornadoredis/connection.py#L78 help me to avoid the exeption or to exit the program in case of an error, ( I just replace  like 79 with an exit(0))   but since I am not an expert in python (still learning) I prefer to ask.

This is how I am running or using tornadoredis


if name == 'main':
    # Logging
    logging.getLogger().setLevel(logging.DEBUG)

    # Initialize tornado-redis and subscribe to key callback (pubsub)
    rclient = tornadoredis.Client(redisHost,redisPort)
    rclient.connect()
    rclient.subscribe(redisChannel, lambda s: rclient.listen(BrokerConnection.pubsub))

    # Initialize sockjs-tornado and start IOLoop
    BrokerRouter = sockjs.tornado.SockJSRouter(BrokerConnection, '/wow')

    app = tornado.web.Application(BrokerRouter.urls)
    app.listen(port)

    logging.info('Listening on port %d for redis key %s', port, redisChannel)

    tornado.ioloop.IOLoop.instance().start()

Client can't be shared on concurrent conditions

When tornado-redis used as a shared client, it will sometimes crash in process_data

Test code

from tornado.ioloop import IOLoop
from tornado.web import RequestHandler, Application, url, asynchronous
from tornado.gen import engine, Task
from tornadoredis import Client

rds = Client()

class RedisTestHandler(RequestHandler):

    @asynchronous
    @engine
    def get(self):
        yield Task(rds.get, 'foo')
        self.finish()


if __name__ == '__main__':
    from redis import Redis
    Redis().set('foo', 'bar')

    app = Application([
        url(r'/', RedisTestHandler),
    ], debug=True)
    app.listen(9000)
    IOLoop.instance().start()

Test it with apache-bench

ab -c2 -n2 http://localhost:9000/

the crash log:

[I 2015-01-09 14:14:34,317 web:1811] 200 GET / (127.0.0.1) 6.19ms
[I 2015-01-09 14:14:34,319 web:1811] 200 GET / (127.0.0.1) 4.72ms
[I 2015-01-09 14:14:34,823 web:1811] 200 GET / (127.0.0.1) 3.03ms
[I 2015-01-09 14:14:34,825 web:1811] 200 GET / (127.0.0.1) 4.30ms
[I 2015-01-09 14:14:35,775 web:1811] 200 GET / (127.0.0.1) 2.74ms
[I 2015-01-09 14:14:35,777 web:1811] 200 GET / (127.0.0.1) 3.84ms
[I 2015-01-09 14:14:36,365 web:1811] 200 GET / (127.0.0.1) 2.38ms
[I 2015-01-09 14:14:36,367 web:1811] 200 GET / (127.0.0.1) 3.17ms
[I 2015-01-09 14:14:36,924 web:1811] 200 GET / (127.0.0.1) 3.16ms
[I 2015-01-09 14:14:36,926 web:1811] 200 GET / (127.0.0.1) 4.37ms
[I 2015-01-09 14:14:37,409 web:1811] 200 GET / (127.0.0.1) 2.62ms
[I 2015-01-09 14:14:37,412 web:1811] 200 GET / (127.0.0.1) 4.56ms
[I 2015-01-09 14:14:37,891 web:1811] 200 GET / (127.0.0.1) 2.56ms
[I 2015-01-09 14:14:37,893 web:1811] 200 GET / (127.0.0.1) 3.69ms
[E 2015-01-09 14:14:38,368 web:1407] Uncaught exception GET / (127.0.0.1)
HTTPServerRequest(protocol='http', host='localhost:9000', method='GET', uri='/', version='HTTP/1.0', remote_ip='127.0.0.1', headers={'Host': 'localhost:9000', 'Accept': '*/*', 'User-Agent': 'ApacheBench/2.3'})
Traceback (most recent call last):
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/web.py", line 1288, in _stack_context_handle_exception
    raise_exc_info((type, value, traceback))
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/stack_context.py", line 314, in wrapped
    ret = fn(*args, **kwargs)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 708, in <lambda>
    self.future, lambda f: self.run())
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 657, in run
    self.result_future.set_exc_info(sys.exc_info())
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 167, in set_exc_info
    self.set_exception(exc_info[1])
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 150, in set_exception
    self._set_done()
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 177, in _set_done
    cb(self)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 108, in final_callback
    if future.result() is not None:
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 109, in result
    raise_exc_info(self._exc_info)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 631, in run
    yielded = self.gen.throw(*sys.exc_info())
  File "redis_test.py", line 22, in get
    yield Task(rds.get, 'foo')
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 628, in run
    value = future.result()
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 109, in result
    raise_exc_info(self._exc_info)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/stack_context.py", line 314, in wrapped
    ret = fn(*args, **kwargs)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 708, in <lambda>
    self.future, lambda f: self.run())
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 657, in run
    self.result_future.set_exc_info(sys.exc_info())
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 167, in set_exc_info
    self.set_exception(exc_info[1])
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 150, in set_exception
    self._set_done()
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 177, in _set_done
    cb(self)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 108, in final_callback
    if future.result() is not None:
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/concurrent.py", line 109, in result
    raise_exc_info(self._exc_info)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornado/gen.py", line 633, in run
    yielded = self.gen.send(value)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornadoredis/client.py", line 441, in execute_command
    resp = self.process_data(data, cmd_line)
  File "/srv/venv/dev/local/lib/python2.7/site-packages/tornadoredis/client.py", line 490, in process_data
    cmd_line)
ResponseError: ResponseError (on GET [('foo',), {}]): Unknown response type b
[E 2015-01-09 14:14:38,377 web:1811] 500 GET / (127.0.0.1) 10.02ms
[I 2015-01-09 14:14:38,378 web:1811] 200 GET / (127.0.0.1) 11.67ms

Environment: tornado==4.0.2 tornado-redis==2.4.18

I think the problem is, when parsing the redis response, especially Bulk Strings, execute_command splits the reading operation into two separate tasks, one for the response type and length (eg. "$5\r\n"), and a second task to get the rest data. But tornado's IOLoop may take task-switch between two reading tasks, under this condition the shared connection may be read twice both for the response type, and the second task may consume wrong response type.

Generally, a get command will execute like this:

write command (W) -> get response type and length -> (RC) -> get rest response data (RD)

in concurrent condition, 2 concurrent requests for example, a connection may be accessed in this order

W1 -> W2 -> RC1 -> RC2 (crash, bad response type) ...

Can't subscribe to another channel after entering a listen loop

I started to experiment with tornado-redis and stumbled on some strange behaviour.
Look at this code:

from __future__ import print_function
import tornado.ioloop
import tornado.gen
import tornadoredis
import time

subscriber = tornadoredis.Client()
subscriber.connect()

publisher = tornadoredis.Client()
publisher.connect()

@tornado.gen.engine
def on_message(message):
    print(message)

    if message.body == 'subscribe extra channels':
        try:
            yield tornado.gen.Task(subscriber.subscribe, 'second')
        except Exception as e:
            print(e)

        try:
            print('yielding subscribing on third')
            yield tornado.gen.Task(subscriber.subscribe, 'third')
        except Exception as e:
            print(e)

@tornado.gen.engine
def subscribe():
    yield tornado.gen.Task(subscriber.subscribe, 'first')
    subscriber.listen(on_message)
    tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 2, publish)

@tornado.gen.engine
def publish():
    print('publishing message...')
    try:
        yield tornado.gen.Task(publisher.publish, 'first', 'subscribe extra channels')
    except Exception as e:
        print(e)

if __name__ == '__main__':
    tornado.ioloop.IOLoop.instance().add_callback(subscribe)
    tornado.ioloop.IOLoop.instance().start()

In this example I am subscribing on channel 'first' Then after 2 seconds publish message into that channel. In message handler function when that message received I am trying to subscribe on 2 new channels. Subscribing on channel 'second' seems to be successful, but subscribing on channel 'third' never happens.

I investigated a little - It seems that execution of subscribing on channel 'third' stops on this line of client's execute_command method:

yield gen.Task(self.connection.wait_until_ready)

Connection is not ready and read_callbacks have one wrapped function. And it seems to never yielded.

connection.wait_until_ready makes the callback lose StackContext, will cause "Cannot send error response after headers written" issue raised in web.py

To reproduce the issue, the conditions are:

  1. Sharing one connection in an asynchronous request handler
  2. Raise a error after call tornado-redis method
  3. Send requests concurrently

In this case, the second request or following requests will go into connection.wait_until_ready block and wait the connection to be ready, and then callback to the request handler until the connection is ready. Otherwise, wait_until_ready makes the callback lose StackContext, and the callback is actually invoked in the previous StackContext associate with the previous connection.read() operation. So, when a error raised, the exception handling in web.py will use the previous StackContext associated to process exception, that cause the "Cannot send error response after headers written" issue.

To avoid this issue, quoted from stack_context.py "Returns a callable object that will restore the current StackContext when executed. Use this whenever saving a callback to be executed later in a different execution context (either in a different thread or asynchronously in the same thread)", before append the callback to ready_callbacks, it should be wrapped with stack_context.wrap, such as:

def wait_until_ready(self, callback=None):
    if callback:
        if not self.ready():
            callback = stack_context.wrap(callback) # should wrapped for restore StackContext
            self.ready_callbacks.append(callback)
        else:
            callback()
    return self

It is also for ConnectionPool.

The sample code for reproduce the issue as below:

[Server side]

!/usr/bin/env python

import logging
import json

import tornadoredis
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.gen as gen
from tornado.options import define, options

rds = tornadoredis.Client(host="127.0.0.1",
port=6379,
selected_db=0)

@gen.engine
def get_data(key, callback=None):
try:
result = yield tornado.gen.Task(rds.get, key)
except Exception, e:
callback(None)
return
callback(result)

class TestHandler(tornado.web.RequestHandler):

def initialize(self, **kwargs):
    super(TestHandler, self).initialize(**kwargs)
    self.set_header("Content-Type", "text/json")
    self.set_header("Content-Type", "application/json")

@tornado.web.asynchronous
@gen.engine
def get(self):
    # make sure there is a key 0 in redis
    value = yield gen.Task(get_data, 0)
    raise tornado.web.HTTPError(400, "always return this error")
    self.finish(json.dumps({"key":key}))

handlers = [
tornado.web.URLSpec(r"/test", TestHandler, name="Test"),
]

settings = {
"debug": True,
}

define("port", default=8888, help="run on the given port", type=int)

if name == "main":
application = tornado.web.Application(handlers, **settings)
http_server = tornado.httpserver.HTTPServer(application, xheaders=True)
http_server.listen(options.port)
logging.info('Listening on port %s' % options.port)
tornado.ioloop.IOLoop.instance().start()

[Client side]

!/usr/bin/env python

from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop

def debug_case:
http_client = AsyncHTTPClient()

def handle_request(response):
   # mock up concurrent requests
    http_client.fetch("http://localhost:8888/test",
                      handle_request,
                      method = "GET")
    if response.error:
        print "Error:", response.error
    else:
        print response.body
    IOLoop.instance().stop()

http_client.fetch("http://localhost:8888/test",
                  handle_request,
                  method = "GET")
IOLoop.instance().start()

if name == "main":
for i in range(1, 100):
debug_case()

weakly-referenced object no longer exists appears often on mget

Consistently get error on multi get (mget) using torando-redis gen/Task to retrieve keys (>100 keys) .

The code that produces this looks like this --

class FooHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self,_args, *_kwargs):
data = yield tornado.gen.Task(Foo.multi_get,keys)
self.finish()

system details:
ubuntu 11.04
tornado 2.4 , on tornado 3 as well

Traceback

File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/web.py", line 1115, in _stack_context_handle_exception
raise_exc_info((type, value, traceback))
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception
if tail.exit(_exc):
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit
return self.exception_handler(type, value, traceback)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(_exc_info)
File "supportServices/services.py", line 434, in get_video_status_brightcove
data = yield tornado.gen.Task(Foo.multi_get,keys)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 343, in _handle_exception
if tail.exit(_exc):
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 186, in exit
return self.exception_handler(type, value, traceback)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 136, in handle_exception
return runner.handle_exception(typ, value, tb)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 556, in handle_exception
self.run()
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 505, in run
yielded = self.gen.throw(_exc_info)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornadoredis/client.py", line 425, in execute_command
resp = yield gen.Task(resp)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/stack_context.py", line 302, in wrapped
ret = fn(_args, *_kwargs)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 550, in inner
self.set_result(key, result)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 476, in set_result
self.run()
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornado/gen.py", line 507, in run
yielded = self.gen.send(next)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornadoredis/client.py", line 485, in consume_multibulk
token = self.process_data(data, cmd_line)
File "/opt/neon/pyenv/local/lib/python2.7/site-packages/tornadoredis/client.py", line 461, in process_data
return partial(self._consume_bulk, tail)

Client abstracting connection away from connection

I'm currently trying to implement connection retrying for the client right now. I'm doing this by subclassing the Client class and adding a wrapper around Client.connect() to retry some amount of times.

It works well for me because I always try to connect manually before I attempt any commands, but there are many places where the Client checks the connection and instead of using Client.connect() it directly just uses Client.connection.connect().

https://github.com/leporo/tornado-redis/blob/master/tornadoredis/client.py#L1305-1306

if not self.connection.connected():
    self.connection.connect()

The same goes for many disconnects.

https://github.com/leporo/tornado-redis/blob/master/tornadoredis/client.py#L1317-L1320

except Exception as e:
    self.command_stack = []
    self.connection.disconnect()
    raise e

I see no reason for this, and changing all of these to use the methods on Client instead of Client.connection would make adding in reconnects and retries much easier. I will happily submit a pull request, but I just wanted to make sure that there was no reason for the current way before I did.

Thanks

A client does not select the specified database

Привет.

Есть приложение на tornado, которое использует tornado-redis. Столкнулся с тем, что чтение и запись производится не в ту базу. Может у кого-нибудь есть идеи, почему так происходит?

Disconnecting from Redis while receiving data can lead to deadlocks

Issue:
Tornado server no longer responds to HTTP requests.

Backtracking:
Debugged the Tornado process with gdb. Managed to backtrace the code to a mutex deadlock in IOLoop's addCallback. It happens when a Redis command receives the response (in my case a BLPOP) and adds the callback to the ioloop. During the partial wrapping in the IOLoop's addCallback method the Tornado-Redis client becomes deleted (del is called) which disconnects the Connection object. The disconnected Connection object then tries to add another callback to IOLoop. Since addCallback has been called already, the mutex is locked. The second call to addCallback stops indefinitely at the Mutex.
I have provided the following gdb backtrack:

#5 Frame 0x17d4e50, for file /usr/local/lib/python2.7/dist-packages/tornado/ioloop.py, line 714, in add_callback (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, _...(truncated)
    with self._callback_lock:
#9 Frame 0x17f73f0, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 369, in _run_callback (self=<IOStream(_write_buffer=<collections.deque at remote 0x19bee50>, _close_callback=None, _pending_callbacks=1, _read_bytes=None, _closed=True, _write_callback=None, _state=None, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lo...(truncated)
    self.io_loop.add_callback(wrapper)
#13 Frame 0x16e1360, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 266, in _maybe_run_close_callback (self=<IOStream(_write_buffer=<collections.deque at remote 0x19bee50>, _close_callback=None, _pending_callbacks=1, _read_bytes=None, _closed=True, _write_callback=None, _state=None, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_loc...(truncated)
    self._run_callback(cb)
#17 Frame 0x16e0810, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 257, in close (self=<IOStream(_write_buffer=<collections.deque at remote 0x19bee50>, _close_callback=None, _pending_callbacks=1, _read_bytes=None, _closed=True, _write_callback=None, _state=None, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at re...(truncated)
    self._maybe_run_close_callback()
#21 (frame information optimized out)
#24 Frame 0x181ef80, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 243, in __del__ (self=<Client(_pipeline=None, selected_db=0, subscribed=set([]), _io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader...(truncated)
    connection.disconnect()
#44 Frame 0x16473f0, for file /usr/local/lib/python2.7/dist-packages/tornado/ioloop.py, line 719, in add_callback (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, _...(truncated)
    stack_context.wrap(callback), *args, **kwargs))
#48 Frame 0x16470b0, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 369, in _run_callback (self=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, ...(truncated)
    self.io_loop.add_callback(wrapper)
#52 Frame 0x16d60a0, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 569, in _handle_write (self=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, ...(truncated)
    self._run_callback(callback)
#56 Frame 0x16d5b70, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 220, in write (self=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callbac...(truncated)
    self._handle_write()
#60 Frame 0x16d5e90, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py, line 124, in write (self=<Connection(info={'db': 0, 'pass': None}, _event_handler=<weakproxy at remote 0x1a546d8>, _stream=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 2...(truncated)
    self._stream.write(data, callback=cb)
#65 Frame 0x1a0aa70, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 371, in start (self=<Task(runner=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f9c0>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1cf8>, gen=<generator at remote 0x1a55140>, yield_point=<...>) at remote 0x19b6bd0>, args=('*3\r\n$5\r\nBLPOP\r\n$24\r\na_redis_key\r\n$1\r\n0\r\n',), key=<object at remote 0x7fbcb309f9c0>, func=<instancemethod at remote 0x19abf50>, kwargs={'callback': <function at remote 0x1a561b8>}) at remote 0x1a50190>, runner=<...>)
    self.func(*self.args, **self.kwargs)
#69 Frame 0x19e4d20, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 533, in run (self=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f9c0>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1cf8>, gen=<generator at remote 0x1a55140>, yield_point=<Task(runner=<...>, args=('*3\r\n$5\r\nBLPOP\r\n$24\r\na_redis_key\r\n$1\r\n0\r\n',), key=<object at remote 0x7fbcb309f9c0>, func=<instancemethod at remote 0x19abf50>, kwargs={'callback': <function at remote 0x1a561b8>}) at remote 0x1a50190>) at remote 0x19b6bd0>, next=None, yielded=<...>)
    self.yield_point.start(self)
#73 Frame 0x1907e50, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 153, in wrapper (args=(<weakproxy at remote 0x1a546d8>, 'BLPOP', 'a_redis_key', 0), kwargs={'callback': <function at remote 0x19a1f50>}, handle_exception=<function at remote 0x19a1488>, result=<generator at remote 0x1a55140>, final_callback=<function at remote 0x19a1cf8>)
    runner.run()
#80 Frame 0x16d2400, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 734, in blpop (self=<weakproxy at remote 0x1a546d8>, keys='a_redis_key', timeout=0, callback=<function at remote 0x19a1f50>, tokens=['a_redis_key', 0])
    self.execute_command('BLPOP', *tokens, callback=callback)
#87 Frame 0x17aae50, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 371, in start (self=<Task(runner=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f970>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19add70>, gen=<generator at remote 0x19abd70>, yield_point=<...>) at remote 0x19b6190>, args=('a_redis_key',), key=<object at remote 0x7fbcb309f970>, func=<functools.partial at remote 0x1a54628>, kwargs={'callback': <function at remote 0x19a1f50>}) at remote 0x19b6510>, runner=<...>)
    self.func(*self.args, **self.kwargs)
#91 Frame 0x195cb80, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 533, in run (self=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f970>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19add70>, gen=<generator at remote 0x19abd70>, yield_point=<Task(runner=<...>, args=('a_redis_key',), key=<object at remote 0x7fbcb309f970>, func=<functools.partial at remote 0x1a54628>, kwargs={'callback': <function at remote 0x19a1f50>}) at remote 0x19b6510>) at remote 0x19b6190>, next={u'a_redis_key': u'redis_response_data'}, yielded=<...>)
    self.yield_point.start(self)
#95 Frame 0x196ef50, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 476, in set_result (self=<Runner(pending_callbacks=set([<object at remote 0x7fbcb309f970>]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19add70>, gen=<generator at remote 0x19abd70>, yield_point=<Task(runner=<...>, args=('a_redis_key',), key=<object at remote 0x7fbcb309f970>, func=<functools.partial at remote 0x1a54628>, kwargs={'callback': <function at remote 0x19a1f50>}) at remote 0x19b6510>) at remote 0x19b6190>, key=<object at remote 0x7fbcb309fa40>, result={u'a_redis_key': u'redis_response_data'})
    self.run()
#99 Frame 0x19ddb30, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 550, in inner (args=({u'a_redis_key': u'redis_response_data'},), kwargs={}, result={...})
    self.set_result(key, result)
#104 Frame 0x192bfb0, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=({u'a_redis_key': u'redis_response_data'},), kwargs={}, ret=None, current_state=((), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), contexts=((...), <...>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
    ret = fn(*args, **kwargs)
#108 Frame 0x1a18370, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 433, in execute_command (self=<weakproxy at remote 0x1a546d8>, cmd='BLPOP', args=('a_redis_key', 0), kwargs={}, result={u'a_redis_key': u'redis_response_data'}, execute_pending=True, callback=<function at remote 0x19a1500>, cmd_line=<CmdLine(cmd='BLPOP', args=('a_redis_key', 0), kwargs={}) at remote 0x19b6e50>, n_tries=1, command='*3\r\n$5\r\nBLPOP\r\n$24\r\na_redis_key\r\n$1\r\n0\r\n', listening=set([]), data='*2\r\n', resp=[u'a_redis_key', u'redis_response_data'])
    callback(result)
#111 Frame 0x1a216f0, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 507, in run (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1758>, gen=<generator at remote 0x19abe60>, yield_point=None) at remote 0x19b6990>, next=[u'a_redis_key', u'redis_response_data'])
    yielded = self.gen.send(next)
#115 Frame 0x18ab510, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 476, in set_result (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1758>, gen=<generator at remote 0x19abe60>, yield_point=None) at remote 0x19b6990>, key=<object at remote 0x7fbcb309fa30>, result=[u'a_redis_key', u'redis_response_data'])
    self.run()
#119 Frame 0x1a9e970, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 550, in inner (args=([u'a_redis_key', u'redis_response_data'],), kwargs={}, result=[...])
    self.set_result(key, result)
#124 Frame 0x196f1c0, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=([u'a_redis_key', u'redis_response_data'],), kwargs={}, ret=None, current_state=((), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), exception_handler=<function at remote 0x19ad1b8>) at remote 0x19b6c10>), contexts=((...), <...>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
    ret = fn(*args, **kwargs)
#128 Frame 0x18fc450, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 490, in consume_multibulk (self=<weakproxy at remote 0x1a546d8>, length=2, cmd_line=<CmdLine(cmd='BLPOP', args=('a_redis_key', 0), kwargs={}) at remote 0x19b6e50>, callback=<function at remote 0x19ad578>, tokens=[u'a_redis_key', u'redis_response_data'], data='$62\r\n', token=u'redis_response_data')
    callback(tokens)
#131 Frame 0x1914da0, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 507, in run (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1ed8>, gen=<generator at remote 0x1a550a0>, yield_point=None) at remote 0x19b65d0>, next=u'redis_response_data')
    yielded = self.gen.send(next)
#135 Frame 0x1915010, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 476, in set_result (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1ed8>, gen=<generator at remote 0x1a550a0>, yield_point=None) at remote 0x19b65d0>, key=<object at remote 0x7fbcb309f9f0>, result=u'redis_response_data')
    self.run()
#139 Frame 0x178f8e0, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 550, in inner (args=(u'redis_response_data',), kwargs={}, result=u'redis_response_data')
    self.set_result(key, result)
#144 Frame 0x19468b0, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=(u'redis_response_data',), kwargs={}, ret=None, current_state=((), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), exception_handler=<function at remote 0x19ad1b8>) at remote 0x19b6c10>), exception_handler=<function at remote 0x19a1d70>) at remote 0x19b6c90>), contexts=((...), <...>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
    ret = fn(*args, **kwargs)
#148 Frame 0x16f5950, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/client.py, line 445, in _consume_bulk (self=<weakproxy at remote 0x1a546d8>, tail='62', callback=<function at remote 0x19a1e60>, response=u'redis_response_data')
    callback(response)
#151 Frame 0x195de10, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 507, in run (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1410>, gen=<generator at remote 0x1a55190>, yield_point=None) at remote 0x19b6dd0>, next='redis_response_data\r\n')
    yielded = self.gen.send(next)
#155 Frame 0x1a06c60, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 476, in set_result (self=<Runner(pending_callbacks=set([]), had_exception=False, finished=False, results={}, running=True, exc_info=None, final_callback=<function at remote 0x19a1410>, gen=<generator at remote 0x1a55190>, yield_point=None) at remote 0x19b6dd0>, key=<object at remote 0x7fbcb309f9e0>, result='redis_response_data\r\n')
    self.run()
#159 Frame 0x17dc140, for file /usr/local/lib/python2.7/dist-packages/tornado/gen.py, line 550, in inner (args=('redis_response_data\r\n',), kwargs={}, result='redis_response_data\r\n')
    self.set_result(key, result)
#164 Frame 0x19cf540, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=('redis_response_data\r\n',), kwargs={}, ret=None, current_state=((), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), exception_handler=<function at remote 0x19ad1b8>) at remote 0x19b6c10>), exception_handler=<function at remote 0x19a1d70>) at remote 0x19b6c90>), contexts=((...), <...>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
    ret = fn(*args, **kwargs)
#169 Frame 0x195e080, for file /usr/local/lib/python2.7/dist-packages/tornadoredis/connection.py, line 148, in read_callback (self=<Connection(info={'db': 0, 'pass': None}, _event_handler=<weakproxy at remote 0x1a546d8>, _stream=<IOStream(_write_buffer=<collections.deque at remote 0x19b8280>, _close_callback=<function at remote 0x19adb18>, _pending_callbacks=1, _read_bytes=None, _closed=False, _write_callback=None, _state=25, max_buffer_size=104857600, io_loop=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b...(truncated)
    callback(*args, **kwargs)
#178 Frame 0x1a10280, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=('some_response_data',), kwargs={}, ret=None, current_state=((), None), contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), <ExceptionStackContext(active=True, new_contexts=None, old_contexts=((...), None), exception_handler=<function at remote 0x19ad398>) at remote 0x19b6610>), exception_handler=<function at remote 0x19a10c8>) at remote 0x19b6a10>), exception_handler=<function at remote 0x19ad1b8>) at remote 0x19b6c10>), exception_handler=<function at remote 0x19a1d70>) at remote 0x19b6c90>), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
    ret = fn(*args, **kwargs)
#183 Frame 0x1a0fc30, for file /usr/local/lib/python2.7/dist-packages/tornado/iostream.py, line 341, in wrapper ()
    callback(*args)
#188 Frame 0x1a108c0, for file /usr/local/lib/python2.7/dist-packages/tornado/stack_context.py, line 302, in wrapped (args=(), kwargs={}, ret=None, current_state=((...), None), contexts=((...), None), exc=(None, None, None), top=None, last_ctx=0, stack=(...))
    ret = fn(*args, **kwargs)
#196 Frame 0x17c5ae0, for file /usr/local/lib/python2.7/dist-packages/tornado/ioloop.py, line 458, in _run_callback (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, ...(truncated)
    callback()
#200 Frame 0x1648130, for file /usr/local/lib/python2.7/dist-packages/tornado/ioloop.py, line 607, in start (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, _closing...(truncated)
    self._run_callback(callback)
#204 Frame 0x1647f50, for file /usr/local/lib/python2.7/dist-packages/zmq/eventloop/ioloop.py, line 160, in start (self=<ZMQIOLoop(_impl=<ZMQPoller(_poller=<Poller(sockets=[(5, 5), (<Socket at remote 0x15f1598>, 4), (3, 5), (4, 5), (15, 5), (16, 5), (13, 5), (26, 5), (27, 5)], _map={3: 2, 4: 3, 5: 0, 13: 6, 15: 4, 16: 5, <...>: 1, 26: 7, 27: 8}) at remote 0x15eaa10>) at remote 0x15ea310>, _handlers={3: <function at remote 0x169cc80>, 4: <function at remote 0x169cd70>, 5: <function at remote 0x15ebc80>, 13: <function at remote 0x19ad7d0>, 15: <function at remote 0x16a2938>, 16: <function at remote 0x16a21b8>, <...>: <function at remote 0x15ebd70>, 26: <function at remote 0x19b20c8>, 27: <function at remote 0x19adc80>}, _running=True, _callbacks=[], _thread_ident=140448435238656, _callback_lock=<thread.lock at remote 0x13ad890>, _cancellations=3, _events={}, time_func=<built-in function time>, _waker=<Waker(writer=<file at remote 0x15d2420>, reader=<file at remote 0x15d2390>) at remote 0x15ead90>, _stopped=False, _c...(truncated)
    super(ZMQIOLoop, self).start()
#208 Frame 0x10f4270, for file /root/server.py, line 81, in <module> ()
    tornado.ioloop.IOLoop.instance().start()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.