Comments (21)
Range v2.0.0 has been re-written in a similar fashion as fromIter to address this issue
from callbag.
Hi! Yes this is an issue that needs to be carefully built and preferably in the source, not in the sink. See the implementation of callbag-from-iter
from callbag.
The whole got1
and inLoop
riff seems a bit over-complex.
The following approach seems more concise, and is how I've implemented my pullables:
let pulls = 0;
sink(0, t => {
if ((t === 1) && (pulls++ === 1)) {
do {
// push down t=1|2 here
// set pulls = NaN after pushing 2
} while (--pulls > 0);
} else if (t === 2) {
pulls = NaN;
}
})
The code says: "On a pull, start pushing if applicable, and while there are additional pulls, keep pushing, but once we have pushed for every applicable pull, become idle again."
Conveniently, both the initial entry to the loop and loop test take the correct action when pulls
becomes NaN
(i.e., skips the loop, or exits the loop).
Note that pulls
should be set to NaN
whenever the last item is pushed (for finite sources) or the source receives a 2 (neither case shown here).
from callbag.
That makes sense, nice solution! π
I think this should be pointed out somewhere officially, both the problem and the solution, so people know how to build and consume pullable sources.
from callbag.
How about an operator that takes a pullable source and wraps it to apply this solution?
It could also be used as an utility for implementing pullable sources in a simple way and wrapping them internally before returning.
Now I donβt know what such an operator should be called.
Maybe something abstract like checkPullable
or guardStack
?
Or rather something more concrete like loopingPullable
or finishPushBeforePull
?
from callbag.
@mperktold I would recommend simple synchronous pullables to just be implemented as generator functions (Iterables) so then you pass them to fromIter
which applies the magic. In other words, fromIter is already that solution you suggested.
from callbag.
Good point, since generator functions are quite easy to write.
Also, this way no missbehaving sources are involved, only a generator function and a proper pullable source.
Still, I think that pullable sources and their properties should be defined more formally in a central place.
Many callbag operators claim to work for listenable sources, pullable sources, or both, so a common definition of these terms would ensure interoperability.
To keep the core specification minimal, I created a repository with a separate spec for pullable sources.
It is only a draft, so if you want to modify something, just go for it. π
What do you think about having such a specification in general? Should the wiki refer to it?
If this is too much off topic, we can also close this issue and open a new one to discuss about the spec proposal.
from callbag.
Is it not possible to implement all sinks, sources, and operators to flexibly work in both pull or push scenarios? Is there some reason not to do this... performance perhaps?
As far as I can see, all sinks could issue pulls "just in case" they are connected to an upstream pullable, and all push-only sources could be expected to simply ignore pulls. Am I wrong?
If there is a reason to create implementations that cannot always work together (and a performance-based reason is possibly plausible), then it seems a shame, as it leaves us with a mishmash of streaming components that may or may not work with each other.
Instead, one must carefully read the documentation for each or, failing that, read and understand the implementation, and/or just figure things out by trail and error. This is pretty much completely unlike other streaming libraries, where all sinks, sources, and operators can simply be assumed to work.
The fact that there is not even a common naming convention (pushonly-
, pullable-
prefixes, or something) for callbag implementations to help one quickly understand their expected role(s) in a chain, leaves a bit of bad taste in my mouth... for what is otherwise an incredibly elegant interoperability solution.
I would also comment that all the sample implementations I have seen use a coding style that is quite obscure (almost appearing like minified code). I'm not sure the point of this, other than to make the code appear "tight". But, single-character variable names, collapsed control flow structures, nested anonymous functions, and indentation that does not always match the logical scoping of the internal execution contexts, does not aid in understanding.
Granted, this may all be viewed as a matter of style, but I think much of the provided callbag implementations do a disservice to the overall concept as, IMO, the code is nearly unreadable, except in the simplest of cases (with much respect to @staltz otherwise). I hope most new callbag authors do not copy this style.
from callbag.
As far as I can see, all sinks could issue pulls "just in case" they are connected to an upstream pullable, and all push-only sources could be expected to simply ignore pulls. Am I wrong?
Yes, sinks can just pull the source when they want to get the next item regardless of whether the source is pullable or listenable.
However, the other way around is much harder: You cannot tell a listenable source when it should emit items and when it shouldn't.
In some cases though you really need a pullable source. For example, take to-iterable
, which converts a source callbag to an Iterable
. Since Iterable
is a pull-based API, the input callbag must be pullable as well.
Therefore, I would say that the best way to have all callbags work in both pull or push scenarios would be to always implement them as pullable, and keep pulling until completion for achieving push behavior.
Btw, this is actually the idea behind backpressure in RxJava.
from callbag.
Thanks @mperktold, but I think you slightly missed my point (or perhaps I am missing yours).
There seem to be some callbag implementations that simply don't work together, but really only for lack of implementation, not for lack of possibility, was my point.
For example, I could create a forEach
sink that never issues pulls, and thus will only work with sources that actively push data. But, there would seem to be no compelling reason to do this -- my forEach
sink could also issue pulls "just in case", and thus handle pulled or pushed data emissions all the same. In fact, I believe the "standard" forEach
implementation does just that (i.e., is a "hybrid" implementation that works in either scenario).
So, my question is, why aren't all sinks, sources, and operators implemented using a "hybrid" style? Is it because some "should not" be implemented this way, some "cannot", or some simply "are not"? Based on the implementations I've seen so far, the answer seems to be that they "are not" (i.e., there is no good reason, merely a lack of a "more complete" implementation).
In any case, users of these "non-hybrid" (i.e., pull-only or push-only) implementations are at a bit of a loss to figure out what is going to work and not work, and that seems concerning, and feels like an unnecessary friction in the eco-system. And, although @staltz has done a good job of enumerating the types of components we can expect to find (e.g., listenables, pullables, etc.), there hasn't emerged a conventional way of clearly denoting such behavior.
from callbag.
Hi! There are a few different topics to address here. One is about the unpredictability of callbags, and the other is about the support from many callbag libraries.
Let me remind an important point from this article on Egghead:
Generality: Promise < Observable < Callbag < Function
Predictability: Function < Callbag < Observable < Promise
Fortunately, callbags are more powerful than observables. Unfortunately, they are less predictable than observables. Same for observables versus promises. For instance a promise only resolves a value once, but an observable can emit multiple values. So when you subscribe to an observable, you are not sure how many values you're going to get. Implementations have to take this into account: receiving one value doesn't mean that it's the final and correct value, the subscriber may need to wait an unknown amount of time before receiving the next value. Sometimes, for predictability, it is desirable to know and be able to predict the number of emissions of an observable, which has led to the creation of Single in RxJava, and specialization of some operators to preserve predictability before and after operator application. For instance, if you map
a Single, you are certain to expect a Single
as output.
Something similar happens with callbags. When you "subscribe"/consume one, you are not sure if it requires pulling or not. In fact the sink may pull but the source may be slow to deliver any value, which may look like the source is not pullable, but you cannot know if it's just slow or if it doesn't support pulling. This is not "a shame", this is what allows for the power of callbags. Unpredictability equals generality. Inflexibility equals predictability. For instance, functions are the most unpredictable abstraction in JavaScript, but are extremely powerful and general.
Now let's talk about the support that multiple libraries provide, or don't provide. First let's note that callbag is not a stream library, it's much better to compare it to functions than it is to compare with RxJS. There are thousands of libraries on npm which provide functions as API, and although they belong to the same abstraction category, there are lots of ways you can consume a function. For instance, if I give you the function f
and don't tell you anything else about it, you don't know if the signature is f(x): y
or f(x, cb): void
or f(cb): void
or f(): void
or f(): x
. So libraries document this in a readme, or sometimes (even better) with TypeScript definitions. TypeScript also doesn't guarantee anything, sometimes there's a mismatch between the JS implementation and the TS typings, but it does communicate something about the predictability (i.e. about the inflexibility!) of the function.
In that light, callbags are literally just special types of functions. A function f
is a callbag if its arguments are f(type: 0|1|2, payload: any): void
and if it follows the spec described in this repo. I have been wanting to write a helper library that does automatic checking of the behavior of a callbag, using (potentially) property based testing, but I couldn't figure it out. So far, like most JS libraries out there, their behavior must be specified in the documentation. The good news is that even if operator X doesn't work with source Y (and assuming that both correctly follow the callbag spec), then there's still a way of including operator Z and W and others, to glue together X with Y in the correct way. For instance, say you try to use callbag-observe on a pullable source. It won't work, but you can use callbag-sample as a glue in between.
Finally, for use cases that need higher predictability, it would make sense to make a preset of operators and sources that are well tested to work with each other, and then possible to publish separately. This is similar to React component libraries that contain many components that are visually consistent with each other. You could have assembled these components separately from assorted third party libraries on NPM, but having a bundle of them well tested for consistency can help.
from callbag.
@franciscotln looks like your got1
is global in callbag-range. I don't think that's intentional.
from callbag.
I've proposed this PR to @staltz to simplify the code of callbag-from-iter
:
PR#5
BTW @Andarist suggested implementing callbag-range using callbag-from-iter as dependency, which made sense to me and it's done this way in range v.3.0.0
from callbag.
now I think I've got it simpler and working in all cases @Andarist :
const fromIter = iter => (start, sink) => {
if (start !== 0) return;
const iterator = typeof Symbol !== 'undefined' && iter[Symbol.iterator]
? iter[Symbol.iterator]()
: iter;
let value, done, disposed, draining;
sink(0, t => {
if (disposed || done) return;
if (t === 1) {
while (draining = !draining) {
({ done, value } = iterator.next());
done ? sink(2) : sink(1, value);
}
}
if (t === 2) disposed = true;
});
};
from callbag.
No, your while loop is still pushing everything from the iterable whether the sink has pulled it or not.
A pullable source should push only with a 1:1 ratio to the pulls it receives. Your code pushes everything after the first pull request (1), unless the sink somehow manages to get in an end request (2).
Hint: You should use a counter to track the pulls you received and push no more than than many times.
from callbag.
Based on what you are saying that it's wrong if it works exactly as the current fromIter? :-)
This is a 1:1 ratio otherwise it wouldn't support lazy pulls, and it does.
test('it supports lazy pulls', t => {
t.plan(5);
const source = fromIter([10, 20, 30]);
const actual = [];
const downwardsExpectedTypes = [
[0, 'function'],
[1, 'number'],
];
let talkback;
source(0, (type, data) => {
const et = downwardsExpectedTypes.shift();
t.equals(type, et[0], 'downwards type is expected: ' + et[0]);
t.equals(typeof data, et[1], 'downwards data type is expected: ' + et[1]);
if (type === 0) {
talkback = data;
setTimeout(() => {
talkback(1);
}, 100);
setTimeout(() => {
talkback(2);
}, 150);
}
if (type === 1) {
actual.push(data);
}
});
setTimeout(() => {
t.deepEquals(actual, [10]);
t.end();
}, 180);
});
Just wrote these, and all the current written tests also pass
from callbag.
I think I may have misinterpreted your while loop. I see what it does now (it does not send everything at once), but I think the looping
variable feels like it is named somewhat backwards.
But, more importantly, you still have the issue that multiple pull requests can arrive during a single loop iteration (aka, push) that this code won't properly take into account (since they will all be "merged" into a single state-change of the looping
variable, which only leads to a single push).
Maintaining a counter (of the number of pending pulls) would be better (and also leads to a much simpler implementation). See my example posted previously in this thread.
from callbag.
How can you tell the number of pending pull of a generator function?
function* fibonacci() {
let fn1 = 0;
let fn2 = 1;
let stop = false;
while (!stop) {
let current = fn1;
fn1 = fn2;
fn2 = current + fn1;
stop = yield current;
}
}
const sequence = fibonacci();
pipe(
fromIter(sequence),
subscribe({
next: v => {
console.log(v);
if (v > 30) sequence.next(true);
},
complete: () => console.log('done'),
})
);
/* logs:
0
1
1
2
3
5
8
13
21
34
done
*/
from callbag.
The pulls come from the sink, not the generator. You just need to track the pulls, and "generate" the matching number of pushes. When they match, you stop pushing wait for more pulls. That way, you only push what is pulled (1:1).
I have already shown the code to do this.
from callbag.
The code snippet that you edited above doesn't emit any value.
If you could share a https://codesandbox.io would be nice to see and test it :-)
from callbag.
Closing this the discussion seems to be finished long ago.
I still feel the need to document this in some official place, but thatβs maybe another issue.
from callbag.
Related Issues (20)
- Use named constants and non-zero power of two values for type parameter HOT 1
- Versioning HOT 1
- I'm missing something HOT 12
- Calling a sourceTalkback with 1 and data HOT 1
- How to handle additional websocket connection state HOT 1
- More readable implementation of callback based duplex stream HOT 13
- Errors and Termination HOT 11
- Ability to discover what IDs a callbag supports HOT 8
- A more concise way HOT 6
- Are asynchronous handshakes permitted? HOT 1
- What if an exception is thrown during callbag execution? HOT 19
- sink termination propagation
- Confusion about spec HOT 7
- Control flow assumptions HOT 16
- Express variance in types HOT 12
- Ability to detect callbags
- Stricter types? HOT 1
- Missing index.js referenced from "main" in package.json breaks Vite HOT 1
- Why do we say sink is a callbag?
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. πππ
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google β€οΈ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from callbag.