Coder Social home page Coder Social logo

reactivex / rxpy Goto Github PK

View Code? Open in Web Editor NEW
4.7K 103.0 357.0 5.47 MB

ReactiveX for Python

Home Page: https://rxpy.rtfd.io

License: MIT License

Python 83.38% Jupyter Notebook 16.55% JavaScript 0.07%
python reactive reactivex reactive-extensions rxpy

rxpy's Introduction

The ReactiveX for Python (RxPY)

Build Status

Coverage Status

PyPY Package Version

Documentation Status

A library for composing asynchronous and event-based programs using observable collections and query operator functions in Python

ReactiveX for Python v4

For v3.X please go to the v3 branch.

ReactiveX for Python v4.x runs on Python 3.7 or above. To install:

pip3 install reactivex

About ReactiveX

ReactiveX for Python (RxPY) is a library for composing asynchronous and event-based programs using observable sequences and pipable query operators in Python. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using operators, and parameterize concurrency in data/event streams using Schedulers.

import reactivex as rx
from reactivex import operators as ops

source = rx.of("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

composed = source.pipe(
    ops.map(lambda s: len(s)),
    ops.filter(lambda i: i >= 5)
)
composed.subscribe(lambda value: print("Received {0}".format(value)))

Learning ReactiveX

Read the documentation to learn the principles of ReactiveX and get the complete reference of the available operators.

If you need to migrate code from RxPY v1.x or v3.x, read the migration section.

There is also a list of third party documentation available here.

Community

Join the conversation on GitHub Discussions! if you have any questions or suggestions.

Differences from .NET and RxJS

ReactiveX for Python is a fairly complete implementation of Rx with more than 120 operators, and over 1300 passing unit-tests. RxPY is mostly a direct port of RxJS, but also borrows a bit from Rx.NET and RxJava in terms of threading and blocking operators.

ReactiveX for Python follows PEP 8, so all function and method names are snake_cased i.e lowercase with words separated by underscores as necessary to improve readability.

Thus .NET code such as:

need to be written with an _ in Python:

group = source.pipe(ops.group_by(lambda i: i % 3))

With ReactiveX for Python you should use named keyword arguments instead of positional arguments when an operator has multiple optional arguments. RxPY will not try to detect which arguments you are giving to the operator (or not).

Development

This project is managed using Poetry. Code is formatted using Black, isort. Code is statically type checked using pyright and mypy.

If you want to take advantage of the default VSCode integration, then first configure Poetry to make its virtual environment in the repository:

poetry config virtualenvs.in-project true

After cloning the repository, activate the tooling:

poetry install
poetry run pre-commit install

Run unit tests:

poetry run pytest

Run code checks (manually):

poetry run pre-commit run --all-files

rxpy's People

Contributors

38elements avatar axgkl avatar azureblade3808 avatar bosonogi avatar bradleynull avatar christiansandberg avatar dbrattli avatar diorcety avatar ebrattli avatar erikkemperman avatar fqxp avatar frederikaalund avatar hangtwenty avatar hoc081098 avatar jcafhe avatar jdreaver avatar kstreee avatar mainro avatar matiboy avatar mattpodwysocki avatar maxjohansen avatar mikeschneeberger avatar mvschaik avatar neilvyas avatar pillmuncher avatar rgbkrk avatar thomasnield avatar timothy-shields avatar tobiahlissens avatar tsteelematc avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxpy's Issues

Understading Observable.defer

Hi, me again, translating from Rx scala to python...

In my scala code I have:

 private def process[A](msg: SQSMessage, action: Action[A]): Observable[Result[A]] =
    Observable.defer(Observable(_ => Result(action(msg.getBody), msg)))

 process(msg, action).onErrorResumeNext {
          ex =>
            logger.error("Error processing message", ex)
            sendMessage(queueName)(msg.getBody)
        }

Trying to do the same with python does work:

def __process(msg, action):
        return Observable.defer(lambda: Observable(lambda s: s.on_next(Result(action(msg.get_body()), msg))))

send = lambda message: self.send_message(queue_name, message.get_body())
__process(message, action).on_error_resume_next(send)

The problem is when action throws an exception, in the scala version onErrorResumeNext is called and sendMessage returns an Observable. In the python version though, this is what happens(I'm using python 2.7.5):

 File "/Users/caente/nlp/lib/python2.7/site-packages/rx/autodetachobserver.py", line 17, in next
    self.observer.on_next(value)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 24, in on_next
    self.next(value)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/linq/observable/merge.py", line 172, in on_next
    on_complete)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/observable.py", line 47, in subscribe
    return self._subscribe(observer)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/anonymousobservable.py", line 43, in _subscribe
    set_disposable()
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/anonymousobservable.py", line 35, in set_disposable
    if not auto_detach_observer.fail(ex):
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 52, in fail
    self.error(exn)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/autodetachobserver.py", line 24, in error
    self.observer.on_error(exn)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 34, in on_error
    self.error(error)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 34, in on_error
    self.error(error)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/autodetachobserver.py", line 24, in error
    self.observer.on_error(exn)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/abstractobserver.py", line 34, in on_error
    self.error(error)
  File "/Users/caente/nlp/lib/python2.7/site-packages/rx/internal/basic.py", line 26, in default_error
    raise err
TypeError: unbound method from_future() must be called with Observable instance as first argument (got function instance instead)

error when calling on_completed on sampled observable

When creating a stream using the "sample" method with an observable as sampler, I get the following error when on_completed is called "TypeError: sample_subscribe() takes exactly 1 argument (0 given)"

Here is a the sample code how to reproduce it:

import rx

def show(x):
    print x

obs = rx.subjects.Subject()
sampler = rx.subjects.Subject()

obs.sample(sampler=sampler).subscribe(show)

obs.on_next(1)
sampler.on_next(0)

sampler.on_completed()

And the output:

1
Traceback (most recent call last):
  File "sample_test.py", line 14, in <module>
    sampler.on_completed()
  File "/usr/local/lib/python2.7/dist-packages/rx/subjects/subject.py", line 57, in on_completed
    observer.on_completed()
  File "/usr/local/lib/python2.7/dist-packages/rx/abstractobserver.py", line 35, in on_completed
    self._completed()
TypeError: sample_subscribe() takes exactly 1 argument (0 given)

Thanks for your help

Observable.interval raises "AttributeError: 'int' object has no attribute 'dispose'" with the AsyncIOScheduler

The following program illustrates the problem:

import asyncio
import rx
import time

scheduler = rx.concurrency.AsyncIOScheduler()
sub0 = rx.Observable.interval(1000, scheduler=scheduler) \
    .subscribe(print, print, print)

try:
    loop = asyncio.get_event_loop()
    loop.run_forever()
except:
    pass

sub0.dispose()

I would expect the above to exit normally. However, I get the following error message instead:

Traceback (most recent call last):
  File "./rx_test.py", line 17, in <module>
    sub0.dispose()
  File "/opt/sd/lib/python3.5/site-packages/rx/autodetachobserver.py", line 40, in dispose
    self.m.dispose()
  File "/opt/sd/lib/python3.5/site-packages/rx/disposables/booleandisposable.py", line 50, in dispose
    old.dispose()
  File "/opt/sd/lib/python3.5/site-packages/rx/disposables/compositedisposable.py", line 63, in dispose
    disposable.dispose()
  File "/opt/sd/lib/python3.5/site-packages/rx/disposables/booleandisposable.py", line 50, in dispose
    old.dispose()
AttributeError: 'int' object has no attribute 'dispose'

Python 3.5.1 and RxPY master branch (where I have applied the fix for issue #90).

Observable.repeat() causes maximum recursion depth exceeded

Rx version: 1.2.4
on both python2 and python3.

I'm new to rx and maybe I'm doing something wrong but using repeat with value > 60 causes RuntimeError: maximum recursion depth exceeded while calling a Python object.

Code example:
Observable.just(0).repeat().subscribe(lambda _: None)

Can't use partial function as a function in flat_map

I created a StackOverflow post describing my problem.

The issue is that I have a partial function (created via functools.partial) that I want to pass to flat_map. However, doing so results in the following:

Traceback (most recent call last):
  File "retry/example.py", line 46, in <module>
    response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
  File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/linq/observable/selectmany.py", line 67, in select_many
    selector = adapt_call(selector)
  File "/home/justin/virtualenv/retry/local/lib/python2.7/site-packages/rx/internal/utils.py", line 37, in adapt_call_1
    argnames, varargs, kwargs = getargspec(func)[:3]
  File "/usr/lib/python2.7/inspect.py", line 816, in getargspec
    raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <method-wrapper '__call__' of functools.partial object at 0x2ce6cb0> is not a Python function

Here is some sample code which reproduces the problem:

from __future__ import absolute_import
from rx import Observable, Observer
from pykafka import KafkaClient
from pykafka.common import OffsetType
import logging
import requests
import functools


logger = logging.basicConfig()


def puts(thing):
    print thing


def message_stream(consumer):
    def thing(observer):
        for message in consumer:
            observer.on_next(message)

    return Observable.create(thing)


def message_handler(message, context=None):
    def req():
        return requests.get('http://httpbin.org/get')

    return Observable.start(req)


def handle_response(message, response, context=None):
    consumer = context['consumer']
    producer = context['producer']
    t = 'even' if message % 2 == 0 else 'odd'
    return str(message) + ': ' + str(response) + ' - ' + t + ' | ' + str(consumer) + ' | ' + producer


consumer = ['pretend', 'these', 'are', 'kafka', 'messages']
producer = 'some producer'
context = {
    'consumer': consumer,
    'producer': producer
}
message_stream = message_stream(consumer)
response_stream = message_stream.flat_map(functools.partial(message_handler, context=context))
message_response_stream = message_stream.zip(response_stream, functools.partial(handle_response, context=context))
message_stream.subscribe(puts)

Observable.from_array ?

Hello I had to revisit a project that is using RxPY, it was on version 0.10, so I upgraded to version 1.0.0.

Now it seems like Observable.from_array no longer exists, is there any option to create an observable from an array?

ReplaySubject() doesn't seem to replay

I mean, I'm probably just not getting it. Looking at RxJava documentation and test_replaysubject.py I'm trying to make a network of Observables that runs in a predictable, deterministic way. (This might not be what RxPY is for? but anyway). I've found one approach that works better, using publish() and connect(). But it seems like ReplaySubject is advertised to accomplish this. If it worked, this code would print the sequence twice. It only prints it once.

from rx.subjects import (Subject, ReplaySubject)
from rx import Observable, Observer

# An observer who takes (int, nbits) and emits ASCII '0' and '1' LSB first

def to_bitstring(t):
  val, nbits = t
  return Observable.from_iterable([str(1 & (val >> i)) for i in range(nbits)])

# An observer who takes things and prints them

def myprint(*args):
  print(*args)
  return None
printer = Observer(myprint)

tstdata = [(0xFF, 8), (0xa0, 8)]
expander = Observable.\
             from_iterable(tstdata).map(to_bitstring).concat_all()
replay = ReplaySubject()
expander.subscribe(replay)
replay.subscribe(printer)
replay.subscribe(printer)

GEventScheduler exits early?

I'm probably just doing something wrong. I've been trying to play with GEvent support, but I am having a hard time getting it to work. My first attempt tries to just use the GEventScheduler to print a series of words:

import rx
from rx.concurrency.mainloopscheduler import GEventScheduler
from rx.concurrency import CurrentThreadScheduler
import gevent

words = ["carriage", "boots", "intriguing", "pepper", "vitamins"]

def output(result):
    print(result)

scheduler = GEventScheduler()
#scheduler = CurrentThreadScheduler()
rx.Observable.from_iterable(words, scheduler=scheduler).subscribe(output)

gevent.wait()

The above code only outputs the first word ("carriage"). Any idea why this terminates early?

PyPI package not updated

Version 1.2.2 has been out for almost 2 months, but PyPI still only has version 1.2.1. Could someone with the power to do so update the PyPI version of Rx? @ardoramor made note of this in #50 but I decided to make a separate issue in case his message was buried. Thanks in advance!

Controlled Observables Bug(s)

Hey,

Thank you so much for maintaining this module. It's super useful.

  1. Are we missing a .controlled() extension method to turn anObservable into a ControlledObservable? rxJS appears to have one.
  2. I think there might be a bug in the constructor of WindowedObservable... self.subscription is referenced before it is set which throws an exception.
    https://github.com/ReactiveX/RxPY/blob/master/rx/backpressure/windowedobservable.py#L56
import rx

source = rx.backpressure.controlledobservable.ControlledObservable(
    rx.Observable.from_iterable([1,2,3,4,5]),
    enable_queue = False,
).windowed(3)
  1. I'm trying to get any working example for ControlledObservable to work... Even this basic chain throws an exception. Is it a bug or am I doing something wrong?
import rx
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stderr,)

source = rx.backpressure.controlledobservable.ControlledObservable(
    rx.Observable.from_iterable([1,2,3,4,5]),
    enable_queue = False,
)

for number in source.to_blocking():
    print number

Which outputs the following stack trace...

Traceback (most recent call last):
  File "sample.py", line 37, in <module>
    for number in source.to_blocking():
  File "/Library/Python/2.7/site-packages/rx/linq/observable/blocking/toiterable.py", line 63, in __iter__
    return self.to_iterable()
  File "/Library/Python/2.7/site-packages/rx/linq/observable/blocking/toiterable.py", line 29, in to_iterable
    self.observable.materialize().subscribe(on_next)
  File "/Library/Python/2.7/site-packages/rx/observable.py", line 51, in subscribe
    return self._subscribe(observer)
  File "/Library/Python/2.7/site-packages/rx/anonymousobservable.py", line 50, in _subscribe
    current_thread_scheduler.schedule(set_disposable)
  File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 54, in schedule
    return self.schedule_relative(timedelta(0), action, state)
  File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 68, in schedule_relative
    self.queue.run()
  File "/Library/Python/2.7/site-packages/rx/concurrency/currentthreadscheduler.py", line 37, in run
    item.invoke()
  File "/Library/Python/2.7/site-packages/rx/concurrency/scheduleditem.py", line 18, in invoke
    self.disposable.disposable = self.invoke_core()
  File "/Library/Python/2.7/site-packages/rx/concurrency/scheduleditem.py", line 33, in invoke_core
    return self.action(self.scheduler, self.state)
  File "/Library/Python/2.7/site-packages/rx/concurrency/scheduler.py", line 128, in scheduled_action
    return self.invoke_rec_immediate(scheduler, pair)
  File "/Library/Python/2.7/site-packages/rx/concurrency/scheduler.py", line 83, in invoke_rec_immediate
    action(inner_action, state)
  File "/Library/Python/2.7/site-packages/rx/linq/observable/fromiterable.py", line 39, in action
    observer.on_next(item)
  File "/Library/Python/2.7/site-packages/rx/autodetachobserver.py", line 16, in _next
    self.observer.on_next(value)
  File "/Library/Python/2.7/site-packages/rx/autodetachobserver.py", line 16, in _next
    self.observer.on_next(value)
TypeError: 'ControlledSubject' object is not callable

Using resources with Observables

How do you use resources with Observables?

For example: given an Observable sequence of file paths, construct a sequence of the contents of the files.

ProcessPoolScheduler

Hello,

I am currently using Reactive Programming paradigm as an alternative to the Actor model. I was expecting that it would be easy to dictate processing in multiple processes by simply using the correct scheduler. After looking I can't find any scheduler that seems to use multiple processes. I was thinking about building a ProcessPoolScheduler, but I was wondering what problems you guys see with implementing something like that?

I understand that the Reactive contract is such that streams are processed in order, but I know many applications don't require that assumption.

Also if something like the ProcessPoolScheduler is not possible, how might one go about trying to build a parallel processing system in Python where you have the GIL to contend with.

Thanks,
Jeffrey

correct installing?

Install RxPy via pip.
Have some messages, what does it mean? It's correct?

Installing collected packages: rx
Running setup.py install for rx
File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_time.py", line 43
class ObservableTime(Observable, metaclass=ObservableMeta):
^
SyntaxError: invalid syntax

  File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_concurrency.py", line 6
    class ObservableConcurrency(Observable, metaclass=ObservableMeta):
                                                     ^
SyntaxError: invalid syntax

  File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_leave.py", line 6
    class ObservableLeave(Observable, metaclass=ObservableMeta):
                                               ^
SyntaxError: invalid syntax

  File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_multiple.py", line 10
    class ObservableMultiple(Observable, metaclass=ObservableMeta):
                                                  ^
SyntaxError: invalid syntax

  File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_creation.py", line 7
    class ObservableCreation(Observable, metaclass=ObservableMeta):
                                                  ^
SyntaxError: invalid syntax

  File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/standardsequenceoperators.py", line 25
    class ObservableLinq(Observable, metaclass=ObservableMeta):
                                              ^
SyntaxError: invalid syntax

  File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/linq/observable_single.py", line 40
    nonlocal is_disposed
                       ^
SyntaxError: invalid syntax

  File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/concurrency/scheduler.py", line 34
    nonlocal is_done
                   ^
SyntaxError: invalid syntax

  File "/home/karl/.virtualenvs/banner/lib/python2.7/site-packages/rx/testing/testscheduler.py", line 98
    nonlocal source
                  ^
SyntaxError: invalid syntax

Successfully installed rx
Cleaning up...

Bug in combine_latest

It seems that combine_latest is buggy. I believe the result_selector = args.pop() line needs to be moved before the preceding if statement:

if args and isinstance(args[0], list):
    args = args[0]
else:
    args = list(args)

result_selector = args.pop()

Concurrency in RxPY

Hi,

Not sure where else to ask this, but I don't see how the library is able to support actual concurrent operation. The recursive scheduler in particular seems to actually serialize computation for the stream. From scheduledobserver.py:

        try:
            work()
        except Exception:
            with self.lock:
                parent.queue = []
                parent.has_faulted = True
            raise

        recurse()

i.e. work() is called followed by recurse() which serializes processing of the stream (and does preserve order). My understanding is that one should be able to get concurrency via different scheduler choices (potentially giving up preserving order). Am I missing something?

on_error_resume_next should take a function

Hey man, finally I have an actual issue :-)

One of my problems was that on_error_resume_next takes an Observable(or more) as parameter, instead of a function that takes an Exception and returns an Observable.
In java/scala I can do something like this:

Observable.defer(Observable[Result[A]](_.onNext(Result(action(msg.getBody), msg)))).onErrorResumeNext {
      ex =>
        logger.error(s"Error processing ${msg.getBody}", ex)
        Observable.empty
    }

While in python I only can do something like this:

 def __process(msg, action):
        def result(m):
            return lambda: Observable(lambda s: s.on_next(Result(action(m.get_body()), m)))

        return Observable.defer(result(msg)).on_error_resume_next(Observable.empty())

It seems silly, but that logger is actually logging the exception to loggly, is a information that we really need, so is kind of a big deal to know why was an exception, even when everything should keep working as if nothing happened. In other words, the subscribers of the that Observable should never receive the error, but the error has to be logged.

So, what's the rationale to take off the exception? Is there a chance to get it back?

Info about schedulers?

I'm reading the official RxPy documentation here:
http://rxpy.codeplex.com/documentation
At some point, there is a mention of the "Using Schedulers" topic:
This will be covered in more details in the Using Schedulers topic.

What type of schedulers are supported in RxPy? I am not interested in using the .NET platform, and am wondering if there is (or there will be) support for more Pythonic libraries like gevent or the Tornado ioloop.

Thanks

Difficulty creating a Subject

I feel like this should work

def myprint(*args):
  print(*args)
  return None

drive = Observable.from_list(list(range(33))).publish()
reg = Subject().buffer_with_count(8).do_action(myprint)
drive.subscribe(reg)
drive.connect()

Or maybe there's another way. I'd really like to be able to create Subjects the same way I create Observables using Linq, then hook them up in a network up with some master Observable to drive it. But I can't seem to manage to create even one this way.

py3)michaels-air:try mfox$ python tryrx.py
Traceback (most recent call last):
  File "/Users/mfox/anaconda/envs/py3/lib/python3.4/site-packages/Rx-0.10.3-py3.4.egg/rx/autodetachobserver.py", line 17, in next
    self.observer.on_next(value)
  File "/Users/mfox/anaconda/envs/py3/lib/python3.4/site-packages/Rx-0.10.3-py3.4.egg/rx/subjects/subject.py", line 94, in on_next
    observer.on_next(value)
  File "/Users/mfox/anaconda/envs/py3/lib/python3.4/site-packages/Rx-0.10.3-py3.4.egg/rx/abstractobserver.py", line 24, in on_next
    self.next(value)
TypeError: 'AnonymousObservable' object is not callable
(py3)michaels-air:try mfox$

Wrong Scheduler?

I created an observable sequence using interval, subscribed to it, and expected to see an endless parade of timestamps slowly making their way down the screen. Instead I received an attribute error indicating that the TimeoutScheduler has no attribute 'schedule_periodic'. VirtualTimeScheduler appears to be the only scheduler with the method 'schedule_periodic'. Is the wrong Scheduler being generated? Please see the output below...

obs = rx.Observable.interval(1000).subscribe(lambda x: print(x))
Traceback (most recent call last):
File "", line 1, in
File "c:\Python34\lib\site-packages\rx\observable.py", line 50, in subscribe
return self._subscribe(observer)
File "c:\Python34\lib\site-packages\rx\anonymousobservable.py", line 32, in _subscribe
if not auto_detach_observer.fail(ex):
File "c:\Python34\lib\site-packages\rx\abstractobserver.py", line 52, in fail
self.error(exn)
File "c:\Python34\lib\site-packages\rx\autodetachobserver.py", line 24, in error
self.observer.on_error(exn)
File "c:\Python34\lib\site-packages\rx\abstractobserver.py", line 34, in on_error
self.error(error)
File "c:\Python34\lib\site-packages\rx\internal\basic.py", line 24, in default_error
raise err
File "c:\Python34\lib\site-packages\rx\anonymousobservable.py", line 30, in _subscribe
auto_detach_observer.disposable = subscribe(auto_detach_observer)
File "c:\Python34\lib\site-packages\rx\linq\observable_time.py", line 136, in subscribe
return scheduler.schedule_periodic(period, action, state=0)
AttributeError: 'TimeoutScheduler' object has no attribute 'schedule_periodic'

Why does AutoDetachObserver ignore my on_error handler?

Minimal working examples: see SO post

The issue documented in the post above seems to be due to the following in rx.autodetachobserver

def _next(self, value):
    try:
        self.observer.on_next(value)
    except Exception:
        self.dispose()
        raise

MWE1 cause: when an Exception is caught, it is simply raised and self.observer.on_error() does not get called.

MWE2 cause: The call to self.dispose() causes self.is_stopped to be set to True so when self.on_error() is called, the condition if not self.is_stopped is False and so prevents anything from happening.

So should it not just be:

def _next(self, value):
    self.observer.on_next(value)

because you don't need to catch errors in _next. This is because the caller uses catches exceptions -- I'm thinking about use cases like:

def subscribe(observer) # observer is AutoDetachObserver
    try:
        observer.on_next(3)
    except Exception as e:
        observer.on_error(e)

Using CombineLatest with Subjects

Is combine_latest supposed to work with subjects? Whenever I try to subscribe to the combined stream, the interpreter freezees because rx requests a lock.

s1 = rx.subjects.BehaviorSubject(1)
s2 = rx.subjects.BehaviorSubject(2)
s = rx.Observable.combine_latest(s1, s2, lambda a, b: (a, b))
s.subscribe(print)  # Freezes here

The output I expect is simply (1, 2). I have tried using plain Subject as well, but it freezes in the same spot. Here is the stack trace after I press Ctrl-C to interrupt:

^CTraceback (most recent call last):
  File "combine.py", line 29, in <module>
    main()
  File "combine.py", line 14, in main
    s.subscribe(print)
  File "/usr/lib/python3.4/site-packages/rx/observable.py", line 51, in subscribe
    return self._subscribe(observer)
  File "/usr/lib/python3.4/site-packages/rx/anonymousobservable.py", line 50, in _subscribe
    current_thread_scheduler.schedule(set_disposable)
  File "/usr/lib/python3.4/site-packages/rx/concurrency/currentthreadscheduler.py", line 54, in schedule
    return self.schedule_relative(timedelta(0), action, state)
  File "/usr/lib/python3.4/site-packages/rx/concurrency/currentthreadscheduler.py", line 68, in schedule_relative
    self.queue.run()
  File "/usr/lib/python3.4/site-packages/rx/concurrency/currentthreadscheduler.py", line 37, in run
    item.invoke()
  File "/usr/lib/python3.4/site-packages/rx/concurrency/scheduleditem.py", line 18, in invoke
    self.disposable.disposable = self.invoke_core()
  File "/usr/lib/python3.4/site-packages/rx/concurrency/scheduleditem.py", line 33, in invoke_core
    return self.action(self.scheduler, self.state)
  File "/usr/lib/python3.4/site-packages/rx/anonymousobservable.py", line 34, in set_disposable
    auto_detach_observer.disposable = fix_subscriber(subscribe(auto_detach_observer))
  File "/usr/lib/python3.4/site-packages/rx/linq/observable/combinelatest.py", line 101, in subscribe
    func(idx)
  File "/usr/lib/python3.4/site-packages/rx/linq/observable/combinelatest.py", line 98, in func
    subscriptions[i].disposable = args[i].subscribe(on_next, observer.on_error, on_completed)
  File "/usr/lib/python3.4/site-packages/rx/observable.py", line 51, in subscribe
    return self._subscribe(observer)
  File "/usr/lib/python3.4/site-packages/rx/subjects/behaviorsubject.py", line 47, in __subscribe
    observer.on_next(self.value)
  File "/usr/lib/python3.4/site-packages/rx/linq/observable/combinelatest.py", line 90, in on_next
    with parent.lock:
KeyboardInterrupt

The following program based on observables works:

o1 = rx.Observable.interval(2000)
o2 = rx.Observable.interval(3000)
s = rx.Observable.combine_latest(o1, o2, lambda a, b: (a, b))
s.subscribe(print)

I figured that the subject version should be able to work similarly, but I could be wrong as I have only a little experience with Rx.

Thanks in advance for any help, and sorry if I am simply using this wrong!

autocomplete example throws an exception

Starting the tornado autocomplete example on python 2.7.5 using RxPY from the master branch at commit 82212d0 (latest, cloned on 2014-09-22):

$ python autocomplete.py
Starting server at port: 8080
WebSocket opened
WARNING:tornado.access:404 GET /favicon.ico (127.0.0.1) 0.30ms
Traceback (most recent call last):
File "/home/anglerud/tmp/RxPY/rx/autodetachobserver.py", line 17, in next
self.observer.on_next(value)
File "/home/anglerud/tmp/RxPY/rx/abstractobserver.py", line 24, in on_next
self.next(value)
File "/home/anglerud/tmp/RxPY/rx/linq/observable/switchlatest.py", line 37, in on_next
inner_source = Observable.from_future(inner_source)
TypeError: unbound method from_future() must be called with Observable instance as first argument (got Future instance instead)

use relative imports for all imports

Currently absolute imports are used. This prevents the rxpy package to be easily included in other packages, when a package manager/installer isn't available (for example, a Python-extensible app that imposes non-standard requirements). By using relative imports, it'd be easy to just drop the rxpy package in a larger package and be able to import it.

IIRC, relative imports are available since py2.5 (?) so this shouldn't affect compat?

observe_on does not allow parallel processing

The following code tries to use .observe_on(Scheduler.timeout) to run tasks in parallel:

import time
import threading
import rx
from rx.concurrency.scheduler import Scheduler

tm1=time.time()
lock=threading.Lock()

def log(s):
  tm2=time.time()
  with lock:
    print '%ds: %s on %s' % (round(tm2-tm1), s, threading.currentThread().name)

def work(x):
  log('processing '+str(x))
  time.sleep(1)

def finish(x):
  log('finished '+str(x))

log('started')

rx.Observable.range(1, 3) \
  .observe_on(Scheduler.timeout) \
  .do_action(work) \
  .subscribe(finish)

log('finished ALL')
time.sleep(5) # wait to complete

Expected output:

0s: started on MainThread
0s: finished ALL on MainThread
0s: processing 1 on Thread-1
0s: processing 2 on Thread-2
0s: processing 3 on Thread-3
1s: finished 1 on Thread-1
1s: finished 2 on Thread-2
1s: finished 3 on Thread-3

Actual output:

0s: started on MainThread
0s: processing 1 on Thread-1
0s: finished ALL on MainThread
1s: finished 1 on Thread-1
1s: processing 2 on Thread-2
2s: finished 2 on Thread-2
2s: processing 3 on Thread-3
3s: finished 3 on Thread-3

The problem seems to be in the design of ScheduledObserver class. The run method does not schedule next queued item until current item is processed.

Oberservable.interval does not work with the AsyncIO Scheduler for Float Periods

The following program illustrates the problem:

import rx
import asyncio

# Bug is with the AsyncIO scheduler
scheduler = rx.concurrency.AsyncIOScheduler()
# Works with the default scheduler (timeout)
#scheduler = None

subscription = rx.Observable.interval(1000.0, scheduler=scheduler) \
        .subscribe(print, print, print)

try:
        loop = asyncio.get_event_loop()
        loop.run_forever()
except:
        pass

subscription.dispose()

No output is generated with the AsyncIO scheduler. Replace 1000.0 with 1000 and it works. Both cases work with the default scheduler (timeout).

Very subtle bug. I couldn't figure out why no events were firing until I isolated my code to the above.

it would be kind for pip system if you could publish directly a wheel

method:

python setup.py sdist bdist_wheel  --universal
twine upload dist/*

reason:

  • if you build a system with pip -r requirements.txt any tar.gz package in a requirement file is first transform as a wheel, and so if it needs a package for this, you have to create a requirements0.txt file and pip it first, adding complexity to a 'solved by wheel' problem otherwise,
  • when a majority of packages on pip will be 'wheels', the packaging team may be able to raise the bar to next level ('whoops' , 'wings', ...)

Keyword arg Scheduler not used in start_with

In rx/linq/observable/startswith.py, the scheduler is fetched from the keyword args. However, it's then immediately overridden by the following if-else. I'm not sure if allowing a scheduler in the keywords is desired, but either way, I think that the current implementation is mistaken.

I don't actually use the scheduler arg (I was looking at start_with for something else), so this isn't high priority for me. I just wanted to bring it up.

Are streams created from iterables supposed to be exhaustible/mutable?

I'm really puzzled by something in RxPY. It seems to be different from RxJS in a crucial way.

Please take a look at my contrived RxJS example on JSFiddle, then look at the equivalent RxPY code below.

In the examples, I create a single stream, then create two new streams by transforming it. I subscribe two observers to the streams. In RxJS, I get the result I expect. In RxPY, I get a confusing result.

import rx

def on_next_1(s):
    print "subscriber 1: ", s

def on_next_2(s):
    print "subscriber 2: ", s

ns = rx.Observable.from_(['a', 'b', 'c'])

xs = ns.map(lambda s: s)
ys = ns.map(lambda s: s.upper())

xs.subscribe(on_next_1)
xs.merge(ys).subscribe(on_next_2)

This RxPY code produces this output:

subscriber 1:  a
subscriber 1:  b
subscriber 1:  c

This is the output I would expect (just like the RxJS output):

subscriber 1: a
subscriber 1: b
subscriber 1: c
subscriber 2: a
subscriber 2: A
subscriber 2: b
subscriber 2: B
subscriber 2: c
subscriber 2: C

In the RxPY code, it seems the first observer works, but the second observer gets nothing. It's like the ns is a mutable object that is changing in place. But shouldn't each call to Observable(...).map return a new Observable? Is it really the desired/designed behavior, for Observable.from_(<a list>) to fire just once and be exhausted, even when you're deriving more than one new Observables from it?

I cannot tell if this is a bug, or if I'm missing a fundamental point! It seems like the current behavior violates expectations about the Observer pattern. Isn't the contract, that each subscriber gets each message? What am I missing?

Why does TwistedScheduler raise AlreadyCalled error?

Minimal working example: see SO post

The issue documented in the post above seems to be due to the following in rx.concurrency.mainloopscheduler.TwistedScheduler:

def dispose():
    handle[0].cancel()

I believe the problem is that handle[0] (a delayedCall) might already have been called prior to cancellation. So should we wrap the above with a try?

def dispose():
    try:
        handle[0].cancel()
    except AlreadyCalled:
        pass

Any plan of adding PyQt mainloop support?

Hi, I notice there is a tkinter mainloop scheduler, as PyQt is also very popular and extremely powerful, is there any plan of adding a qt mainloop scheduler?

I'm new to Rx but I'll try to do one first.

Possible bug when using pausable on BehaviorSubject

I am using BehaviorSubjects all over the place my current GUI application, because they are an awesome way to store values and get updates. I have some situations where I use .pausable() when combining multiple streams of values. I do this because I update many of the streams at once, and I don't want to compute results in derived streams until all of the parent streams are updated. That is, I set the pausable to False, make updates to multiple streams, and then set the pausable to True.

I am encountering what I think is an error when I pause and immediately unpause without changing the value. Here is a sample script:

import rx


def normal_subject():
    pauser = rx.subjects.BehaviorSubject(True)
    subject = rx.subjects.Subject()
    subject.pausable(pauser).subscribe(print)
    subject.on_next("print value")

    print("Subject pause/unpause")
    pauser.on_next(False)
    pauser.on_next(True)


def behavior_subject():
    pauser = rx.subjects.BehaviorSubject(True)
    subject = rx.subjects.BehaviorSubject("print value")
    subject.pausable(pauser).subscribe(print)

    print("BehaviorSubject pause/unpause")
    pauser.on_next(False)
    pauser.on_next(True)


if __name__ == "__main__":
    print("Subject:")
    normal_subject()
    print("\nBehaviorSubject:")
    behavior_subject()

Here is the output:

$ python rx_pause.py 
Subject:
print value
Subject pause/unpause

BehaviorSubject:
print value
BehaviorSubject pause/unpause
print value

In the normal subject situation, the value is only printed once, right when on_next is called. When we pause and then unpause, the value is not printed again. In the BehaviorSubject version, the value is printed twice, the second time being when the pausable is paused then unpaused. It is my understanding that both situations should produce the same output, at least based off of the RxMarbles website.

My hunch is pausable is implemented using some sort of subscribe/unsubscribe mechanism. When used with BehaviorSubjects, the re-subscription causes the latest value to be emitted over again, even though no values were emitted while paused. I would think that pausable should be implemented by ignoring on_next, so values are simply ignored while paused, and we don't resubscribe when we unpause our stream.

Again, I'm not quite sure that this is a bug, but it definitely feels unintended. I think that if two streams emit the same values into the same operator, we should get the same result.

Process is Blocked when an Error occurs in an Observable.interval Chain

The following program illustrates the problem:

import rx
import time

sub = rx.Observable.interval(1000) \
    .map(lambda x: 0/(5-x)) \
    .subscribe(print, print, print)

try:
    while True:
        time.sleep(0.1)
except:
    sub.dispose()

The map call will cause a division by 0 exception to occur if x is 5. Since Observable.interval emits 0, 1, 2, 3, ... this will happen after 6 seconds.

If you exit via a keyboard interrupt (Ctrl+C) before the exception occurs, the program exits normally (since dispose is called, cancelling the Observable subscription).

However, if you exit after the exception occurs, the process never ends. Some thread (probably the one controlling Observable.interval) seems to be blocking the process.
Pressing Ctrl+C again causes the following error to show:

Exception ignored in: <module 'threading' from '/opt/sd/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/opt/sd/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/opt/sd/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/opt/sd/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

I would have thought that an error would cancel the controlling thread (of Observale.interval), but this does not seem to be the case. What gives?

I'm using Python 3.5.1 and RxPY 1.2.2.

Duplicate events when creating loops with Subject

I am trying to use Subject to create a loop (connecting the end of a stream to the beginning). I have a real use-case for this, but I wrote a contrived script to isolate my problem. I'm confused, because it seems like there are duplicate events.

(Note, I'm running Python 2.7, and I have tornado installed.)

Here's the code without the loop connected:

from __future__ import print_function
import re
import time

from rx import Observable
from rx.concurrency import IOLoopScheduler
from rx.subjects import Subject
from tornado import ioloop


scheduler = IOLoopScheduler()

seed_list = ['a', 'b', '1', '2', 'c', ]


def check_no_numbers(s):
    """Filters out strings w/ number characters
    """
    if not re.search(r'\d', s):
        print("+", s)
        return True
    else:
        print("x", s)
        return False


def dot(s):
    return s + '.'  #+ str(uuid4())


if __name__ == "__main__":
    xs = Observable.from_(seed_list, scheduler=IOLoopScheduler())

    subject = Subject()

    def push_new(x):
        print('pushing: {!r}'.format(x))
        time.sleep(0.1)
        subject.on_next(x)

    xs.subscribe(on_next=push_new)

    filtered = subject.filter(check_no_numbers).filter(lambda s: len(s) <= 3)
    results = filtered.map(lambda x: "==> {}".format(x))

    news = filtered.map(dot)
    # news.subscribe(on_next=push_new)  # DON'T loop

    def out(result):
        print(result)
    results.subscribe(on_next=out)

    ioloop.IOLoop.current().start()

The output is not surprising to me. Each event goes through the pipeline once, as expected:

pushing: 'a'
+ a
==> a
pushing: 'b'
+ b
==> b
pushing: '1'
x 1
pushing: '2'
x 2
pushing: 'c'
+ c
==> c

Only one line is different in the following code. The code subscribes one observer to the Subject at the beginning of the pipeline, causing the expected looping. But the surprise is that the events seem to be duplicated.

from __future__ import print_function
import re
import time

from rx import Observable
from rx.concurrency import IOLoopScheduler
from rx.subjects import Subject
from tornado import ioloop


scheduler = IOLoopScheduler()

seed_list = ['a', 'b', '1', '2', 'c', ]


def check_no_numbers(s):
    """Filters out strings w/ number characters
    """
    if not re.search(r'\d', s):
        print("+", s)
        return True
    else:
        print("x", s)
        return False


def dot(s):
    return s + '.'  # + str(uuid4())


if __name__ == "__main__":
    xs = Observable.from_(seed_list, scheduler=IOLoopScheduler())

    subject = Subject()

    def push_new(x):
        print('pushing: {!r}'.format(x))
        time.sleep(0.1)
        subject.on_next(x)

    xs.subscribe(on_next=push_new)

    filtered = subject.filter(check_no_numbers).filter(lambda s: len(s) <= 3)
    results = filtered.map(lambda x: "==> {}".format(x))

    news = filtered.map(dot)
    news.subscribe(on_next=push_new)  # loop

    def out(result):
        print(result)
    results.subscribe(on_next=out)

    ioloop.IOLoop.current().start()

This produces the following, surprising output:

pushing: 'a'
+ a
pushing: 'a.'
+ a.
pushing: 'a..'
+ a..
pushing: 'a...'
+ a...
+ a...
+ a..
==> a..
+ a.
==> a.
+ a
==> a
pushing: 'b'
+ b
pushing: 'b.'
+ b.
pushing: 'b..'
+ b..
pushing: 'b...'
+ b...
+ b...
+ b..
==> b..
+ b.
==> b.
+ b
==> b
pushing: '1'
x 1
x 1
pushing: '2'
x 2
x 2
pushing: 'c'
+ c
pushing: 'c.'
+ c.
pushing: 'c..'
+ c..
pushing: 'c...'
+ c...
+ c...
+ c..
==> c..
+ c.
==> c.
+ c
==> c

If you sort these lines, you can see the duplication very clearly (note how you only see pushing: <x> once, but see <x> twice, for each value...)

+ a
+ a
+ a.
+ a.
+ a..
+ a..
+ a...
+ a...
# snipped
pushing: 'a'
pushing: 'a.'
pushing: 'a..'
pushing: 'a...'
# snipped

(For another way to see the behavior, you can uncomment the snippet in the dot() function, that calls uuid4(), to tag the strings.)

Is this a bug?

have some troubles

What does it mean? I have this error log.

File "/home/karl/.virtualenvs/banner/local/lib/python2.7/site-packages/rx/concurrency/scheduler.py", line 34
nonlocal is_done
^
SyntaxError: invalid syntax

Python 2 vs. 3 incompatibility?

Hi,

Thanks for putting this library together! I'm really enjoying learning about reactive extensions.

One issue I've encountered is that some of the examples (for example ironpython) have Python 2.x print statements. EG:

print "x y z"

So will not run in Python 3.x. However, the notebook tutorial has statements like:

xs = d.subscribe(print)

Which only works in Python 3.x. How would one subscribe the print function in Python 2.x?

Why does RxPY keep randomly throwing 'NoneType' object has no attribute 'dispose'?

Minimal working example: see SO post

The issue documented in the post above seems to be due to the following in rx.concurrency.currentthreadscheduler

if not self.queue:
    self.queue = Trampoline(self)

This is not thread-safe yet Scheduler.current_thread = current_thread_scheduler = CurrentThreadScheduler() is shared across threads and is used in an asynchronous manner by LINQ operators. So should we use a lock as below?

from rx import Lock

class CurrentThreadScheduler(Scheduler):

    def __init__(self):
        ...
        self.lock = Lock()

    def schedule_relative(self, duetime, action, state=None):
        ...
        with self.lock:
            has_queue = self.queue
            if not has_queue:
                self.queue = Trampoline(self)

        if has_queue:
            self.queue.enqueue(si)
        else:
            self.queue = Trampoline(self)
            try:
                self.queue.enqueue(si)
                self.queue.run()
            finally:
                self.queue.dispose()
                self.queue = None
        ...

On a further note, the self.queue.dispose() seems also to not be very safe. What if one thread has just called self.queue.enqueue(si) when another is about to call self.queue.dispose()? The queued item will end up being dropped silently.

(idea) Is it crazy to integrate RxPy and Celery, or RxPy and RabbitMQ?

Just an idea, that I'm wondering if I should explore or nip in the bud. Looking for some Rx wisdom I suppose, since I'm a newb.

I'm looking at these:

I'm also thinking about Celery and how to either integrate it with RxPy, or to imitate a small part of RxPy with Celery's operators. Integration would be nicer. Or if RxPy + ((Twisted or Tornado) + pika + RabbitMQ) could replace the need for Celery for some projects... that would also be interesting.

Is this something worth thinking about and trying to work on? Or am I mixing incompatible patterns? Opinions?

Rx 1.2.3 ImportError: No module named subjects

Using rx either via pip or building the development /master branch results in:

 # python
Python 2.7.9 (default, Sep 11 2015, 21:52:46) 
[GCC 4.8.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> from rx.subjects import Subject
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "rx.py", line 18, in <module>
    from rx.subjects import Subject
ImportError: No module named subjects

from rx.subjects import Subject works inside the RxPy/ sources but not outside. How to fix this?

Any help is highly appreciated.

TypeError when observable merge

When merging two observables I got this stack trace. It seems a comparison is between an int
and an AnonymousObservable and a type error exception is thrown.

I am using python 3.4.1 and Rx 0.9.3

from rx import Observable

source1 = Observable.range(1, 3)
source2 = Observable.range(1, 3)
x = source1.merge(source2)

x.subscribe(
lambda x: print("Observer 1: OnNext: ", x),
lambda ex: print("Observer 1: OnError: ", ex.Message),
lambda: print("Observer 1: OnCompleted")
)

/Library/Frameworks/Python.framework/Versions/3.4/bin/python3.4-32 /Users/larsholm/PycharmProjects/Py1Proj/py1.py
Traceback (most recent call last):
File "/Users/larsholm/Library/Python/3.4/lib/python/site-packages/rx/autodetachobserver.py", line 17, in next
self.observer.on_next(value)
File "/Users/larsholm/Library/Python/3.4/lib/python/site-packages/rx/abstractobserver.py", line 24, in on_next
self.next(value)
File "/Users/larsholm/Library/Python/3.4/lib/python/site-packages/rx/linq/observable/merge.py", line 58, in on_next
if active_count[0] < max_concurrent_or_other:
TypeError: unorderable types: int() < AnonymousObservable()

Process finished with exit code 0

reactivex.github.io

@dbrattli
I think it is good that docstring of RxPY and sample code are published in reactivex.github.io.
I will make sample code of RxPY.
I will send docstring of RxPY and sample code to reactivex.github.io by pull request.
Would you tell me whether there is a problem?

Generator object in Observable.from_ throws if Scheduler.new_thread used

Generators cannot be resumed from multiple threads simultaneously. This causes Observable.from_ to throw if Scheduler.new_thread is used.

The solution seems to be to wrap the passed-in generator as a thread-safe iterator that serializes calls to __next__(). However, given that Observable knows which scheduler it's using, perhaps it could be smarter about this case?

ACTUAL

o = Observable.from_(my_generator(), scheduler=Scheduler.new_thread)
o.subscribe(lambda x: print('aaa', x))
o.subscribe(lambda x: print('bbb', x))

This fails due to a race condition.

EXPECTED

o = Observable.from_(my_generator(), scheduler=Scheduler.new_thread)
o.subscribe(lambda x: print('aaa', x))
o.subscribe(lambda x: print('bbb', x))

This should not fail and Observable could wrap my_generator() in a thread-safe iterator.

WORKAROUND

class threadsafe_iter(object):
    def __init__(self, it)
        self.it = it
        self.lock = Lock()

    def __iter__(self):
        return self

    def __next__(self):
        with self.lock:
            return next(self.it)


o = Observable.from_(threadsafe_iter(my_generator()), scheduler=Scheduler.new_thread)
o.subscribe(lambda x: print('aaa', x))
o.subscribe(lambda x: print('bbb', x))

This works fine.

I could provide a complete example if needed.

Possible bug when disconnecting subscriber from iterable

I try to use iterator as observable but I want this observable to be as cold as possible, i.e. generate new values only when there is any subscriber. But when I dispose subscriber iterator still works. The code is as follow:

from time import sleep
from rx import Observable
from rx.concurrency import new_thread_scheduler


def myiterator():
    for i in range(100):
       sleep(1)
       print('Publishing value: {0}'.format(i))
       yield i

source = Observable.from_(myiterator(), scheduler=new_thread_scheduler)
subscription = source.subscribe(
    lambda v: print("Value published: {0}".format(v)),
    lambda e: print("Error! {0}".format(e)),
    lambda: print("Completed!")
)

sleep(3)
print('Before dispose')
subscription.dispose()
print('After dispose')
sleep(3)
print('Finish')

and the output is:

Publishing value: 0
Value published: 0
Publishing value: 1
Value published: 1
Before dispose
After dispose
Publishing value: 2
Publishing value: 3
Publishing value: 4
Finish

I suppose there should be no publishing value after subscriber disposal. Am I correct?

TestScheduler - 'NoneType' object is not callable

In scala I can do something like this:

scala> val testScheduler = TestScheduler()
scala> testScheduler.createWorker.schedulePeriodically(1 milliseconds, 1 milliseconds)(println(1))
scala> testScheduler.advanceTimeBy(10 milliseconds)
warning: there was one feature warning; re-run with -feature for details
1
1
1
1
1
1
1
1
1
1

I tried to do the same thing with python; but got this error, obviously has to do something with "state", which is None by default. The problem is, I can't see what is for...

In [48]:  s = TestScheduler()

In [49]: s.schedule_periodic(1, write("1"))
1
Out[49]: <rx.disposables.singleassignmentdisposable.SingleAssignmentDisposable at 0x10e864410>

In [50]: s.advance_by(10)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-50-f16a9b67196f> in <module>()
----> 1 s.advance_by(10)

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/virtualtimescheduler.pyc in advance_by(self, time)
    135         if self.comparer(self.clock, dt) > 0:
    136             raise ArgumentOutOfRangeException()
--> 137         return self.advance_to(dt)
    138 
    139     def sleep(self, time):

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/virtualtimescheduler.pyc in advance_to(self, time)
    117                         self.clock = next.duetime
    118 
--> 119                     next.invoke()
    120                 else:
    121                     self.is_enabled = False

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduleditem.pyc in invoke(self)
     15 
     16     def invoke(self):
---> 17         self.disposable.disposable = self.invoke_core()
     18 
     19     def compare_to(self, other):

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduleditem.pyc in invoke_core(self)
     24 
     25     def invoke_core(self):
---> 26         return self.action(self.scheduler, self.state)
     27 
     28     def __lt__(self, other):

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/virtualtimescheduler.pyc in run(scheduler, state1)
     63             self.queue.remove(si)
     64 
---> 65             return action(scheduler, state1)
     66 
     67         si = ScheduledItem(self, state, run, duetime, self.comparer)

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduler.pyc in action1(s, p)
    162 
    163         def action1(s, p):
--> 164             return self.invoke_rec_date(s, p, 'schedule_relative')
    165 
    166         return self.schedule_relative(duetime, action1, state={ "first": state, "second": action })

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduler.pyc in invoke_rec_date(self, scheduler, pair, method)
    109 
    110             action(state1, action1)
--> 111         recursive_action(state)
    112         return group
    113 

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduler.pyc in recursive_action(state1)
    108                     is_added = True
    109 
--> 110             action(state1, action1)
    111         recursive_action(state)
    112         return group

/Users/caente/nlp/lib/python2.7/site-packages/rx/concurrency/scheduleperiodicrecursive.pyc in tick(self, command, recurse)
     22         recurse(0, self._period)
     23         try:
---> 24             self._state = self._action(self._state)
     25         except Exception as exn:
     26             self._cancel.dispose()

TypeError: 'NoneType' object is not callable

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.