Coder Social home page Coder Social logo

Comments (25)

ViralBShah avatar ViralBShah commented on May 19, 2024

@JeffBezanson This seems like a post 0.2 thing. Thoughts?

Let's keep thisPR around.

from distributed.jl.

Keno avatar Keno commented on May 19, 2024

This is definitely not in scope for 0.2

from distributed.jl.

amitmurthy avatar amitmurthy commented on May 19, 2024

This is not yet a PR. Will convert it into a PR if folks are OK with the general design.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Hey @amitmurthy , I'm thinking of trying to make a PR for heartbeat functionality based on the new message-passing idiom. Would that be stepping on your toes? Do you have any additional thoughts about heartbeat support since this issue was created?

from distributed.jl.

amitmurthy avatar amitmurthy commented on May 19, 2024

Would that be stepping on your toes?

Perish the thought. No issues whatsoever.

I would initially focus just on the distributed "progress meter".

Removing failed from the cluster currently happens when the remote socket connection is closed. This workers pretty well in most cases - the exception being when different transports are used (which is not very widespread).

Restarting workers is not straightforward since we have to recreate worker global state if any.

Reconnecting in the event of broken connections (due to serialization/deserialization) errors can and should be implemented. This would require

  • Having a one-up message number that is acknowledged by the receiver.
  • The sender keeping a reference to all sent messages around till an ack is recd. So while messages with errors are notified as such, messages sent after the message-in-error can be retried on a new connection. This is independent of heartbeats though.

Note : I am currently traveling and will not have reliable internet over the next few days - my responses may be delayed.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

OK, that makes sense.

But wouldn't it to be simpler to just not close the connection in the first place in the event of a serialization error? If a worker can't deserialize a message from pid1 (ie, deserialize(r_stream) throws), why not send a message to that effect back to pid1 instead of exiting?

I guess I'm not really understanding why exceptions caused by evaluation of the thunk the worker has been told to evaluate should be treated so differently than exceptions caused by deserializing pid1's message.

(Understand you're traveling and am not expecting an immediate response)

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Oh, is it because the state of the stream is then in a corrupted state so serializing/deserializing messages over it is no longer possible?

In that case, one unifying solution might be to have a second stream to each worker, an out-of-band "status" stream. It could both be used for heartbeats and for indicating that the main message stream is no longer usable because of a serialization error and has to be reset on both sides.

from distributed.jl.

amitmurthy avatar amitmurthy commented on May 19, 2024
  • The out-of-band communication needs to be in a separate thread
  • reset is not simple. Consider this
    • Separate tasks on pid1 write msg1, msg2 and msg3 to pid2 at the same time.
    • On the sender side, all 3 messages have been written to the socket and the write calls have returned
    • On the receiver side, while deserializing msg1, an exception is raised.
    • Since we don't have a message length header field, we cannot discard the bytes of msg1 and read msg2 and msg3.
    • The reason we don't have a length header is because of long messages which are written to the socket directly. To otherwise calculate the msg size in bytes would require writing to an intermediate buffer.
    • We could implement a serialized_sizeof(object) which would calculate the serialized size of any object. That would help in prefixing a length-of-message and can help in recovering from deserialization errors.
    • Another option is to have ack messages and keep sent objects around till they are acked.
    • Any other schemes?

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Ya, I see that out-of-band would have to be on a separate thread. IJulia has example multithreaded heartbeat code (https://github.com/JuliaLang/IJulia.jl/blob/3a1ad9eac8c259c79a6a1c09342eac262b16f34b/src/heartbeat.jl). It might require a new C function somewhere in src. And at least in my testing, using a Julia function that ccalls sleep and then writes to a socket in a loop works fine, although maybe there's something unsafe about that which haven't shown up yet.

I see the complexity of dealing with a deserialization error. If we're throwing out possible schemes, we can take inspiration from http multipart encoding -

  1. to put a message on the wire, first write a random boundary token,
  2. then write the serialized message,
  3. then write the boundary token again.

The receiver first reads the token, then deserializes the message. If it encounters an error during deserialization, it just reads the bytes from the stream until it finds the boundary token so it can start processing the next message.

from distributed.jl.

ssfrr avatar ssfrr commented on May 19, 2024

If you're interested in a way to frame messages in a stream I've always been a fan of COBS, though my applications have usually been wire protocols on embedded systems where the messages are pretty short (~100 bytes or less). It's nice because you're guaranteed the frame delimiter won't show up in the data, so if you get into a confused state you know that the next delimiter you see is a new frame.

In the past I've used protocols where the framing byte just gets escaped when it appears in the stream, but that can cause big overheads if the frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested I could write a COBS stream wrapper type that acts like a stream with an additional finish method that would delimit the frames. Sounds like fun.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell [email protected]
wrote:

If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing, though
my applications have usually been wire protocols on embedded systems where
the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.

In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested I
could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.


Reply to this email directly or view it on GitHub
#18.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Proposed implementation here for reference:
https://github.com/JuliaLang/julia/blob/jmm/boundary_message/base/multi.jl#L228

On Tue, Oct 27, 2015 at 7:55 PM Jonathan Malmaud [email protected] wrote:

That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell [email protected]
wrote:

If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing,
though my applications have usually been wire protocols on embedded systems
where the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.

In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested
I could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.


Reply to this email directly or view it on GitHub
#18.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

It also doesn't seem like it would work here because it needs to know the
number of frame bytes in the message before encoding the message, but we
don't have the length of the message since it's being written directly to
the socket.

On Tue, Oct 27, 2015 at 7:56 PM Jonathan Malmaud [email protected] wrote:

Proposed implementation here for reference:
https://github.com/JuliaLang/julia/blob/jmm/boundary_message/base/multi.jl#L228

On Tue, Oct 27, 2015 at 7:55 PM Jonathan Malmaud [email protected]
wrote:

That looks cool, but why not just use a random 10 bytes as the frame
delimiter? That's never going to match a message by chance.

On Tue, Oct 27, 2015 at 7:46 PM Spencer Russell [email protected]
wrote:

If you're interested in a way to frame messages in a stream I've always
been a fan of COBS
https://en.wikipedia.org/wiki/Consistent_Overhead_Byte_Stuffing,
though my applications have usually been wire protocols on embedded systems
where the messages are pretty short (~100 bytes or less). It's nice because
you're guaranteed the frame delimiter won't show up in the data, so if you
get into a confused state you know that the next delimiter you see is a new
frame.

In the past I've used protocols where the framing byte just gets escaped
when it appears in the stream, but that can cause big overheads if the
frame byte shows up a lot in the stream.

Receiving COBS can be done on-the-fly without keeping a buffer. Encoding
could have a stream interface but would require a buffer of 254 bytes.

I'm slammed for the next couple days but after that if you're interested
I could write a COBS stream wrapper type that acts like a stream with an
additional finish method that would delimit the frames. Sounds like fun.


Reply to this email directly or view it on GitHub
#18.

from distributed.jl.

ssfrr avatar ssfrr commented on May 19, 2024

I'll preface this by saying that I'm not entirely convinced either that this is a better way to go, and it's possible that my ignorance of the larger context is getting in the way. Here are a couple of things that concern me with the random delimiter scheme though:

  1. though the chance of a collision is almost infinitesimally small, it's nonzero and I imagine lots and lots of these messages getting sent. I'm not sure what the target is here for failure probability, but this is probably just me being pedantic.
  2. if the process writing the frame gets borked in a way that prevents it from writing the closing delimiter, the stream never recovers. This might not be a failure mode that we need to worry about though.

Re: needing to know the number of frame bytes: that's why you'd need the 254-byte buffer on the writer side. You only need to lookahead for a 0x00 up to 254 bytes because if it's longer than that you just put 0xff as the frame byte and send the next 254 bytes as-is.

Anyways, mostly I just wanted to throw it out there as an alternative, not advocate to change course, so if these points aren't convincing I think the random delimiter is also a good way to go.

from distributed.jl.

ssfrr avatar ssfrr commented on May 19, 2024

reading my comment, "code byte" is probably a better term than "frame byte". The frame delimiter is a literal 0x00, and the code byte can happen many times within the frame, each time indicating the number of nonzero bytes following.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Oh ya, I see. Do you mind if I implement it before you? Now I'm excited about it.

from distributed.jl.

ssfrr avatar ssfrr commented on May 19, 2024

I wouldn't be able to get to it until the beginning of next week, so go for it!

from distributed.jl.

StefanKarpinski avatar StefanKarpinski commented on May 19, 2024

That's a clever way to do this.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Alright, JuliaLang/julia#13795 can now recover gracefully from deserialize failures by keeping a log of ACKed messages and using frame delimiters to reset the message stream to a working state.

from distributed.jl.

amitmurthy avatar amitmurthy commented on May 19, 2024

Why do we need both? Won't just using frame delimiters be enough? As for the possibility of a (extremely unlikely) collision, it is to be noted that it exists only in the failure case, i.e., upon a deserialization error.

Chances of errors during serialization have reduced after JuliaLang/julia@77b2527

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Well, you want a way for the worker to signal to the client that a
deserialization error happened, and to associate that signal with a specific
message. The worker doesn't know the response_oid of the client's Call msg
because that was part of the garbled message. So messages need to have a
unique ID and senders need to remember the association between message IDs
and messages at least until a successful Ack comes back.

Plus I'm hoping it might lay some groundwork for other applications that
take advantage of a message log, such as debugging of the communication
between workers and maybe eventually replaying messages to get workers
into a known state.

On Wed, Oct 28, 2015 at 11:53 PM Amit Murthy [email protected]
wrote:

Why do we need both? Won't just using frame delimiters be enough? As for
the possibility of a (extremely unlikely) collision, it is to be noted that
it exists only in the failure case, i.e., upon a deserialization error.

Chances of errors during serialization have reduced after 77b2527
JuliaLang/julia@77b2527


Reply to this email directly or view it on GitHub
#18.

from distributed.jl.

amitmurthy avatar amitmurthy commented on May 19, 2024

It may be better to split the message into a "header" and "body" and deserialize in two steps. First we deserialize the header (which should never fail - if it does fail, treat it as a fatal error) and then the function/args. Errors while deserializing function and args are treated the same as errors while executing f(args), i.e., send back an appropriate RemoteException.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Makes sense. If the header is going to be message-agnostic, there'd need to
be a bit of refactoring of the message types so that response_oid is part
of the header instead of the message bodies, and just won't have a meaning
for message types that don't currently have a response_oid.

Otherwise each message type could have its own corresponding header type,
but that seems a bit cumbersome.

On Thu, Oct 29, 2015 at 12:14 AM Amit Murthy [email protected]
wrote:

It may be better to split the message into a "header" and "body" and
deserialize in two steps. First we deserialize the header (which should
never fail - if it does fail, treat it as a fatal error) and then the
function/args. Errors while deserializing function and args are treated the
same as errors while executing f(args), i.e., send back an appropriate
RemoteException.


Reply to this email directly or view it on GitHub
#18.

from distributed.jl.

amitmurthy avatar amitmurthy commented on May 19, 2024

Yes. The header needs to contain just the response_oid which identifies the RemoteRef that stores the result and notify_oid that identifies the ref that the caller task (on the sender side) is waiting on. They may be nothing in some cases where a matching response is not expected.

from distributed.jl.

malmaud avatar malmaud commented on May 19, 2024

Alright, JuliaLang/julia#13795 implements the header mechanism.

from distributed.jl.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.