Coder Social home page Coder Social logo

dart-lang / stream_transform Goto Github PK

View Code? Open in Web Editor NEW
120.0 40.0 21.0 375 KB

Dart utility methods to create StreamTransfomer instances to manipulate Streams

Home Page: https://pub.dev/packages/stream_transform

License: BSD 3-Clause "New" or "Revised" License

Dart 99.87% HTML 0.13%

stream_transform's Introduction

Dart CI Pub package package publisher

Extension methods on Stream adding common transform operators.

Operators

asyncMapBuffer, asyncMapSample, concurrentAsyncMap

Alternatives to asyncMap. asyncMapBuffer prevents the callback from overlapping execution and collects events while it is executing. asyncMapSample prevents overlapping execution and discards events while it is executing. concurrentAsyncMap allows overlap and removes ordering guarantees for higher throughput.

Like asyncMap but events are buffered in a List until previous events have been processed rather than being called for each element individually.

asyncWhere

Like where but allows an asynchronous predicate.

audit

Waits for a period of time after receiving a value and then only emits the most recent value.

buffer

Collects values from a source stream until a trigger stream fires and the collected values are emitted.

combineLatest, combineLatestAll

Combine the most recent event from multiple streams through a callback or into a list.

debounce, debounceBuffer

Prevents a source stream from emitting too frequently by dropping or collecting values that occur within a given duration.

followedBy

Appends the values of a stream after another stream finishes.

merge, mergeAll, concurrentAsyncExpand

Interleaves events from multiple streams into a single stream.

scan

Scan is like fold, but instead of producing a single value it yields each intermediate accumulation.

startWith, startWithMany, startWithStream

Prepend a value, an iterable, or a stream to the beginning of another stream.

switchMap, switchLatest

Flatten a Stream of Streams into a Stream which forwards values from the most recent Stream

takeUntil

Let values through until a Future fires.

tap

Taps into a single-subscriber stream to react to values as they pass, without being a real subscriber.

throttle

Blocks events for a duration after an event is successfully emitted.

whereType

Like Iterable.whereType for a stream.

Comparison to Rx Operators

The semantics and naming in this package have some overlap, and some conflict, with the ReactiveX suite of libraries. Some of the conflict is intentional - Dart Stream predates Observable and coherence with the Dart ecosystem semantics and naming is a strictly higher priority than consistency with ReactiveX.

Rx Operator Category variation stream_transform
sample sample/throttleLast(Duration) sample(Stream.periodic(Duration), longPoll: false)
throttleFirst(Duration) throttle
sample(Observable) sample(trigger, longPoll: false)
debounce debounce/throttleWithTimeout(Duration) debounce
debounce(Observable) No equivalent
buffer buffer(boundary), bufferWithTime,bufferWithCount No equivalent
buffer(boundaryClosingSelector) buffer(trigger, longPoll: false)
RxJs extensions audit(callback) No equivalent
auditTime(Duration) audit
exhaustMap No equivalent
throttleTime(trailing: true) throttle(trailing: true)
throttleTime(leading: false, trailing: true) No equivalent
No equivalent? asyncMapBuffer
asyncMapSample
buffer
sample
debounceBuffer
debounce(leading: true, trailing: false)
debounce(leading: true, trailing: true)

Getting a StreamTransformer instance

It may be useful to pass an instance of StreamTransformer so that it can be used with stream.transform calls rather than reference the specific operator in place. Any operator on Stream that returns a Stream can be modeled as a StreamTransformer using the fromBind constructor.

final debounce = StreamTransformer.fromBind(
    (s) => s.debounce(const Duration(milliseconds: 100)));

stream_transform's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stream_transform's Issues

[FR] exhaustMap

Hello folks.

Seeing that this package could potentially be merged into package:async, got me thinking about what features it lacks for my personal usage to completely replace the RxDart package.

The only one is basically the exhaustMap transformer. From the most used higher-order transformers triad asyncExpand, switchMap and exhaustMap, the last one is lacking, so it would be great to have it natively.

Thanks!

dart2 error with controller stream

https://travis-ci.org/dart-lang/stream_transform/jobs/395141419

00:13 +23 -2: test/switch_test.dart: Outer type: [single subscription], Inner type: [single subscription] forwards errors from outer Stream [E]                                                        
  type '_ControllerStream<dynamic>' is not a subtype of type 'Stream<Stream<dynamic>>' of 'outer'
  package:stream_transform/src/switch.dart  _SwitchTransformer.bind
  dart:async                                Stream.transform
  test/switch_test.dart 46:16               main.<fn>.<fn>
  ===== asynchronous gap ===========================
  dart:async                                _AsyncAwaitCompleter.completeError
  test/switch_test.dart                     main.<fn>.<fn>

Add startWith - prepend values from an Iterable

I think it would be useful to have a startWith<T>(Iterable<T> values). The use case I've hit is where I'm using scan and I want the initial value output as well. I'd rather not add a boolean argument to scan so I think the more general approach would look like values.transform(scan(initial, combine)).transform(startWith([initial]));.

The alternative today seems to be much clunkier: new Stream.fromIterable([initial]).transform(concat(values.transform(scan(initial, combine))))

Approximately matches the same name in Reactive, except those implementations mostly use varargs which aren't an option in Dart.

rxDart has startWith(value) and startWithMany(values).

@matanlurey @jonahwilliams - any objections?

Provide an API to combine a stream list of any size

If one wants to combine a list of streams that may be also empty or only contains one stream, the solution would be to do this over and over:

final streams = <Stream>[...];
if (streams.isEmpty) {
  return Stream.value(<T>[]);
}
if (streams.length == 1) {
  return streams[0].map<List<T>>((event) => <T>[event]);
}
return streams[0].combineLatestAll(streams.sublist(1));

This is not very clean to write over and over and seems to me like a pretty common use-case especially if you deal with server data collections.

I would propose to add a function like this to stream transform natively:

Stream<List<T>> combineStreams<T>(List<Stream<T>> streams) {
  if (streams.isEmpty) {
    return Stream.value(<T>[]);
  }
  if (streams.length == 1) {
    return streams[0].map<List<T>>((event) => <T>[event]);
  }
  return streams[0].combineLatestAll(streams.sublist(1));
}

Add asyncScan

Hi,

I am wondering whether it would make sense to add an asyncScan method to this library.

The idea would be to build a kind of reducer that provides the next value asynchronously.

My use case would be the data source of an infinitely scrolling list, where the UI passes a watermark and the stream would fetch the next items off the network and return a new List<ViewModel>.

Would this be a useful addition, or is there another construct that would be better suited to this case?

Thanks!

Add leading and trailing parameters to debounce and throttle

It is sometimes helpful to specify whether a debounced stream should invoke on the leading or trailing edge of the timeout.

For example, right now we're working on a feature where the user can flip through weekly charts. We want to debounce API calls because the user can flip very quickly, and we don't need to make API calls for charts that the user has effectively skipped. It would be nice to be able to specify that we want to invoke on the leading edge to support the common use case where the user flips back only one week, and expects that chart to load instantly.

For reference, JavaScript's lodash has implemented these parameters:

Implement as part of core Stream api?

Heya @natebosch!

Just checked up on the progress on this package, very cool :) I also re-read the package vision doc from a while back, and it got me thinking: Is there any way we could move these from a third-party package into the core Stream Api?

I started working on RxDart with Frank b/c I felt the core Stream API was simply missing a lot of features I needed (concatenating, zipping, flatMapping / Latest, really core features IMO to developing UIs with Streams and probably other things). It seems I'm not the only one, as in addition to this package, there are several implementations of these same StreamTransformers in various packages:

  • RxDart
  • Frappe
  • stream_ext
  • The async package even has things like StreamZip

Given that the community keeps reinventing the wheel here, demonstrating a clear need for these transformers, and since this package resides under the dart-lang org, I was wondering if it was possible to simply move many of these into the core Stream API, or if we could advocate together for such a change to the core API?

I think this would reduce the duplicated effort across so many packages and give Dart a better core async lib. Any thoughts?

use pkg:pedantic lints

Missing:

"avoid_relative_lib_imports",
"avoid_types_as_parameter_names",
"no_duplicate_case_values",
"prefer_contains",
"prefer_equal_for_default_values",
"recursive_getters",
"use_rethrow_when_possible"

Broadcast stream to emit most recent event sent on listening

It's more a feature request than an issue.
I'm facing this issue: https://stackoverflow.com/questions/68590198/dart-get-the-last-or-the-first-value-of-a-stream

There is a suggestion to solve it (see bellow), and I would love to see it inside stream_transform library. Does it make sense?

// Copyright 2021 Google LLC.
// SPDX-License-Identifier: Apache-2.0
import "dart:async";

/// Listens to [source] to returned stream.
///
/// Each listener on the returned stream receives the most recent
/// event sent on [source] followed by all further events of [source]
/// until they stop listening.
/// If there has been no events on [source] yet, only the further events
/// are forwarded.
Stream<T> mostRecentStream<T>(Stream<T> source) {
  var isDone = false;
  var hasEvent = false;
  T? mostRecentEvent;
  List<MultiStreamController>? pendingListeners;
  var listeners = <MultiStreamController>[];

  void forEachListener(void Function(MultiStreamController) action) {
    var active = 0;
    var originalLength = listeners.length;
    for (var i = 0; i < listeners.length; i++) {
      var controller = listeners[i];
      if (controller.hasListener) {
        listeners[active++] = controller;
        if (i < originalLength) action(controller);
      }   
    }
    listeners.length = active;
  }

  source.listen((event) {
    mostRecentEvent = event;
    hasEvent = true;
    forEachListener((controller) {
      controller.addSync(event);
    });
  }, onError: (e, s) {
    forEachListener((controller) { 
      controller.addErrorSync(e, s);
    });
  }, onDone: () {
    isDone = true;
    for (var controller in listeners) {
      controller.close();
    }
    listeners.clear();
  });

  return Stream<T>.multi((controller) {
    if (hasEvent) controller.add(mostRecentEvent as T);
    if (isDone) {
      controller.close();
    } else {
      listeners.add(controller);
    }  
  });
}

Transformers swallow new subscriptions for broadcast streams

Consider a stream that does something when being listened to:

import 'dart:async';
import 'package:stream_transform/stream_transform.dart';

class DoOnSubscribeStream<T> extends Stream<T> {
  final Stream<T> inner;
  final void Function() onSubscribe;

  DoOnSubscribeStream(this.inner, this.onSubscribe);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    onSubscribe();
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

Now, let's say that stream is used as a broadcast stream that prints something for each new subscription. When using transformers in this package, new subscriptions downstream do not trigger the callback:

void main() {
  final controller = StreamController<void>.broadcast(
      onListen: () => print('controller.onListen'));
  final stream = DoOnSubscribeStream(controller.stream, () {
    print('new subscription');
  });

  var switched = stream.switchMap((event) => Stream.value(event));
  switched.listen(null); // prints!
  switched.listen(null); // does not print!
}

The full example prints:

  • new subscription
  • controller.onListen

I would expect it to print:

  • new subscription
  • controller.onListen
  • new subscription

I know that the reason for this behavior is that the switchMap transformer uses a broadcast stream controller for the inner stream, and that only calls onListen once. However, I don't agree with this behavior because it assumes that each subscription to a broadcast stream emits the same events to be correct. I couldn't find any official source backing that assumption though. Another example for this is the Stream.multi constructor in the SDK. Is this the expected behavior?

void main() {
  var counter = 0;
  final stream = Stream<int>.multi((controller) {
    controller.add(++counter);
    controller.close();
  }, isBroadcast: true);

  var switched = stream.switchMap((event) => Stream.value(event));
  switched.listen(print); // prints 1
  switched.listen(print); // also prints 1 :(
}

See also: The discussion at ReactiveX/rxdart#587

Add a table (or similar) with a mapping between rx operators and this package

For folks who are familiar with Rx, but not Dart streams we should document some differences and similarities.

  • For operators which are similar, but have semantic differences we should call those out.
  • For operators which are similar but have different names, we should provide a table mapping between them.

Add a trailing option to throttle

I discussed the need for a rate limiting method with @natebosch that has the following properties:

  • Emit the first event at the beginning of the period
  • Emit the last event at the end of the period

Given a period of 500ms (one - represents 100ms):

Source:  A-B----C-D-------E--F
Periods: |----|----|----  |----|----
Result:  A----B----D------E----F

This is similar to both audit and throttle with some key differences:

  • Unlike audit, this doesn't delay the initial event (or the initial event after a period has elapsed since the last event)

    Source:  A-B----C-D-------E--F
    Periods: ----|  ----|     ----|
    Result:  ----B------D---------F
    
  • Unlike throttle, this won't "miss" the last event in a period

    Source:  A-B----C-D-------E--F
    Periods: |----  |----     |----
    Result:  A------C---------E
    

Nate pointed out that this behavior could be implemented today using asyncMapSample:

Stream<T> firstAndLastInDuration(Stream<T> stream, Duration duration) {
  var controller = StreamController<T>(sync: true);
  stream.asyncMapSample((event) async {
    controller.add(event);
    await Future.delayed(duration);
  }).drain();
  return controller.stream;
}

Alternatively, the same behavior could be provided by adding a trailing argument to throttle which would make it also emit the last event in the period (and reset the timer). This would give it the exact behavior described above:

Source:  A-B----C-D-------E--F
Periods: |----|----|----  |----|----
Result:  A----B----D------E----F

Angular tests time out when using transformers from this package

Context

The toh-6 example AngularDart app has angular_test-based tests that run fine under Dart 1.23 and 1.24-dev.

Migration from stream_transformers

We recently switched out use of the stream_transformers package and used this one instead (because this package is strong mode compliant) -- cf. https://github.com/dart-lang/site-webdev/pull/719. Switching to this package was relatively straighforward:

stream_transformers transformer this package
new Debounce(...) debounce(...)
new FlatMapLatest(...) switchMap(...)

Hero search

One of the app features is a hero search which lists matching hero names as the user types, like this:

The hero search feature is explained in the AngularDart tutorial part 6, section on Streams.

Hero search tests

Some test checks if search box heroes-found list is appropriately updated as search text is progressively typed; e.g. this test:

test('list heroes matching ${searchText}g', () async {
    await _typeSearchTextAndRefreshPO('g');
    expect(await po.heroNames, ['Magneta', 'Magma']);
  });

...

Future _typeSearchTextAndRefreshPO(String searchText) async {
  await po.search.type(searchText);
  Stream<List> herosStream;
  await fixture.update((c) => herosStream = c.heroes);
  // Wait for the data to be fetched from db. ...
  await herosStream.first; // <***********************************************
  po = await fixture.resolvePageObject(HeroSearchPO);
}

As shown above in the _typeSearchTextAndRefreshPO() method, to test this feature, w/o having to wait for a fixed amount of time, the angular_test waits on the hero search result stream at this line -- marked with <**** above.

While this test works fine when using the transformers from stream_transformers, it fails (due to a timeout waiting on the stream) when using the transformers from this package.

cc @kwalrath @kevmoo

Cf. https://github.com/dart-lang/site-webdev/pull/719#discussion_r121031304

Flaky tests on Travis

The debounce and audit tests are flaky on Travis because we rely on a 10ms delay firing after a 5ms delay.

It's possible that #24 would make this more reliable. If not we could also consider bumping the tests to 15ms wait.

Filing for tracking but won't spend time on this until after we try sync controllers.

Events dropped when using startWith operator

Description

When using the startWith operator on a broadcast source stream, the events emitted by the source stream are ignored/dropped until Future.value(initialValue) completes.

Reproduction

E.g. the following code

final modelChange = StreamController<int>.broadcast();
modelChange.stream.startWith(1).listen(print);

// the following event is not printed, i.e. dropped!
modelChange.add(2);

// schedule the following code in a separate microtask
await null;

modelChange.add(3);

prints the sequence 1, 3, where the 2 is missing.

The reason for this is that at the time of modelChange.add(2), there is a separate microtask queued to complete Future.value(initial), while the modelChange.stream is not subscribed to yet.
The await null schedules the modelChange.add(3) in a separate microtask, thus the Future.value(initial) completes and we subscribe to the modelChange.stream, thus correctly capturing the modelChange.add(3) event.

This is very counterintuitive for users of startWith, since one would expect that anything that happens after the listen invocation is captured.

Suggested change

The mental model should be: startWith results in a stream with all of the events of the source stream after or at the moment of subscribing plus the initial value passed to startWith

The startWith operator could therefore be implemented like this:

extension Concatenate<T> on Stream<T> {
  Stream<T> startWithImproved(T initial) => Stream.value(initial).merge(this);
}

In this case,

final modelChange = StreamController<int>.broadcast();
modelChange.stream.startWithImproved(1).listen(print);

modelChange.add(2);

await null;

modelChange.add(3);

correctly prints 1,2,3;

Note: This, however, relies on the exact behavior of merge() to give priority to the Stream.value() stream.

Important: As described #152 (comment) in this implementation proposal is not behaving correctly for synchronous broadcast streams, so an alternative implementation as suggested in #151 (comment) should be considered and developed.

scan and switchMap don't correctly report stream type

new StreamTransformer(onListen) constructor doesn't create streams which know their type, so isBroadcast always reports false. This pattern is used in scan and switchMap so these produce Streams which don't behave correctly.

Make transformToHandlers public

Hello folks,

Currently transformFromHandlers isn't exported in lib/stream_transform.dart. Is it possible to expose it?

It's better than the standard fromHandlers transformer in the sense that it allows relying on a common state for broadcast streams since callbacks are called once per event only.

stream_transform with flutter

The current Dart SDK version is 2.6.0-dev.0.0.flutter-1103600280.

Because conn_demo depends on stream_transform >=0.0.20 which requires SDK version >=2.6.0 <3.0.0, version solving failed.
pub get failed (1)

Add prepend - the reverse of concat

In some places it might be useful to reverse the order of concatenation to reduce nesting.

values
    .transform(step1)
    .transform(step2)
    .transform(step3)
    .transform(prepend(startingValues));

vs

startingValues.transform(concat(values
    .transform(step1)
    .transform(step2)
    .transform(step3)));

The particular place I'm hitting this would be better solved with #19 so I'm less sure of the value here. It's also a little confusing to have prepend and startWith mean totally different things.

Recommendations on "closable" transformed streams?

This is not a bug. Rather, I am seeking recommendations from the experts :-)

Normally we listen to a broadcast Stream with a StreamSubscription. When we are done processing events from that stream, we clean up by closing the StreamSubscription.

There is no easy way to clean up a transformed stream. For example:

final outputStream = stream1.combineLatest(stream2, combinerFn);

According to the API, outputStream won't be closed until both stream1 and stream2 are closed. In reality, however, we may want to close outputStream and clean up the internal StreamSubscriptions, so combinerFn won't fire any more. But we do not want to close stream1 or stream2 - they may be owned by third-party libraries, or they are still serving other parts of the app.

We have internal customers who have a large number of stream transformations and are concerned about the wasted memory and CPU cycles.

Before rolling our own combineLatest solution that supports proper cleanups, I would like to understand if this is the right way to go. Or is it something that can be resolved within package:stream_transform?

Thanks!

Implement a `combineLatest` transform

I'd expect usage to look like this:

firstStream.transform(combineLatest(secondStream, (a, b) => a + b));

I'd expect us to not pass null to the combine in case one of the streams hasn't emitted yet - to get that behavior you'd could use a startWith to force an "immediate" start for one or both streams.

Are you interested in collaboration?

Hey hey :) The RxDart project actually has a layer of about 30 StreamTransformers. It would be quite simple to massage our transformers to fit into this library's style. Our transformers are pretty well tested by this point through a few rounds of feedback, and we could easily migrate those as well to fit this libs style. Also, each transformer has decent DartDocs included.

Anyhow, if you're up for a bit of collaboration, let us know!

Update, totally failed to paste a link: https://github.com/ReactiveX/rxdart/tree/master/lib/src/transformers

asyncMapSample is not dropping all events while busy

It seems like asyncMapSample is not dropping all events while convert is executing. Instead what seems to happened is that the next event is queued up, but all other events are dropped,

Am I missing something?

The docs say:
"asyncMapSample prevents overlapping execution and discards events while it is executing. "

import 'dart:async';

import 'package:flutter_test/flutter_test.dart';
import 'package:stream_transform/stream_transform.dart';

main() {
  test('discards event while processing [convert]', () async {
    StreamController<Future Function()> controller = StreamController();

    controller.stream.asyncMapSample((asyncFunction) async {
      await asyncFunction();
    }).listen((event) {});

    final increment = expectAsync0(
      () async {
        await Future.delayed(const Duration(milliseconds: 5));
      },
      count: 1,
    );

    // Spam the controller
    for (int i = 0; i < 50; i++) {
      /// My expection was that this would only be called once
      /// since after adding the first event the "convert" function should be busy
      controller.add(increment);
    }

    await Future.delayed(const Duration(milliseconds: 20));

    await controller.close();
  });
}


Output from running the test:


00:02 +0 -1: discards event while processing [convert] [E]
  Callback called more times than expected (1).
  package:test_api                                          _ExpectedFunction.max0
  test\async_map_test.dart 11:26                            main.<fn>.<fn>
  package:stream_transform/src/async_map.dart 124:18        AsyncMap._asyncMapThen.<fn>
  package:stream_transform/src/from_handlers.dart 29:50     TransformByHandlers.transformByHandlers.<fn>.<fn>
  dart:async                                                _StreamController.add
  package:stream_transform/src/aggregate_sample.dart 57:18  AggregateSample.aggregateSample.emit
  package:stream_transform/src/aggregate_sample.dart 86:9   AggregateSample.aggregateSample.onTrigger
  ===== asynchronous gap ===========================
  dart:async                                                Future.catchError
  package:stream_transform/src/async_map.dart 124:41        AsyncMap._asyncMapThen.<fn>
  package:stream_transform/src/from_handlers.dart 29:50     TransformByHandlers.transformByHandlers.<fn>.<fn>
  dart:async                                                _StreamController.add
  package:stream_transform/src/aggregate_sample.dart 57:18  AggregateSample.aggregateSample.emit
  package:stream_transform/src/aggregate_sample.dart 86:9   AggregateSample.aggregateSample.onTrigger
  ===== asynchronous gap ===========================
  dart:async                                                _StreamImpl.listen
  package:stream_transform/src/from_handlers.dart 29:22     TransformByHandlers.transformByHandlers.<fn>
  dart:async                                                _StreamImpl.listen
  test\async_map_test.dart 12:8                             main.<fn>

asyncMapSample crashes on null values

I tried using asyncMapSample on a Stream<void> (to which I'm passing null since the data doesn't matter) and found that it crashes immediately due to the following null assertion:

controller.add(currentResults!);

One can work around this issue for a Stream<void> by passing a non-null object since the value doesn't matter, but I suspect this prevents this extension method from being used on a stream of nullable values (e.g. Stream<String?>).

Enforcing a minimum wait time between emissions without delaying the first

Given a stream with events at t = 0, 1, 2, 5, 6, is there a way to transform this into a stream where each event adds a 2 second hold until the next event is emitted?

I tried .asyncMapBuffer((events) => Future.delayed(const Duration(seconds: 2), () => events.last)), but that adds an unwanted 2 second delay to the very first event, too.

Add concurrentAsyncExpand?

We have Stream.asyncMap, concurrentAsyncMap, Stream.asyncExpand... but no concurrentAsyncExpand.

StreamTransformer<S, T> concurrentAsyncExpand<S, T>(Stream<T> convert(S event))

Example:

Iterable<Future<int>> futures = [3, 2, 1].map((i) => Future.delayed(Duration(seconds: i), () => i));
Stream.fromIterable(futures)
  .transform(concurrentAsyncExpand((future) async* {
    yield 'awaiting future';
    var result = await future;
    yield 'future completed: $result';
  }))
  .forEach(print);

Output:

awaiting future
awaiting future
awaiting future
future completed: 1
future completed: 2
future completed: 3

`scan` like transform with different output

Hi,

this might be more of a question / clarification instead of a feature request.

I had the following task to solve, and only came up with a very cumbersome solution using multiple (data leaking) transforms, and either I have overlooked something, or there is really a place for a new transform.

The basic premise is this: I have an end-less scroll view that emits the last viewed item's index. And I have an API where I can load pages of 10 items each.

My approach is then, whenever the viewItemIndex Sink is pushed to, to calculate the next page after that item, and then retrieve all pages between the last loaded page and the desired one.

The last step (using max()-like logic and expanding the single input to a stream of pages to load) is what makes my current implementation a little cumbersome (I use scan to save the previously loaded index, and then that state gets passed to an asyncExpand that emits loads from previous to current page index).

I think a better API for this case (and all cases where one wants a little bit of encapsulated, private state in order to produce the next result) would be something I dub scanExpand here (just using expand here for the biggest version, people could of course also only return 1 item).

    _lastViewedProductIndex.stream
        .map(pageFromIndex)
        .transform(scan(0 /* highest loaded page */,
            (int highestLoadedPage, int nextDesiredPage) sync* /* returns Stream<int> */ {
                for (var highestLoadedPage = nextDesiredPage; i <= highestLoadedPage; i++) {
                    yield i;
                }

                return nextDesiredPage; // Not how to best type this in Dart, what we need would maybe be an Result object with `nextState` and `stream` property
          }
        ).asyncMap(loadPage);

I hope I made my use-case clear. Do you see the value in this, or would something similar already be possible with existing constructs?

Thanks for any input you have :)!

Implement asyncSwitchMap

If would be handy to have something like asyncSwitchMap in this package.

Use case
Creation of the child stream requires asynchronous code execution.

Example:
Loading user's items from the database. UI must react on changes in both user info stream and item info stream.

/// Provider interface for database
abstract class Provider {
  Furure<T> provide();
}
/// Abstract database class
abstract class Database {
  Stream<UserItem> findByUserId(String userId);
}
/// Item repository
class ItemRepository {
  ...
  Stream<UserItem> findByUserId(Stream<User> userStream) {
    userStream
      .swtichMap((User user){
        /// Database needs to be accessed with `await` keyword. Currently it is not possible to achieve this syntax with switchMap()
        Database db = await _databaseProvider.provide();
        return db.findbyUserId(user.id);
      });
  }
}

If this implementation is possible, it would be really nice to have it in order to make code cleaner.

Consider adding an argument to `buffer` to match ReactiveX behavior

While I was trying to more deeply understand the reactive docs I found an interesting behavior difference between our buffer and reactive buffer(bufferClosingSelector) which I suspect makes them fit fairly different use cases.

In Reactive, if you send a "close" selector when there are no events buffered you get an empty list immediately. In stream_transform if you send a trigger when there are no events buffered you get set up to be immediately notified of the next even in a single item list. This is a bit like a "long polling" behavior for buffering, vs the strict immediate polling from reactive.

Typically I don't like to add speculative generality, however in this case it might be worth adding the support eagerly since the additional generality may actually make it easier to document and compare to Rx.

Add replayLatest

For non-broadcast streams this would implicitly call .asBroadcastStream(). For any new listeners on a Stream the most recently emitted event will always be emitted.

This one might not quite be in the same scope as most stuff in this package since it will require a new Stream implementation that lets us have some behavior for every new listener. I think by default with a broadcast StreamController you only get the onListen happening any time we go from zero to 1 listeners, not when we get new listeners. I also don't think it has the right hook to send an event to just one listener.

Since that's also a significant deviation from normal Stream behavior we should be careful here and avoid it if it would be too confusing.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.