Coder Social home page Coder Social logo

pgiri / asyncoro Goto Github PK

View Code? Open in Web Editor NEW
51.0 6.0 13.0 2 MB

Python framework for asynchronous, concurrent, distributed, network programming with coroutines

License: MIT License

Python 92.13% CSS 0.86% JavaScript 0.40% HTML 6.61%
concurrent-programming asynchronous-programming distributed-computing cloud-computing

asyncoro's Introduction

asyncoro's People

Contributors

pgiri avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

asyncoro's Issues

multiple threading

Hi pgiri,

I like this project, and its documentations. I actually learn a lot from it.

My background is data scientist. As a newbie for asynchronous programming, my goals might be rather ambitious. I want to write a scheduler to control several machine in a small cluster, assigning processing tasks to them.

I know there is Hadoop, spark, and many map-reduce framework alike. But I still want to give a try by myself. because

  1. our image processing jobs are computing intensive, I want to be flexible with computing, be it GPU computing, or many core computing or anything. I would like the scheduler can assign proper computing resources to each job, and being able to tune for performances.

  2. I want to be flexible with data storage, our data is managed with other software, I want to be compatible, without importing them info HDFS or any specific system constrained by the framework. I would write a simple data module to extract them.

All being said, I feel asyncoro and dispy are conveniet tools for my purpose. And here is my question, which among a lot others, pops in my head on top. The Asyncoro scheduler seems to be single threaded. How about the coroutines? Are they wrapped into a single python thread or they are truly multiple threaded on hardware level? If I managed to build such a system, are they going to perform well in real work?

Thanks for you patient,
Chuan

finish() method on a RCI does not work

server.py:

def faraway_proc(coro=None):
    logger.info("faraway started")
    yield coro.locate("farfaraway_proc", timeout=10)
    logger.info("faraway ended")

faraway = asyncoro.RCI(faraway_proc)
faraway.register()
[...]

client.py:

def client(coro=None):
    yield scheduler.peer(asyncoro.Location("127.0.0.2", 10001), udp_port=51350)
    rci1 = yield asyncoro.RCI.locate('faraway_proc')
    print "client rci1 start"
    rcoro = yield rci1()
    print "client rci1 running"
    value = yield rcoro.finish()
    print "client rci1 ended"
    print value

scheduler = asyncoro.AsynCoro(node="127.0.0.2", name="client1", tcp_port=2002)            
asyncoro.Coro(client)

With the above code, the following exception ocurrs:

2016-04-01 12:26:17,155 - asyncoro - network server "client1" @ 127.0.0.2:2002, udp_port=51350                                                                                      
[email protected]:10001                                                            
client rci1 start                                                                         
client rci1 running                                                                       
2016-04-01 12:26:17,169 - asyncoro - uncaught exception in client/[email protected]:2002:                                                                                 
Traceback (most recent call last):                                                        
  File "/home/oscar/dev/projects/client/src/lib/python2.7/site-packages/asyncoro/__init__.py", line 3327, in _schedule                                                              
    retval = coro._generator.throw(*exc)                                                  
  File "client.py", line 43, in client                                                    
    value = yield rcoro.finish()                                                              
  File "/home/oscar/dev/projects/client/src/lib/python2.7/site-packages/asyncoro/__init__.py", line 3329, in _schedule                                                              
    retval = coro._generator.send(coro._value)                                            
  File "/home/oscar/dev/projects/client/src/lib/python2.7/site-packages/asyncoro/__init__.py", line 2412, in finish                                                                 
    if self._complete is None:                                                            
AttributeError: _complete

faraway_proc works as expected:

2016-04-01 12:11:58,718 - asyncoro - network server "test1" @ 127.0.0.2:10001, udp_port=51350
INFO:asyncoro:network server "test1" @ 127.0.0.2:10001, udp_port=51350
INFO:server:Initialization completed
INFO:server:faraway started
INFO:server:faraway ended

Am I doing anything wrong?

Thanks!

Random exception using disasyncoro since version 4.1

Hi,

I'm randomly seeing this exception when two process comunicate each other through two definned channels. This only happens under version 4.1:

2016-07-05 09:55:45 asyncoro - uncaught exception in !req_proc/[email protected]:10002:
Traceback (most recent call last):
  File "/home/oscar/dev/PythonEnvs/producction/lib/python2.7/site-packages/asyncoro/__init__.py", line 3649, in _schedule
    retval = coro._generator.send(coro._value)
 File "/home/oscar/dev/PythonEnvs/producction/lib/python2.7/site-packages/asyncoro/disasyncoro.py", line 183, in req_proc
    req = self.reqs.popleft()
IndexError: pop from an empty deque

Thanks!

Invalid coroutine to resume with coro.sleep() call

I have a sleep within a generator function and I always get following error after expired:

invalid coroutine xxxxxx to resume

the xxxxxx should be the coroutine id.

The generator function is as following:
def generate_job(job, next_channel, coro=None): log.debug(job) coro.set_daemon() if utils.get_from_dict(job, ['schedule', 'repeated']): next_channel.send(job) while True: yield coro.sleep(utils.get_from_dict(job, ['schedule', 'repeated', 'interval'])) next_channel.send(job)

Windows/IOCP crashes

I have some cryptic issues with IOCP backend on windows. Got this behavior using python 2.7.13 and Windows 7/x86 and Windows 10/x86_64.
Story goes like this. I wrote some kind of proxy server using asyncoro and rpyc. Once one side closes connection, remote backend also forces closing. After some time (very small) application crashes. Stack trace may be different, but looks like it:

python27!_PyObject_GC_Malloc(unsigned int64 basicsize = 0x1f)+0x120 [c:\build27\cpython\modules\gcmodule.c @ 1512]
python27!PyMethod_New(struct _object * func = 0x00000000`046719e8, struct _object * self = 0x00000000`00000000, struct _object * klass = 0x00000000`040eb1e0)+0xe1 [c:\build27\cpython\objects\classobject.c @ 2263]
python27!_PyObject_GenericGetAttrWithDict(struct _object * obj = 0x00000000`048feda0, struct _object * name = 0x00000000`048feda0, struct _object * dict = 0x00000000`040ee0ca)+0x1aa [c:\build27\cpython\objects\object.c @ 1440]
python27!PyEval_EvalFrameEx(struct _frame * f = 0x00000000`03f4a848, int throwflag = 0n1)+0x3124 [c:\build27\cpython\python\ceval.c @ 2544]
python27!PyEval_EvalCodeEx(struct PyCodeObject * co = 0x00000000`040e9bb0, struct _object * globals = 0x00000000`00000002, struct _object * locals = 0x00000000`00000002, struct _object ** args = 0x00000000`040b27a8, int argcount = 0n2, struct _object ** kws = 0x00000000`040d0f10, int kwcount = 0n2, struct _object ** defs = 0x00000000`00000000, int defcount = 0n0, struct _object * closure = 0x00000000`00000000)+0x911 [c:\build27\cpython\python\ceval.c @ 3584]
python27!function_call(struct _object * func = 0x00000000`04671908, struct _object * arg = 0x00000000`0429d308, struct _object * kw = 0x00000000`0427ad08)+0x178 [c:\build27\cpython\objects\funcobject.c @ 528]
python27!PyObject_Call(struct _object * func = 0x00000000`048feda0, struct _object * arg = 0x00000000`026147b8, struct _object * kw = 0x00000000`02191048)+0x83 [c:\build27\cpython\objects\abstract.c @ 2548]
python27!instancemethod_call(struct _object * func = 0x00000000`00000000, struct _object * arg = 0x00000000`00000020, struct _object * kw = 0x00000000`00000002)+0x219 [c:\build27\cpython\objects\classobject.c @ 2603]
python27!PyObject_Call(struct _object * func = 0x00000000`048feda0, struct _object * arg = 0x00000000`0466ebe0, struct _object * kw = 0x00000000`0427ad08)+0x83 [c:\build27\cpython\objects\abstract.c @ 2548]
python27!slot_tp_init(struct _object * self = 0x00000000`048feda0, struct _object * args = 0x00000000`0466ebe0, struct _object * kwds = 0x00000000`0427ad08)+0xd0 [c:\build27\cpython\objects\typeobject.c @ 5807]
python27!type_call(struct _typeobject * type = 0x00000000`0427ad08, struct _object * args = 0x00000000`0427ad08, struct _object * kwds = 0x00000000`00000000)+0xd6 [c:\build27\cpython\objects\typeobject.c @ 765]
python27!PyObject_Call(struct _object * func = 0x00000000`03f1bdc8, struct _object * arg = 0x00000000`0427ad08, struct _object * kw = 0x00000000`00000000)+0x83 [c:\build27\cpython\objects\abstract.c @ 2548]
python27!ext_do_call(struct _object * func = 0x00000000`03f1bdc8, struct _object *** pp_stack = 0x00000000`00000002, int flags = 0n141, int na = 0n1, int nk = 0n0)+0x2eb [c:\build27\cpython\python\ceval.c @ 4666]
python27!PyEval_EvalFrameEx(struct _frame * f = 0x00000000`040dbd30, int throwflag = 0n1)+0x3af3 [c:\build27\cpython\python\ceval.c @ 3031]
python27!PyEval_EvalCodeEx(struct PyCodeObject * co = 0x00000000`040e9f30, struct _object * globals = 0x00000000`00000001, struct _object * locals = 0x00000000`00000001, struct _object ** args = 0x00000000`00000000, int argcount = 0n1, struct _object ** kws = 0x00000000`0429d4e0, int kwcount = 0n1, struct _object ** defs = 0x00000000`00000000, int defcount = 0n0, struct _object * closure = 0x00000000`00000000)+0x911 [c:\build27\cpython\python\ceval.c @ 3584]
python27!function_call(struct _object * func = 0x00000000`04671ac8, struct _object * arg = 0x00000000`048fe240, struct _object * kw = 0x00000000`04904d08)+0x178 [c:\build27\cpython\objects\funcobject.c @ 528]
python27!PyObject_Call(struct _object * func = 0x00000000`048fe470, struct _object * arg = 0x00000000`67557038, struct _object * kw = 0x00000000`040e9e30)+0x83 [c:\build27\cpython\objects\abstract.c @ 2548]
python27!instancemethod_call(struct _object * func = 0x00000000`04904d08, struct _object * arg = 0x00000000`67540fe6, struct _object * kw = 0x00000000`04677ed8)+0x219 [c:\build27\cpython\objects\classobject.c @ 2603]
python27!PyObject_Call(struct _object * func = 0x00000000`04671ac8, struct _object * arg = 0x00000000`02191048, struct _object * kw = 0x00000000`04904d08)+0x83 [c:\build27\cpython\objects\abstract.c @ 2548]

It always crashes on malloc on creating some object. The path depends on actual python code. I believe it happens because at some step IOCP code write garbage to python object memory pool. The top caller always _schedule() function.

Usually not long before the crash I get exception like this:

(10053, 'WSASend', 'An established connection was aborted by the software in your host machine.')

So I have feeling that there maybe an issue with IOCP request cancellation.

I can easily reproduce this behavior with my program, so I can share It and minidumps with full memory dump. But because it quite big maybe you can provide some assumptions what to check before that.

Running simple async process in the background

Great work here! I'm trying to use this on a raspberry pi, which collects bunch of points and writes to a influx database.. Since the writing is in bulk, because of network IO, it stops the program when doing the 'post request' to the database, :( hence I was trying to use asyncoro library..As I understand the some_long_func() is suppose to return a generator. I'm confused with this part, since I ONLY want to make the processing work in the background while not stopping the main event loop.. What should I return in that case? How can I do this?

def some_long_func(coro=None):
    ""this function does a long POST request using some custom library""
    #yield coro.sleep(0.1)
    TimeSeriesHelper(send_json_to_server) # class for doing the work of POST
    print 'final done'

def main_func():
    asyncoro.Coro(some_long_func)
    for i in range(20):
        print i
        time.sleep(0.5)
main_func()

AttributeError: '_AsyncNotifier' object has no attribute '_poller_name' on windows with pywin32

When I tried to run dispynode.py from dispy pacakge on win8 x86 with python 2.7.9 and pywin32 installed, I got this:

Traceback (most recent call last):
File ".\dispynode.py", line 1691, in
_dispy_node = DispyNode(**dispy_config)
File ".\dispynode.py", line 247, in init
self.asyncoro = AsynCoro()
File "D:\Python27\lib\site-packages\asyncoro__init
.py", line 78, in call
cls.instance = super(MetaSingleton, cls).__call(args, *kwargs)
File "D:\Python27\lib\site-packages\asyncoro__init
.py", line 3109, in init
version, self._notifier._poller_name)
AttributeError: '_AsyncNotifier' object has no attribute '_poller_name'

version = "3.6.15"
Remove lines 3108 and 3109, works fine.

AttributeError: 'NoneType' object has no attribute '_proceed_'

I am using dispy which uses asyncoro (4.5.3) as a dependency.

I noticed that my nodes sometimes crash because of the following error:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Anaconda\lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "C:\Anaconda\lib\threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Anaconda\lib\site-packages\gulbis\ext\asyncoro\__init__.py", line 3631, in _schedule
    self._notifier.poll(0)
  File "C:\Anaconda\lib\site-packages\gulbis\ext\asyncoro\__init__.py", line 1204, in poll
    overlap.object(err, n)
  File "C:\Anaconda\lib\site-packages\gulbis\ext\asyncoro\__init__.py", line 1430, in _recvall
    self._read_coro._proceed_('')
AttributeError: 'NoneType' object has no attribute '_proceed_'

Specify interface broadcast address while broadcasting message to discover peers

When starting disasyncoro.AsynCoro with param discover_peers=True, a broadcast message will be sent using name <broadcast> to discover peers in local network.
According to http://stackoverflow.com/questions/683624/udp-broadcast-on-all-interfaces, if there are multi ethernet interfaces, we don't know which interface will be used. That cause the discovering failed.

  if discover_peers:
        ping_sock = AsyncSocket(socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
        ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        ping_sock.settimeout(2)
        ping_msg = {'location': self._location, 'signature': self._signature,
                    'name': self._name, 'version': __version__}
        ping_msg = 'ping:' + serialize(ping_msg)
        try:
            #yield ping_sock.sendto(ping_msg, ('<broadcast>', self._udp_sock.getsockname()[1]))
            # change to something like this. 
            yield ping_sock.sendto(ping_msg, ('192.168.1.255' , self._udp_sock.getsockname()[1]))
        except:
            pass
        ping_sock.close()

I'd like to help with that, but I have no elegant solution currently.

Current master is broken

In [1]: import asyncoro

In [2]: def test(coro=None):
   ...:     yield "Hello"
   ...:

In [3]: asyncoro.Coro(test)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-3-e30813260462> in <module>()
----> 1 asyncoro.Coro(test)

c:\python27\lib\site-packages\asyncoro\__init__.pyc in __init__(self, *args, **kwargs)
   2429         self._hot_swappable = False
   2430         if not Coro._asyncoro:
-> 2431             Coro._asyncoro = self._scheduler = AsynCoro.instance()
   2432         self._scheduler = self.__class__._asyncoro
   2433         self._location = self._scheduler._location

c:\python27\lib\site-packages\asyncoro\__init__.pyc in instance(cls, *args, **kwargs)
   3400         """
   3401         if not cls._instance:
-> 3402             cls._instance = cls(*args, **kwargs)
   3403         return cls._instance
   3404

c:\python27\lib\site-packages\asyncoro\__init__.pyc in __call__(cls, *args, **kwargs)
     90     def __call__(cls, *args, **kwargs):
     91         if not cls._instance:
---> 92             cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
     93         return cls._instance
     94

c:\python27\lib\site-packages\asyncoro\__init__.pyc in __init__(self)
   3366             AsynCoro._instance = self
   3367             Coro._asyncoro = Channel._asyncoro = self
-> 3368         self._notifier = _AsyncNotifier()
   3369         self._location = None
   3370         self._name = ''

c:\python27\lib\site-packages\asyncoro\__init__.pyc in __init__(self)
   1138                 self.cmd_rsock = AsyncSocket(self.cmd_rsock)
   1139                 self.cmd_rsock_buf = win32file.AllocateReadBuffer(128)
-> 1140                 self.cmd_rsock._read_overlap.object = self.cmd_rsock_recv
   1141                 err, n = win32file.WSARecv(self.cmd_rsock._fileno, self.cmd_rsock_buf,
   1142                                            self.cmd_rsock._read_overlap, 0)

AttributeError: 'NoneType' object has no attribute 'object'

PyPI Installation

Needs:

  • setup.py (most of the info is already in the asyncoro.py files)
    • classifiers
    • author
    • email
    • version
    • license
    • project URL
  • MANIFEST.in
  • a means to determine from setup.py which asyncoro to install (2 or 3)

I'll be glad to take this on if interest in this issue is demonstrated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.