Coder Social home page Coder Social logo

mfontanini / cppkafka Goto Github PK

View Code? Open in Web Editor NEW
579.0 579.0 207.0 757 KB

Modern C++ Apache Kafka client library (wrapper for librdkafka)

License: BSD 2-Clause "Simplified" License

CMake 3.85% C++ 94.34% C 1.81%
apache-kafka kafka librdkafka rdkafka

cppkafka's People

Contributors

accelerated avatar amirshavit avatar arvidn avatar azat avatar bnaecker avatar brieucnx avatar demin80 avatar dmpas avatar evilbeaver avatar farnazj avatar filimonov avatar ilejn avatar jlcordeiro avatar kraj avatar lesnyrumcajs avatar mfontanini avatar multiprogramm avatar neatlife avatar pavel-pimenov avatar proller avatar psigen avatar sachnk avatar shashank88 avatar snar avatar spaceim avatar ych avatar zenonparker avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cppkafka's Issues

release versioning

Hey --

I'm currently using a clone of master in my project from last week. But I noticed that there were some commits into master recently.

Is it possible to use GitHub's tag / release feature so that we can consistently download a snapshot of a particular version? Right now, I see there is only 1 release from awhile ago.

Thanks!

Question on rd_kafka_wait_destroyed

I notice that your library is not calling the optional but highly recommended rd_kafka_wait_destroyed(). Would it be worth implementing either some static method which users could call directly to destroy the library or even better an RAII object which upon scope exit would call rd_kafka_wait_destroyed ?

Windows Compiling

I already faced with a lot of problems while compiling. I want to recommend a way to compile easily on Windows.

  1. Install nuget
  2. Install librdkafka and boost with following links:
    https://www.nuget.org/packages/librdkafka.redist/
    https://www.nuget.org/packages/boost
  3. Compiling
    mkdir build
    cd build
    cmake .. -DBOOST_INCLUDEDIR="<path_to_nuget>\nuget-3.5.0\boost.1.64.0.0\lib\native\include"
    -DRDKAFKA_LIBRARY="<path_to_nuget>\nuget-3.5.0\librdkafka.redist.0.9.5\runtimes\win7-x64\native"
    -DRDKAFKA_INCLUDE_DIR="<path_to_nuget>\nuget3.5.0\librdkafka.redist.0.9.5\build\native\include"
    make

Examples binaries not being generated.

Hi there, I was checking your wrapper and wanted to test it out but after I build it the examples binaries are not being generated due to the: EXCLUDE_FROM_ALL option.
Any reason for that? Or am I doing the cmake wrong, because otherwise the binaries of the examples are not being generated.
Thanks for your time!

Failed to find valid rdkafka version when trying to compile cppkafka

Hi all,
I'm trying to build cppkafka on a vm ubuntu 16.04 amd64 with gcc 5.0.4 , cmake 3.10.0 and Boost 1.63.0 . I've built librdkakfa version v0.11.3.
In a local folder I've created two subfolders , include and lib. The include contains rdkafka.h and the lib the librdkafka.so.1.
When I try to build the cppkafka with the following command :
cmake .. -DRDKAFKA_ROOT_DIR=/home/user/local_folder -DRDKAFKA_LIBRARY=/home/user/local_folder/lib -DRDKAFKA_INCLUDE_DIR=/home/user/local_folder/include
I get the following printout:

-- The C compiler identification is GNU 5.4.0
-- The CXX compiler identification is GNU 5.4.0
-- Check for working C compiler: /usr/bin/cc
-- Check for working C compiler: /usr/bin/cc -- works
-- Detecting C compiler ABI info
-- Detecting C compiler ABI info - done
-- Detecting C compile features
-- Detecting C compile features - done
-- Check for working CXX compiler: /usr/bin/c++
-- Check for working CXX compiler: /usr/bin/c++ -- works
-- Detecting CXX compiler ABI info
-- Detecting CXX compiler ABI info - done
-- Detecting CXX compile features
-- Detecting CXX compile features - done
-- Build will generate a shared library. Use CPPKAFKA_BUILD_SHARED=0 to perform a static build
-- Boost version: 1.63.0
-- Found RDKAFKA: /home/user/local_folder/lib  
CMake Error at cmake/FindRdKafka.cmake:39 (message):
  Failed to find valid rdkafka version
Call Stack (most recent call first):
  CMakeLists.txt:40 (find_package
```)

What am I doing wrong here?

install incomplete

Hey --

The current master (5cad740aea9fe0be5cef01c9959d8cb759c07c91) doesn't seem to install properly. In particular, after doing make install, my application fails to compile due to:

/usr/local/include/cppkafka/consumer.h:41:10: fatal error: detail/callback_invoker.h: No such file or directory

On further inspection, it seems that the detail directory is not part of the CMake installation (include/cppkafka/CMakeLists.txt):

# Install headers including the auto-generated cppkafka.h
file(GLOB INCLUDE_FILES "*.h")
file(GLOB UTILS_INCLUDE_FILES "utils/*.h")
install(
    FILES ${INCLUDE_FILES}
    DESTINATION include/cppkafka
    COMPONENT Headers
)
install(
    FILES ${UTILS_INCLUDE_FILES}
    DESTINATION include/cppkafka/utils/
    COMPONENT Headers
)

This could be intentional, as the detail directory could be for private headers included by implementation. If that's the case, then publicly consumed header files shouldn't include anything from detail. Alternatively, we could simply add the detail directory to the CMake file installation recipe.

Run-time errors while running consumer function

Hey! I am trying to implement a producer and a consumer functions in cppkafka for windows. The example code for the producer works OK. But I am having problems with the provided consumer code in the examples folder.
I set up the zookeeper and kafka servers and ran a producer client in cmd.
Even when I implemented a simple code (in MSVS 2015) with basic elements such as

Configuration config = {
{ "metadata.broker.list", "127.0.0.1:9092" }
};
Consumer consumer(config);
consumer.subscribe({ "test" });

I am receiving a Debug error when I run the executable (i.e., consumer.exe). It says abort() has been called. Any ideas? Can you provide the simplest consumer code such as the producer one in the readme.md???

Is it possible to create a topic with specific partition number

Hi, I know Kafka creates topic when producing the messages if the topic did not exist. In this situation, Kafka creates topic using default configs like num.partitions.

I want to create my desired topic, if not exist, with my desired partitions without changing default configurations. Is it possible using the cppkafka?

Common header file (suggestion)

Would be nice to have a single header file which includes everything one might need when using cppkafka instead of including several header files. This common header should also include the util classes. It would be similar to rdkafka.h for instance. It's always easier to just include a single file and not have to worry about other dependencies.

suggestion: cppkafka/cppkafka.h

Thoughts?

Feature proposal for individual partition round-robin queue polling for consumers

Hi @mfontanini, so in the light of what I discovered in reference to the behavior of the global queue forwarding (see comments here), there is definitely a need for round-robin polling on the partitions instead of the default behavior which is to consume one batch from partition A, followed by another batch from partition B, etc. This might be OK with some applications but it's not for my case and it's quite detrimental if not done properly. So the proposal would be to support both types of consume scenarios in your library.

  1. Enabling the feature could be done by:
  • Provide some setter in the consumer class which would tell which polling mechanism to use (could turn on/off the feature at will). The setter would call rd_kafka_poll_set_consumer(NULL) if it's disabled.
  • Add a new constructor with an additional parameter which would only enable/disable the type of polling once per object. In this case we simply would not call rd_kafka_poll_set_consumer(handle).
  • Add a default parameter to the existing constructor which might break ABI (not desired).
  • Add a cppkafka-specific configuration option for consumers which could be passed as part of the current constructor Consumer(Configuration config). This could be the least invasive.
  1. Adding a Queue wrapper class to manage the queue handle returned by rd_kafka_queue_get_partition. The destructor would call the appropriate destroy function in librdkafka. This could be implemented in a similar way to your Topic::HandlePtr unique ptr with a destructor template argument.

  2. Changing polling method by:

  • keep existing Consumer::poll() function and call rd_kafka_consumer_poll() if partition polling is not enabled.

    If enabled do:

  • call rd_kafka_poll() to get general events.

  • for each topic partition in the topic partition list (a copy of the topic partition list will be kept during the call to assign - see 4) call rd_kafka_consume_queue(). Note that only one partition will be consumed on each call to Consumer::poll() as a static round-robin counter will be kept internally to the function.

  • return the message if any (or even better, poll until one valid message is returned or all partitions have been polled)

    3.a) Alternatively keep poll() unchanged and add poll_partitions() function however in this case some error must be thrown if the user enables partition polling and then calls poll().

  1. Upon rebalance assign, keep a local copy of the TopicPartionsList. Upon revoke, clear this list. Perhaps instead of keeping a copy of the TopicPartitionsList, when assign is called, we can at that point create a list of Queue objects belonging to each partition. That would be the simplest way.

include cppkafka

Hi,

I am new working with protocols and I would like to use the librdkafka with your cppkafka client library. Unfortunately, I am not able to include your library correctly. I followed your instructions creating a build folder, run cmake and make on ubuntu 16.04. The commands run successfully.
In the end the build/include folder only shows libcppkafka.so and libcppkafka.so.0.1. The rdkafka.h file or the librdkafka library are not shown as described in your readme file (Note that finding librdkafka will succeed iff there's an include and lib directories inside the specified path, including both the rdkafka.h header and the librdkafka library file.).

Now I am wondering what I did wrong. Did I use the wrong directory for build? Has the cppkafka repository to be checked out inside the librdkafka repository? Or is anything else wrong/missing?
I am sorry asking this (probably simple) question but I hope you can help me anyway.

Thanks for your reply.

QUESTION: Development model

Hi! Thanks for the lib!

One question: if I want to use this lib as a submodule. Where is 'stable' branch or commit?
Now it has only one tag v0.1 and it's release date is quite old (6.6.2017)
Is it only one stable point of code or branch 'master' is also stable enough?

How to deliver message with different topic in one routine

Hi,

I am writing a piece of code to send different topic messages to kafka cluster.
But the program got stuck in the producer.poll() API while sending the second topic's message...

The message sending function is as follows:

int kafkaProducer::produceMessage(u8_t* pBuffer, u32_t length, string &topic_name)
{
    MessageBuilder builder(topic_name);
    //builder.topic(topic_name);

    Configuration config = {
        { "metadata.broker.list", kafkaBrokers },
        { "queue.buffering.max.messages", MAX_BUFFERING_MESSAGES}
    };
    config.set_delivery_report_callback(deliveryReportCallback);
    Producer producer(config);

    Buffer buffer(pBuffer, length);
    builder.payload(buffer);
    //TODO: free pBuffer, // maybe in deliveryReportCallback ?

    producer.produce(builder);

    while (producer.get_out_queue_length() > 0) {
        producer.poll();
    }

    return 0;
}

Execution scenario is as follows:

    string topic1 = "topic1", topic2 = "topic2"
    produceMessage(..., topic1); // fine
    produceMessage(..., topic2); // Got stuck in producer.poll()

I managed to use BufferedProducer instead, but got the same result...
And it works fine if I send multiple messages to the same topic.

Thanks,
Mike Xu

Question: How to detect (un)successful connection to a Kafka broker?

Hey!

I'm trying to use cppkafka and like it so far!

However, I can't seem to find a way to know what's going on with the connection itself.
I've set up a producer, but it seems to asynchronously throw once it determines a connection to a broker can't be established. Can I register a callback or use any other mechanism to capture/handle such situation myself instead of it crashing my binary?

Bonus question: is it possible to connect to a broker through Zookeeper's proxy rather than specify explicit broker list myself?

How to deal with the issue of local queue full?

Hi,

I'd like to send 1 million messages to Kafka broker in one loop (queue.buffering.max.messages parameter configured to 1 million).

Pseudo code is as follows:

Buffer buffer(buf, length);  // buf has already been allocated.
builder.payload(buffer);
for(long i = 1; i <= 1000000; i++) {
    producer.produce(builder);
}
while (producer.get_out_queue_length() > 0) {
    producer.poll();
}

===

The above code works fine (1 million msgs sent successfully) in case of buffer length is 740 bytes, but it will be failed with below exception (after delivering 370,000+ messages) in case of buffer length increasing to 2855 bytes.
terminate called after throwing an instance of 'cppkafka::HandleException'
what(): Local: Queue full
./pump: line 1: 8797 Aborted

Do you have any suggestion on this issue? Any other parameters need to be configured for the producer?
Or I need to split the loop into several pieces, e.g. sending 100,000 messages every time?

Thanks & best regards,
Mike

Writing failed message on a file

Hey, I want to write a callback function that writes failed messages to a file. Is it possible using CppKafka and if yes what method can I use? Is this approach an standard policy facing not delivered messages?

Producer: 1 second delay?

I compiled examples/kafka_producer.cpp. Messages were produced to Kafka 1 second (plus a few milliseconds) after produce(builder) was called.
Calling produce() multiple times makes all messages that should be produced into one partition being produced (to brokers) 1 second after the first message in the partition is produce()'d.

To demonstrate the problem, I modified the source code slightly. It now produces 10 messages. Before each produce(builder), a time stamp is printed. After each produce(builder), there is a 50ms usleep().

Source code is uploaded to my repository.

The topic has 2 partitions.

Producer: (50ms intervals between messages)

[2017-07-31 17:22:22.941] 1
[2017-07-31 17:22:22.992] 2
[2017-07-31 17:22:23.042] 3
[2017-07-31 17:22:23.092] 4
[2017-07-31 17:22:23.142] 5
[2017-07-31 17:22:23.192] 6
[2017-07-31 17:22:23.242] 7
[2017-07-31 17:22:23.293] 8
[2017-07-31 17:22:23.343] 9
[2017-07-31 17:22:23.393] 10

Consumer: Based on shopify/sarama in Go language. When it is used with other producers, the delay is usually no more than a few milliseconds. Thus, the time stamp is almost equal to the time that the brokers receives the message from the producer.

[2017-07-31 17:22:23.944] Partition: 1, Offset: 183 Key: [1] Payload: [1]
[2017-07-31 17:22:23.944] Partition: 1, Offset: 184 Key: [2] Payload: [2]
[2017-07-31 17:22:23.944] Partition: 1, Offset: 185 Key: [3] Payload: [3]
[2017-07-31 17:22:23.944] Partition: 1, Offset: 186 Key: [8] Payload: [8]
[2017-07-31 17:22:23.944] Partition: 1, Offset: 187 Key: [9] Payload: [9]
[2017-07-31 17:22:23.944] Partition: 1, Offset: 188 Key: [10] Payload: [10]
[2017-07-31 17:22:24.093] Partition: 0, Offset: 117 Key: [4] Payload: [4]
[2017-07-31 17:22:24.093] Partition: 0, Offset: 118 Key: [5] Payload: [5]
[2017-07-31 17:22:24.093] Partition: 0, Offset: 119 Key: [6] Payload: [6]
[2017-07-31 17:22:24.093] Partition: 0, Offset: 120 Key: [7] Payload: [7]

Ubuntu 14.04 LTS x64, Kafka 0.9.0.1, current and only release

Question: Why does the program's performance fall when Kafka server is not available?

Hello. Please, help me to understand this situation.

There is a function in which messages come in succession. In this function, there are two threads that process the message simultaneously. The function is terminated when both threads are finished.

In the first thread in a certain time interval (let 3 seconds, after the given time I do 'flush') the messages are saved to a file. After that, a new one is created and everything is repeated.

The second thread is sending to Kafka server.

When the silver is available all is well.
But as soon as the server is turned off, Kafka server returns an error and the first thread has a very strong delay.

I thought it was connected to the message buffer. Played with instructions https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Config:
{ "enable.auto.commit", false }, { "queue.buffering.max.ms", 0 }, { "message.copy.max.bytes", 0 }, { "message.send.max.retries", 0 }

TopicConfig:
{ "auto.offset.reset", "smallest" }, { "request.required.acks", 0 }, { "message.timeout.ms", 200 }

But nothing good happened.

It is possible to change topic for MessageBuilder object

Consider the code blow:

MessageBuilder builder (str); // str is a topic name.

now I want to change the topic for builder:
builde(new_str); // new_str is another topic

But this is errorful. Is there any way to set another topic for builder?

problem in generating docs

Hi,

when I try to generate docs in build directory of cppkafka, it errors:

make: *** No rule to make target 'docs'. Stop.

I have ubuntu 16.04 and doxygen is installed.

How can I fix that?

How to send struct to consumers?

I have a structure as shown below:

typedef unsigned char byte;

struct Payload {
    unsigned int width;
    unsigned int height;
    vector<byte> data;

    Payload(unsigned int width, unsigned int height, vector<byte> data) :
            width(width),
            height(height),
            data(std::move(data)) {}
};
//producer
vector<byte> data;
data.assign(frame.datastart, frame.dataend);
Payload payload(100, 100, data);
vector<byte> buffer(sizeof(object));
memcpy(&buffer[0], &object, sizeof(object));
builder.payload(Buffer(buff))
//consumer
const cppkafka::Buffer &buffer = msg.get_payload();

unsigned char data[buffer.get_size()];
for (int i = 0; i < buffer.get_size(); i++) {
     data[i] = buffer.get_data()[i]; 
}
Payload *payload = reinterpret_cast<Payload *>(data);
cout << payload->data.size() << endl; // 30000
cout << payload->data[0] << endl; // ERROR

And I aim to send it to consumers. First, I serialize it to array of bytes, then create buffer and send it to consumer from producers.

On consumer side I deserialize the message, and try to get values from payload's data field. Unfortunately I get an error. But, I can get width and height fields. Where am I doing wrong?

Unresolved external when linking into DLL on Windows

Sucessfully built both rdkafka and cppkafka. Both *.lib are specified in linker input

LNK2019	unresolved external symbol __imp__rd_kafka_conf_new referenced in function 
"public: __thiscall cppkafka::Configuration::Configuration(void)" (??0Configuration@cppkafka@@QAE@XZ) 
d:\project-path\cppkafka.lib(configuration.obj)

and many similar errors

It seems, that linker can't find librdkafka functions, which are used in cppkafka
Any ideas why this happens?

image

relative time as timestamp

The message timestamp is expressed as a relative time duration (std::chrono::milliseconds) here.

That seems like the wrong type, it really should be a time_point, right?

How can I set debug configuration property

I have a CppKafka producer code that I run the program using command:
./connector -b kafka1:9092,kafka2:9092,kafka3:9092
Now I want to run it in debug mode with properties metadata,topic,msg, how can I do that using CppKafka?

Question about payload alignment and padding.

Hi,
Does cppkafka take care of data alignment and padding when pushing/pulling data for a topic? Suppose I have a struct (with several fields) I want to inject into a message as the payload, what do I need to do in terms of data alignment and/or padding? Does your lib (or the underlying one) provides the tools for that? Thanks.

Set up Travis CI builds

This is to make sure the library isn't broken. This should build the library, tests and examples and run all tests.

auto.offset.reset=latest not working

Trying to consume only the messages that produced after consumer starts. For testing I added in examples/kafka_consumer_dispatcher.cpp explicit configuration for auto.offset.reset before creating of consumer:

TopicConfiguration topic_config = {
      {"auto.offset.reset", "latest"}
    };
config.set_default_topic_configuration(topic_config);

Start the producer and produce some messages:

./kafka_producer -b localhost -t mss -p 0

Next start the consumer and received all messages (I expect to get no messages).

./kafka_consumer_dispatcher -b localhost -t mss -g 0

This issue related to:

  1. #1. BTW solution with assign looks like works fine.
  2. confluentinc/librdkafka#607

Env:

  1. Ubuntu 18.04
  2. kafka_2.12-1.1.0
  3. librdkafka - last master (ID f26b9b2)
  4. cppkafka - last master (ID 429ec92)

Enhancement: internal counter in the message type

@mfontanini It would be nice to add a feature to be able to supply the number of times a backoff committer should retry (not based on time) and same thing for the buffered producer. I was thinking to add a counter inside the message which would monotonically increase, and the backoff committer would increment this value each time it fails. This won't work however with the producer since the delivery callback creates non-owning temporaries, so each time the counter would be reset. But at least the sync committer would work.
Alternatively the application could keep a map of message handles and counters, but that's a bit overkill. Any suggestions on adding such a feature?

Set up appveyor builds

This is to make sure the library isn't broken. This should build the library, tests and examples and run all tests.

Also package the built library in a zip for easier distribution.

Fix compact topic processor race condition

Compact topic processor test fails intermittently. It should be fixed as it keeps giving false error alerts when checking-in code.

/home/travis/build/mfontanini/cppkafka/tests/compacted_topic_processor_test.cpp:119: FAILED:
  CHECK( set_count == 2 )
with expansion:
  1 == 2

`undefined reference to *` error on building consumer example

Hi,
First I installed librdkafka using instruction successfully. After that I installed cppkafka using document instruction, in addition command sudo make install at the end. So now both librdkafka and cppkafka are in directories /usr/local/lib and /usr/local/include. Trying to build consumer example code, the IDE(both Clion and Eclipse) resolved including libraries and show hints in using methods, but in building I faced error undefined reference to methods and classes. I check directory /usr/local/include and it only contains the .h files!

Did I do any steps wrong?

How to send binary data by cppkafka library?

Hi,

Thank you for your great job for this c++ wrapper of librdkafka.

I can successfully send text messages to Kafka broker by examples/kafka_producer.cpp program, but nothing can be received in the consumer side while I managed to send binary data as follows (codes modified basing on kafka_producer.cpp):

///////////////////////////////////////
    // Create the producer
    Producer producer(config);

#if 1  // Binary payload seems failed.
    cout << "Sending CDR to kafka broker..." << endl;
    unsigned char* buf = NULL;
    int length = 0;

    loadCdrContent(buf, length);
    Buffer buffer(buf, length);
    builder.payload(buffer);

    producer.produce(builder);
    // TODO: free memory
#else // String payload can be sent successfully
    cout << "Producing messages into topic " << topic_name << endl;
    // Now read lines and write them into kafka
    string line;
    while (getline(cin, line)) {
        // Set the payload on this builder
        builder.payload(line);

        // Actually produce the message we've built
        producer.produce(builder);
    }
#endif

///////////////////////////////////////
int loadCdrContent(unsigned char* &buffer, int &length) {
    std::ifstream is ("./CDR.ber", std::ifstream::binary);
    is.seekg (0, is.end);
    length = is.tellg();
    is.seekg (0, is.beg);

    buffer = new unsigned char [length];
    is.read((char *)buffer, length);

    cout << "Buffer length is " << length << endl;

    is.close();

    return 0;
}

Do you have any suggestion on this issue? Is the above code of sending binary payload correct?
Btw, buffer.get_data() and buffer.get_size() returns correct value.

Best reagards,
Mike

Enhancement: Configuration class changes

I would like to propose the following refactoring which I believe will make code more symmetric from a producer/consumer perspective, however I would like the author's opinion before submitting the code:

  1. Add Assignment/Revocation/RebalanceError callbacks to the Configuration class.
  2. Deprecate the getter/setter methods for these callbacks in the Consumer class.
  3. For backwards compatibility the Consumer class will still retain its methods (and private callback members) which will now be set at construction time if provided in the Configuration object. (NOTE: The new functionality could be also be enabled at compile time via a define flag - by default this flag will not be present which will essentially default to the current deprecated code and a compile time warning could be printed. This would allow for cleaner code).
  4. Change all the Configuration callback setters to two methods each having the following signature:
Configuration::set_xxx_some_callback(const SomeCallback& cb);
Configuration::set_xxx_some_callback(SomeCallback&& cb);

or with a single method which can be used interchangeably.

template <class CB = SomeCallback>
Configuration::set_xxx_some_callback(CB&& cb) { some_callback_ = std::forward<CB>(cb); }

This is more in line with modern move semantics. The current code of pass-by-copy-then-move is the pre c++11 way of simulating move semantics.

The reasoning behind the refactoring is that it's confusing as to why the Consumer supports certain callback methods and the Producer doesn't. Furthermore, the Configuration object has producer specific callbacks (e.g. delivery report) and another Consumer specific callback (e.g. offset commit) so conceptually there's no reason to not to support the other three callbacks from the Consumer so that everything is grouped together.
Alternatively an even cleaner solution (but requiring more changes) would be to have a ConsumerConfiguration and a ProducerConfiguration object both deriving from ConfigurationBase and pass these in the Producer/Consumer constructors respectively. These specialized configuration objects would ONLY allow setting those options which could be applied.

Terminate called after throwing an instance

hiii,

i am having an error when i am trying to publish a message to a server using cppkafka (with the simple example code you given) in ubuntu. But i don't see this problem when publishing it to a topic in my computer. Do you have any suggestion ?

The problem is :
terminate called after throwing an instance of 'cppkafka::HAndleException' what(): Local: Timed out

thank you

cppkafka::Consumer Destructor Can Throw

Love the project! I've been using librdkafka for a while now in C/C++ projects requiring native access to Kafka, but it's always felt fairly unwieldy in the typical case.
I wasn't sure if this was an intentional design choice or not, but cppkafka::Consumer::~Consumer calls close (without a try-catch), which calls check_error, which throws upon error conditions. Destructors are implicitly marked noexcept starting in C++11 unless explicitly marked otherwise (derived classes inherit this specification), but regardless, destructors that can throw strike me as a bad idea.
My personal suggestion would be to leave the current close implementation untouched, expose it in the public API (to allow explicit client handling of handle-destruction error conditions if so desired), make the close call in the Consumer destructor conditional, and swallow all exceptions from close in the destructor (I don't know if cppkafka currently has any logging strategy, but this would probably be a place to put a WARN log). Admittedly this isn't as nice from an encapsulation perspective, and will require additional checks throughout the rest of the public interface, but currently a failed call to rd_kafka_consumer_close during destruction will result in a call to std::terminate without the chance for the client to intervene.

How to produce data asynchrony

Hey,

In the producer example, messages are sent to brokers using command producer.produce(builder);.
As you know Kafka provides three strategies for sending data, fire and forget, synchronous and asynchronous. Using cppkafka is it possible to produce messages in asynchronous mode?

sending char[] data type as a message

Hi,

How can I send messages in type char[]?

Is it required messages to be just in string format?

I tried creating messages in char[] type but I faced error!
char buf[2000];

builder.payload(buf);

and I should convert it to string

char buf[2000];

string temp = buf;

builder.payload(string);

Smart message deleters with embedded commit (proposal)

I think it would be an interesting concept to support smart deleter functions when you delete an rdkafka message handle. Currently the deleters are either null (non-owning) or call rd_kafka_message_destroy. In the case of manual commits, the user has to poll for the message, do something with the message and then call commit(message) if it's valid. Since the Message class wraps a unique_ptr it would be nice to leverage its deleter to also do a commit or async_commit of the message (think RAII auto-commit in case it goes out of scope). This will eliminate calling commit at every exit point.
For this, the message should also capture the consumer handle at construction. Perhaps a setting in the consumer can indicate if we want to poll_with_smart_commit or poll_with_smart_commit_async which will capture the handle and set a different deleter in the message object. The deleter would be something like:

void deleter_with_commit(rd_kafka_message_t* handle) {
    if (handle && 
        handle->get_error() == RD_KAFKA_RESP_ERR_NO_ERROR && 
        !std::uncaught_exception()) {
          rd_kafka_commit_message(get_consumer_handle(), handle, false); 
    }
    rd_kafka_message_destroy(handle);
}

alternatively the Message::~Message can be enhanced to call commit before the unique_ptr is destroyed.

sending unsigned char[] to kafka

I want to write unsigned char[] to Kafka. The original message is char[] and when I print it, it contains some extra character, for example:
char buf[2000];
for (int i =0 ; i < 2000; i++) { printf(" "); printf("%02x", buf[i]); }
it contains :
EF BF BD 01 00 1C 1E 39 5A 18 40 EF BF BD 00 38 51 EF BF BD 00 00 EF BF BD EF BF BD 00 48 00 EF BF BD 00 00 00 64 EF BF BD 1F,
extra EF BF BD pattern that are not in general pattern. To avoid this to happen, I tried:
for (int i =0 ; i < 2000; i++) { printf(" "); printf("%02x", (unsighed char)buf[i]); }, and the general message is printing in correct format.

When I attempted to send buf to Kafka, I tried

builder.payload({(unsighend char*)buf, 2000});

But it is written to Kafka in the wrong format, in the format that extra EF BF BD pattern included in the message!

How can send data to Kafka so that save in original message format?

Error building cppkafka on Windows

while compiling with following command I'm getting following errors. How should i solve this issue ?

cmake from visual studio developer tools 15 2017.

commands used:

cmake .. -DBOOST_INCLUDEDIR="E:\kakfkalib\boost.1.65.1.0\lib\native\include" -DRDKAFKA_LIBRARY="E:\kakfkalib\librdkafka.redist.0.11.0\runtimes\win7-x64\native" -DRDKAFKA_INCLUDE_DIR="E:\kakfkalib\librdkafka.redist.0.11.0\build\native\include"
msbuild.exe ALL_BUILD.vcxproj

Errors:

E:\kakfkalib\cppkafka\build\ALL_BUILD.vcxproj" (default target) (1) ->
"E:\kakfkalib\cppkafka\build\src\cppkafka.vcxproj" (default target) (3) ->
(Link target) ->
backoff_committer.obj : error LNK2001: unresolved external symbol __imp__rd_kafka_message_timestamp [E:\kakfkalib\cpp
kafka\build\src\cppkafka.vcxproj]
configuration.obj : error LNK2001: unresolved external symbol __imp__rd_kafka_message_timestamp [E:\kakfkalib\cppkafk
a\build\src\cppkafka.vcxproj]
message.obj : error LNK2001: unresolved external symbol __imp__rd_kafka_message_timestamp [E:\kakfkalib\cppkafka\buil
d\src\cppkafka.vcxproj]
consumer.obj : error LNK2001: unresolved external symbol __imp__rd_kafka_message_timestamp [E:\kakfkalib\cppkafka\bui
ld\src\cppkafka.vcxproj]
backoff_performer.obj : error LNK2001: unresolved external symbol __imp__rd_kafka_message_timestamp [E:\kakfkalib\cpp
kafka\build\src\cppkafka.vcxproj]
configuration.obj : error LNK2019: unresolved external symbol __imp__rd_kafka_conf_new referenced in function "public
: __thiscall cppkafka::Configuration::Configuration(void)" (??0Configuration@cppkafka@@QAE@XZ) [E:\kakfkalib\cppkafka\b
uild\src\cppkafka.vcxproj]
configuration.obj : error LNK2019: unresolved external symbol __imp__rd_kafka_conf_destroy referenced in function "pr
ivate: static class cppkafka::ClonablePtr<struct rd_kafka_conf_s,void (__cdecl*)(struct rd_kafka_conf_s ),struct rd_ka
fka_conf_s * (__cdecl
)(struct rd_kafka_conf_s const )> _cdecl cppkafka::Configuration::make_handle(struct rd_kafka_c
onf_s )" (?make_handle@Configuration@cppkafka@@ca?AV?$ClonablePtr@Urd_kafka_conf_s@@P6AXPAU1@@ZP6APAU1@PBU1@@z@2@PAUrd
_kafka_conf_s@@@z) [E:\kakfkalib\cppkafka\build\src\cppkafka.vcxproj]
configuration.obj : error LNK2019: unresolved external symbol __imp__rd_kafka_conf_dup referenced in function "privat
e: static class cppkafka::ClonablePtr<struct rd_kafka_conf_s,void (__cdecl
)(struct rd_kafka_conf_s *),struct rd_kafka

conf_s * (__cdecl
)(struct rd_kafka_conf_s const *)> _cdecl cppkafka::Configuration::make_handle(struct rd_kafka_conf
s *)" (?make_handle@Configuration@cppkafka@@ca?AV?$ClonablePtr@Urd_kafka_conf_s@@P6AXPAU1@@ZP6APAU1@PBU1@@z@2@PAUrd_kaf
ka_conf_s@@@z) [E:\kakfkalib\cppkafka\build\src\cppkafka.vcxproj]
producer.obj : error LNK2001: unresolved external symbol __imp__rd_kafka_conf_dup [E:\kakfkalib\cppkafka\build\src\cp
pkafka.vcxproj]
consumer.obj : error LNK2001: unresolved external symbol __imp__rd_kafka_conf_dup [E:\kakfkalib\cppkafka\build\src\cp
pkafka.vcxproj]
configuration.obj : error LNK2019: unresolved external symbol __imp__rd_kafka_conf_set referenced in function "public
: class cppkafka::Configuration & __thiscall cppkafka::Configuration::set(class std::basic_string<char,struct std::char
_traits,class std::allocator > const &,class std::basic_string<char,struct std::char_traits,class std
::allocator > const &)" (?set@Configuration@cppkafka@@QAEAAV12@ABV?$basic_string@DU?$char_traits@D@std@@v?$alloca
tor@D@2@@std@@0@Z) [E:\kakfkalib\cppkafka\build\src\cppkafka.vcxproj]
configuration.obj : error LNK2019: unresolved external symbol __imp__rd_kafka_conf_set_dr_msg_cb referenced in functi
on "public: class cppkafka::Configuration & __thiscall cppkafka::Configuration::set_delivery_report_callback(class std:
:function<void __cdecl(class cppkafka::Producer &,class cppkafka::Message const &)>)" (?set_delivery_report_callback@Co
nfiguration@cppkafka@@QAEAAV12@V?$function@$$A6AXAAVProducer@cppkafka@@ABVMessage@2@@z@std@@@z) [E:\kakfkalib\cppkafka
build\src\cppkafka.vcxproj]
configuration.obj : error LNK2019: unresolved external symbol __imp__rd_kafka_conf_set_offset_commit_cb referenced in
function "public: class cppkafka::Configuration & __thiscall cppkafka::Configuration::set_offset_commit_callback(class
std::function<void __cdecl(class cppkafka::Consumer &,class cppkafka::Error,class std::vector<class cppkafka::TopicPar
tition,class std::allocator > const &)>)" (?set_offset_commit_callback@Configuration@cp
pkafka@@QAEAAV12@V?$function@$$A6AXAAVConsumer@cppkafka@@Verror@2@ABV?$vector@VTopicPartition@cppkafka@@v?$allocator@VT
opicPartition@cppkafka@@@std@@@std@@@z@std@@@z) [E:\kakfkalib\cppkafka\build\src\cppkafka.vcxproj]
configuration.obj : error LNK2019: unresolved external symbol __imp__rd_kafka_conf_set_error_cb referenced in functio
n "public: class cppkafka::Configuration & __thiscall cppkafka::Configuration::set_error_callback(class std::function<v
oid __cdecl(class cppkafka::KafkaHandleBase &,int,class std::basic_string<char,struct std::char_traits,class std:
:allocator > const &)>)" (?set_error_callback@Configuration@cppkafka@@QAEAAV12@V?$function@$$A6AXAAVKafkaHandleBa
se@cppkafka@@habv?$basic_string@DU?$char_traits@D@std@@v?$allocator@D@2@@std@@@z@std@@@z) [E:\kakfkalib\cppkafka\build
src\cppkafka.vcxproj]
configuration.obj : error LNK2019: unresolved external symbol __imp__rd_kafka_conf_set_throttle_cb referenced in func

Error linking into DLL on Windows

Got linking error on VS 2017.

  • Project is Win32 DLL generated from CMAKE
  • both librdkafka and cppkafka are built with cmake and installed on D:\lib\rdkafka and D:\lib\cppkafka respectively
  • both librdkafka.lib and cppkafka.lib are linked in Win32 DLL in project options. Here's linker additional dependencies:
  D:\lib\boost\lib\libboost_iostreams-vc141-mt-x32-1_67.lib
  D:\lib\boost\lib\libboost_zlib-vc141-mt-x32-1_67.lib
  ws2_32.lib
  D:\lib\librdkafka\lib\rdkafka.lib
  D:\lib\cppkafka\lib\cppkafka.lib
  • Linker says it cant'find anything that cppkafka references in the librdkafka:

cppkafka.lib(configuration.obj) > unresolved external symbol __imp__rd_kafka_conf_new referenced in function "public: __thiscall cppkafka::Configuration::Configuration(void)" :
cppkafka.lib(configuration.obj) > unresolved external symbol __imp__rd_kafka_conf_destroy referenced in function "private: __thiscall cppkafka::Configuration::Configuration(struct rd_kafka_conf_s *)"

Bold text is the obj file that references librdkafka.lib It seems Linker can't find any of librdkafka.lib symbols. Any ideas why? librdkafka.lib exists in the linker keys....

What is the proper way to link static cppkafka.lib (along with librdkafka.lib) into another app?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.