Comments (12)
That function is listed in the "legacy consumer" part of librdkafka's code. Do you know if it works with the high level consumer in librdkafka?
Otherwise, you can always use Consumer::assign
passing the offsets for every partition.
from cppkafka.
Sorry, I'm not sure if that works in the high level consumer.
I did try using Consumer::assign
but it fails.
Here's what I did:
auto tp = TopicPartition("test", 0);
tp.set_offset(TopicPartition::OFFSET_END);
cout << tp.get_partition() << tp.get_offset() << endl; // this gives me 0 and -1
consumer.assign({tp});
auto tpl = consumer.get_assignment();
auto offset = consumer.get_offsets(tpl[0]);
count << get<0>(offset) << std<1>(offset) << endl; // this gives me -1001 and -1001
Questions:
- I need to get the latest position of a message in a given topic. How do I do this?
- I need to seek to a given position in a topic. How do I do this?
from cppkafka.
Can't you just subscribe to the topic and use auto.offset.reset
to latest
? Using assign with a specific offset should work, there's nothing that cppkafka
is doing besides forwarding that call to librdkafka
.
Also, what do you mean "it fails"?
from cppkafka.
I've used this on some application I wrote and it works as expected. Did you manage to make it work? Just assign and you'll start getting messages for that offset.
from cppkafka.
Closing as this should work as expected. If you couldn't make it work, please re-open.
from cppkafka.
Same issue.
cppkafka::TopicPartitionList assignments;
offset = TopicPartition::OFFSET_END;
assignments.push_back({topicName, partition, offset});
consumer->assign(assignments);
cout << consumer->get_offsets(consumer->c->get_assignment()[0]);
always gives an INVALID_OFFSET. And consumer doesn't consume anything.
from cppkafka.
If this were to be an issue, it would be on rdkafka's side. cppkafka is just a very thin layer on top of it.
Have you checked the documentation for that call in rdkafka's side? Quoting it:
If there is no cached offset (either low or high, or both) then RD_KAFKA_OFFSET_INVALID will be returned for the respective offset.
It is possible you're querying the offset too quickly and it's still unknown to the library.
And consumer doesn't consume anything.
Are you producing messages on that topic/partition? Your consumer will be pointing to the end of the log so you need to produce something after you subscribe for the consumer to receive something.
from cppkafka.
Of course I'm producing on this topic/partition. And bundled kafka console-consumer can read messages. My consumer can't
from cppkafka.
I've just tried this locally using rdkafka v0.11.4 and it works as expected. I do get INVALID_OFFSET after querying right after calling assign
but then I get messages and if I query for the offsets again after I get a message, the query does return the (high) offset correctly.
Are you using the latest version of rdkafka? Again, cppkafka is a super thin layer on top of rdkafka so odds are if there's actually an issue, it's on rdkafka's side and you should create a ticket on their repo.
from cppkafka.
Okay, thank you very much! I'll continue digging things
from cppkafka.
As far as I could understand, there is a recommendation to use function assign
instead of seek
.
But assign
is quite heavy because are new|delete in its realization.
Is it possible to implement seek
function?
from cppkafka.
Do you know if it works with the high level consumer in librdkafka?
from cppkafka.
Related Issues (20)
- Cannot pull using consumer HOT 1
- poll_batch does not trigger commit when auto.commit is true ? HOT 1
- How to remove topic using the API
- Consumer misbehavior with newer librdkafka versions HOT 1
- Can't build tests on Ubuntu 22.04 (Jammy)
- produer success but kafka shell consumer all "null" HOT 1
- How to use BufferedProducer to send binary data?
- Dll import function not allowed
- create BufferedProducer<std::string> producer globally
- Is there any way to build it without "boost"? HOT 1
- Please help me, I have tried a variety of methods to compile, all failed, including using the method provided by chatGPT 4.0. also failed. HOT 1
- How to set ttl?
- How to get number of messages in the topic?
- add_library(Rdkafka::rdkafka) issue
- CPPKafka crashes when trying to create a configuration in Windows
- What happens if Kafka was offline ? HOT 1
- Automate Compilation of "examples" in Build Process
- warning: object backing the pointer will be destroyed at the end of the full-expression
- cppkafka::produce not flushing
- clang complains about parameter used in std::atomic_flag constructor
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from cppkafka.