Coder Social home page Coder Social logo

Comments (12)

mfontanini avatar mfontanini commented on June 15, 2024

That function is listed in the "legacy consumer" part of librdkafka's code. Do you know if it works with the high level consumer in librdkafka?

Otherwise, you can always use Consumer::assign passing the offsets for every partition.

from cppkafka.

varunpatro avatar varunpatro commented on June 15, 2024

Sorry, I'm not sure if that works in the high level consumer.

I did try using Consumer::assign but it fails.

Here's what I did:

auto tp = TopicPartition("test", 0);
tp.set_offset(TopicPartition::OFFSET_END);

cout << tp.get_partition() << tp.get_offset() << endl; // this gives me 0 and -1

consumer.assign({tp});

auto tpl = consumer.get_assignment();
auto offset = consumer.get_offsets(tpl[0]);
count << get<0>(offset) << std<1>(offset) << endl; // this gives me -1001 and -1001

Questions:

  1. I need to get the latest position of a message in a given topic. How do I do this?
  2. I need to seek to a given position in a topic. How do I do this?

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

Can't you just subscribe to the topic and use auto.offset.reset to latest? Using assign with a specific offset should work, there's nothing that cppkafka is doing besides forwarding that call to librdkafka.

Also, what do you mean "it fails"?

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

I've used this on some application I wrote and it works as expected. Did you manage to make it work? Just assign and you'll start getting messages for that offset.

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

Closing as this should work as expected. If you couldn't make it work, please re-open.

from cppkafka.

EvilBeaver avatar EvilBeaver commented on June 15, 2024

Same issue.

cppkafka::TopicPartitionList assignments;
offset = TopicPartition::OFFSET_END;
assignments.push_back({topicName, partition, offset});
consumer->assign(assignments);

cout << consumer->get_offsets(consumer->c->get_assignment()[0]);

always gives an INVALID_OFFSET. And consumer doesn't consume anything.

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

If this were to be an issue, it would be on rdkafka's side. cppkafka is just a very thin layer on top of it.

Have you checked the documentation for that call in rdkafka's side? Quoting it:

If there is no cached offset (either low or high, or both) then RD_KAFKA_OFFSET_INVALID will be returned for the respective offset.

It is possible you're querying the offset too quickly and it's still unknown to the library.

And consumer doesn't consume anything.

Are you producing messages on that topic/partition? Your consumer will be pointing to the end of the log so you need to produce something after you subscribe for the consumer to receive something.

from cppkafka.

EvilBeaver avatar EvilBeaver commented on June 15, 2024

Of course I'm producing on this topic/partition. And bundled kafka console-consumer can read messages. My consumer can't

from cppkafka.

mfontanini avatar mfontanini commented on June 15, 2024

I've just tried this locally using rdkafka v0.11.4 and it works as expected. I do get INVALID_OFFSET after querying right after calling assign but then I get messages and if I query for the offsets again after I get a message, the query does return the (high) offset correctly.

Are you using the latest version of rdkafka? Again, cppkafka is a super thin layer on top of rdkafka so odds are if there's actually an issue, it's on rdkafka's side and you should create a ticket on their repo.

from cppkafka.

EvilBeaver avatar EvilBeaver commented on June 15, 2024

Okay, thank you very much! I'll continue digging things

from cppkafka.

shurazs avatar shurazs commented on June 15, 2024

As far as I could understand, there is a recommendation to use function assign instead of seek.
But assign is quite heavy because are new|delete in its realization.
Is it possible to implement seek function?

from cppkafka.

filimonov avatar filimonov commented on June 15, 2024

Do you know if it works with the high level consumer in librdkafka?

confluentinc/librdkafka#1925

from cppkafka.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.