tj / axon Goto Github PK
View Code? Open in Web Editor NEWmessage-oriented socket library for node.js heavily inspired by zeromq
License: MIT License
message-oriented socket library for node.js heavily inspired by zeromq
License: MIT License
Just a quick note that there are duplicate sections for req/rep with conflicting info:
https://github.com/visionmedia/axon#req--rep
https://github.com/visionmedia/axon#req--rep-1
The first one has req.bind
and req.send
in a couple of places, which should be sock.bind
and sock.send
, and it also says that the send callback must be provided but in the last one there is sock.send('ping')
without a callback.
Lots of stuff to cover.
Please see the following example https://gist.github.com/3976658
Whenever a push socket has no clients, it will buffer messages until that client connects. However when I open two push sockets, the first socket places its buffered messages into the second sockets buffer.
PORT1
will not see any buffered messagesPORT2
will see buffered messages from both portsHi, the way req-rep is implemented is quite different from zmq so I was wondering what type of socket/pattern would suit best the following scenario.
user hit app.get(/) then the request is passed down to the pipeline (sequence of push-pull) until reaching the sink which in turn would reply to the request made by app.get(/) (but only after receiving the last message from the pipleline.)
I tried different patterns for connecting the starting route and the sink but they all have some drawback.
Using zmq I use to do it like that but as you know the node.js bind has all sort of unpredictable issue.
pull.on('message',function(data){
reply.send(data)
})
damn!
much lower:
min: 372 ops/s
mean: 7886 ops/s
median: 4777 ops/s
total: 22317 ops in 4.152s
through: 7.701171875 mb/s
hahaha i just screwed up the ordering and was creating socket files name ./192.168.0.198
hhaha, the port / host order feels unnatural, though it's what node does too I guess. I'd like to also have sock.connect('192.168.0.198:3000')
to compliment sock.connect(3000, '192.168.0.198')
note to self
The test seems a bit wierd to me.
sub.subscribe(/^user:*/); // equivalent to /^user.*$/
sub.subscribe(/^page:view/); // equivalent to /^page:view.*$/
Therefore I think it doesn't demonstrate best practices that some users could look in tests for.
and emit an event when messages are dropped so you can implement your own queuing etc..
Events like "connect" or "disconnect" come with a _peername object that looks like this : { address: '127.0.0.1', family: 'IPv4', port: 52824 }. This is very convenient to identify the specific origin of the event.
Is it possible to add the same object to all messages ?
So this is boggling my mind... if you run test.socket.reconnection.js
a few times. A passing test looks like (I added some extra output):
writing 9
received 9
ss:queue close +11ms
ss:sock close +11ms
pull closed
ss:sock connect 127.0.0.1:3000 +0ms
ss:queue disconnect undefined +0ms
writing 10
ss:queue connect 127.0.0.1 +1ms
ss:queue flush +0ms
ss:sock connect +1ms
ss:queue flush +0ms
received 10
writing 11
received 11
So the disconnect
event from Queue
emitted just fine and "10" got buffered.... Now I also get this sometimes, causing the test to fail:
writing 9
received 9
ss:queue close +9ms
ss:sock close +9ms
pull closed
ss:sock connect 127.0.0.1:3000 +1ms
writing 10
ss:queue disconnect undefined +1ms
ss:queue connect 127.0.0.1 +0ms
ss:queue flush +0ms
ss:sock connect +1ms
ss:queue flush +1ms
writing 11
received 11
As you can see, "10" snuck in prior to disconnect
being emitted from Queue
but after the socket seemingly closed, causing "10" to never be received by the Pull socket.
The crazy thing is, I tried beating the timer with some nextTick hacks and even tried removing the socket check within Push#sendToPeers
in hopes for a EPIPE or something, no luck though....
some bug:
axon:sock remove socket 0 +0ms
axon:sock remove socket -1 +0ms
if you .close()
a socket and it has buffered messages, then .connect()
you'll lose those messages
on my mbp at least, ill try it on the air later and see if maybe there's just something funky going on with this machine and profile later:
min: 853 ops/s
mean: 15963 ops/s
median: 26103 ops/s
total: 59238 ops in 9.663s
through: 15.5888671875 mb/s
inheriting doesn't make sense
for subscriptions. back when we were going the router/dealer route propagation would have been a bit more annoying but now that we would just relay with pub/sub all the way through this should be trivial
might be able to use this for some stuff that im sort of re-implementing since it's not currently exposed
throughput & ops / s for various use-cases, many small messages, few large msgs, many large msgs, several per-tick, few per-tick, batching variations etc.
Need examples for these, probably add about the benefits of multiple backends with multiple connect()'s over something like cluster. (I get questions about axon+cluster)
In the file examples/workers/consumer.js there is a comment // ^ dont do this
. Please enhance the example to show what we should be doing instead. The examples should be there to help us learn how to do things properly.
ubuntu 11.04, node 0.8.12, axon 0.4.2
wicked@wicked-desktop:~/s2s/node_modules/axon/benchmark$ node pub.js
pub bound
sending 1000 per tick
sending 1023 byte messages
events.js:68
throw arguments[1]; // Unhandled 'error' event
^
Error: This socket is closed.
at Socket._write (net.js:518:19)
at Socket.write (net.js:510:15)
at PubSocket.flushBatch (/home/wicked/Alawar/s2s/node_modules/axon/lib/sockets/pub.js:57:10)
at PubSocket.send (/home/wicked/Alawar/s2s/node_modules/axon/lib/sockets/pub.js:75:54)
at more (/home/wicked/Alawar/s2s/node_modules/axon/benchmark/pub.js:21:42)
at process.startup.processNextTick.process._tickCallback (node.js:244:9)
now that we're eating our own dog-food with the meta flag
with zmq it was easy to just do sock.send([envelope, id, 0, buf]);
but we need the nul ATM
do we even need delimiters? I'm not sure of zmq's internals on this but I'd think we should be able to arbitrarily shift / unshift to maintain the envelope "stack"
I can do the same in a more beautiful manner with no ifs in on('message') callback.
var axon = require('..')
, should = require('should')
, pub = axon.socket('pub')
, sub = axon.socket('sub')
, EventEmitter2 = require('eventemitter2').EventEmitter2;
pub.bind(3013);
var ee = new EventEmitter2({wildcard: true, delimiter: ':'});
ee.on('user:*', function(name) {
msgs.push(this.event, name);
});
ee.on('page:view', function(name) {
msgs.push(this.event, name);
msgs.map(String).should.eql(expected);
pub.close();
sub.close();
});
var msgs = [];
sub.connect(3013, function(){
pub.send('user:login', 'tobi');
pub.send('user:login', 'loki');
pub.send('user:logout', 'jane');
pub.send('unrelated', 'message');
pub.send('other', 'message');
pub.send('page:view', '/home');
});
sub.on('message', function(type, name){
ee.emit(String(type), String(name));
});
var expected = [
'user:login',
'tobi',
'user:login',
'loki',
'user:logout',
'jane',
'page:view',
'/home'
];
Probably it's even better to use eventemitter2 in axon internally and allow users subscribe to events like this: subscribe('user:*', function() {});
to something else since they're not directly related to the codecs
cant remember if we have this supported, probably not tested
I hacked around with request/reply leveraging the multipart stuff and no callbacks. The thing to think about is how we want the reply socket to behave. I think in the ZMQ world the reply socket will block until it has sent back the reply to the peer. So that might not make a whole bunch of sense in the node world? maybe? So I think our version of reply would be a lot more like a dealer.
Anyhow, in my little example it uses a multipart message; message 1 being the "identity" of the request socket and message 2 plus being the body. It defaults to '\u0000'
for the sole purpose of the reply socket being able to know if it needs to generate an identity for it.
var req = ss.socket('req')
, rep = ss.socket('rep');
req.identity('tobi');
rep.bind(3000);
req.connect(3000);
req.on('message', function(pong){
pong.toString().should.equal('pong');
rep.close();
req.close();
});
rep.on('message', function(id, ping){
ping.toString().should.equal('ping');
id.toString().should.equal('tobi');
rep.send(id, 'pong');
});
req.send('ping');
since we dont lock-step maybe:
var sock = axon.socket('req');
sock.send('something', function(res){
})
ubuntu 11.04, node 0.8.12, axon 0.4.2
wicked@wicked-desktop:~/s2s/node_modules/axon/benchmark$ node pub.js
...
axon:pub flushBatch to 1 sockets, 1 writable +0ms
axon:pub flushBatch to 1 sockets, 0 writable +1ms
...
axon:pub flushBatch to 1 sockets, 0 writable +0ms
axon:sock disconnect 127.0.0.1:50770 +633ms
axon:sock start total sockets: 1 +0ms
axon:sock remove socket 0 +0ms
axon:sock end total sockets: 0 +0ms
axon:sock on('error') socket= +3ms { _handle: null,
_pendingWriteReqs: 1481,
_flags: 0,
_connectQueueSize: 0,
destroyed: true,
errorEmitted: true,
bytesRead: 0,
_bytesDispatched: 72095400,
allowHalfOpen: false,
writable: false,
readable: false,
server:
{ _events: { connection: [Function], listening: [Function] },
_connections: 0,
connections: [Getter/Setter],
allowHalfOpen: false,
_handle:
{ writeQueueSize: 0,
onconnection: [Function: onconnection],
owner: [Circular] },
_connectionKey: '4:0.0.0.0:3003' },
_peername: { address: '127.0.0.1', family: 'IPv4', port: 50770 },
_events:
{ data: [Function],
error: [Function],
timeout: [Function],
close: [Function] },
_connecting: false,
_connectQueue: null,
_idleNext: null,
_idlePrev: null,
_idleTimeout: -1 }
axon:sock error This socket is closed. +1ms
axon:sock start total sockets: 0 +0ms
axon:sock removal of unknown socket +0ms
events.js:68
throw arguments[1]; // Unhandled 'error' event
^
Error: This socket is closed.
at Socket._write (net.js:518:19)
at Socket.write (net.js:510:15)
at PubSocket.flushBatch (/home/wicked/Alawar/s2s/node_modules/axon/lib/sockets/pub.js:60:14)
at PubSocket.send (/home/wicked/Alawar/s2s/node_modules/axon/lib/sockets/pub.js:80:54)
at more (/home/wicked/Alawar/s2s/node_modules/axon/benchmark/pub.js:21:42)
at process.startup.processNextTick.process._tickCallback (node.js:244:9)
I propose similar fix to pub.js as done here:
websockets/ws#74
PS: also note double socket removal, once on 'close' event then once on 'error'.
Not super important, but might be helpful for new users. For example, accidentally sending objects without format('json')
.
assert.js:102
throw new assert.AssertionError({
^
AssertionError: missing value
at writeUInt32 (buffer.js:917:12)
at Buffer.writeUInt32BE (buffer.js:950:3)
at Message.pack (/Users/gjohnson/Projects/axon/lib/message.js:86:7)
at Message.write (/Users/gjohnson/Projects/axon/lib/message.js:40:18)
at RepSocket.Socket.pack (/Users/gjohnson/Projects/axon/lib/sockets/sock.js:121:11)
at reply (/Users/gjohnson/Projects/axon/lib/sockets/rep.js:53:23)
at RepSocket.<anonymous> (/Users/gjohnson/Projects/axon/examples/reqrep/rep.js:9:3)
at RepSocket.EventEmitter.emit (events.js:91:17)
at Parser.RepSocket.onmessage (/Users/gjohnson/Projects/axon/lib/sockets/rep.js:47:15)
at Parser.frameBody (/Users/gjohnson/Projects/axon/lib/parser.js:106:10)
I think since configurable is attaching on socket.prototype and all socket's proto is socket.prototype we have sockets playing games on each other. :-)
Good example is in test.router.js, they both end up with the same identities. lol
once it's working well, or remove all together since it's not likely to have more than one msg per tick, but ill have to see how things work out
currently you cannot .bind() the subscriber, needed for a relay
with new master
if there's no response and a pipe breaks etc that callback will be in limbo. in many cases this may not matter at all, we could just timeout and error with err.timeout
, the lib can ignore it as necessary, but it would be nice if we had optional support for re-transmission
for now we only really need them to be process specific so a PID is fine, this is just so that a reply to a dead req socket does not invoke a new callback since the ids get reset to 0
could be nice if EventEmitter2 syntax was supported on events
sock.on('*', function(data){
console.log('yay wildcard', data);
});
some ideas for fun or that may actually be useful (in other repos):
toString()
ed js functionsI don't absolutely need this yet, but I have an app running now that does some local communication, so it would be nice to have at some point...
Perhaps:
.bind('unix:///tmp/foo');
.connect('unix:///tmp/foo');
when all have closed, currently we handle some of this as if there was only one connect(), same with "connect" event etc. we can have say "socket close" or similar to use internally (and for logging) in order to-reconnect
things like .emit('error ignored', err)
etc, still useful to connect into your own logging system
I have been thinking about REQ/REP a little bit which really relies heavily on multipart messages. So I thought I would throw out what I have been thinking to get some feedback on it before I start hacking away. (cc: #3).
octet: 0 1 2 3 4 <length>
+-------+------+------+------+-------+------------
| final | meta | <length> | data ...
+-------+------+------+------+-------+------------
sends()
data to a connected endpoint, It should never bind()
.send()
is a flag as to whether more frames will be written; it defaults to false. We are just explicit were for examples sake.message
is emitted, the socket will be closed.var req = ss.socket('req');
req.format('json');
req.connect(3000);
req.on('message', function(msg) {
console.log(msg);
});
req.send({ says: 'ping' }, false);
This would result in the following packets.
Null frame:
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0 | 0x1 | '\0'
+-----+-----+-----+------+-----+-----
Body frame:
+-----+-----+-----+------+-----+---------------------
| 0x0 | 0x1 | 0x0 | 0x0 | 0xf | '{"says":"ping"}'
+-----+-----+-----+------+-----+---------------------
send()
.reply()
function as the second argument. This can be invoked to build up an envelope to go back to the REQ socket.reply()
is a flag as to whether more frames will be written; it defaults to false.var rep = ss.socket('rep');
rep.format('json');
rep.bind(3000);
rep.on('message', function(msg, reply) {
reply({ says: 'pong' }, false);
});
Just like ZMQ, once the REP socket receives a "final" message, it rips through all frames in the envelope upto and including the null frame. It then emits the message body.
Upon reply()
, it will prepend the earlier ripped off frames and send the message back.
Similarly to the REP parsing process, REQ will rip through the all frames in the response envelope upto and including its original null frame, then emits the message body.
With multipart we can do elegant multi-hop routing from peer to peer just like ZMQ with address frames in the envelopes. Something like REQ <-> ROUTER <-> DEALER <-> REP
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0 | 0x1 | '\0'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+---------------------
| 0x0 | 0x1 | 0x0 | 0x0 | 0xf | '{"says":"ping"}'
+-----+-----+-----+------+-----+---------------------
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0 | 0x6 | 'req_id'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0 | 0x1 | '\0'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+---------------------
| 0x0 | 0x1 | 0x0 | 0x0 | 0xf | '{"says":"ping"}'
+-----+-----+-----+------+-----+---------------------
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0 | 0x9 | 'router_id'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0 | 0x6 | 'req_id'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+-----
| 0x1 | 0x0 | 0x0 | 0x0 | 0x1 | '\0'
+-----+-----+-----+------+-----+-----
+-----+-----+-----+------+-----+---------------------
| 0x0 | 0x1 | 0x0 | 0x0 | 0xf | '{"says":"ping"}'
+-----+-----+-----+------+-----+---------------------
leveraging the new stuff. Regular non-multipart:
sock.send('hey')
multipart:
sock.send('hey', 'there', 'tobi')
sock.send(['hey', 'there', 'tobi'])
sub.on('message', function(a, b, c){ ... })
I haven't checked but we'll have to make sure the new multipart stuff supports "nesting" for envelopes
I want to use axon with secure-peer for super easy security and thus need access to the raw stream. Any plans on doing this?
internal sockets that you shouldn't really be interacting with, maybe it's best if we just self.emit('error', err)
these. The one I ran across today was our RepSocket.prototype.onmessage
, a reply() to a dead connection will error, however node is even worse because node will invoke the .write()
callback, and emit "error" on the socket, so we would have to do:
sock.write(self.pack(args), function(err){
if (err) {
sock.once('error', function(){});
self.emit('error', err);
}
});
for me REP is acting as the bind() in this case but from what I can see we're not handing those socket errors and removing them
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.