Coder Social home page Coder Social logo

rxphp's Introduction

RxPHP

Reactive extensions for PHP. The reactive extensions for PHP are a set of libraries to compose asynchronous and event-based programs using observable streams.

CI status Coverage Status

Example

$source = \Rx\Observable::fromArray([1, 2, 3, 4]);

$source->subscribe(
    function ($x) {
        echo 'Next: ', $x, PHP_EOL;
    },
    function (Exception $ex) {
        echo 'Error: ', $ex->getMessage(), PHP_EOL;
    },
    function () {
        echo 'Completed', PHP_EOL;
    }
);

//Next: 1
//Next: 2
//Next: 3
//Next: 4
//Completed

Try out the demos

$ git clone https://github.com/ReactiveX/RxPHP.git
$ cd RxPHP
$ composer install
$ php demo/interval/interval.php

Have fun running the demos in /demo.

note: When running the demos, the scheduler is automatically bootstrapped. When using RxPHP within your own project, you'll need to set the default scheduler.

Installation

  1. Install an event loop. Any event loop should work, but the ReactPHP event loop is recommended.
$ composer require react/event-loop
  1. Install RxPHP using composer.
$ composer require reactivex/rxphp
  1. Write some code.
<?php

require_once __DIR__ . '/vendor/autoload.php';

use Rx\Observable;
use React\EventLoop\Factory;
use Rx\Scheduler;

$loop = Factory::create();

//You only need to set the default scheduler once
Scheduler::setDefaultFactory(function() use($loop){
    return new Scheduler\EventLoopScheduler($loop);
});

Observable::interval(1000)
    ->take(5)
    ->flatMap(function ($i) {
        return Observable::of($i + 1);
    })
    ->subscribe(function ($e) {
        echo $e, PHP_EOL;
    });

$loop->run();

Working with Promises

Some async PHP frameworks have yet to fully embrace the awesome power of observables. To help ease the transition, RxPHP has built in support for ReactPHP promises.

Mixing a promise into an observable stream:

Observable::interval(1000)
    ->flatMap(function ($i) {
        return Observable::fromPromise(\React\Promise\resolve(42 + $i));
    })
    ->subscribe(function ($v) {
        echo $v . PHP_EOL;
    });

Converting an Observable into a promise. (This is useful for libraries that use generators and coroutines):

$observable = Observable::interval(1000)
    ->take(10)
    ->toArray()
    ->map('json_encode');

$promise = $observable->toPromise();

Additional Information

License

RxPHP is licensed under the MIT License - see the LICENSE file for details

rxphp's People

Contributors

aaronbonneau avatar asm89 avatar bartvanhoutte avatar cboden avatar davidwdan avatar e1379 avatar etienneroudeix avatar ihor avatar jmb3 avatar martinsik avatar mbonneau avatar robations avatar wyrihaximus avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxphp's Issues

Non-strict array_search causes issues in CompositeDisposable (and likely other places)

There is non-strict array searching in side the CompositeDisposable that cause unexpected/incorrect results in some situations:

Reproducing code:

<?php

use React\EventLoop\Factory;
use Rx\Observable;
use Rx\Observer\CallbackObserver;
use Rx\Testing\TestScheduler;

require_once __DIR__ . '/vendor/autoload.php';

$loop = Factory::create();

$scheduler = new TestScheduler();

$o = Observable::interval(500)
    ->merge(Observable::range(1,3))
    ->take(5)
    ->subscribe(new CallbackObserver(function ($x) {
        echo "Next: $x\n";
    }, function (Exception $exception) {
        echo "Err\n";
    }, function () {
        echo "Complete\n";
    }), $scheduler);

$scheduler->start();

Expected output:

Next: 1
Next: 2
Next: 3
Next: 0
Next: 1
Complete

Actual output:

Next: 1
Next: 2
Next: 3

This is caused by CompositeDisposable finding the incorrect Disposable here:

$key = array_search($disposable, $this->disposables);

This is can be corrected by enforcing strict comparison (passing true as the third argument to array_search)

PR on the way.

Publish testing helpers

The library ships with some testing helpers. E.g. the test scheduler, but also a base test case and some helpers that are useful. We could move those to the lib/ (or src/ for 2.x) directory and make them useable by downstream users without having to rely on autoloader hacks.

More concretely:

Implement observeOn operator

I have a stream of Runnable

interface Runnable
{
    public function run();
}

and I wanna schedule each of this one on my custom ThreadPoolScheduler.

getStreamOfRunnable()
    ->observeOn(new ThreadPoolScheduler(new FixedThreadPool(4)))
    ->doOnNext(function (Runnable $r) {
        $r->run();
    })
    ->observeOn(new ImmediateScheduler())
    ->subscribe($observer)

As I know the subscribeOn operator is for the different things(or mby i still didnt get it).

P.S. Sorry for my bad eng.

Memory leak on scheduler

Hi, thanks a lot for this initiative.

$loop = Factory::create();
$scheduler = new EventLoopScheduler($loop);

$mem = function() {
    echo memory_get_usage(true)."\n";
};

$scheduler->schedulePeriodic($mem, 0, 100);

\Rx\Observable::interval(100, $scheduler)
    ->subscribeCallback($mem);

$loop->run();

Whatever method I take I get a bump of 3Mb of memory every 5s.
With $loop->addPeriodicTimer everything is stable.

EventLoopScheduler does not cancel timer in certain circumstances

EventLoopScheduler does not cancel timer in certain circumstances. This may cause the eventloop to hang until that event was supposed to have been invoked (the scheduled action will not actually be invoked).

This occurs when:

  • An action is scheduled past another scheduled action
  • That action, at some time in the future, becomes the current scheduled action
  • That action's disposable is disposed of prior to invocation

Can't install

I get the following:

php -v
PHP 5.6.10 (cli) (built: Jul  6 2015 14:28:54)
Copyright (c) 1997-2015 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2015 Zend Technologies
noel-mbp16:rxphp noeldacosta$ composer require reactivex/rxphp
Using version ^1.5 for reactivex/rxphp
./composer.json has been updated
Loading composer repositories with package information
Updating dependencies (including require-dev)
Your requirements could not be resolved to an installable set of packages.

  Problem 1
    - Can only install one of: reactivex/rxphp[1.5.0, 1.4.x-dev].
    - Can only install one of: reactivex/rxphp[1.5.1, 1.4.x-dev].
    - Can only install one of: reactivex/rxphp[1.5.2, 1.4.x-dev].
    - Installation request for reactivex/rxphp 1.4.x-dev -> satisfiable by reactivex/rxphp[1.4.x-dev].
    - Installation request for reactivex/rxphp ^1.5 -> satisfiable by reactivex/rxphp[1.5.0, 1.5.1, 1.5.2].


Installation failed, reverting ./composer.json to its original content.

Non deterministic test causes intermittent test failures in 2.x branch

1) Rx\Scheduler\EventLoopSchedulerTest::testSchedulerWorkedWithScheduledEventOutsideItself
Failed asserting that 0.17082595825195312 matches expected 0.2.
/home/travis/build/ReactiveX/RxPHP/test/Rx/Scheduler/EventLoopSchedulerTest.php:134

This test is fixable by rerunning the job on travis until it passes but should be fixed so that it doesn't fail.

Drop ObservableInterface

I have a tendency to assume that when I have an ObservableInteface it is actually an Observable with all the operators.

I suggest that the ObservableInterface be dropped and a Subscribable interface be created that only has ->subscribe on it. This will clarify that the type does not have any operators on it. The responsibility of making sure that objects are actually Observable will be much clearer. (Example: the OperatorInterface can require Observable so the user knows they don't need to feed the interface through ->asObservable inside the implementation.)

Returning a custom Observable from a custom operator

The Observable::__call method internally uses the lift() method which means all custom operators always return an instance of AnonymousObservable.

I'm thinking how could I implement a custom operator returning a different Observable. For example I want to implement publishWhatever() which should return an instance of ConnectableObservable.

Maybe operators that don't want to use lift() could implement some interface and the __call method will behave accordingly?

Testing?

Not an issue, more a question but could you point me in the direction of how to test something like the following:

$observable
->flatmap(function($value){
return a zip of a couple of api requests if exists
})
->map(function($value){
return a cleaned value
})
->flatmap(function($value){
return related api requests if exists
})
->filter(function($value){
return false if related api requests don't pass
})
->filter(function($value){
return false if related api requests don't pass
})
->filter(function($value){
return false if related api requests don't pass
})
->toArray();

Thanks!

P.S thanks for an awesome library :)

Update PHPUnit to 5.7

Update PHPUnit to 5.7 and extend \PHPUnit\Framework\TestCase in \Rx\TestCase, so the test suite can be used with PHPUnit 6.0

Add a forEach method to Observable

Add a forEach method to Observable, for use with synchronous code, which will behave similarly to subscribe, except that it'll block (await) until complete.

Errors in max.php.expect

max.php

max([1, 3, 5, 7, 9, 2, 4, 6, 8]) == 9 !?

<?php
require_once __DIR__ . "/../bootstrap.php";
/* Without comparer */
$source = \Rx\Observable::fromArray([1, 3, 5, 7, 9, 2, 4, 6, 8])
    ->max();
$subscription = $source->subscribe($createStdoutObserver());

max.php.expect

Next value: 9
Complete!

Rename `_subscribe`

_subscribe, the abstract method on Observable should be renamed possibly to subscribeCore.

Deprecate ImmediateScheduler?

It seems that the ImmediateScheduler is causing us a bunch of problems. It doesn't actually schedule anything, but just calls the scheduled action directly. This occasionally blows out the stack and some operators won't work at all.

The ImmediateScheduler mostly seems to exist because it was easy to run small demos with it when the project was still young. It's unusable for anything serious. Should we:

  • deprecate the scheduler (we can one day remove it in a 2.x)
  • optionally add a simple trampolining scheduler and make that the default (without any fancy I/O stuff, if you want that then use the reactphp one)

Thoughts @cboden @davidwdan @mbonneau @Ocramius?

RxPHP v2.x

I've started work on RxPHP v2 with the goal of bringing it's behavior closer to RxJS 4/5. You can see the progress here

Here's a list of the changes I've made so far:

  • PHP 7 is a minimum requirement
  • It now uses the interop event-loop ( defaults to ReactPHP, but it'll work with any library that support the interop loop, including Icicle, ReactPHP, Amphp and KoolKode)
  • It uses a static scheduler (you no longer pass the scheduler through subscribe). For the most part, the user will not have to worry about the scheduler.
  • The event loop auto starts with register_shutdown_function. The less the user has to worry about the underlying event loop and scheduler, the better.
  • We had some non-standard names for operators, because they were reserved words in PHP, those have all been renamed (of, empty, do, catch, switch). Aliases of the old names were left for backwards compatibility.
  • Lots of cleanup
  • Included Allow custom operators

v1 example

With v1, the user has to know when to create a new scheduler and has to be familiar with event loops, which can be confusing and easy to mess up.

$loop      = \React\EventLoop\Factory::create();
$scheduler = new \Rx\Scheduler\EventLoopScheduler($loop);

\Rx\Observable::interval(1000)
    ->take(5)
    ->flatMap(function ($i) {
        return \Rx\Observable::just($i + 1);
    })
    ->subscribe(new \Rx\Observer\CallbackObserver(function ($e) {
        echo $e, PHP_EOL;
    }), $scheduler);

$loop->run();

v2 example

With v2, we pick the correct default scheduler for each operator that requires one, which you can override as needed (setting a scheduler should be the exception, not the rule). In the example below, interval and of already have the correct default scheduler, so the user doesn't need to define one.

\Rx\Observable::interval(1000)
    ->take(5)
    ->flatMap(function ($i) {
        return \Rx\Observable::of($i + 1);
    })
    ->subscribe(function ($e) {
        echo $e, PHP_EOL;
    });

v2 with Loop::execute example

You can also use it within Loop::execute, without making any changes to your Rx code

\Interop\Async\Loop::execute(function () {
    \Rx\Observable::interval(1000)
        ->take(5)
        ->flatMap(function ($i) {
            return \Rx\Observable::of($i + 1);
        })
        ->subscribe(function ($e) {
            echo $e, PHP_EOL;
        });
});

Remaining Items and Ideas

  • If possible, combine subscribe and subscribeCallback
  • Add support for async-interop/promise
  • Move react promise support to stand alone project or remove
  • Look into simplifying AnonymousObservable by moving auto dispose code to Observable
  • Figure out how to get it to work with HHVM
  • Add a forEach method to Observable that'll behave like subscribe except it'll block until complete (for use with synchronous code) similar to RxJava.

Before I move this to the ReactiveX/RxPHP repo, I'd like to get some feedback on the changes above as well as other changes or pain points that we should address with v2.

Demos and tests sometimes run with inconsistent results on travis

As part of the tests on travis, RxPHP runs most of the demos to make sure they are still outputting the same expected results.

This is causing some flakiness in the tests as the demos use a real time clock to run and, given the asynchronous nature of Rx, sometimes the test environments get different results. (These are often cleared up by just rerunning the tests).

I opened this issue as a place to discuss possibilities for resolving this issue.

Example: #98

doOnEach, doOnNext, doOnError, doOnCompleted do not work as expected

Currently, the "do" operators do not work as expected after receiving an error or completed.

Reproducing example:

Observable::just(0)
    ->doOnNext(function ($x) {
        echo "next: " . $x . "\n";
    })
    ->doOnNext(function () {
        echo "completed\n";
    })
    ->repeat(2)
    ->subscribeCallback();
// Output:
// next: 0
// completed
// -------------
// Expected Output:
// next: 0
// completed
// next: 0
// completed

This behavior is due to the AbstractObserver class stopping all events once there is a complete or an error.

Autostart the scheduler

This issue is to discuss using register_shutdown_function to automatically start the loop. This would be done bootstrap.php and set within require in composer.json, like this:

register_shutdown_function(function () {
    Loop::execute(function () {}, Loop::get());
});

Pros:

  1. Users will not need to wrap their code in Loop::execute.
  2. Currently, if someone tries using operators and forgets to wrap them, the program will exit and nothing will happen. By using register shutdown, everything that has been sent to the loop before the program exists, will run.
  3. It hides more of the internal plumbing, that shouldn't matter to most users

Cons:

???

EventLoopScheduler may execute ScheduledItems prior to their scheduled time

The EventLoopScheduler may execute ScheduledItems prior to their scheduled time if a previous ScheduledItem has been disposed of prior to invocation.

This is because the EventLoopScheduler uses peek() on the queue to check on the next ScheduledItem's due time without regard to whether the item has been canceled (or disposed).

`forkJoin` does not work as I expected

Using RxPHP version 2.0.2 with PHP 7.1.7 (on Windows 10, on OSX El Capitan and on Ubuntu 17.04). The behavior is the same on every platform, with an immediate scheduler or with the React event loop.

When I try to run the forkJoin demo :

$ php forkJoin.php
Next value: 47c
Complete!

But when looking at the code, the next value should be: 45c (e.g. the 7 value is not the last one produced by $obs2... in fact 7 is not produced at all by any observable).

$obs1 = Observable::range(1, 4);
$obs2 = Observable::range(3, 5);
$obs3 = Observable::fromArray(['a', 'b', 'c']);

When using forkJoin in my own code :

$obs1 = <an observable that produces a single `Job` instance>;
$obs2 = <an observable that produces a single `Configuration` instance>;
$obs3 = <an observable that produces a single `GitData` instance>;

Observable::forkjoin([$obs1, $obs2, $obs3], function($v1, $v2, $v3) {
  echo '$v1 = ', get_class($v1), PHP_EOL;
  echo '$v2 = ', get_class($v2), PHP_EOL;
  echo '$v3 = ', get_class($v3), PHP_EOL;
})->subscribe(...);

The output I get is not in the expected order (and it is the same even if I change the scheduler):

$v1 = Configuration
$v2 = Job
$v3 = GitData

I'm starting with the Observables: have I misinterpreted what forkJoin should produce, have I missed something? Thanks in advance.

Disposed of scheduled item does not cancel timer in some instances

If the next scheduled item is disposed, the timer is not canceled and will cause the scheduler to hang until that next time.

$loop = Factory::create();

Scheduler::setDefaultFactory(function () use ($loop) {
    return new Scheduler\EventLoopScheduler($loop);
});

$d = Scheduler::getDefault()->schedule(function () {
    echo "Scheduled thing...\n";
}, 5000);

$loop->addTimer(1, function () use ($d) {
    $d->dispose();
});

$loop->run();

The code above will wait 5 seconds and then exit even though the scheduled item is disposed of at 1 second.

How to use the demos?

This may be a stupid question – but what exactly am I supposed to get from the demos?
Each demo contains two files... loading each in the browser gives the same static result.
I'm left just feeling bemused. There's little documentation and the demo's shed no light for me at all on what this all is nor how to make use of it.

Enable strict typing

For great safety, strict typing should be enabled.

Things like range('1', '2') should fail.

Discrepancy between PHP and hhvm with generators

See this code sample:

https://gist.github.com/anonymous/2611cbe9564fd820b9b2

To summarise the output, PHP generates expected output returning numbers 0–12 for both the subscribe and foreach.

hhvm outputs 1–12 for the subscribe and 0–12 for the foreach.

$ php -v
PHP 5.6.17 (cli) (built: Jan  8 2016 10:27:48) 
Copyright (c) 1997-2015 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2015 Zend Technologies
    with Xdebug v2.3.3, Copyright (c) 2002-2015, by Derick Rethans

$ hhvm --version
HipHop VM 3.11.0 (rel)
Compiler: 1454683423_N
Repo schema: abdeb80638110f1c5b97727498d253ab662076cf

(Tested against rxphp 1.1.0)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.