Coder Social home page Coder Social logo

streams's Introduction

Streams API

Where Did All the Text Go?

We are in the process of transitioning this specification from a GitHub README into something a bit more palatable. The official-lookin' version is developed in the official-lookin branch's index.html file, which you can see on GitHub, or in its rendered glory at this long URL.

Right now, we've transferred over most of the concepts and text, but none of the algorithms or APIs. We'll be iterating on the APIs a bit more here, in Markdown format, until we feel confident in them. In the meantime, please check out the rendered spec for all of the interesting stage-setting text.

By the way, this transition is being tracked as #62.

Readable Stream APIs

BaseReadableStream

class BaseReadableStream {
    constructor({
        function start = () => {},
        function pull = () => {},
        function cancel = () => {}
    })

    // Reading data from the underlying source
    any read()
    Promise<undefined> wait()
    get ReadableStreamState state

    // Composing with other streams
    WritableStream pipeTo(WritableStream dest, { ToBoolean close = true } = {})
    ReadableStream pipeThrough({ WritableStream in, ReadableStream out }, options)

    // Stop accumulating data
    void cancel(any reason)

    // Useful helper
    get Promise<undefined> closed

    // Internal properties
    Array [[buffer]] = []
    boolean [[started]] = false
    boolean [[draining]] = false
    boolean [[pulling]] = false
    string [[state]] = "waiting"
    any [[storedError]]
    Promise<undefined> [[waitPromise]]
    Promise<undefined> [[closedPromise]]
    Promise [[startedPromise]]
    function [[onCancel]]
    function [[onPull]]

    // Internal methods for use by the underlying source
    [[push]](any data)
    [[close]]()
    [[error]](any e)

    // Other internal helper methods
    [[callPull]]()
}

enum ReadableStreamState {
    "readable"  // the buffer has something in it; read at will
    "waiting"   // the source is not ready or the buffer is empty; you should call wait
    "closed"  // all data has been read from both the source and the buffer
    "errored"   // the source errored so the stream is now dead
}

Properties of the BaseReadableStream prototype

constructor({ start, pull, cancel })

The constructor is passed several functions, all optional:

  • start(push, close, error) is typically used to adapt a push-based data source, as it is called immediately so it can set up any relevant event listeners, or to acquire access to a pull-based data source.
  • pull(push, close, error) is typically used to adapt a pull-based data source, as it is called in reaction to read calls, or to start the flow of data in push-based data sources. Once it is called, it will not be called again until its passed push function is called.
  • cancel() is called when the readable stream is canceled, and should perform whatever source-specific steps are necessary to clean up and stop reading.

Both start and pull are given the ability to manipulate the stream's internal buffer and state by being passed the this.[[push]], this.[[close]], and this.[[error]] functions.

  1. If IsCallable(start) is false, throw a TypeError exception.
  2. If IsCallable(pull) is false, throw a TypeError exception.
  3. If IsCallable(cancel) is false, throw a TypeError exception.
  4. Set this.[[onCancel]] to cancel.
  5. Set this.[[onPull]] to pull.
  6. Let this.[[waitPromise]] be a newly-created pending promise.
  7. Let this.[[closedPromise]] be a newly-created pending promise.
  8. Let startResult be the result of start(this.[[push]], this.[[close]], this.[[error]]).
  9. ReturnIfAbrupt(startResult).
  10. Let this.[[startedPromise]] be the result of casting startResult to a promise.
  11. Upon fulfillment of this.[[startedPromise]], set this.[[started]] to true.
  12. Upon rejection of this.[[startedPromise]] with reason r, call this.[[error]](r).
get state
  1. Return this.[[state]].
read()
  1. If this.[[state]] is "waiting" or "closed", throw a TypeError exception.
  2. If this.[[state]] is "errored", throw this.[[storedError]].
  3. Assert: this.[[state]] is "readable".
  4. Assert: this.[[buffer]] is not empty.
  5. Let data be the result of shifting an element off of the front of this.[[buffer]].
  6. If this.[[buffer]] is now empty,
    1. If this.[[draining]] is true,
      1. Set this.[[state]] to "closed".
      2. Let this.[[waitPromise]] be a newly-created promise rejected with a TypeError exception.
      3. Resolve this.[[closedPromise]] with undefined.
    2. If this.[[draining]] is false,
      1. Set this.[[state]] to "waiting".
      2. Let this.[[waitPromise]] be a newly-created pending promise.
      3. Call this.[[callPull]]().
  7. Return data.
wait()
  1. If this.[[state]] is "waiting",
    1. Call this.[[callPull]]().
  2. Return this.[[waitPromise]].
cancel()
  1. If this.[[state]] is "closed", return a new promise resolved with undefined.
  2. If this.[[state]] is "errored", return a new promise rejected with this.[[storedError]].
  3. If this.[[state]] is "waiting", resolve this.[[waitPromise]] with undefined.
  4. If this.[[state]] is "readable", let this.[[waitPromise]] be a new promise resolved with undefined.
  5. Clear this.[[buffer]].
  6. Set this.[[state]] to "closed".
  7. Resolve this.[[closedPromise]] with undefined.
  8. Return the result of promise-calling this.[[onCancel]]().
get closed
  1. Return this.[[closedPromise]].
pipeTo(dest, { close })
BaseReadableStream.prototype.pipeTo = (dest, { close = true } = {}) => {
    const source = this;
    close = Boolean(close);

    fillDest();
    return dest;

    function fillDest() {
        if (dest.state === "writable") {
            pumpSource();
        } else if (dest.state === "waiting") {
            dest.wait().then(fillDest, cancelSource);
        } else {
            // Source has either been closed by someone else, or has errored in the course of
            // someone else writing. Either way, we're not going to be able to do anything
            // else useful.
            cancelSource();
        }
    }

    function pumpSource() {
        if (source.state === "readable") {
            dest.write(source.read()).catch(cancelSource);
            fillDest();
        } else if (source.state === "waiting") {
            source.wait().then(fillDest, abortDest);
        } else if (source.state === "closed") {
            closeDest();
        } else {
            abortDest();
        }
    }

    function cancelSource(reason) {
        source.cancel(reason);
    }

    function closeDest() {
        if (close) {
            dest.close();
        }
    }

    function abortDest(reason) {
        // ISSUE: should this be preventable via an option or via `options.close`?
        dest.abort(reason);
    }
};
pipeThrough({ input, output }, options)
  1. If Type(input) is not Object, then throw a TypeError exception.
  2. If Type(output) is not Object, then throw a TypeError exception.
  3. Let stream be the this value.
  4. Let result be the result of calling Invoke(stream, "pipeTo", (options)).
  5. ReturnIfAbrupt(result).
  6. Return output.

Internal Methods of BaseReadableStream

[[push]](data)
  1. If this.[[state]] is "waiting",
    1. Push data onto this.[[buffer]].
    2. Set this.[[pulling]] to false.
    3. Set this.[[state]] to "readable".
    4. Resolve this.[[waitPromise]] with undefined.
  2. If this.[[state]] is "readable",
    1. Push data onto this.[[buffer]].
    2. Set this.[[pulling]] to false.
  3. Return false.
[[close]]()
  1. If this.[[state]] is "waiting",
    1. Resolve this.[[waitPromise]] with undefined.
    2. Resolve this.[[closedPromise]] with undefined.
    3. Set this.[[state]] to "closed".
  2. If this.[[state]] is "readable",
    1. Set this.[[draining]] to true.
[[error]](e)
  1. If this.[[state]] is "waiting",
    1. Set this.[[state]] to "errored".
    2. Set this.[[storedError]] to e.
    3. Reject this.[[waitPromise]] with e.
    4. Reject this.[[closedPromise]] with e.
  2. If this.[[state]] is "readable",
    1. Clear this.[[buffer]].
    2. Set this.[[state]] to "errored".
    3. Set this.[[storedError]] to e.
    4. Let this.[[waitPromise]] be a newly-created promise object rejected with e.
    5. Reject this.[[closedPromise]] with e.
[[callPull]]()
  1. If this.[[pulling]] is true, return.
  2. Set this.[[pulling]] to true.
  3. If this.[[started]] is false,
    1. Upon fulfillment of this.[[startedPromise]],
      1. Let pullResult be the result of this.[[onPull]](this.[[push]], this.[[close]], this.[[error]]).
      2. If pullResult is an abrupt completion, call this.[[error]](pullResult.[[value]]).
  4. If this.[[started]] is true,
    1. Let pullResult be the result of this.[[onPull]](this.[[push]], this.[[close]], this.[[error]]).
    2. If pullResult is an abrupt completion, call this.[[error]](pullResult.[[value]]).

ReadableStream

class ReadableStream extends BaseReadableStream {
    // Adds a backpressure strategy argument.
    constructor({
        function start = () => {},
        function pull = () => {},
        function cancel = () => {},
        strategy: { function count, function needsMoreData }
    })

    // Overriden to do bookkeeping for the backpressure strategy
    any read()

    // Supports multi-pipe by default, overriding base behavior.
    WritableStream pipeTo(WritableStream dest, { ToBoolean close = true } = {})

    // Overriden to take into account the backpressure strategy.
    // You can also think of this as part of the constructor override, i.e. it passes
    //   in a different function to `start` and `pull`.
    [[push]](data)

    // Internal properties
    [[tee]]
    [[strategy]]
    [[bufferSize]] = 0
}

Properties of the ReadableStream Prototype

constructor({ start, pull, cancel, strategy })
  1. Set this.[[strategy]] to strategy.
  2. Call super({ start, pull, cancel }).
read()
  1. Let data be super().
  2. Subtract this.[[strategy]].count(data) from this.[[bufferSize]].
  3. Return data.
pipeTo(dest, { close })
  1. Let alreadyPiping be true.
  2. If this.[[tee]] is undefined, let this.[[tee]] be a new TeeStream and set alreadyPiping to false.
  3. Call this.[[tee]].addOut(dest, { close }).
  4. If alreadyPiping is false, call super(this.[[tee]], { close: true }).
  5. Return dest.

Internal Methods of ReadableStream

[[push]](data)
  1. Call BaseReadableStream's version of this.[[push]](data).
  2. If this.[[state]] is now "readable",
    1. Add this.[[strategy]].count(data) to this.[[bufferSize]].
    2. Return this.[[strategy]].needsMoreData(this.[[bufferSize]]).
  3. Return false.

Writable Stream APIs

BaseWritableStream

class BaseWritableStream {
    constructor({
        function start = () => {},
        function write = () => {},
        function close = () => {},
        function abort = close
    })

    // Writing data to the underlying sink
    Promise<undefined> write(any data)
    Promise<undefined> wait()
    get WritableStreamState state

    // Close off the underlying sink gracefully; we are done.
    Promise<undefined> close()

    // Close off the underlying sink forcefully; everything written so far is suspect.
    Promise<undefined> abort(any reason)

    // Useful helpers
    get Promise<undefined> closed

    // Internal methods
    [[error]](any e)
    [[advanceBuffer]]()
    [[doClose]]()
    [[doNextWrite]]({ type, promise, data })

    // Internal properties
    Array [[buffer]] = []
    string [[state]] = "waiting"
    any [[storedError]]
    Promise<undefined> [[currentWritePromise]]
    Promise<undefined> [[writablePromise]]
    Promise<undefined> [[closedPromise]]
    function [[onWrite]]
    function [[onClose]]
    function [[onAbort]]
}

enum WritableStreamState {
    "writable" // the sink is ready and the buffer is not yet full; write at will
    "waiting"  // the sink is not ready or the buffer is full; you should call wait
    "closing"  // the sink is being closed; no more writing
    "closed"   // the sink has been closed
    "errored"  // the sink errored so the stream is now dead
}

Properties of the BaseWritableStream prototype

constructor({ start, write, close, abort })

The constructor is passed several functions, all optional:

  • start() is called when the writable stream is created, and should open the underlying writable sink. If this process is asynchronous, it can return a promise to signal success or failure.
  • write(data, done, error) should write data to the underlying sink. It can call its done or error parameters, either synchronously or asynchronously, to respectively signal that the underlying resource is ready for more data or that an error occurred writing. The stream implementation guarantees that this function will be called only after previous writes have succeeded (i.e. called their done parameter), and never after close or abort is called.
  • close() should close the underlying sink. If this process is asynchronous, it can return a promise to signal success or failure. The stream implementation guarantees that this function will be called only after all queued-up writes have succeeded.
  • abort() is an abrupt close, signaling that all data written so far is suspect. It should clean up underlying resources, much like close, but perhaps with some custom handling. Unlike close, abort will be called even if writes are queued up, throwing away that data.

In reaction to calls to the stream's .write() method, the write constructor option is given data from the internal buffer, along with the means to signal that the data has been successfully or unsuccessfully written.

  1. If IsCallable(start) is false, throw a TypeError exception.
  2. If IsCallable(write) is false, throw a TypeError exception.
  3. If IsCallable(close) is false, throw a TypeError exception.
  4. If IsCallable(abort) is false, throw a TypeError exception.
  5. Set this.[[onWrite]] to write.
  6. Set this.[[onClose]] to close.
  7. Set this.[[onAbort]] to abort.
  8. Let this.[[writablePromise]] be a newly-created pending promise.
  9. Let this.[[closedPromise]] be a newly-created pending promise.
  10. Call start() and let startedPromise be the result of casting the return value to a promise.
  11. When/if startedPromise is fulfilled, call this.[[advanceBuffer]]().
  12. When/if startedPromise is rejected with reason r, call this.[[error]](r).
get closed
  1. Return this.[[closedPromise]].
get state
  1. Return this.[[state]].
write(data)
  1. If this.[[state]] is "writable",
    1. Set this.[[state]] to "waiting".
    2. Set this.[[writablePromise]] to be a newly-created pending promise.
    3. Let promise be a newly-created pending promise.
    4. Call this.[[doNextWrite]]({ type: "data", promise, data }).
    5. Return promise.
  2. If this.[[state]] is "waiting",
    1. Let promise be a newly-created pending promise.
    2. Push { type: "data", promise, data } onto this.[[buffer]].
    3. Return promise.
  3. If this.[[state]] is "closing" or "closed",
    1. Return a promise rejected with a TypeError exception.
  4. If this.[[state]] is "errored",
    1. Return a promise rejected with this.[[storedError]].
close()
  1. If this.[[state]] is "writable",
    1. Set this.[[state]] to "closing".
    2. Call this.[[doClose]]().
    3. Return this.[[closedPromise]].
  2. If this.[[state]] is "waiting",
    1. Set this.[[state]] to "closing".
    2. Push { type: "close", promise: undefined, data: undefined } onto this.[[buffer]].
    3. Return this.[[closedPromise]].
  3. If this.[[state]] is "closing" or "closed",
    1. Return a promise rejected with a TypeError exception.
  4. If this.[[state]] is "errored",
    1. Return a promise rejected with this.[[storedError]].
abort(reason)
  1. If this.[[state]] is "closed", return a new promise resolved with undefined.
  2. If this.[[state]] is "errored", return a new promise rejected with this.[[storedError]].
  3. Call this.[[error]](reason).
  4. Return the result of promise-calling this.[[onAbort]]().
wait()
  1. Return this.[[writablePromise]].

Internal Methods of BaseWritableStream

[[error]](e)
  1. If this.[[state]] is "closed" or "errored", return.
  2. For each entry { type, promise, data } in this.[[buffer]], reject promise with r.
  3. Clear this.[[buffer]].
  4. Set this.[[state]] to "errored".
  5. Set this.[[storedError]] to e.
  6. Reject this.[[writablePromise]] with e.
  7. Reject this.[[closedPromise]] with e.
[[advanceBuffer]]()
  1. If this.[[buffer]] is not empty,
    1. Shift entry off of this.[[buffer]].
    2. Call this.[[doNextWrite]](entry).
  2. If this.[[buffer]] is empty,
    1. Set this.[[state]] to "writable".
    2. Resolve this.[[writablePromise]] with undefined.
[[doClose]]()
  1. Reject this.[[writablePromise]] with a TypeError exception.
  2. Let closePromise be the result of promise-calling this.[[onClose]]().
  3. Upon fulfillment of closePromise,
    1. Set this.[[state]] to "closed".
    2. Resolve this.[[closedPromise]] with undefined.
  4. Upon rejection of closePromise with reason r,
    1. Call this.[[error]](r).
[[doNextWrite]]({ type, promise, data })
  1. If type is "close",
    1. Assert: this.[[state]] is "closing".
    2. Call this.[[doClose]]().
    3. Return.
  2. Assert: type must be "data".
  3. Set this.[[currentWritePromise]] to promise.
  4. Let signalDone be a new function of zero arguments, closing over this and promise, that performs the following steps:
    1. If this.[[currentWritePromise]] is not promise, return.
    2. Set this.[[currentWritePromise]] to undefined.
    3. If this.[[state]] is "waiting",
      1. Resolve promise with undefined.
      2. Call this.[[advanceBuffer]]().
    4. If this.[[state]] is "closing",
      1. Resolve promise with undefined.
      2. If this.[[buffer]] is not empty,
        1. Shift entry off of this.[[buffer]].
        2. Call this.[[doNextWrite]](entry).
  5. Call this.[[onWrite]](data, signalDone, [[error]]).
  6. If the call throws an exception e, call this.[[error]](e).

Note: if the constructor's write option calls done more than once, or after calling error, or after the stream has been aborted, then signalDone ends up doing nothing.

WritableStream

class WritableStream extends BaseWritableStream {
    // Adds a backpressure strategy argument.
    constructor({
        function start = () => {},
        function write = () => {},
        function close = () => {},
        function abort = close,
        strategy: { function count, function needsMoreData }
    })

    // Overriden to take into account backpressure strategy
    Promise<undefined> write(data)

    // Overriden to take into account backpressure strategy.
    // You can also think of this as part of the the constructor and write override.
    [[doNextWrite]]({ type, promise, data })

    // Internal properties
    [[strategy]]
    [[bufferSize]] = 0
}

Properties of the WritableStream Prototype

constructor({ start, write, close, abort, strategy })
  1. Set this.[[strategy]] to strategy.
  2. Call super({ start, write, close, abort }).
write(data)
  1. If this.[[state]] is "writable" or "waiting",
    1. Add this.[[strategy]].count(data) to this.[[bufferSize]].
  2. If this.[[state]] is "writable",
    1. Let promise be a newly-created pending promise.
    2. If ToBoolean(this.[[strategy]].needsMoreData(this.[[bufferSize]])) is false,
      1. Set this.[[state]] to "waiting".
      2. Set this.[[writablePromise]] to be a newly-created pending promise.
    3. If this.[[buffer]] is empty,
      1. Call this.[[doNextWrite]]({ type: "data", promise, data }).
    4. Otherwise,
      1. Push { type: "data", promise, data } onto this.[[buffer]].
    5. Return promise.
  3. Return super(data).

Internal Methods of WritableStream

[[doNextWrite]]({ type, promise, data })
  1. Subtract this.[[strategy]].count(data) from this.[[bufferSize]].
  2. Return the result of calling BaseWritableStream's version of this.[[doNextWrite]]({ type, promise, data }).

Helper APIs

TeeStream

A "tee stream" is a writable stream which, when written to, itself writes to multiple destinations. It aggregates backpressure and abort signals from those destinations, propagating the appropriate aggregate signals backward.

class TeeStream extends BaseWritableStream {
    constructor() {
        this.[[outputs]] = [];

        super({
            write(data) {
                return Promise.all(this.[[outputs]].map(o => o.dest.write(data)));
            },
            close() {
                const outputsToClose = this.[[outputs]].filter(o => o.close);
                return Promise.all(outputsToClose.map(o => o.dest.write(data)));
            },
            abort(reason) {
                return Promise.all(this.[[outputs]].map(o => o.dest.abort(reason)));
            }
        });
    }

    addOut(dest, { close = true } = {}) {
        this.[[outputs]].push({ dest, close });
    }
}

LengthBufferingStrategy

A common buffering strategy when dealing with binary or string data is to wait until the accumulated length properties of the incoming data reaches a specified highWaterMark. As such, this is provided as a built-in helper along with the stream APIs.

class LengthBufferingStrategy {
    constructor({ highWaterMark }) {
        this.highWaterMark = Number(highWaterMark);

        if (Number.isNaN(this.highWaterMark) || this.highWaterMark < 0) {
            throw new RangeError("highWaterMark must be a nonnegative number.");
        }
    }

    count(chunk) {
        return chunk.length;
    }

    needsMoreData(bufferSize) {
        return bufferSize < this.highWaterMark;
    }
}

CountBufferingStrategy

A common buffering strategy when dealing with object streams is to simply count the number of objects that have been accumulated so far, waiting until this number reaches a specified highWaterMark. As such, this strategy is also provided as a built-in helper.

class CountBufferingStrategy {
    constructor({ highWaterMark }) {
        this.highWaterMark = Number(highWaterMark);

        if (Number.isNaN(this.highWaterMark) || this.highWaterMark < 0) {
            throw new RangeError("highWaterMark must be a nonnegative number.");
        }
    }

    count(chunk) {
        return 1;
    }

    needsMoreData(bufferSize) {
        return bufferSize < this.highWaterMark;
    }
}

streams's People

Contributors

domenic avatar marcoscaceres-remote avatar tyoshino avatar tzik avatar

Watchers

 avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.