Coder Social home page Coder Social logo

brukva's People

Contributors

clement avatar evilkost avatar kmerenkov avatar kmike avatar tnm avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

brukva's Issues

Reconnection stops working

I think it stops working after this change: 6f5514c#L1R321

_sudden_disconnect is raising exception now but on_reconnect is usually called after _sudden_disconnect. Exception pops up and prevents reconnection. This is quite easy to reproduce if redis is executed with 'timeout 1' option.

The traceback (not so useful though):

ERROR:brukva.client:Socket closed on remote end
ERROR:root:Exception in callback <functools.partial object at 0x102966470>
Traceback (most recent call last):
  File "/Users/kmike/svn/tornado/tornado/ioloop.py", line 345, in _run_callback
    callback()
  File "/Users/kmike/svn/tornado/tornado/stack_context.py", line 173, in wrapped
    callback(*args, **kwargs)
  File "/Users/kmike/svn/brukva/brukva/adisp.py", line 111, in _send_result
    self.call(self.g.throw(result))
  File "/Users/kmike/dev/planor/realtime/handlers/pubsub.py", line 27, in change_active_sessions_count
    session_count = yield self.redis.async.hincrby('active_sessions', sessionid, diff)
ConnectionError: Socket closed on remote end

Trouble disconnecting clients due to ConnectionError

I'm not sure if this is an issue with brukva, or just something I don't understand about the best way to use it.

I have handler such as the following:

class MessageHandler(RequestHandler):
    def __init__(self, *args, **kwargs):
        super(MessageHandler, self).__init__(*args, **kwargs)
        self.redis = brukva.Client()
        self.redis.connect()
        self.redis.subscribe('messages')

    @tornado.web.asynchronous
    @tornado.web.authenticated
    def post(self):
        self.redis.listen(self.on_new_message)

    def on_new_message(self, result):
        self.redis.unsubscribe('messages')
        self.redis.disconnect()

        self.finish(dict(
            messages=tornado.escape.json_decode(result.body)
        ))

This works, but I see "ConnectionError('Tried to read from non-existent connection')" in the console - it seems to be happening after the on_new_message is done. If I take out the disconnect call it fixes it, but then all my connections stay open.

Do you know of any solution to this problem?

Thanks!

brukva in long polling

I'm trying to make long polling from asynchronous brukva pubsub
but got error this error in implement tornado chat example

    class MessageNewHandler(BaseHandler):
        @tornado.web.authenticated
        def post(self):
            message = {
                "id": str(uuid.uuid4()),
                "from": self.current_user["first_name"],
                "body": self.get_argument("body"),
            }
            message["html"] = self.render_string("message.html", message=message)
            if self.get_argument("next", None):
                self.redirect(self.get_argument("next"))
            else:
                self.write(message)
                c = brukva.Client()
                c.connect()
                c.publish('test_channel', message)


    class MessageUpdatesHandler(BaseHandler):
        @tornado.web.authenticated
        @tornado.web.asynchronous

        def post(self):
            self.client = brukva.Client()
            self.client.connect()
            self.client.subscribe('test_channel')
            self.client.listen(self.on_new_messages)


        def on_new_messages(self, messages):
            # Closed client connection
            if self.request.connection.stream.closed():
                return
            self.finish(dict(messages=messages))

but i got these error

            [E 110607 02:44:39 web:900] Uncaught exception POST /a/message/updates (127.0.0.1)
            .
            .
            .
            File "/usr/lib/python2.7/json/encoder.py", line 178, in default
            raise TypeError(repr(o) + " is not JSON serializable")
            TypeError: <brukva.client.Message object at 0x9fafe0c> is not JSON serializable

When callback throws an exception, callback's (wrongly) called again

When callback throws an exception (like HTTPError) this callback's called again by brükva, with the exception as argument.

A simple example (with a kludgy workaround) which reproduces the problem:

#!/usr/bin/env python

import tornado.ioloop
import tornado.web
import brukva

rc = brukva.Client()
rc.connect()

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        rc.get("no-such-key", self._get_1)

    def _get_1(self, data):
        # BUG: The following condition feels wrong
        if isinstance(data, tornado.web.HTTPError):
            print("BUG: We're being called with self-thrown exception")
            raise data
        if data is None:
            raise tornado.web.HTTPError(404)
        self.write(data)
        self.finish()

application = tornado.web.Application([
    (r"/", MainHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

The problem is, brükva catches exception, thinks it happened in its own internals and sends it to callbacks at forward_error:29.

Tested with current HEAD (57f8103).

Disconnecting a client from Redis leads to a ConnectionError

When I close the WebSocket my python program prints out the following error:

ERROR:brukva.client:Connection lost
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/brukva/client.py", line 926, in listen
data = yield async(self.connection.readline)()
GeneratorExit: Connection lost
Exception AttributeError: "'ConnectionError' object has no attribute 'body'" in <generator object listen at 0x90a798c> ignored

Although the connection to redis gets closed I still receive that error.

Here a snippet of the code I use:

import tornado.httpserver
import tornado.web
import tornado.websocket
import tornado.ioloop
import brukva

c = brukva.Client()
c.connect()

class WSHandler(tornado.websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        super(WSHandler, self).__init__(*args, **kwargs)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe('measurements')

    def open(self):
        self.client.listen(self.on_message)
        print('new connection')

    def on_message(self, message):
        self.write_message(str(message.body))
        print('message received %s' %message.body)

    def on_close(self):
        self.client.unsubscribe('measurements')
        self.client.disconnect()
        print('connection closed')

Python 3 compatibility

File "my_module", line 4, in
import brukva
File ".../env/lib/python3.6/site-packages/brukva/init.py", line 1, in
from brukva.client import Connection, Client
File ".../env/lib/python3.6/site-packages/brukva/client.py", line 158
except socket.error, e:
^
SyntaxError: invalid syntax

Client brukva connection

class MessageUpdatesHandler(BaseHandler):
@tornado.web.authenticated
@tornado.web.asynchronous
def post(self):
    self.listing_id = self.get_argument("listing_id", None)
    self.cursor = self.get_argument("cursor", None)
    self.client = brukva.Client()
    self.client.connect()
    self.client.subscribe(self.listing_id)
    self.client.listen(self.on_new_messages)

def on_new_messages(self, messages):
    # Closed client connection
    if self.request.connection.stream.closed():
        return
    msg = json.loads(messages.body,'ascii')
    self.finish(dict(messages=[msg]))
    self.client.unsubscribe(self.listing_id)

def on_connection_close(self):
    # unsubscribe user from channel
    self.client.unsubscribe(self.listing_id)
    self.client.disconnect()

When I long poll using above code and check redis activity seems like when ever client send or posts something it creates new connection, so total connected client increasing on top of each other like 2 client talking and send 10 messages total connected client show 15 or sth like that. Is this bug or should I use different approach while connecting ?

Add example

Please add examples, Its takes few hours to understand that module works:

On Windows:
c = brukva.Client(host = '127.0.0.1')

Example:
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web

from tornado.options import define, options

define("port", default=8888, help="run on the given port", type=int)

import logging
import brukva
from brukva import adisp

logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('app')

c = brukva.Client()
c.connect()

class MainHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
@adisp.process
def get(self):
yield c.async.set('hello', 'Hello, world')
foo = yield c.async.get('hello')
self.write(foo)
self.finish()

def main():
tornado.options.parse_command_line()
application = tornado.web.Application([
(r"/", MainHandler),
])
http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()

if name == "main":
main()

self.connection.readline | GeneratorExit

When user subscribe to channel and do nothing unsubscribe and disconnect this is the exception I got. Its sometimes appear when some messae passed between publisher and subscribers

[E 111217 01:07:03 client:49] None
Traceback (most recent call last):
File "/usr/local/lib/python2.6/dist-packages/brukva-0.0.1-py2.6.egg/brukva/client.py", line 926, in listen
data = yield async(self.connection.readline)()
GeneratorExit

Redis set callback not being called.

I'm having trouble with an intermittent timeout between redis and brukva. I'm setting a key,value record and assigning a callback which is never called.

TCPDump shows redis receiving the SET message with the appropriate payload but the callback is never executed leaving requests to time out.

Restarting the tornado instances that use brukva does not solve the issue, however restarting both restarting redis-server and tornado in that order solves the problem.

I'm using redis version 2.4.4 on Debian Squeeze. I'm using tornado 2.1.1 and brukva 0.0.1

Any help would be appreciated as this bug is killing my app on a regular basis.

None is passed to Client.listen callback instead of brukva.client.Message instance

I got this problem again.

Stack trace from ipdb's 'bt' command:

/Users/kmike/svn/tornadio/tornadio/server.py(71)__init__()
     67             except Exception, ex:
     68                 logging.error('Failed to start Flash policy server: %s', ex)
     69 
     70         logging.info('Entering IOLoop...')
---> 71         io_loop.start()

  /Users/kmike/svn/tornado/tornado/ioloop.py(270)start()
    268                 fd, events = self._events.popitem()
    269                 try:
--> 270                     self._handlers[fd](fd, events)
    271                 except (KeyboardInterrupt, SystemExit):
    272                     raise

  /Users/kmike/svn/tornado/tornado/stack_context.py(173)wrapped()
    171                 callback(*args, **kwargs)
    172         else:
--> 173             callback(*args, **kwargs)
    174     if getattr(fn, 'stack_context_wrapped', False):
    175         return fn

  /Users/kmike/svn/tornado/tornado/iostream.py(199)_handle_events()
    197         try:
    198             if events & self.io_loop.READ:
--> 199                 self._handle_read()
    200             if not self.socket:
    201                 return

  /Users/kmike/svn/tornado/tornado/iostream.py(251)_handle_read()
    249                 # can't see it; the only way to find out if it's there is to

    250                 # try to read it.

--> 251                 result = self._read_to_buffer()
    252             except Exception:
    253                 self.close()

  /Users/kmike/svn/tornado/tornado/iostream.py(287)_read_to_buffer()
    285         """
    286         try:
--> 287             chunk = self._read_from_socket()
    288         except socket.error, e:
    289             # ssl.SSLError is a subclass of socket.error


  /Users/kmike/svn/tornado/tornado/iostream.py(275)_read_from_socket()
    273                 raise
    274         if not chunk:
--> 275             self.close()
    276             return None
    277         return chunk

  /Users/kmike/svn/tornado/tornado/iostream.py(180)close()
    178             self.socket = None
    179             if self._close_callback:
--> 180                 self._run_callback(self._close_callback)
    181 
    182     def reading(self):

  /Users/kmike/svn/tornado/tornado/iostream.py(230)_run_callback()
    228             # inside our blanket exception handler rather than outside.

    229             with stack_context.NullContext():
--> 230                 callback(*args, **kwargs)
    231         except:
    232             logging.error("Uncaught exception, closing connection.",

  /Users/kmike/svn/tornado/tornado/stack_context.py(173)wrapped()
    171                 callback(*args, **kwargs)
    172         else:
--> 173             callback(*args, **kwargs)
    174     if getattr(fn, 'stack_context_wrapped', False):
    175         return fn

  /Users/kmike/svn/tornado/tornado/websocket.py(204)on_connection_close()
    202     def on_connection_close(self):
    203         self.client_terminated = True
--> 204         self.on_close()
    205 
    206     def _not_supported(self, *args, **kwargs):

  /Users/kmike/svn/tornadio/tornadio/persistent.py(84)on_close()
     82     def on_close(self):
     83         try:
---> 84             self.connection.on_close()
     85         finally:
     86             self.connection.is_closed = True

  /Users/kmike/dev/planor/realtime/connection.py(73)on_close()
     71     def on_close(self):
     72         if self.authenticated:
---> 73             self.logout()
     74 
     75     def logout(self):

  /Users/kmike/dev/planor/realtime/connection.py(77)logout()
     75     def logout(self):
     76         self.authenticated = False
---> 77         self.pubsub_client.disconnect()
     78         for handler in self.handlers:
     79             handler.on_logout()

  /Users/kmike/svn/brukva/brukva/client.py(334)disconnect()
    332 
    333     def disconnect(self):
--> 334         self.connection.disconnect()
    335 
    336     def on_connect(self):

  /Users/kmike/svn/brukva/brukva/client.py(141)disconnect()
    139             except socket.error, e:
    140                 pass
--> 141             self._stream = None
    142 
    143     def write(self, data, try_left=None):

  /Users/kmike/svn/brukva/brukva/client.py(873)listen()
    871                     raise response
    872                 result = self.format_reply(cmd_listen, response)
--> 873                 ctx.ret_call(result)
    874 
    875     ### CAS


  /Users/kmike/svn/brukva/brukva/client.py(39)__exit__()
     37 
     38         if self.is_active:
---> 39             self._call_callbacks(value)
     40             return True
     41         else:

  /Users/kmike/svn/brukva/brukva/client.py(29)_call_callbacks()
     27                     cb(value)
     28             else:
---> 29                 self.callbacks(value)
     30 
     31     def __enter__(self):

  /Users/kmike/dev/planor/realtime/connection.py(83)on_pubsub_message()
     79             handler.on_logout()
     80 
     81     def on_pubsub_message(self, message):
     82         for handler in self.handlers:
---> 83             handler.on_pubsub_message(message)

None
> /Users/kmike/dev/planor/realtime/handlers/debug.py(14)on_pubsub_message()
     12         logging.debug(message)
     13         if message is None:
---> 14             import ipdb; ipdb.set_trace()
     15         #logging.debug('channel: %s, body: %s' % (message.channel, message.body))

     16 

Subscribing twice on the same channel prevents further new subscriptions

The example code below should expose the issue. Subscribing twice on the same channel, prevents further new subscriptions on any channel.

Calling listen() only once doesn't seem to help.

import time
import random
from functools import partial

import brukva
import tornado


def start_data_producer():

    now = time.time()
    ioloop = tornado.ioloop.IOLoop.instance()

    c = brukva.Client()
    c.connect()

    def pub(channel):
        data = random.randint(0,10)
        print "Publishing %s on %s" % (data, channel)
        c.publish(channel, data)

    tick = tornado.ioloop.PeriodicCallback(partial(pub, "test"), 1000)
    tick.start()

    tick2 = tornado.ioloop.PeriodicCallback(partial(pub, "test_2"), 1000)
    tick2.start()

def run_test():

    now = time.time()
    ioloop = tornado.ioloop.IOLoop.instance()

    c = brukva.Client()
    c.connect()

    def callback(result):
        print "Received: %s" % result.body 

    def unsub(channel):
        print "=== Unsubscribing: %s ===" % channel
        c.unsubscribe(channel)

    def sub(channel):
        print "=== Suscribing: %s ===" % channel
        c.subscribe(channel)
        # Should listen() be called for every subscribe() call?
        c.listen(callback)    

    ioloop.add_timeout(now, partial(sub, "test")) 

    # If we don't perform this second subscribe call then all is good. 
    ioloop.add_timeout(now + 5, partial(sub, "test")) 

    ioloop.add_timeout(now + 10, partial(unsub, "test"))
    ioloop.add_timeout(now + 15, partial(sub, "test"))
    ioloop.add_timeout(now + 20, partial(sub, "test_2"))


if __name__ == '__main__':
    start_data_producer()    
    run_test()
    tornado.ioloop.IOLoop.instance().start()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.