Comments (2)
Something like this perhaps?
final class MergeDelayErrorOperator implements OperatorInterface
{
private ObservableInterface $subsequentObservable;
/**
* MergeDelayErrorOperator constructor.
*
* @param ObservableInterface $subsequentObservable
*/
public function __construct(ObservableInterface $subsequentObservable)
{
$this->subsequentObservable = $subsequentObservable;
}
public function __invoke(ObservableInterface $observable, ObserverInterface $observer): DisposableInterface
{
$errors = [];
return $observable
->materialize()
->merge($this->subsequentObservable->materialize())
->filter(static function ($event) use (&$errors) {
$class = get_class($event);
if ($class === OnErrorNotification::class) {
$errors[] = $event;
return false;
} elseif ($class === OnCompletedNotification::class) {
return false;
}
return true;
})
->dematerialize()
->subscribe(
[$observer, 'onNext'],
[$observer, 'onError'],
static function () use ($observer, &$errors) {
array_map(fn(Notification $n) => $n->accept($observer), $errors);
$observer->onCompleted();
}
);
}
}
from rxphp.
@bartvanhoutte I have tested a couple of ideas. Here is my preferred solution:
Observable::fromArray([$observable1, $observable2])
->reduce(function ($a, Observable $o) {
$s = new Subject();
return [
$a[0]->merge($o->catch(function (\Throwable $e) use ($s) {
$s->onError($e);
return Observable::empty();
})),
$a[1]->merge($s)
];
}, [Observable::empty(), Observable::empty()])
->flatMap(function ($a) {
return $a[0]->concat($a[1]);
});
Here it is implemented as a function in the Observable
class:
public function mergeDelayError(Observable $o) : Observable {
return Observable::fromArray([$this, $o])
->reduce(function ($a, Observable $o) {
$s = new Subject();
return [
$a[0]->merge($o->catch(function (\Throwable $e) use ($s) {
$s->onError($e);
return Observable::empty();
})),
$a[1]->merge($s)
];
}, [Observable::empty(), Observable::empty()])
->flatMap(function ($a) {
return $a[0]->concat($a[1]);
});
}
Below is a test of the solution:
<?php
namespace Rx\Functional\Operator;
use Rx\Functional\FunctionalTestCase;
use Rx\Observable;
use Rx\Subject\Subject;
class MergeDelayErrorTest extends FunctionalTestCase
{
/**
* @test
*/
public function it_waits_for_complete_before_emitting_error()
{
$xs = $this->createColdObservable(array(
onNext(100, 4),
onNext(200, 2),
onNext(300, 3),
onNext(400, 1),
onCompleted(500)
));
$ys = $this->createColdObservable(array(
onNext(50, 'foo'),
onNext(100, 'bar'),
onNext(150, 'baz'),
onError(160, new \Exception()),
onNext(200, 'qux'),
onCompleted(250)
));
$results = $this->scheduler->startWithCreate(function() use ($xs, $ys) {
$xs->mergeDelayError($ys);
});
$this->assertMessages(array(
onNext(250, 'foo'),
onNext(300, 4),
onNext(300, 'bar'),
onNext(350, 'baz'),
onNext(400, 2),
onNext(500, 3),
onNext(600, 1),
onError(700, new \Exception())
), $results->getMessages());
$this->assertSubscriptions(array(subscribe(200, 700)), $xs->getSubscriptions());
$this->assertSubscriptions(array(subscribe(200, 360)), $ys->getSubscriptions());
}
}
from rxphp.
Related Issues (20)
- Promise::fromObservable should be cancelable
- Update PHPUnit to 5.7
- Disposed of scheduled item does not cancel timer in some instances
- EventLoopScheduler does not cancel timer in certain circumstances
- Not getting any output in the console HOT 6
- Weird result for Iterators and Generators and multiple listeners/subscribers HOT 2
- Change behavior when exception occurs in `onNext` HOT 1
- multi-threads support HOT 5
- Observable from function HOT 3
- How to resume on next after catch? HOT 3
- Unexpected Script Execution within PHP 7.3 HOT 8
- Test not working HOT 2
- Is this still maintained? HOT 1
- When trying to write a test I get 'Error: Call to undefined function Tests\onNext()' HOT 1
- PHP 8 Compatibility HOT 4
- Handling streams HOT 1
- Pluck will error on null value
- [ General question ] functional API, RxJS-like HOT 2
- changing behavior of interval function
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rxphp.