Comments (4)
from confluent-kafka-go.
thanks Matt, let me paint the scenario we're trying to solve to see if you have any suggestions on how we might go about it.
Our goal is to ensure all messages are fully processed before committing our offset forward. At high volume, you don't want to commit on every message so we'd like to set a ticker so that, let's say, every 2 seconds we commit our highest processed offset. A potential problem with that approach is that there's no guarantee that you processed all the messages ahead of that unless you're tracking it from your starting offset which it doesn't seem we'll be able to get.
Let's say you have a new consumer start-up, it's set to latest offsets and the latest offset for that topic is 100. So it starts getting messages for topic/partition Topic1Partition0 with the message offsets 101,102, etc.. up to 110. We successfully process message 110 when the ticker goes off. We know that 110 is our high offset but we haven't gotten 101-109 so we can't move our pointer ahead to 110 yet, so we skip committing. The ticker goes off again in 2 seconds and by then we know we've done 101-110 so we can commit offset 110. We stop the consumer and restart it, we don't know that we're starting at 110 now with the AssignedPartitions call.
A workaround off the top of my head would be that we know partitions are ordered so we can use the first offset we get on startup for each topic/partition and that becomes our starting value to know when we have a contiguous range of offsets to commit safely.
Unless you know of another approach that might work better? thanks!
from confluent-kafka-go.
librdkafka has a method rd_kafka_committed (also very similar, rd_kafka_position) which will tell you the committed offsets for a list of topic/partitions, but I can't see that exposed in the go client.
What you suggest is sounds correct - the first message received for each topic/partition will give you the start offset - I think you should be able to do what you want with this pretty easily. @edenhill may have some more input when his timezone allows ...
from confluent-kafka-go.
@jiminoc Are you doing async processing of messages? That's the only case you would need to do the type of tracking you're talking about. Many (most?) Kafka applications are written to process messages one at a time, serially. Having to interact with a slow, external system is usually where this assumption of serial processing can break down.
If you are processing asynchronously, you'll need to track when it is safe to commit offsets, but usually you could do that just by tracking the smallest unacked message. You don't usually need to know the starting offset before hand, you just poll() for the first message and its offset will be the first offset you worry about.
There's also a slightly different way to handle commits. Rather than committing safe offsets exactly ever 2 seconds, you can instead track unacked messages and every 2 seconds mark what is still outstanding, wait for it to flush, then commit the offset you were at when you initiated the process. This makes the accounting a little bit simpler (you just need a map to remove acked messages from, waiting until the map is empty to commit). However, to allow processing to continue while waiting for the outstanding data to flush (i.e. so you don't stall), you do need to have 2 lists of outstanding messages: 1 for ongoing processing and 1 to track outstanding messages since the last commit process was initiated. This process is how Kafka connect manages commits for source connectors. It avoids having to also maintain a data structure that can tell you efficiently what the min safe offset to commit is.
from confluent-kafka-go.
Related Issues (20)
- Automatic offset commit led to the loss of some messages HOT 2
- delivery.report.only.error check does not work in case type of the value is not boolean
- Async Producer Panic HOT 1
- Is the producer a tcp keepalive connection
- Signal: bus error when running on macos sonoma 14.4.1 HOT 1
- allow us to unsubscribe from a topic more easily HOT 4
- Producer messages stuck flushing even though there weren't any writes. HOT 4
- Consumers unable to join group until group leader is restarted HOT 3
- Newly added consumers with different version of library in group are not consuming from subscribed topics.
- Schema reference for JSON schemas HOT 1
- consumer recived repeat message after crash
- Does confluent-kafka-go producer PartitionAny is round robin produce message to kafka? HOT 1
- Can we set krb5.conf path for kinit?
- Duplicate messages consumed during rebalancing HOT 2
- Schema Registry misleading error message when unauthorized
- Segmentation violation Error with confluent kafka GO sdk v2 HOT 3
- Can't provide my own http.Client to schemaregistroy.Client via schemaregistry.Config HOT 2
- Question about producer and deliveryChan HOT 3
- go get couldn't download kafka HOT 2
- Producer messages not flushing after Flush()? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from confluent-kafka-go.