Coder Social home page Coder Social logo

Question: Working on Kafka protocol, what're good ways to work with a protocol with almost 400 payload types? about bedrockframework HOT 17 CLOSED

davidfowl avatar davidfowl commented on May 25, 2024 6
Question: Working on Kafka protocol, what're good ways to work with a protocol with almost 400 payload types?

from bedrockframework.

Comments (17)

AndyPook avatar AndyPook commented on May 25, 2024 3

Hey, thanks for checking in...

Both Tim and I are no longer working for clients that use Kafka. So a lot of the motivation has gone.
Sad as I was learning things, but other pressures have pushed this way down on "the list".

Speaking only for my own work, https://github.com/AndyPook/A6k.Kafka made some decent progress. It can talk to a broker, process responses etc. I was working on the group management part. All of which was really "just code" nothing to do with bedrock. Trying to understand the Kafka message flows was a bit of a challenge :) I was in the process of testing the Consumer Group bits which would have meant it was close to having a "real" consumer that would have played nice with the Confluent.Kafka clients. I mean you could have had a6k and confluent clients in the same consumer group. Quite exciting!

From a bedrock pov, I think the approach was pretty reasonable. The IValueTaskSource stuff is interesting. Though it need pushing through to other parts. Otherwise, once you have an appropriate message loop working and you can move up an abstraction layer or two then it's really just "normal" coding. It's amazing being able to write that sentence. Being able to write a handful of classes to deal with raw sockets is a great sign that the underlying framework is in good shape.

The bits that were frustrating was down to the Kafka protocol. It needs a lot of blocks to be written so you can scan back and gzip or generate a crc, then write that via bedrock. It makes the code look quite messy. There isn't a "nice" pattern (at least there wasn't at that point).

Feel free to take whatever you want from a6k. Feel free to ask questions. Maybe I'll refind some motivation :)

One other thing that did come out of this small experience was an even greater respect for anyone that runs an OSS project.
To anyone that reads this... please be nice to the maintainers!!

from bedrockframework.

AndyPook avatar AndyPook commented on May 25, 2024 2

I managed to get Fetch to work!!!
The doc lies!! (at least there's plenty of room for interpretation).

RecordSet is tricky. It contains arrays but some of them has no count prefix. You just keep reading until you've consumed the specified number of bytes.
Some things are varint, some are signed-varint.
This was made super hard due to WireShark also not understanding the Fetch response. Just says "Malformed Packet" 😬

Plenty more to do here: compression; crc checking (this one uses crc32, not crc32c used elsewhere).
This only works with a single broker, against a topic with a single partition (or where you can guarantee all the lead replicas are on that broker).

Next step is using cluster metadata (=> multiple connections) and using JoinGroup properly. Which leads to the same kind of pre-fetching, internal queue malarkey that librdkafka has.

btw: "toppar" (seen all over librdkafka) means topic-partition tuple thing. Used as a key to queues and some internal housekeeping.

Anyway... getting Fetch to work feels like success 😀

from bedrockframework.

davidfowl avatar davidfowl commented on May 25, 2024 1

Wonderful! I'm looking forward to that contribution!

As a followup, would Bedrock.Framework.Experimental be a good place to try novel approaches to common issues in this space?

Yes, make a kafka folder and lets start iterating on it. It doesn't need to be fast from the get go. We'll learn more as we write more parsers. Then you can write those whitepapers yourself 😉

from bedrockframework.

jkotalik avatar jkotalik commented on May 25, 2024 1

@ChadJessup I'd be interesting in giving technical guidance as well. I encourage you to do the same!

from bedrockframework.

jzabroski avatar jzabroski commented on May 25, 2024

How does PluralSight do it in Scala? They open sourced Hydra which is built on top of Kafka. I can't really read Scala code but that might be a place to think of ideas.

from bedrockframework.

ChadJessup avatar ChadJessup commented on May 25, 2024

How does PluralSight do it in Scala? They open sourced Hydra which is built on top of Kafka. I can't really read Scala code but that might be a place to think of ideas.

I'll look into that project. I've been referring to librdkafka when docs and Wireshark fail me. But raw C code has some benefits on this front that I don't believe can be leveraged in C#.

from bedrockframework.

AndyPook avatar AndyPook commented on May 25, 2024

Hi, just started having a go at this.
Got some basic req/res pipelining. A handful of message types. Can produce a message. Yay!

There are only 40ish requests. It's the versioning that is a pain. It appears that librdkafka only really deals with certain API versions. So I've just been tying into those. Although the versions are generally additive, so was thinking some if(v>X) would be sufficient to start. Then the readers can be static.
The biggest pita are all the prefixed blocks. I've been using a memory.shared based IBufferWritter to buffer the block before writing the length/CRC prefix and copying the buffer. It'd be nice if there was a built-in pattern for prefixes.

The other "problem" is that the Kafka protocol doc is spread around various places and the protocol bnf is somewhat consistent (wirehark to the rescue!)

A long way to go to get consumer group orchestration and message consuming to work.
Then higher level client facades.
Then....
...

I'll stick what I have on github soon

from bedrockframework.

AndyPook avatar AndyPook commented on May 25, 2024

https://github.com/AndyPook/A6k.Kafka

Just a start.
Only works with single broker instances.
No Record Batching yet, it would need per topic/partition queues.
Works ok with single req/res. Needs some "manager/orchestrator" types for JoinGroup/SyncGroup chat.

TestConsole is a dumb demo console app for "testing" against a kafka instance (I use single-node\docker-compose.yml to run the test instance).

from bedrockframework.

ChadJessup avatar ChadJessup commented on May 25, 2024

Nice! You and I have gone down some very similar paths (I'll be putting up a preliminary PR here soon) - even down to our docker-compose setup. :)

As for the prefixes, that was, and always has been, one of the biggest pains I've dealt with when parsing protocols like this. The first bytes usually being a size is great when reading, but with writing I've always had a difficult time trying to efficiently calculate the size and then write the payload without double-scanning, or having risky code like:

        private const int constantPayloadsize =
            sizeof(short) // acks
            + sizeof(int) // timeout
            + sizeof(int) // topics array count
            + sizeof(short) // topic array name length
            + sizeof(int) // single partition index
            + sizeof(long) // offset
            + sizeof(int) // message set size
            + sizeof(int) // message size
            + sizeof(int) // key length size
            + sizeof(int) // value length size
            ;

        public override int GetPayloadSize()
        {
            return constantPayloadsize
                + this.Topics.Sum(t => t.TopicName.Length)
                + (this.keyLength ?? 0)
                + (this.valueLength ?? 0);
        }

To try and deal with this pain, I just spent a day or two prototyping out a generic PayloadWriter class that let's me deal with this somewhat common scenario:

        public void WritePayload(ref PayloadWriterContext context)
        {
            var pw = context.CreatePayloadWriter()
                .StartCalculatingSize("message")
                    .StartCrc32Calculation()
                        .Write(this.Magic)
                        .Write(this.Attributes)
                        .Write(this.Key, this.KeyLength)
                        .Write(this.Value, this.ValueLength)
                    .EndCrc32Calculation()
                .EndSizeCalculation("message");
        }

It's letting me cleanup a significant amount of code/extension methods, gives me the 'shape' of the payload better, chaining these PayloadWriters allows reusable payload chunks, and should make versioning easier to deal with. Custom protocol needs are extension methods.

An entire schema would look like this combined (this is actually spread across multiple methods, but doesn't need to be:

Produce Request Version 0:

var pw = new PayloadWriter(isBigEndian: true));
    .StartCalculatingSize("totalSize")
      .Write(message.ApiKey);
      .Write(message.ApiVersion);
      .Write(correlationId);
      .WriteNullableString(ref clientId) // extension method for unique protocol
      .Write(message.Acks)
      .Write(message.Timeout)
      .WriteArray(message.Topics, this.WriteTopic) // this.WriteTopic is actually another method, but it writes unique protocol array delimiter
        .WriteString(topic.TopicName)
        .WriteArray(topic.Partitions, this.WritePartition)
          .Write(partition.Index)
          .StartCalculatingSize("messageSetSize")
            .Write(offset)
            .StartCalculatingSize("message")
              .StartCrc32Calculation() // extension method
                  .Write(message.Magic)
                  .Write(message.Attributes)
                  .Write(message.Key, message.KeyLength)
                  .Write(message.Value, message.ValueLength)
              .EndCrc32Calculation()
            .EndSizeCalculation("message");
          .EndSizeCalculation("messageSetSize");
    .EndSizeCalculation("totalSize");

pw.TryWritePayload(out ReadOnlySequence<byte> payload);

@AndyPook - Thoughts? Would something like this have helped with your efforts?

from bedrockframework.

AndyPook avatar AndyPook commented on May 25, 2024

ha! fluent all the things!

Yeah, I probably would have used that. Still, looks like you'd need to take care with the Start/End. And Format-Document would probably mess with the indentation?
And you prob need more options for all the int/long/varint/varintlong/signed-varint shenanigans. Though that's just more extensions (like for WriteNullableString), right?

I've ended up with a bit of a pattern that "works" (kinda). I've gone down the extension-method all the things route. I was hoping that once the API read/write stuff was "done" then we could just bury it under the covers. As long as it's "readable/debuggable enough", right? it's just a pita to begin with.
I do like fluenty approach though :)

Do you think there's something to be done on the read side too?
All the if(!TryRead) return false noise, is a bit tiresome (though I understand the need)

from bedrockframework.

AndyPook avatar AndyPook commented on May 25, 2024

added super simple (bordering on willfully dumb) Consumer.
Discovers which broker the topic is hosted on, makes Fetch req to that/those brokers.
Has ZERO understanding of consumer groups.

The DI makes new'ing a Consumer a little awkward compared to Confluent. Suggestions welcome.

from bedrockframework.

AndyPook avatar AndyPook commented on May 25, 2024

To speak to the actual question asked (what to do about 400 payloads). Still a good question...

It seems that a lot of the versions for a lot of Requests are actually identical in format. In those cases it really just tells the broker what Response version to reply with. The Response versions just add extra fields (mostly). So just some if(version>x) conditional decoding ought to work.

However, how to get that version into the IMessageReader? There's not "context" concept, so we'd need to new up a new reader for each message?

Also, it seems that librdkafka will use the version just before TAGGED_FIELDS are introduced.

NOTE: I don't think https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#supported-protocol-versions is completely up-to-date. eg I see it sending v11 FETCH requests when that page says it only support v4.

from bedrockframework.

AndyPook avatar AndyPook commented on May 25, 2024

I think what I have deals with the wire protocol in an "ok" fashion.

The problem now is dealing with the higher level coordination (eg Metadata and Consumer Groups) which isn't really a bedrock thing. Kafka is "interesting" in that the clients must be pretty smart and do quite a lot of client coordination while only having a req/resp with the broker. The brokers do not take any responsibility. It is even up to one of the clients in the group to assign partitions to the rest of the members. I guess this does mean that clients can then do "interesting" things without being constrained by broker opinions. But it does make writing good clients fairly tricky.

One thing I have found difficult is finding good, definitive doc for various things. It requires reading a lot of cwiki proposals which it is unclear if they are current. Plus tracking all the KIPS.
I've been trying to follow some of the other clients (librdkafka, scala, a few in golang). They all have slightly different interpretations.

But then, if it was easy, everyone would be doing it 😄

from bedrockframework.

ChadJessup avatar ChadJessup commented on May 25, 2024

Just an fyi, I'll be a bit slow to respond here due to vacation.
I also noticed the Kafka docs are hard to follow, and since the various clients only having partial implementations, it's doubly difficult to track down the right direction.

The Java client is pure Java, whereas the rest seem to be using librdkafka, which isn't as full featured as the Java client. :(

One thing I'm not sure on, is if Bedrock should contain an entire Kafka client, or just enough of the primitives and helper code to try and benefit other protocols? Perhaps @davidfowl has more thoughts on that aspect?

from bedrockframework.

AndyPook avatar AndyPook commented on May 25, 2024

There are some decent looking pure go libraries out there. I've been looking at https://github.com/Shopify/sarama (because a I think they actually use it) as much to learn a bit about golang as anything else.
And https://github.com/confluentinc/kafka has some useful bits under the "clients" folder. But they are missing await so it gets a bit noisy with all the "listeners" and callbacks.

My assumption was that Bedrock was a bit of a playground for ideas that may (or may not) get pulled into System. (or Microsoft.) packages. Experimental being some "demos" to see if the patterns work out.
Sort of the reason that I put my effort in a separate repo that just takes a package ref.

(maybe/probably making huge/inappropriate assumptions here)

from bedrockframework.

ChadJessup avatar ChadJessup commented on May 25, 2024

@AndyPook - can't add you directly to the PR for some reason. I'd love for some of your feedback on #68

from bedrockframework.

henningms avatar henningms commented on May 25, 2024

Has there been any more progress on the Kafka initative here? 😁 Very interesting!

from bedrockframework.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.