Coder Social home page Coder Social logo

js-csp's Introduction

Build Status codecov Dependency Status devDependency Status

js-csp

Communicating sequential processes for Javascript (like Clojurescript core.async, or Go).

Maintainer wanted

This project is at maintenance mode at the moment, and actively looking for new maintainers. Please send us an issue via github if you are interested.

Current maintainer: hung-phan, gt3

Examples

const csp = require('js-csp');

Pingpong (ported from Go).

function* player(name, table) {
  while (true) {
    let ball = yield csp.take(table);

    if (ball === csp.CLOSED) {
      console.log(name + ": table's gone");
      return;
    }

    ball.hits += 1;
    console.log(`${name} ${ball.hits}`);

    yield csp.timeout(100);
    yield csp.put(table, ball);
  }
}

csp.go(function* () {
  const table = csp.chan();

  csp.go(player, ["ping", table]);
  csp.go(player, ["pong", table]);

  yield csp.put(table, {hits: 0});
  yield csp.timeout(1000);

  table.close();
});

There are more under examples directory.

Documentation

This is a very close port of Clojurescript's core.async. The most significant difference is that the IOC logic is encapsulated using generators (yield) instead of macros. Therefore resources on core.async or Go channels are also helpful.

Other

Or, if you use Python's Twisted: https://github.com/ubolonton/twisted-csp

Or, if you use Clojure: https://github.com/clojure/core.async

Install

npm install js-csp
bower install js-csp

Contribution

Feel free to open issues for questions/discussions, or create pull requests for improvement.

Some areas that need attention:

  • More documentation, examples, and maybe some visualization. Porting RxJS/Bacon examples may help.
  • Multiplexing, mixing, publishing/subscribing. These need to be tested more. The API could also be improved.
  • Deadlock detection.

Development

These commands are supposed to run separately

$ npm run test:watch
$ npm run lint # for code quality checking
$ npm run flow:watch # to stop server after you are done run npm run flow:stop

Production

$ npm run build

It will transpile all the codes in src to lib, or even better if you use webpack 2 to consume the lib via "module": "./src/csp.js".

Inspiration

License

Distributed under MIT License.

js-csp's People

Contributors

amilajack avatar fearphage avatar grossbart avatar hung-phan avatar jasonkuhrt avatar jimt avatar jlongster avatar jmarca avatar lm1 avatar loadedsith avatar lukasmlady avatar nmn avatar takashi avatar tiye avatar ubolonton avatar yrns avatar zeroware avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

js-csp's Issues

Synchronous handling of data when possible?

It looks like any data passed over a channel is always asynchronous, even when a receiver is already waiting. Is there a possibility that when a take is already waiting for a value, the put should fire synchronously?

This was a major hurdle in my flux implementation called Flexy. Using it I have some logic that fires an action to fetch data if such data doesn't exist in the store. However, if there are many such components, all of them fire the action before a single action is actually handled by the dispatcher. This is because, the action doesn't fire till next tick of the event loop.

I understand that always being async can help with the Zalgo issue, but synchronous dispatch when possible is a useful feature to have as well. Any thoughts?

multiple versions (instances?) of js-csp breaks implicit take

in a project with different modules requiring js-csp, implicit take breaks.

var c = someotherChan;  // assume this came from a library or something 
go(function*(){
  yield put(c, 123);
})
go(function*{
  console.log(yield c);
})

// expected - 123
// actual - the channel c itself 

Can js-csp be used in combination with other generator libraries?

This is really just a question (it might or might not really be an "issue"):

I was writing some code to walk a directory tree and upload the files to amazon S3.

And I fairly quickly got some code working that I liked to walk the directory tree, using generators and TJ Hallowaychuk's "co" library.

But to limit the number of files that could be simultaneously loaded to S3 at any one time (so as not to swamp the network), I thought js-csp might help me, where I was thinking my directory tree walking code would "put" to a channel containing the path of each file to be uploaded, and a fixed number of go routines could read from that channel and do the uploading. i.e. The number of go routines would fix the number of simultaneous uploads (in the pattern I believe is called a "fan out" - one guy pushing several guys taking).

Anyway my real question was that when I reached the point of wanting to connect the directory tree walking code on the one side with the uploading "generator" code I realized - "whoops this co library requires that I "yield" within a generator function wrapped in "co", and your js-csp library requires that I yield within a generator function wrapped in "go", and I'm not sure how to make them play well together?

Is it possible to use js-csp with other such generator libraries?

Are there any examples?

And to clarify: I tried to get around the above using putAsync from the directory tree walking side, but I came to realize those puts were never blocking. But I'm uploading 1.4TB(!) worth of files to S3 so I really want that to be the simply unbuffered blocking put.

i.e. I wanted the rate at which the uploading go routines do "take" to act like back-pressure and limit the rate at which the tree walking code progressed through the files. But it wasn't it was doing the puts without blocking and even dropping file names - unless I put an unacceptably huge buffer size on that channel.

(maybe the related question I didn't think to ask is: could I have used js-csp for everything i.e. also for the directory tree walking code so I'm doing the "puts" from a go routine? But where that go routine doing the tree walking can be using "yield" style not callbacks or promises? i.e. This code is in fact a rewrite of some older code written in callback style, but I'd like to use the generator "blocking" style everywhere. Not just with the js-csp channels but also with my file I/O too).

Thanks,

Darren

Scoped cancellable goroutines, try-catch-finally proposal

I'd like to propose some related features/improvements to js-csp that touches on pre-existing discussions on promise interactions (#13) and error handling (#14). These features are:

  1. Consider scoping the lifetime of goroutines to parent goroutines when yield go() is used
  2. Expose a .kill() or .terminate() fn on the return value of go() to make goroutines externally cancellable
  3. yielding a promise blocks on that promise, and leverages try-catch-finally semantics

Scoping goroutine lifetimes to parents

In node-csp (a different CSP implementation than js-csp), if a goroutine internally does a yield go(fn *), this creates a subprocess that terminates when the parent process terminates. This seems like a convenient feature, with afaik the only alternative in js-csp (and the pattern established in Go lang) to pass in a channel to the inner goroutine that signals when to terminate, which must then presumably be used in a prioritized select, etc etc. And in node-csp if you don't want to scope the lifetime of inner goroutines to a parent goroutine, you simply omit the yield. Would this possibly be a welcome improvement to js-csp, and if not, what are the conceptual arguments against it?

Externally kill-able routines

It would be nice to have a direct way of externally terminating a goroutine. Right now in js-csp, go() returns a channel that emits the value returned by the goroutine, which you can close, but it won't actually stop the goroutine. If we had this, we'd have a pretty nice pattern for cancellable tasks that don't have to constantly check at every step of the way (via selects() or checking a flag or whatever the approach) whether they should keep going. For example:

// pretend `go` returns a Process handle instead of a channel
let proc = go(function * () {
  setState({ word: "Connecting in a moment..." });

  yield csp.timeout(500);  

  let ws;
  try {
    ws = new WebSocket("http://wat.com");
    setState({ word: "Connecting now..." });

    yield makeOnConnectChannel(ws);

    setState({ word: "Connected!" });

    let chan = makeMessageChannel(ws);
    while (true) {
      let value = yield chan;
      setState({ word: `Got message: ${value}` });
    }
  } finally {
    ws.close();
    setState({ word: "Disconnected!" });
    // ...and any other cleanup
    // this block would be invoked upon termination
    // (a lesser known fact about generators is that calling .throw/.return
    // on them will still run finally blocks and even let you yield additional values before closing)
  }
});

// later, after user presses a button, or leaves a route
proc.kill();

While I understand the value of keeping the primitives/rules simple for managing processes (i.e. use channels for everything), the nice thing about the above is:

  1. It has all the niceties of the proposed async/await syntax, but with the ability to cancel (which promises won't be able to do for quite a long time)
  2. You avoid having to litter your generator function with all sorts of "should i keep going?" checks
  3. try/catch/finally can idiomatically be used for cleanup

Block on yielded promises

This pattern also touches upon some of the discussion in #13; here's an example that uses try-catch-finally under the assumption that yielding promises, blocks on them:

go(function * () {
  let ajaxRequest = ajax("/wat.json");
  try {
    let value = yield ajaxRequest.promise("/wat.json");
    setState({ message: `got value: ${value}` });
  } catch(e) { 
    setState({ message: "an error occurred" });
  } finally {
    ajaxRequest.close();
  }
});

If the ajax promise rejects, then it'll run the catch and finally handlers for error handling and cleanup. If the process is externally cancelled before the ajax returns, then the finally block closes the ajax request (again, something you can't do with promises).

Conclusion

I've implemented the above semantics in my app (with my a own custom and much less feature completion impl of CSP) and it's greatly cleaned up the logic in a complex asynchronous loop, all whilst avoiding the grunt work / boilerplate of checking cancellation channels whilst leveraging try-catch-finally for error handling and cleanup. So far, it seems like a very clean and powerful pattern to me, so I'd love to find out if there are any compelling reasons not to adopt something like this in js-csp.

Easing promise interactions

I don't feel ready to implement anything yet, but I wanted to file an issue to talk about it. Promises are certainly everywhere in JS, and I think we need to do something to make it easy for a new user to interact with them. When I imagine trying to sell this project to my team, it certainly makes me aware that it would be hard without some support for promises.

The simplest solution I can think of is a new operation takePromise (or called something else) that accepts a promise and blocks until it's resolved. I'm hesitant to make it so that you can just yield a promise and have it do the same thing, but we could do that. That would certainly be clean. However, it might be confusing because you no longer know when you are working with a channel or a promise.

We will need to figure out our error handling story with this too. In fact we should probably figure that out first. I'll file a new bug.

callback interop

Thank you for writing this. This finally looks like a proper channel implementation. I've seen a few others but they go off on some weird tangents, and I've been looking for just a simple port of Clojure's core.async.

The one thing I don't see anywhere is interop with callbacks/promises. Do you have any code for that? It would be good to be able to invoke a method that takes a callback and automatically have it transfer the result and/or error over a channel. Same thing with promises.

putAsync and "unclosed" channel

I'm not sure what the behavior of this program should be :

     function findAll() {
        var res = csp.chan();

        setTimeout(function () {
            csp.putAsync(res, 1);
            setTimeout(function () {
                csp.putAsync(res, 2);
                // Note that we *don't* call res.close(); here
            }, 500);
        }, 500);

        return res;
    }

    csp.go(function * () {

        var what = true;
        var findAllChan = findAll();
        while (what) {
            what = yield csp.take(findAllChan);
            console.log("Found", what);
        }
        console.log("Nothing else to find");
    });

In practice, I get :

Found 1
Found 2

If I close the channel inside of findAll, I get, as expected :

Found 1
Found 2
Found null
Nothing else to find

Why does the first version stop ? Is there an implicit "timeout" on taking from a channel ?

Is CSP the right or a possible solution?

I was reading the wonderful "Taming the Asynchronous Beast with CSP Channels in JavaScript" article again and got thinking that there might be more use for CSP for me besides how it can be used for "Taming User Interfaces".

After the re-read I started thinking about using CSP for aligning data to a grid or producing the points of the grid. For an example lets consider a music score and the alignment of the scores data to represent the notes vertically aligned. Could CSP be used to rhythmically space, align, condense the note durations with in the score's data? Below is what that looks like, where the top line is a condensation of all the rhythm points on the tracks in the score's data:

condensed score

Could CSP step through a JSON object that is constantly changing/being edited and keep a x-axis array of column points up to date? I see a possible function like a structured iterator that communicates as it iterates over the concurrent tracks, which contain sequences of spans.The essential iterable JSON data is the score file, with the following basic structure:

{
track : {
span : {
note : {
duration : value
}
...
}
...
}
...
}

The score contains any number of tracks and they contain a sequence of different types of spans, where one type has notes. A note has an attribute that specifies its duration, which can be any of the values like whole, half, quarter, 8th, 16th, 32nd, etc. The CSP function would then iterate over all tracks in a score concurrently and create a column entry at each point where one or more notes across tracks align. This would then result in a x-axis spacing table of column entries, which align notes across the whole score.

So far I'm thinking that each track would have its own CSP process and a channel would be used to tell the track processes to end one alignment calculation and start on the next one? The apps edit tools add/remove, parts and spans containing notes dynamically, so in this sense the score is never complete while it is being edited. This is why I was thinking that CSP processes would also need to be added/removed in the function so that it matches the score's changing track structure.

Also, if new notes are created on a given track at a location that is a ways into the score (from the origin) the function should only re-render alignment from that point forward. It is the functions's task to constantly and dynamically maintain an alignment grid that reflects the entities in the score.

What do you think? Is CSP useful or is there no advantage for this scenario?

Expose the Channel class

When I use channels in my code i'd like to be able to add JSDoc when a function return or expect a Channel.

Exposing the channel class can solve this. Or maybe there's another way ?

why is csp.CLOSED === null?

Hello!

I was wondering: why is csp.CLOSED currently equal to null? Wouldn't it make more sense to assign it an arbitrary object for reference equality (or possibly a Symbol in compatible environments) so "null" can be a usable value to send through a channel? I imagine this was your intention from the beginning (otherwise there'd be little point in giving CLOSED its own property assignment in the first place), just wondering if there's a reason it isn't already in place. I believe highland uses an empty object for the same purpose (_.nil) to allow streaming of null values.

Channel stops accepting puts

Hi,

I'm running into an obscure issue that I can't figure out. So I set up a inbound channel that takes HTTP requests coming from a client. Then I take those requests off the channel, process them and put them back onto an outbound channel. The server then takes the modified requests off the outbound channel and replies back to the client. When I send 2 or more concurrent requests to the server, the outbound channel becomes corrupt after 2 puts.

This is what the outbound channel looks like BEFORE any requests are sent:
utbound
{ buf:
{ buf: { length: 0, array: [Object], head: 0, tail: 0 },
n: 100 },
xform:
{ '@@transducer/step': [Function],
'@@transducer/result': [Function] },
takes:
{ length: 1,
array: [ [Object], , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ],
head: 1,
tail: 0 },
puts:
{ length: 0,
array: [ , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ],
head: 0,
tail: 0 },
dirty_takes: 1,
dirty_puts: 0,
closed: false }

And this is what it looks like AFTER 2 concurrent requests:
outbound
{ buf:
{ buf: { length: 0, array: [Object], head: 2, tail: 2 },
n: 100 },
xform:
{ '@@transducer/step': [Function],
'@@transducer/result': [Function] },
takes:
{ length: 0,
array: [ null, null, , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ],
head: 2,
tail: 2 },
puts:
{ length: 0,
array: [ , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ],
head: 0,
tail: 0 },
dirty_takes: 2,
dirty_puts: 0,
closed: false }

The only difference I can tell is that when this issue happens the "takes" array is missing the object at the end of it. If I issue requests manually the array grows as I expect and that object remains at the end of the array.

My code is in this gist: https://gist.github.com/rhasson/0460a2b261cf5422ec8f

The way to reproduce this problem is to run Apache Bench with concurrency of 2 or more, for example:

ab -n 4 -c 2 -p incomingcall.json -T 'application/json' -m POST http://localhost:9000/actions

Thanks,
Roy

transducers.drop(n) not working as expected

import csp, {go, chan} from 'js-csp';
let {fromColl, into, pipeline} = csp.operations;
import xd from 'transducers.js';

// the regular transducer works fine
console.log(xd.seq([1, 2, 3, 4, 5], xd.drop(2)))
// outputs [3, 4, 5]

// but not when used with a channel
go(function*(){
  var c = chan();
  pipeline(c, xd.drop(2), fromColl([1, 2, 3, 4, 5]));
  var res = yield into([], c);  
  console.log(res);
  // expected - [3, 4, 5]
  // actual - []
})

Is it possible to "untake" a channel?

Say I did something like this:

let value = yield take( ch );

But later I decided to cancel the take and let other process to handle it, is there any way to do that? Thanks.

Redis, RabbitMQ, Kafka adapters

Channels are an amazingly versatile transport mechanism to pass values between go blocks. In clojure, and go, they are also used to pass values between threads.

Since javascript has no threads, channels can and should be used to pass values between node processes. Since processes don't have any shared memory, this needs to be done using something like Redis to manage the common state.

Building a simple API to build various adapters for channels would make it an invaluable tool for multi-process management in Node.

Additionally, the same API should work for communication between master process and child process. And between the main process and web workers on the client-side.

Please let me know, how I can help with this.

putAsync usage

Hi

I'm trying to use putAsync to wrap an asynchronous ("callback based") function, and return a channel to give the result.

Following what I read here, http://jlongster.com/Taming-the-Asynchronous-Beast-with-CSP-in-JavaScript, I tried using putAsync this way :

module.exports = function (csp) {

    function findFoo() {
        var fooChan = csp.chan();
        setTimeout(function () {
            csp.putAsync(fooChan, "FOO");
        }, 200);
        return fooChan;
    }

    csp.go(function* () {
        var foo = yield csp.take(findFoo());
        console.log("Got", foo);
    });

};

This fails, with an error like this :

/home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/csp.bundled.node.js:1925
    callback(result.value);
    ^
TypeError: undefined is not a function
    at Object.put_then_callback [as putAsync] (/home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/csp.bundled.node.js:1925:5)
    at null._onTimeout (/home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/req.js:17:17)
    at Timer.listOnTimeout [as ontimeout] (timers.js:112:15)

Note that I am in a "strange" setup ; I'm using an version of the nodejs bundle that I transpiled to ES5 using "regenerator".

Is my code supposed to work at all ? Or am I missing something ?

Thanks

onto(ch, iterable, keepOpen?)

It would be beneficial to rely on an interface instead of specific implementations

var source
var sink = chan()

// Immtable.js
source = Immutable.Range(1, Infinity)
onto(sink, source, true)

// ES6 Set / Map
source = new Set([1, 2, 3]) // or map.keys()
onto(sink, source, true)

// ES6 generator
function * id () {
  var i = 0
  while (true) {
    yield i++
  }
}
onto(sink, id(), true)

Angular JS grid is not reloading after i change the grid options in my script

Hi
I used the following script to hide particular column based on the drop down selection.
var pos = $scope.gridOptions.columnDefs.map(function (e) { return e.field; }).indexOf('Coordinator');
if ($scope.ddl == "All") {
$scope.gridOptions.columnDefs[pos].visible = true;
}
else {
$scope.gridOptions.columnDefs[pos].visible = false;
}
But the visible property changing to true but the column is not visible in the grid. How to achieve this please suggest me in this case.
Thanks in Advance.

Instance checking?

csp.chan() is not an instance of csp.chan; I don't see any functions such as csp.isChan either. How should I check whether a supplied value is a CSP channel? Same goes for other entities.

should we allow yielding a channel and `take` happen by default?

This is something I've thought of a few times. We could pretty easily make it so that you can yield a channel and a take happens by default:

var ch = chan();
go(function*() {
  console.log(yield ch);
});

The downside is that it's not quite as readable. But as long as we never support anything like yielding a promise I think it's ok. I hate the idea of not knowing what's being yielded (is it a channel? or a promise? or a stream?) so we would still make the user do something like yield takePromise(x) to interface with promises. Since we only work with channels though, this might work.

The benefit is that now you don't need sleep AND timeout, you just need timeout, and you don't have to think about the difference. And in general it removes a lot of take calls.

(This would only be necessary when not using sweet.js macros. I have a macro that adds very nice syntax for this, which I will show soon :) )

js-CSP compared with streams and events

@jlongster wrote a great article introducing js-CSP, and does a short comparison with promises. I think a more fair comparison would be to streams such as RxJS and Bacon.js.

This, issue will also play into Multiplexing, mixing, publishing/subscribing .

The major difference I see between channels and streams other than generator functions and API is:

One-To-Many Streams

Streams act like events and broadcast their values. As a result you can attach many listeners to the same stream of data. Channels, act as value bridges. One value put on one end, is only received by one receiver.

// stream is a stream of data.
var s1 = stream.map(fn1);
var s2 = stream.map(fn2);

// both s1, and s2, will be mapped from all the values fed into stream.
// The values are copied for every method called on stream.

However, the one-to-many data stream can be simulated with a helper function, and Dropping channels.

function toMany(ch){
  var channels = [];
  go(function*(){
    while(true){
      var val = yield take(ch);
      for(var i = 0; i < channels.length; i++){
        yield put(channels[i], val);
      }
    }
  });

  return function(){
    var ch = chan(buffers.dropping(0));
    channels.push(ch);
    return ch;
  }
}

Now, this can be used in exactly the same way that streams from RxJS and Bacon can be used.
(it would be better to use a set to hold channels rather than an array in an ES6 world)

// ch is a channel where values are being put
var subscribe = toMany(ch);
var ch1 = subscribe();
var ch2 = subscribe();
// ch1 and ch2 now both have all the values being put onto ch.
// using methods from transducers, and toMany downstream again, you can simulate all the behaviour of streams.

This, I think shows the power and flexibility of channels. RxJS and BaconJS are amazing for doing async programming in a pre-generator world. But Channels are a lower-level construct that can be used to be used in the same ways, but also in other ways.

With a few helper methods, channels can be used to achieve the same semantics of promises or of streams, or a one receiver semantic that is unique.

Note: this function like other functions, still needs work to account for closed channels.

One downside of the one-to-many semantic of streams is that it is hard to send a message upstream telling the origin to stop emitting data.

What does "alts" stand for?

What does the name of the alts function stand for? Where might the original definition be (in whatever language it happens to be in)?

Use js-csp with babel and browserify

Hi!

I don't know if this is an appropriate location for ask this kind of questions. Let me know if it is not.

I found that js-csp publish in npm directly the es6 source code (without the traspilation process), that is ok. But when I use it with browserify+babelify, the source code is just incliuded in my bundle and not preprocessed. This means that the result bundle I can use only with modern browsers...

I have tried https://github.com/babel/babelify#why-arent-files-in-node_modules-being-transformed but it does not works for me! I'm missing something? Thank you!

Transducer integration

The channel from @jlongster's blog implementation seems broken (in firefox and regenerator):

Full test:

var csp = require('js-csp');

function compose(f,g) {
    return function(x) {
        return f(g(x));
    }
}

function map(op) {
    return function(reduce) {
        return function(a,x) {
            return reduce(a, op(x));
        }
    }
}

function filter(pre) {
    return function(reduce) {
        return function(a,x) {
            return pre(x)? reduce(a,x): a;
        }
    }
}

var xform = compose(
    map(function(x) {
        return x * 2
    }),
    filter(function(x) {
        return x > 5
    })
);

var ch = csp.chan(1, xform);

csp.go(function*() {
    yield csp.put(ch, 1);
    yield csp.put(ch, 2);
    yield csp.put(ch, 3);
    yield csp.put(ch, 4);
});

csp.go(function*() {
    while(!ch.closed) {
        console.log(yield csp.take(ch));
    }
});

It outputs

2 6 8

It seems the "2" is not even processed by the xform.
Any idea what's going on?

Unable to require js-csp using babel

I'm on the latest version of babel (5.6.14) and js-csp (0.4.1). An error is thrown on the * generator symbol from line 68 in 'js-csp/src/csp.operations.js'. Any ideas why?

$ babel-node --version
5.6.14

$ babel-node

> var csp = require('js-csp');
/learn/flux-chat-example-rxjs/flux-chat/node_modules/js-csp/src/csp.operations.js:68
  go(function*() {
             ^
SyntaxError: Unexpected token *
    at exports.runInThisContext (vm.js:73:16)
    at Module._compile (module.js:443:25)
    at Module._extensions..js (module.js:478:10)
    at Object.require.extensions.(anonymous function) [as .js] (/usr/local/lib/node_modules/babel/node_modules/babel-core/lib/babel/api/register/node.js:177:7)
    at Module.load (module.js:355:32)
    at Function.Module._load (module.js:310:12)
    at Module.require (module.js:365:17)
    at require (module.js:384:17)
    at Object.<anonymous> (/Users/ddinh/workspace/learn/flux-chat-example-rxjs/flux-chat/node_modules/js-csp/src/csp.js:4:18)
    at Module._compile (module.js:460:26)
    at Module._extensions..js (module.js:478:10)
    at Object.require.extensions.(anonymous function) [as .js] (/usr/local/lib/node_modules/babel/node_modules/babel-core/lib/babel/api/register/node.js:177:7)
    at Module.load (module.js:355:32)
    at Function.Module._load (module.js:310:12)
    at Module.require (module.js:365:17)
    at require (module.js:384:17)
    at repl:3:11
    at Object.exports.runInThisContext (vm.js:74:17)
    at _eval (/usr/local/lib/node_modules/babel/bin/_babel-node:54:13)
    at REPLServer.replEval (/usr/local/lib/node_modules/babel/bin/_babel-node:137:14)
    at bound (domain.js:254:14)
    at REPLServer.runBound [as eval] (domain.js:267:12)
    at REPLServer.<anonymous> (repl.js:279:12)
    at REPLServer.emit (events.js:107:17)
    at REPLServer.Interface._onLine (readline.js:214:10)
    at REPLServer.Interface._line (readline.js:553:8)
    at REPLServer.Interface._ttyWrite (readline.js:830:14)
    at ReadStream.onkeypress (readline.js:109:10)
    at ReadStream.emit (events.js:110:17)
    at readline.js:1175:14
    at Array.forEach (native)
    at emitKeys (readline.js:993:10)
    at ReadStream.onData (readline.js:910:14)
    at ReadStream.emit (events.js:107:17)
    at readableAddChunk (_stream_readable.js:163:16)
    at ReadStream.Readable.push (_stream_readable.js:126:10)
    at TTY.onread (net.js:538:20)

consider name change?

I don't mind the current name, but it's very, very confusing in the JS world because everyone thinks of Content-Security Policy when they see CSP. This has come up countless times when talking to people. Although it doesn't take much to explain, I think it might help adoption if we change the name.

I don't know how tied you are to the current name, I know it's a pain to change it... but I think it's worth considering. Also in general there are a lot of half-baked channel/CSP libs on npm right now. I don't know if it's worth choosing something unique to stand out.

Not sure of any suggestions... I'll think about it.

node callback and event emitter utilities

While using the lib I found that at some point people will have to use node callback and event emitter based lib with channels.

Here is two utilities functions I ended up with :

/**
 * Create a channel from an async node function
 * Passed function must accept a node callback as last args
 *
 * @param {function} f
 * @return {Channel}
 */
function fromNodeCallback(f) {

    var args = 2 <= arguments.length ? Array.prototype.slice.call(arguments, 1) : [];
    var chan = csp.chan();

    args.push(function(err, value) {
        csp.putAsync(chan, err ? err : value, function() {
            chan.close();
        });
    });

    f.apply(this, args);

    return chan;
}


/**
 * Create a channel from an event emitter object
 *
 * @param {function} eventEmitter
 * @param {string} eventName
 * @param {Channel} [chan]
 *
 * @return {Channel}
 */
function fromEvent(eventEmitter, eventName, chan) {

    chan = chan || csp.chan();

    eventEmitter.on(eventName, function(data) {
        csp.putAsync(chan, data);
    });

    return chan;
}

both take and put will block until both sides are there to actually pass the value? There is a exception.

var csp = require('js-csp')

var ch = csp.chan()

csp.go(function*() {
    while (yield csp.put(ch, 1)) {
        console.log('process 1 start')
        yield csp.take(csp.timeout(250))
        console.log('process 1 end')
    }
})


csp.go(function*() {
    while (yield csp.put(ch, 2)) {
        console.log('process 2 start')
        yield csp.take(csp.timeout(300))
        console.log('process 2 end')
    }
})

csp.go(function*() {
    while (yield csp.put(ch, 3)) {
        console.log('process 3 start')
        yield csp.take(csp.timeout(1000))
        console.log('process 3 end')
    }
})

csp.go(function*() {
    for (var i = 0; i < 10; i++) {
        console.log('start')
        console.log(yield csp.take(ch))
        console.log('end')
    }
    ch.close()
})
start
1
end
start
process 1 start
2
end
start
process 2 start
3
end
start
process 3 start
process 1 end
process 1 start
1
end
start
process 2 end
process 2 start
2
end
start
process 1 end
process 1 start
1
end
start
process 2 end
process 2 start
2
end
start
process 1 end
process 1 start
1
end
start
process 2 end
process 2 start
2
end
start
process 3 end
process 3 start
3
end
process 1 end
process 2 end
process 3 end

you see process 3 start; process 1 end; process 1 start; 1, i think it should be process 3 start; process 1 end; 1; process 1 start;

Non-deterministic results in an attempt at implementing a competing consumer example.

#!/usr/bin/env node --harmony

var csp = require('../src/csp');

var workSize = 20;
var workers = 10;


function* worker(name, workChan, resultsChan) {
  var count = 0;
  while(true) {
    var element = yield csp.take(workChan);
    if (element === csp.CLOSED) {
      resultsChan.close();
      break
    }

    yield csp.put(resultsChan, {name: name, number: count++});
  }
}

var resultsChan = csp.chan();

var workChan = csp.operations.fromColl(new Array(workSize));

csp.go(function*() {
  var workerResultsChans = [];
  for (var i = 0; i < workers; i++) {
    var workerResultsChan = csp.chan();
    csp.go(worker, ["worker: " + i, workChan, workerResultsChan]);
    workerResultsChans.push(workerResultsChan);
  }

  var mergedWorkerResultsChan = csp.operations.merge(workerResultsChans);

  csp.operations.pipe(mergedWorkerResultsChan, resultsChan);
});

csp.go(function*() {
  var result = yield csp.operations.into([], resultsChan);
  console.log(result);
  console.log(result.length);
});

Example Output 1:

[ { name: 'worker: 0', number: 0 },
  { name: 'worker: 1', number: 0 },
  { name: 'worker: 2', number: 0 },
  { name: 'worker: 5', number: 0 },
  { name: 'worker: 1', number: 1 },
  undefined,
  { name: 'worker: 0', number: 1 },
  { name: 'worker: 9', number: 0 },
  { name: 'worker: 7', number: 0 },
  { name: 'worker: 6', number: 1 },
  { name: 'worker: 9', number: 1 },
  { name: 'worker: 7', number: 1 },
  undefined,
  { name: 'worker: 1', number: 2 },
  { name: 'worker: 3', number: 0 },
  undefined,
  undefined,
  undefined,
  undefined,
  { name: 'worker: 8', number: 0 } ]
20

Example Output 2:

[ { name: 'worker: 0', number: 0 },
  { name: 'worker: 2', number: 0 },
  { name: 'worker: 4', number: 0 },
  { name: 'worker: 7', number: 0 },
  { name: 'worker: 1', number: 0 },
  undefined,
  { name: 'worker: 2', number: 1 },
  undefined,
  { name: 'worker: 4', number: 1 },
  undefined,
  { name: 'worker: 1', number: 1 },
  { name: 'worker: 0', number: 2 } ]
12

The above was an attempt at a competing consumers implementation. I end up getting inconsistent number of results and the results include a non deterministic number of 'undefined' in the merged results channel.

I'm going to continue looking into this issue, but wanted to raise it to see if I was doing something stupid.

Error handling

In the past we've talked about several ways to handle errors:

  • Add a new instruction that throws when it receives an Error object (I called it takem). I wanted a short instruction because you will probably be using it a lot. This allows the user fine-grained control over error handling.
  • Add some kind of "errored" state to the channel, and when you try to take from it, it automatically throws (the exception thrown could have the channel that errored attached, so in your catch handler you could inspect it and get the error off, etc). This lets the library enforce error semantics: either it passes an error along the channel or it puts the channel into an errored state. Any piping would propagate the error state. I'm warming up to this idea.

Those are actually the only two I can think of right now, I thought there were more. It's also on my list to look at RxJS and study how they work. I know errors are propagated automatically.

The reason this is a bigger deal for us than in core.async is because there's a ton more backend async work in JavaScript. Clojure typically uses the threaded model. You rarely care about errors on the frontend (a "click" event doesn't have an error). But on the backend they matter, so I don't think a really simplistic solution will suffice.

The main thing I care about is that errors outside of channels (like a typo in process code) should simply throw like normal. Errors shouldn't automatically be gobbled up.

"Latest" pattern

Let's say I have a source channel which contains the result of an expensive operation. I have a reader that reads from this source channel every 1 second. I want to read from the source, but only read the latest (or cached) value.

I think there a couple ways to do this:

With a custom buffer

var LatestBuffer = function(buf) {
  this.buf = buf;
};

LatestBuffer.prototype.is_full = function() {
  return false;
};

LatestBuffer.prototype.remove = function() {
  return this.buf;
};

LatestBuffer.prototype.add = function(item) {
  this.buf = item;
};

LatestBuffer.prototype.count = function() {
  return 1;
};

var channel = csp.chan(new LatestBuffer());
csp.operations.pipe(sourceCh, channel);

With a couple goroutines

var latest;

csp.go(function*() {
   while (true) {
    latest = yield sourceCh;
  }
});

var channel = csp.go(function*() {
  while (true) {
    yield latest;
  }
});

Is one better than the other? I think I like LatestBuffer more. Is there a better way? Would this be nice to have in js-csp?

Thanks!

New transducer protocol

There's some discussion ongoing about an official spec for transducers here. Will js-csp support this, once it's stabilized?

React.js Integration

The ability for JS-CSP to radically change the way events are handled in user interfaces looks worthy of further investigation. The "Taming User Interfaces" section of the following article introduced this way of using JS-CSP: http://jlongster.com/Taming-the-Asynchronous-Beast-with-CSP-in-JavaScript

Meanwhile React.js only uses the standard event system. I for one would very much like to see more on how JS-CSP can be used to handle events in React components. Are there there any practical examples of this or a React Mixin that can be used for practical application of JS-CSP?

Multiple processes taking a value?

I'm new to the concept of CSP, and my knowledge is limited to what's introduced in the following article: http://jlongster.com/Taming-the-Asynchronous-Beast-with-CSP-in-JavaScript

If I have multiple processes taking values from a channel, why is it that only one process takes a value when I put a single value into the channel?

Example:

var csp = require('js-csp');

var channel = csp.chan();

csp.go(function*() {

    var val = yield csp.take(channel);

    console.log('process 1', val);

});

csp.go(function*() {

    var val = yield csp.take(channel);

    console.log('process 2', val);

});

csp.putAsync(channel, 42);

// output: process 1 42

Consider using a GPL-compatible license

Hi,

in the current environment where Wordpress is a thing and so many people earn their livelihoods with it, a JS-library published under a GPL-incompatible licence is unusable to these people, which I find very unfortunate.

License changes are much easier when a project is young and has a small number of contributors, so I thought it would be a good idea to bring this up now.

regards

Why cancel all pending puts when closing channel?

Coming from core.async, I was surprised that I cannot put some (pending) values onto a channel, close the channel and then take those values. For example:

go(function* () {
  var c = chan();
  putAsync(c, "value");
  c.close();
  yield c; //=> null
});

The same thing in Clojure(Script):

(go
  (let [c (chan)]
    (put! c "value")
    (close! c)
    (<! c))) ;;=> "value"

It seems like this behavior was added in 9c8a601.

I find it useful to not have to wait for a channel to be exhausted before closing it.

Thanks!

AJAX Examples

I was wondering if you could show an example of how to chain multiple AJAX calls as an example?
I'm really interested in using this but I am trying to relate to the problem I'm currently trying to solve.

An example of a promise model would be

ajaxService
.getCustomer()
.getCustomerPermissions()
.then(customer => router.navigate('/customerdetails', customer);

js-csp ruins webpack UglifyJsPlugin

This sounds crazy but it is true.

In my project, I install js-csp by npm and the version is 0.5.0. The webpack config use UglifyJsPlugin to minify bundle js.

    new webpack.optimize.UglifyJsPlugin({
      output: { comments: false },
      compress: {warnings: false}
    })

When the code doesn't include js-csp, it is OK. After include js-csp, the UglifyJsPlugin just doesn't work. The generated bunde.js is not minified at all.

import csp from 'js-csp';
console.log(csp);

Is there any code snippet that is silver bullet to kill UglifyJS?

Task doesn't wait until another task process action

Let's say I have some global state in my app and I implemented in a form of csp process one middleware which logging state and another middleware which handle action through the channel and mutate state.


var csp = require("js-csp");
var ch = csp.chan();

var globalState = {foo: 'bar'};


csp.go(function* loggingMiddleware() {
  while(true){
    console.log('loggingMiddleware subscribe');
    var action = yield csp.take(ch);
    console.log('state before', JSON.stringify(globalState));
    yield csp.put(ch, action);
    console.log('state after', JSON.stringify(globalState));
  }
});

csp.go(function* actionHandler() {
  console.log('actionHandler subscribe');
  var action = yield csp.take(ch);
  console.log('actionHandler receive action');
  if(action.type == 'set_state'){
    delete action.type;
    Object.assign(globalState, action);
  }
});

setTimeout(()=>{
  csp.putAsync(ch, {type: 'set_state', foo: 'bar2'});
}, 500);

But after I putting some data to channel loggingMiddleware doesn't wait until another task process this data and I am getting this log:

loggingMiddleware subscribe
actionHandler subscribe
state before {"foo":"bar"}
state after {"foo":"bar"}
loggingMiddleware subscribe
actionHandler receive action

alts results indicating is it a put or take

On alts, docs says:

"Returns" an object with 2 properties: The channel of the succeeding operation, and the value returned by the corresponding put/take operation.

However, when do something like:

let result = yield alts(ch, [ch, true]);

There's no meaningful way to check if the carried out operation is a take or a put, as both will have result.value === true.

zero-length buffers?

Is it conceptually wrong that a buffers.sliding(0) or buffers.dropping(0) buffer to have the following (identical) behavior:

  • if a taker is present, a put will deliver the value directly to the taker
  • if no takers are present, any puts will be dropped

This is not the behavior of js-csp today: buffers.dropping(0) seems to drop all values even when there is a taker, and buffers.sliding(0) almost has the effect above but some reason broadcasts a bunch of backpressured nulls once a looping taker "connects". In either case, it seems like unhandled/undefined logic, but it seems possibly useful to support zero-length buffers with the above behavior. Is there something conceptually wrong with the above idea?

Include bundled files in npm package

It would be great if you could include bundled files (build directory, to be clear) in the uploaded npm package. It would let us use it from npmcdn!

Thanks!

putAsync on unbuffered channel in Promises

I'm wrapping code that returns value asynchronously with Promises.
In a scenario where several promises have to putAsync in the same unbuffered channel, I get an Exception.

var assert = require("assert");
var P = require("bluebird");

module.exports = function (csp) {

    // Emulate a library function that returns a Promise
    // resolved wth a value.
    function getAsPromise(value) {
        return new P(function (resolve, reject) {
            console.log("Promise resolved with", value);
            resolve(value);
        });
    }

    function getAsChan() {

        var res = csp.chan();

        var putValue = function (value) {
            console.log("Putting value", value, "in channel");
            csp.putAsync(res, value);
        };

        var get42 = function () {
            return getAsPromise(42);
        };

        var get43 = function () {
            return getAsPromise(43);
        };

        var close = function () {
            res.close();
        };

        get42()
            .then(putValue)
            .then(get43)
            .then(putValue)
            .then(close);

        return res;

    }

    csp.go(function *() {
        var chan = getAsChan();
        console.log("Taking first value");
        var value = yield csp.take(chan);
        assert.ok(value === 42 || value === 43);
        console.log("Taking second value");
        value = yield csp.take(chan);
        assert.ok(value === 42 || value === 43);
        console.log("Done");
    });

};

Here is the result :

Promise resolved with 42
Taking first value
Putting value 42 in channel
Promise resolved with 43
Putting value 43 in channel

/home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/csp.bundled.node.js:1738
        put_callback(false);
        ^
TypeError: undefined is not a function
    at /home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/csp.bundled.node.js:1738:9
    at Object.process_messages [as _onImmediate] (/home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/csp.bundled.node.js:1854:5)
    at processImmediate [as _immediateCallback] (timers.js:345:15)

This is fixed by changing the buffer to be buffered :

var chan = csp.chan(2);

Is this the right fix ? In this situation, what should the buffer size be ? Exactly the potential number of "concurrent" value ?
Does a "growing" buffer makes sense if I'm not absolutely sure about the possible number of items ?
Is it possible to grow a buffer at run time ?

Project active?

I was wondering if the project is still active as I haven't seen any commits from August last year?

Thanks,

Darren

add pipeline-async functions

I'm trying to apply an async function to every value of one channel.
In FRP this operation is typically called flatMap.
The function takes a function which return a chan.

Here is a draft of what it may look like :

var csp = require('js-csp');

/**
 *
 * @param {function} f
 * @param {Channel} input
 * @param {Channel} [output]
 * @return {Channel}
 */
function flatMap(f, input, output) {

    if (!output) {
        output = csp.chan();
    }

    csp.go(function* () {
        while(true) {
            var value = yield csp.take(input);

            if (value === csp.CLOSED) {
                output.close();
                break;
            }

            var mapResult = f(value);

            while (true) {
                var outValue = yield csp.take(mapResult);
                if (outValue === csp.CLOSED) {
                    break;
                } else {
                    yield csp.put(output, outValue);
                }
            }

        }
    });

    return output;
}

Is there any other way to compose existing function to produce this kind of result ?

EDIT : I realized that this implementation is closer to flatMapConcat than flatMap.

attach `put` and `take` methods to the channel instances themselves, instead of on `csp`

This is a nitpick of mine, but I think attaching the put and take methods to the root csp object doesn't read as well. It's kind of annoying to have to pass in the channel as the first argument to every call. I'd like to interact with the API this way:

var channel = csp.chan();

var doThing = function*(){
  var value = yield channel.take();
  doThingWithValue(value);
};

var putThing = function*(){
  var value = yield getValueSomehow();
  yield channel.put(value);
};

Unless there's a reason why they need to be on the csp object that I'm missing?

Trouble debugging performance due to message_channel

I am trying out js-csp for an intensive video processing project - specifically parsing an MPEG-TS stream into h264 video.

When debugging, with this library I lose out on the ability to really dig in, because the chrome tools cannot break into the message_channel.port1.onmessage function.

Do you know any tips for this? Might there be an alternative to using message_channel?

Image of Dev Tools

Problems using 'csp.bundled.js' with facebook regenerator

Hi

I'm trying to use js-csp on an older version of node-js. I tried using regenerator, to some success, but I would like some help.

I've created this repo with my tests so far : https://github.com/phtrivier/es5-js-csp-test

This strategy works :

  • copy-pasting all the source code from js-csp
  • transforming it with regenerator (using a grunt task)
  • requireing the transformed version of 'csp.js'
phtrivier@murphy ~/prj/platform/git/tmp/es6-test $ grunt && node build/es5/index-ok.js
Running "regenerator:sources" (regenerator) task

Running "regenerator:index" (regenerator) task

Done, without errors.
ping 1
pong 2
ping 3
pong 4
ping 5
pong 6
ping 7
pong 8
ping 9
pong 10
ping: table's gone
pong: table's gone

However, I tried to instead transform the 'csp.bundled.js' file, and require this one ; unfortunately, the result from require("build/es5/csp.bundle) is an empty object, and I get this error :

phtrivier@murphy ~/prj/platform/git/tmp/es6-test $ grunt && node build/es5/index-ko.js
Running "regenerator:sources" (regenerator) task

Running "regenerator:index" (regenerator) task

Done, without errors.

/home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/ping-pong.js:43
    csp.go(regeneratorRuntime.mark(function callee$1$0() {
        ^
TypeError: Object #<Object> has no method 'go'
    at module.exports (/home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/ping-pong.js:43:9)
    at Object.<anonymous> (/home/phtrivier/prj/platform/git/tmp/es6-test/build/es5/index-ko.js:477:1)
    at Module._compile (module.js:456:26)
    at Object.Module._extensions..js (module.js:474:10)
    at Module.load (module.js:356:32)
    at Function.Module._load (module.js:312:12)
    at Function.Module.runMain (module.js:497:10)
    at startup (node.js:119:16)
    at node.js:906:3

Is there another "all-in-one" file that I could require ? If I were to try and add a build step to js-csp to generate a single, ES5-compatible file, what would it be ?

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.