Coder Social home page Coder Social logo

Comments (4)

mhowlett avatar mhowlett commented on August 21, 2024

from confluent-kafka-go.

jiminoc avatar jiminoc commented on August 21, 2024

thanks Matt, let me paint the scenario we're trying to solve to see if you have any suggestions on how we might go about it.

Our goal is to ensure all messages are fully processed before committing our offset forward. At high volume, you don't want to commit on every message so we'd like to set a ticker so that, let's say, every 2 seconds we commit our highest processed offset. A potential problem with that approach is that there's no guarantee that you processed all the messages ahead of that unless you're tracking it from your starting offset which it doesn't seem we'll be able to get.

Let's say you have a new consumer start-up, it's set to latest offsets and the latest offset for that topic is 100. So it starts getting messages for topic/partition Topic1Partition0 with the message offsets 101,102, etc.. up to 110. We successfully process message 110 when the ticker goes off. We know that 110 is our high offset but we haven't gotten 101-109 so we can't move our pointer ahead to 110 yet, so we skip committing. The ticker goes off again in 2 seconds and by then we know we've done 101-110 so we can commit offset 110. We stop the consumer and restart it, we don't know that we're starting at 110 now with the AssignedPartitions call.

A workaround off the top of my head would be that we know partitions are ordered so we can use the first offset we get on startup for each topic/partition and that becomes our starting value to know when we have a contiguous range of offsets to commit safely.

Unless you know of another approach that might work better? thanks!

from confluent-kafka-go.

mhowlett avatar mhowlett commented on August 21, 2024

librdkafka has a method rd_kafka_committed (also very similar, rd_kafka_position) which will tell you the committed offsets for a list of topic/partitions, but I can't see that exposed in the go client.

What you suggest is sounds correct - the first message received for each topic/partition will give you the start offset - I think you should be able to do what you want with this pretty easily. @edenhill may have some more input when his timezone allows ...

from confluent-kafka-go.

ewencp avatar ewencp commented on August 21, 2024

@jiminoc Are you doing async processing of messages? That's the only case you would need to do the type of tracking you're talking about. Many (most?) Kafka applications are written to process messages one at a time, serially. Having to interact with a slow, external system is usually where this assumption of serial processing can break down.

If you are processing asynchronously, you'll need to track when it is safe to commit offsets, but usually you could do that just by tracking the smallest unacked message. You don't usually need to know the starting offset before hand, you just poll() for the first message and its offset will be the first offset you worry about.

There's also a slightly different way to handle commits. Rather than committing safe offsets exactly ever 2 seconds, you can instead track unacked messages and every 2 seconds mark what is still outstanding, wait for it to flush, then commit the offset you were at when you initiated the process. This makes the accounting a little bit simpler (you just need a map to remove acked messages from, waiting until the map is empty to commit). However, to allow processing to continue while waiting for the outstanding data to flush (i.e. so you don't stall), you do need to have 2 lists of outstanding messages: 1 for ongoing processing and 1 to track outstanding messages since the last commit process was initiated. This process is how Kafka connect manages commits for source connectors. It avoids having to also maintain a data structure that can tell you efficiently what the min safe offset to commit is.

from confluent-kafka-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.