Comments (8)
If someone is looking for the solution:
const T = require("transducers-js");
const Kefir = require("kefir");
const groupBy = (keyF, limitF) => src => {
const streams = {};
return src.transduce(T.comp(
T.filter((x) => !streams[keyF(x)]),
T.map(function(firstValue) {
const key = keyF(firstValue);
const similarValues = src.changes().filter(x => keyF(x) === key );
const data = Kefir.later(0, firstValue).concat(similarValues);
const limited = limitF(data, firstValue).withHandler((emitter, event) => {
if (event.type === 'end') {
delete streams[key];
emitter.end();
} else {
emitter.emit(event.value);
}
});
streams[key] = limited;
return limited;
})
))};
from kefir.
The use of Kefir.later
means that the value is being emitted after the test ends. Either use Kefir.constant
, which will emit the value immediately, or use toEmitInTime
and tick
to advance the clock & emit that value.
from kefir.
Could you explain groupBy
's behavior?
from kefir.
groupBy
groups the emitted events in a observable by a key function and returns a stream of streams in which each stream only contains items which have the same key function result.
for reference:
https://baconjs.github.io/api3/classes/observable.html#groupby
https://rxjs.dev/api/operators/groupBy
from kefir.
Interesting. I don't know as we need this for Kefir core (@kefirjs/core happy to disagree if you think this is useful), but one nice thing we added recently is the thru
method, which would allow you to write a groupBy
function that implements this behavior and use it like a method:
import groupBy from 'kefir-groupBy' // would be a package you create
import Kefir from 'kefir'
var events = [
{ id: 1, type: 'add', val: 3 },
{ id: 2, type: 'add', val: -1 },
{ id: 1, type: 'add', val: 2 },
{ id: 2, type: 'cancel' },
{ id: 3, type: 'add', val: 2 },
{ id: 3, type: 'cancel' },
{ id: 1, type: 'add', val: 1 },
{ id: 1, type: 'add', val: 2 },
{ id: 1, type: 'cancel' }
]
function keyF(event) {
return event.id
}
function limitF(groupedStream) {
const cancel = groupedStream.filter(x => x.type === 'cancel').take(1)
var adds = groupedStream.filter(x => x.type === 'add')
return adds.takeUntil(cancel).map(e => e.val)
}
Kefir.sequentially(2, events)
.thru(groupBy(keyF, limitF)) // use it like this
.flatMap(groupedStream => groupedStream.fold(0, (acc, x) => acc + x))
.onValue(sum => {
console.log(sum)
// returns [-1, 2, 8] in an order
})
from kefir.
Great, I'll try it with the thru
method.
Thanks ! :)
from kefir.
The only thing I would test there is making sure errors just flow through as expected, but otherwise, looks good!
from kefir.
I am coming back here to ask for your help. The implementation seems to work very much fine a a normal environment but does not work when it's used within jest-kefir
and I have no idea why.
Here is my test usage in a normal environment where everything works fine:
const data = {id: 1, data: 42};
const data2 = {id: 2, data: 43};
const a = stream();
const b = a
.thru(groupBy(a => a.id))
.flatMap(groupedStream => groupedStream.flatMapLatest(x => Kefir.later(0, x)))
.log();
send(a, [value(data), value(data2)]);
Output:
[stream.transduce.flatMap] <value> { id: 1, data: 42 }
[stream.transduce.flatMap] <value> { id: 2, data: 43 }
This is the test which fails because there is no output at all:
const data = {id: 1, data: 42};
const data2 = {id: 2, data: 43};
const a = stream();
const b = a
.thru(groupBy(a => a.id))
.flatMap(groupedStream => groupedStream.flatMapLatest(x => Kefir.later(0, x)));
expect(b).toEmit([value(42), value(43)], () => {
send(a, [value(data), value(data2)]);
});
Maybe you have a hint for, what could be the problem.
from kefir.
Related Issues (20)
- KefirJS and WebSockets HOT 4
- Only `sampledBy` is past tense
- Current state policies HOT 2
- Idea: Usable as AsycInterator
- Proposal to add `fromProperty` static function HOT 15
- Static Land interop `Observable` definition is wrong HOT 5
- Shouldn't a property only emit on "new values"? Proposing `onChange()` HOT 1
- Missing type for `unplug`
- Maintain TypeScript types? HOT 4
- Stop bundling `symbol-observable`
- TypeError: this._emitValue is not a function HOT 1
- Make subscription available inside callback HOT 8
- A big "Thank You" to Kefir developers HOT 1
- How to import Kefir within Rollup library HOT 3
- flatMapConcat should work when a spawned observable ends synchronously on activation
- Add documentation to "Kefir.pool" as "Rx.Subject equivalent" HOT 2
- Synchronous emit in flush method of debounce produces incorrect result HOT 8
- Is it possible to not activate a staled observable when reactivating flatMapLatest? HOT 8
- Russian docs translate
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kefir.