Comments (25)
@JeffBezanson This seems like a post 0.2 thing. Thoughts?
Let's keep thisPR around.
from distributed.jl.
This is definitely not in scope for 0.2
from distributed.jl.
This is not yet a PR. Will convert it into a PR if folks are OK with the general design.
from distributed.jl.
Hey @amitmurthy , I'm thinking of trying to make a PR for heartbeat functionality based on the new message-passing idiom. Would that be stepping on your toes? Do you have any additional thoughts about heartbeat support since this issue was created?
from distributed.jl.
Would that be stepping on your toes?
Perish the thought. No issues whatsoever.
I would initially focus just on the distributed "progress meter".
Removing failed from the cluster currently happens when the remote socket connection is closed. This workers pretty well in most cases - the exception being when different transports are used (which is not very widespread).
Restarting workers is not straightforward since we have to recreate worker global state if any.
Reconnecting in the event of broken connections (due to serialization/deserialization) errors can and should be implemented. This would require
- Having a one-up message number that is acknowledged by the receiver.
- The sender keeping a reference to all sent messages around till an ack is recd. So while messages with errors are notified as such, messages sent after the message-in-error can be retried on a new connection. This is independent of heartbeats though.
Note : I am currently traveling and will not have reliable internet over the next few days - my responses may be delayed.
from distributed.jl.
OK, that makes sense.
But wouldn't it to be simpler to just not close the connection in the first place in the event of a serialization error? If a worker can't deserialize a message from pid1 (ie, deserialize(r_stream)
throws), why not send a message to that effect back to pid1 instead of exiting?
I guess I'm not really understanding why exceptions caused by evaluation of the thunk the worker has been told to evaluate should be treated so differently than exceptions caused by deserializing pid1's message.
(Understand you're traveling and am not expecting an immediate response)
from distributed.jl.
Oh, is it because the state of the stream is then in a corrupted state so serializing/deserializing messages over it is no longer possible?
In that case, one unifying solution might be to have a second stream to each worker, an out-of-band "status" stream. It could both be used for heartbeats and for indicating that the main message stream is no longer usable because of a serialization error and has to be reset on both sides.
from distributed.jl.
- The out-of-band communication needs to be in a separate thread
reset
is not simple. Consider this- Separate tasks on pid1 write msg1, msg2 and msg3 to pid2 at the same time.
- On the sender side, all 3 messages have been written to the socket and the
write
calls have returned - On the receiver side, while deserializing msg1, an exception is raised.
- Since we don't have a message length header field, we cannot discard the bytes of msg1 and read msg2 and msg3.
- The reason we don't have a length header is because of long messages which are written to the socket directly. To otherwise calculate the msg size in bytes would require writing to an intermediate buffer.
- We could implement a
serialized_sizeof(object)
which would calculate the serialized size of any object. That would help in prefixing a length-of-message and can help in recovering from deserialization errors. - Another option is to have ack messages and keep sent objects around till they are acked.
- Any other schemes?
from distributed.jl.
Ya, I see that out-of-band would have to be on a separate thread. IJulia has example multithreaded heartbeat code (https://github.com/JuliaLang/IJulia.jl/blob/3a1ad9eac8c259c79a6a1c09342eac262b16f34b/src/heartbeat.jl). It might require a new C function somewhere in src
. And at least in my testing, using a Julia function that ccall
s sleep
and then writes to a socket in a loop works fine, although maybe there's something unsafe about that which haven't shown up yet.
I see the complexity of dealing with a deserialization error. If we're throwing out possible schemes, we can take inspiration from http multipart encoding -
- to put a message on the wire, first write a random boundary token,
- then write the serialized message,
- then write the boundary token again.
The receiver first reads the token, then deserializes the message. If it encounters an error during deserialization, it just reads the bytes from the stream until it finds the boundary token so it can start processing the next message.
from distributed.jl.
If you're interested in a way to frame messages in a stream I've always been a fan of COBS, though my applications have usually been wire protocols on embedded systems where the messages are pretty short (~100 bytes or less). It's nice because you're guaranteed the frame delimiter won't show up in the data, so if you get into a confused state you know that the next delimiter you see is a new frame.
In the past I've used protocols where the framing byte just gets escaped when it appears in the stream, but that can cause big overheads if the frame byte shows up a lot in the stream.
Receiving COBS can be done on-the-fly without keeping a buffer. Encoding could have a stream interface but would require a buffer of 254 bytes.
I'm slammed for the next couple days but after that if you're interested I could write a COBS stream wrapper type that acts like a stream with an additional finish
method that would delimit the frames. Sounds like fun.
from distributed.jl.
That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.
On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell [email protected]
wrote:
If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing, though
my applications have usually been wire protocols on embedded systems where
the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.I'm slammed for the next couple days but after that if you're interested I
could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.—
Reply to this email directly or view it on GitHub
#18.
from distributed.jl.
Proposed implementation here for reference:
https://github.com/JuliaLang/julia/blob/jmm/boundary_message/base/multi.jl#L228
On Tue, Oct 27, 2015 at 7:55 PM Jonathan Malmaud [email protected] wrote:
That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell [email protected]
wrote:If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing,
though my applications have usually been wire protocols on embedded systems
where the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.I'm slammed for the next couple days but after that if you're interested
I could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.—
Reply to this email directly or view it on GitHub
#18.
from distributed.jl.
It also doesn't seem like it would work here because it needs to know the
number of frame bytes in the message before encoding the message, but we
don't have the length of the message since it's being written directly to
the socket.
On Tue, Oct 27, 2015 at 7:56 PM Jonathan Malmaud [email protected] wrote:
Proposed implementation here for reference:
https://github.com/JuliaLang/julia/blob/jmm/boundary_message/base/multi.jl#L228On Tue, Oct 27, 2015 at 7:55 PM Jonathan Malmaud [email protected]
wrote:That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell [email protected]
wrote:If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing,
though my applications have usually been wire protocols on embedded systems
where the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.I'm slammed for the next couple days but after that if you're interested
I could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.—
Reply to this email directly or view it on GitHub
#18.
from distributed.jl.
I'll preface this by saying that I'm not entirely convinced either that this is a better way to go, and it's possible that my ignorance of the larger context is getting in the way. Here are a couple of things that concern me with the random delimiter scheme though:
- though the chance of a collision is almost infinitesimally small, it's nonzero and I imagine lots and lots of these messages getting sent. I'm not sure what the target is here for failure probability, but this is probably just me being pedantic.
- if the process writing the frame gets borked in a way that prevents it from writing the closing delimiter, the stream never recovers. This might not be a failure mode that we need to worry about though.
Re: needing to know the number of frame bytes: that's why you'd need the 254-byte buffer on the writer side. You only need to lookahead for a 0x00
up to 254 bytes because if it's longer than that you just put 0xff
as the frame byte and send the next 254 bytes as-is.
Anyways, mostly I just wanted to throw it out there as an alternative, not advocate to change course, so if these points aren't convincing I think the random delimiter is also a good way to go.
from distributed.jl.
reading my comment, "code byte" is probably a better term than "frame byte". The frame delimiter is a literal 0x00
, and the code byte can happen many times within the frame, each time indicating the number of nonzero bytes following.
from distributed.jl.
Oh ya, I see. Do you mind if I implement it before you? Now I'm excited about it.
from distributed.jl.
I wouldn't be able to get to it until the beginning of next week, so go for it!
from distributed.jl.
That's a clever way to do this.
from distributed.jl.
Alright, JuliaLang/julia#13795 can now recover gracefully from deserialize failures by keeping a log of ACKed messages and using frame delimiters to reset the message stream to a working state.
from distributed.jl.
Why do we need both? Won't just using frame delimiters be enough? As for the possibility of a (extremely unlikely) collision, it is to be noted that it exists only in the failure case, i.e., upon a deserialization error.
Chances of errors during serialization have reduced after JuliaLang/julia@77b2527
from distributed.jl.
Well, you want a way for the worker to signal to the client that a
deserialization error happened, and to associate that signal with a specific
message. The worker doesn't know the response_oid of the client's Call msg
because that was part of the garbled message. So messages need to have a
unique ID and senders need to remember the association between message IDs
and messages at least until a successful Ack comes back.
Plus I'm hoping it might lay some groundwork for other applications that
take advantage of a message log, such as debugging of the communication
between workers and maybe eventually replaying messages to get workers
into a known state.
On Wed, Oct 28, 2015 at 11:53 PM Amit Murthy [email protected]
wrote:
Why do we need both? Won't just using frame delimiters be enough? As for
the possibility of a (extremely unlikely) collision, it is to be noted that
it exists only in the failure case, i.e., upon a deserialization error.Chances of errors during serialization have reduced after 77b2527
JuliaLang/julia@77b2527—
Reply to this email directly or view it on GitHub
#18.
from distributed.jl.
It may be better to split the message into a "header" and "body" and deserialize in two steps. First we deserialize the header (which should never fail - if it does fail, treat it as a fatal error) and then the function/args. Errors while deserializing function and args are treated the same as errors while executing f(args)
, i.e., send back an appropriate RemoteException.
from distributed.jl.
Makes sense. If the header is going to be message-agnostic, there'd need to
be a bit of refactoring of the message types so that response_oid is part
of the header instead of the message bodies, and just won't have a meaning
for message types that don't currently have a response_oid.
Otherwise each message type could have its own corresponding header type,
but that seems a bit cumbersome.
On Thu, Oct 29, 2015 at 12:14 AM Amit Murthy [email protected]
wrote:
It may be better to split the message into a "header" and "body" and
deserialize in two steps. First we deserialize the header (which should
never fail - if it does fail, treat it as a fatal error) and then the
function/args. Errors while deserializing function and args are treated the
same as errors while executing f(args), i.e., send back an appropriate
RemoteException.—
Reply to this email directly or view it on GitHub
#18.
from distributed.jl.
Yes. The header needs to contain just the response_oid
which identifies the RemoteRef that stores the result and notify_oid
that identifies the ref that the caller task (on the sender side) is waiting on. They may be nothing
in some cases where a matching response is not expected.
from distributed.jl.
Alright, JuliaLang/julia#13795 implements the header mechanism.
from distributed.jl.
Related Issues (20)
- Allow @everywhere include(...) to override default path behavior
- Distributed worker manager doesn't use socket connection to infer worker ip HOT 2
- `isready(::AbstractWorkerPool)` is inconsistent with whether `take!` will block
- Can we have `bind(::RemoteChannel, ::Process)`? HOT 1
- [Distributed.jl] inconsistent serialization of closures over global vars HOT 2
- Dynamic @distributed scheduling HOT 3
- Distributed.jl - possibility to use other Serialization libraries? HOT 6
- Underministic behavior of `addprocs()` of `SSHManager` HOT 1
- MKL_NUM_THREADS
- SIGTERM test leaks stderr interrupt trace HOT 3
- Uncaught failure or noisy test in julia CI HOT 1
- Spurious `@spawnat` parallelism with single worker, single thread HOT 2
- improve pmap code for arrays, with type/shape of result the same as map (with PR)
- RFC: "for-loop" compliant @parallel for.... take 3 (with PR)
- Use a custom hashing function for remotecall_fetch to mitigate #48 (with PR).
- add RemoteLogger for distributed logging (with PR code)
- do not lock up addprocs on worker setup errors (with PR)
- RFC: Make addprocs() safe for reuse (i.e. doesn't request more processes if re-called) (with PR)
- Add a wait(::[Abstract]WorkerPool) (with PR code)
- broken error handling in message_handler_loop
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from distributed.jl.