reactivex / rxjs Goto Github PK
View Code? Open in Web Editor NEWA reactive programming library for JavaScript
Home Page: https://rxjs.dev
License: Apache License 2.0
A reactive programming library for JavaScript
Home Page: https://rxjs.dev
License: Apache License 2.0
Just as the title says, let's move this back to dispose. This will make for better compatibility with older RxJS, and with the ES7 spec.
Many in the node community have expressed interest in an extremely modular Observable implementation. Creating this issue to gather feedback on the idea of:
subscribe
function)require
'dThis way, the operators that rely on other operators (i.e. concat
being a merge(1)
) can require
the modules they need, and the package manager can worry about factoring out common modules.
Don't know if this is the right place to ask, please let me know if there's a better one.
Is your ReactiveX Github organization/entity related to Reactive-Extensions Github organization found at https://github.com/Reactive-Extensions ?
Is one more up-to-date than the other?
And in particular, where should I look for the most complete documentation related to Rx.NET?
Yours at https://github.com/ReactiveX/Rx.NET is empty, theirs at https://github.com/Reactive-Extensions/Rx.NET is not empty, but has an empty wiki and certainly lacks in documentation when compared to RxJS, RxJava, ...
Thanks for your time
groupBy is not in here yet. This is a critical add.
I vote yes. See it here: https://github.com/fantasyland/fantasy-land
cc: @puffnfresh
Currently it's unsubscribe
, but I feel it should be dispose
. @zenparsing and @jhusain, what aligns with the ES7 proposal?
What are other people's feelings on this?
The current setup isn't something we can really support with Travis yet, not that there's enough (any) tests to justify it.
But it's very important going forward that this issue is tackled.
Currently, in master
the disposal behavior meets the older ES7 Observable spec proposal, so it triggers return
(aka completed) when subscription.dispose()
is called. This is so Observable could support a primed Generator as an Observer to subscribe.
This needs to be removed as it's no longer supported by the ES7 Observable spec. Again, this spec is settling down, but more importantly so is this library. The plan is to change the behavior back to the good ol' RxJS 2 -esque behavior we know and love; Where dispose()
-ing a subscription does not trigger the completed handler.
All references to the disposeHandler
in Observer will also be removed.
There's some mixed spacing in the files, I see both tabs and spaces. I know it's very early but it would be nice to pick one style and stick with it now, to avoid junk in history later.
Most JS projects follow two spaces as convention so I'd suggest that if you're open to it.
Will RxJS Next get ReplaySubject? I've been lately using ReplaySubject with a 1-buffer instead of BehaviorSubject, and I think it's redundant to have both Behavior and Replay as primitives. BehaviorSubject can be achieved with ReplaySubject.
Toying around with the idea of running RxJS's existing QUnit test suite with node-quint-phantomjs. Would need to copy over nearly all the src/core/test code, but should be easier than rewriting all the tests from scratch.
According to Jafar, the latest spec is here: https://github.com/zenparsing/es-observable
This has changed a bit since the original spec that this was based off of, but it seems to be stablizing.
Notably, subscription returns a generator rather than a subscription object.
Might be nice to re-think the API on this one slightly. It seems very uncommon to use the keySelector first argument. Interested in hearing anecdotes to the contrary.
Realistically, we can model all subscription logic with CompositeSubscription. Promoting composite behavior to the Subscription base class will reduce file size and complexity.
We can reduce the number of Subscription allocations by sharing a single Subscription instance between all intermediate Observers per subscribe call. Some operators don't allocate resources that need to be disposed. The ones that do (the flattening, splitting, and coincidence strategies) can add their inner Subscriptions to the Subscription shared between all Observers. This also reduces complexity inside those operators, as they don't have to worry about removing their inner subscriptions from the shared Subscription, because they'll be cleaned up when the shared Subscription is disposed (on throw, return, or dispose).
This is probably only feasible if Subscriptions store their inner subscriptions in a HashSet for O(1) insertion and removal.
Minor observation/aside: it seems like a optimization is possible here vs mergeAll/flatMap since you should only need to track a single subscription. Probably negligible.
I spent a little time talking with @trxcllnt yesterday about scheduler design. I've also glossed over this briefly with @jhusain and @benjchristensen
I have a REALLY, REALLY primitive impl in the codebase now, but it needs work. In particular it needs a better design.
Right now it's a global scheduler. But you'll notice I added a dispose() method to it. This is because I initially created it as a class, and thought scheduling might be instance based. This would make scheduling more flexible. However, global scheduling is very useful for things like Framework lifecycles (@IgorMinar, @machty).
I'm looking for input as to how to make this flexible but easy to understand for authors.
We need to determine what sort of support we'll offer for ES5/ES3 out of the gate. Will we build a bundle for those targets? Or shall we expect that authors transpile from our modules on their own?
I suspect the former would be the most prudent.
EDIT: closed.
Currently the rx.ts file is manually exporting Observable and Subscription constructors on the window object.
Looks like you just forgot the right name.
Need an implementation of a trampolining scheduler (Rx2's currentThreadScheduler
). Don't need an implementation of the recursive scheduler (Rx2's immediateScheduler
) if all the Observable
creation functions (of
, fromArray
, returnValue
, etc.) fallback to imperative loops in the absence of an external scheduler. This is possible since observers now return iteration results from onNext, onError, and onCompleted.
Example:
// src/observable/FromArrayObservable.js
var Observable = require("rx/Observable");
function FromArrayObservable(array, scheduler) {
this.array = array;
this.scheduler = this.scheduler;
}
FromArrayObservable.prototype = Object.create(Observable.prototype);
FromArrayObservable.prototype.subscribe = function subscribe(subscriber) {
var array = this.array;
var scheduler = this.scheduler;
if (scheduler) {
return subscriber.add(scheduler.schedule([{done: false }, subscriber, array, -1], dispatch));
} else {
var index = -1;
var count = array.length;
while (++index < count) {
var result = subscriber.next(array[index]);
if (result.done) {
return subscriber;
}
}
subscriber.return();
return subscriber;
}
};
function dispatch(scheduler, state) {
var result = state[0];
var subscriber = state[1];
if (subscriber.disposed) {
result.done = true;
return subscriber;
}
var array = state[2];
var index = state[3];
if ((state[3] = ++index) < array.length) {
var result = state[0] = subscriber.next(array[index]);
if (result.done) {
return subscriber;
}
return subscriber.add(scheduler.schedule(state, dispatch));
} else if (!result.done) {
subscriber.return();
}
return subscriber;
};
module.exports = FromArrayObservable;
In Rx2 both subscription (subscribe) and delivery (onNext) can be made async.
This is not the case in Rx3.
The following will fail with:
expected [ 'next:1', 'received:1', 'next:2', 'received:2' ] to equal [ 'next:1', 'next:2', 'received:1', 'received:2' ].
var loggedValues = [];
var generator;
var observable = new Observable(g => {generator = g;}, new NextFrameScheduler());
observable.observer({
next: value => loggedValues.push(`received:${value}`),
return: () => {
expect(loggedValues).toEqual(['next:1', 'next:2', 'received:1', 'received:2']);
done();
}
});
setTimeout(() => {
loggedValues.push(`next:1`);
generator.next(1);
loggedValues.push(`next:2`);
generator.next(2);
generator.return();
}, 0);
Are there plans to allow async delivery?
Should not async delivery be the default? After all It is async when dealing with promises.
var loggedValues = [];
var resolve;
var p = new Promise((r) => {
resolve = r;
});
p.then(value => {
loggedValues.push(`received:${value}`);
expect(loggedValues).toEqual(['before', 'after', 'received:42']);
done();
});
setTimeout(() => {
loggedValues.push(`before`);
resolve(42);
loggedValues.push(`after`);
}, 0);
(EDIT: @Blesh)
What we're looking for here is a solid set of analogs to real-world scenarios where bottlenecks or memory inflation have been noticed. These will be run through benchpress.js tests as set up in the perf/
folder. Currently flatMap
is the only working performance test since migrating to macro perf tests.
In the short term (until first beta release, probably) we'll be testing against RxJS 2 as a control, but inevitably we'll want to start testing against previous published versions of itself.
Resolve issue with ES6 complaints from TS compiler.
As discussed in the ES7 Observable Spec here, the idea looks like this:
interface Observer {
subscribed((subscription: Subscription) => void): void
next((value: any) => void): void
error((err: any) => void): void
complete(() => void): void
}
Advantages:
subscribeOn
)this.sub = subscription
in the subscribed
handler, giving a reference to subscription
in all other handlers without closure.This would be to support subscription with a generator in an ergonomic way.
Here's the basic API:
var subscriptionGenerator = myObservable.subscribeWithGenerator(function* () {
try {
var nextValue = yield;
doSomethingWithValue(next);
} catch (err) {
doSomethingWithAnError(err);
} finally {
console.log('completed');
}
});
subscriptionGenerator.return(); // trigger finally and dispose observable
subscriptionGenerator.throw('bad'); // trigger catch block, then finally block, then dispose observable
subscriptionGenerator.dispose(); // just dispose observable
subscriptionGenerator.next('blah'); // next at yourself, sort of silly, but here for completeness.
cc/ @jhusain
I feel type annotation will be good helper to write the amount of code scale like RxJS. In such purpose, many project accepts TypeScript or Closure Compiler.
But the current codebase do not use any type annotation system.
Why? And do you have any plan to introduce such system?
i.e. in flatMap benchmark, chain many observables instead of just emitting many values.
This issue on StackOverflow illustrates a problem with backpressure in current RxJS, I think
While I think this guy could solve his problem with an Iterator of Promise, I think it would be better if RxJS could support this with fewer object allocations.
cc/ @benjchristensen
Please note there are two different implementations of Observable right now:
These are experimental until we decide on one.
For now they will need some unit tests to make sure they work.
Since the ES7 Observable spec has moved away from the use of Generators, and we have a more solid idea now of the direction the ES7 Observable spec is going, we need to circle back and remove the IteratorResult returns and checks from this library.
Instead we'll be moving back to a more RxJS 2 -esque model where those methods return void
.
Based on discussions in #69, #70 and (mostly) #71.
Should the interfaces of Observer and Subscription be joined into a single type? Right now, generally, Subscriptions are created 1:1 with (safe) Observers. They tend to have to "know" about each other or at least notify one another to update state such as isUnsubscribed
. And there might be some wins to joining them into a single class.
This would also definitely affect the lift
method, and ObserverFactories would have to become SubscriptionFactories or the like, I suppose.
Pros? Cons? Good idea? Horrible idea?
According to @mattpodwysocki, he needs to introduce breaking changes to RxJS 2 to fix some bugs (this is my primitive understanding). Breaking changes, according to NPM's semver guidelines, mean a major revision.
Thoughts? Concerns?
@trxcllnt and @benjchristensen - This is a placeholder for the discussion around what this is and how we should implement it. Could one of you type up a description please?
Observable is a functional programming thing. In pure functional programming, there is no "Zalgo". So anyone that is using Observable properly will want to always use [Symbol.observer]()
to subscribe to the observable. I think it's safe to say that I know exactly zero heavy users of Observable, at Netflix at least, that want to do this. (I'll go so far as to say that list probably includes @jhusain. haha) Just playing with it hurts and I find myself wanting to override the whole thing like: Observable.prototype.subscribe = Observable.prototype[Symbol.observer]
just to enjoy using the type.
I understand that Zalgo is a footgun for people who are coding imperatively, so we need to build libraries around this defensively. I guess I feel hiding it under [Symbol.observer]
is just a little too much.
I'd like to propose that [Symbol.observer]
change to either:
subscribe
- back to the good old days of RxJS, maybe make the "safe" version be named when
.forEach
- because the name alone makes it sound like sync behavior, honestly.observe
- because it's less annoying to type over and over than [Symbol.observer]
.At some point people are going to have to put on their big-boy pants about Zalgo. If you're programming functionally, it's really all or nothing, or you better know what you're doing. Observables are not promises. They're not a stateful construct for async imperative programming, they're a functional programming paradigm.
I really half-assed the typings in the library so far (currently in the lift
branch). It would be good to make a pass through this and perhaps add proper TypeScript generics to Observable. It's just not a priority because it's sugar.
As mentioned in #1, there are two different implementations of map for now, we need to create some performance tests for these implementations to determine which method performs better.
I'd love some help from @IgorMinar or @jeffbcross with this, being that they have more expertise than myself in this arena.
Implement Observable operators in terms of lift
.
lift
?lift
is a function that takes a source Observable and an Observer factory function, and returns a new Observable:
function lift (Observable source, Function observerFactory) {
return new Observable((destinationObserver) => {
return source.subscribe(observerFactory(destinationObserver));
});
}
Though this is the formal definition, we should make the following changes to our Observable's lift
:
lift
on the Observable prototype.source
argument in favor of using this
as the source.observerFactory
from function
to the ObserverFactory
interface. This interface requires the implementor to have a create
function that accepts an Observer and returns an Observer.lift
has the following advantages:
lift
on their Observable subclass.For example, here's how map
is implemented in terms of lift
:
class Observable {
constructor(subscribe) {
if(subscribe) {
this.subscribe = subscribe;
}
}
subscribe(observer) {
return this.source.subscribe(this.observerFactory.create(observer));
}
lift(observerFactory) {
const o = new Observable();
o.source = this;
o.observerFactory = observerFactory;
return o;
}
map(selector) {
return this.lift(new MapObserverFactory(selector));
}
}
class MapObserverFactory {
constructor(selector) {
this.selector = selector;
}
create(destination) {
return new MapObserver(destination, this.selector);
}
}
class MapObserver {
constructor(destination, selector) {
this.selector = selector;
this.destination = destination;
}
next(x) {
return this.destination.next(this.selector(x));
}
throw(e) {
return this.destination.throw(e);
}
return(e) {
return this.destination.return(e);
}
}
Should be consistent with other ReactiveX projects.
Based on discussions on the es-observable spec repo, it's been collectively determined that subscription and observation must be synchronous by default for performance reasons.
One of the lead examples was groupBy
, as pointed out by @trxcllnt... if subscription is always async, you'll potentially have to buffer an unbounded number of values that could be synchronously pushed at newly created "Grouped" Observables.
There's also a smaller matter of incurring extra turns for no real gain if this functional programming type is being used purely functionally.
The danger here is to those that are using mutable state and imperative programming. In those situations, there might be some strange behaviors. It is my view that this should simply be taught to people via warnings and best practices.
Per #28, we need to match observable spec defined at http://github.com/zenparsing/es-observable
After discussion with @jhusain, it's been determined that the signature should be "generator in generator out", and that Generator should have an additional dispose
method on it.
Here's the basics for purposes of this issue (the @zenparsing repo will be the "official" version of the spec though, it's just not updated as of the time of this issue creation):
// rough interface (could be more "genericky", I suppose)
interface Generator {
next(value:any):IteratorResult<any>
return(value:any):IteratorResult<any>
throw(value:any):IteratorResult<any>
dispose():void
}
let subscription:Generator = myObservable.subscribe(observer:Generator);
With this there will be three methods of unsubscribing:
subscription.return(x)
- signal success to the consumer, then clean up.subscription.throw(err)
- signal an error to the consumer, then clean up.subscription.dispose()
- just clean upThis enables the support of generator functions as observers:
myObservable.subscribe(prime(function* () {
try {
while(true) {
var value = yield; // nexted value
doSomething(value);
}
} catch(err) {
someErrorHappened(err)
} finally {
// clean up
}
}));
which would be (roughly) synonymous with:
myObservable.subscribe({
next(value) { doSomething(value); },
throw(err) { someErrorHappened(err); },
dispose() { completedSuccessfully(); }
});
Both will work, the latter is preferred in most cases, but the primed generator pattern can prove useful for things like parsers or other state machines that may built up values to clean up as it processes incoming values.
* Note: prime
is a function that pumps one "next" value at the generator to get it started, since generators are initialized lazily with the first next
call to their returned iterators.
/cc @jhusain
ref:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.