Coder Social home page Coder Social logo

Comments (18)

Manicben avatar Manicben commented on August 21, 2024 2

We would love to have this available in the producer, as we're moving from Sarama to confluent-kafka-go, but we still need to support Sarama's default partitioner, which uses the 32 bit FNV-1a hashing algorithm (part of hash/fnv in Go).

For now we will follow @edenhill's advice (i.e. get topic metadata, run custom Go partitioner prior to Produce and set Message.Partition), but it would still be nice to be able to have custom partitioners be supported in some form in the Producer API (either Go or C). Although, I may just try my hand at adding the FNV-1a algo to librdkafka...

from confluent-kafka-go.

edenhill avatar edenhill commented on August 21, 2024 1

The problem with implementing partitioner_cb support in high-level language bindings is that the partitioner callback may be called from an internal librdkafka thread and this isn't trivial to handle in cgo, cpython, et, al.
This should be fixed in librdkafka, rather than the bindings, but this isnt trivial either, that's why this functionality is currently missing from our bindings.

But here's a dumb idea:
what if you, as a Go app developer, implemented the partitioner in C (cgo) and the Go client provided an API to set the C partitioner_cb.
You wouldn't be allowed to call any Go methods from this callback, but since partitioners are pretty minimal by design this might be okay.

It would look something like this:

/*
#include <librdkafka/rdkafka.h>

static int32_t my_partitioner (rd_kafka_topic_t *rkt, ..) {
    ..some custom hasher goes here..
     return hash % partition_cnt;
}
*/
import "C"

...

  conf := ConfigMap{..., "default.topic.config": &ConfigMap{"partitioner_cb", C.my_partitioner}}
 p, err := NewProducer(conf)
...

I know it is ugly, but would it be a reasonable workaround for you until proper Go partitioners are supported?

from confluent-kafka-go.

edenhill avatar edenhill commented on August 21, 2024 1

@tchap the default partitioner is Consistent-Random, which maps the same key to the same partition, so you should be fine.
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L1606

from confluent-kafka-go.

edenhill avatar edenhill commented on August 21, 2024

Out of curiousity, can you share your custom partitioner?
It might be generic enough to be integrated.

from confluent-kafka-go.

0x1997 avatar 0x1997 commented on August 21, 2024

@edenhill It's application specific. Basically custom_hash(static_cast<State*>(msg_opaque)) % partition_cnt in C++.

from confluent-kafka-go.

edenhill avatar edenhill commented on August 21, 2024

Okay, what you'll need to do in the meantime is get the partition count (with GetMetadata()) for your topic(s) and then run your partitioner prior to calling Produce() and setting Message.Partition accordingly.

from confluent-kafka-go.

tchap avatar tchap commented on August 21, 2024

I wouldn't even need an entirely custom partitioner, I would just need the same key to always go to the same partition. Perhaps it would be possible to bake in something like that, for example by adding another constant besides PartitionAny... ?

from confluent-kafka-go.

edenhill avatar edenhill commented on August 21, 2024

The next version of librdkafka (the underlying Kafka client) will expose the builtin partitioners as configuration properties, allowing you to change to an alternative builtin partitioner, such as the Java compatible murmur2_random partitioner.

Search for 'partitioner' here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Custom partitioners are not yet exposed in the Go client though.

from confluent-kafka-go.

billygout avatar billygout commented on August 21, 2024

@edenhill

Custom partitioners are not yet exposed in the Go client though.

I have a usecase which really needs this, and I'm trying any kind of workaround and it seems not to be easy.
First, I tried the workaround mentioned here #16 (comment) , but I get the error:

Failed to create producer: Invalid value type unsafe.Pointer for key partitioner_cb (expected string,bool,int,ConfigMap)

Then, I casted the pointer to an uintptr, and then, casted again to int, and then I ran into a runtime exception saying:

Failed to create producer: Property "partitioner_cb" must be set through dedicated ..set..() function

At this point, I stopped trying, but it looks like it wants me to use rd_kafka_topic_conf_set_partitioner_cb() through some more CGo magic, but I gave up since I didn't know what to put in the first param to that function, which is rd_kafka_topic_conf_t *topic_conf, and I don't know how to get the rd_kafka_topic_conf_t in the context of my golang program.

Do you have any tips for completing this workaround?
Thanks!

from confluent-kafka-go.

edenhill avatar edenhill commented on August 21, 2024

Due to the generic way configuration is passed from Go to C it is a bit tricky to add a special case for set_partitioner_cb(), so for the sake of proof-of-concepting I suggest you insert a call to ..set_partitioner_cb() with a hardcoded C-function callback here, right before rd_kafka_conf_set_default_topic_conf():
https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/config.go#L149

Do note that this callback may be called from internal librdkafka threads and it is not clear to me how to safely trigger a Go call from such a thread.

from confluent-kafka-go.

billygout avatar billygout commented on August 21, 2024

Thank you, @edenhill ! What I actually need is not generic custom partitioner. Rather, what I'm looking for is a way to hash based on something other than the kafka key, since I'm using the key for other purposes. It looks like hashing based on the msg_opaque would work for me.

                                                const rd_kafka_topic_t *rkt,
                                                const void *keydata,
                                                size_t keylen,
                                                int32_t partition_cnt,
                                                void *rkt_opaque,
                                                void *msg_opaque

as long as I can set the msg_opaque at the golang level and it looks like i can (https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/message.go#L79).

Would you accept a PR to add a new builtin partitioner to librdkafka, like this:
consistent_opaque - CRC32 hash of msg_opaque (Empty and NULL msg_opaque are mapped to single partition) ? Although I'm not sure if the CRC32 should be applied to the pointer address msg_opaque, or "the whole data behind the pointer, and since it doesn't have a corresponding length argument, the msg_opaque would have to point to a c-string so the CRC32 knows how far to read...

from confluent-kafka-go.

edenhill avatar edenhill commented on August 21, 2024

I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction.
If you need additional data with your message you can either create a richer message payload (using for example avro and schema-registry), or use message headers to "tag along" arbitary data to your liking.

We will not accept a PR that does partitioning on something else than the key, sorry.

Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages.

from confluent-kafka-go.

billygout avatar billygout commented on August 21, 2024

Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages.
Ah, that would certainly kill the idea.

from confluent-kafka-go.

billygout avatar billygout commented on August 21, 2024

I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction.

This ship has sailed for a while at my company. In the future when we upgrade to kafka 0.11, we'll probably stick this metadata in the 0.11+ Headers. Additionally, we have no use for compaction and have it turned off for our use case.

from confluent-kafka-go.

billygout avatar billygout commented on August 21, 2024

or use message headers to "tag along" arbitary data to your liking.

This is a possibility too. I will re-visit that. It was my first option, but ran into social problems :)

from confluent-kafka-go.

edenhill avatar edenhill commented on August 21, 2024

Yeah, the simplest alternative is to add it as a builtin partitioner to librdkafka.
Look's like the fnv1a code is very simple, so should be fairly straight forward.

Find all occurences of murmur2_random in the librdkafka/src code and add fnv1a_random counterparts.
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_msg.c#L870

from confluent-kafka-go.

maeglindeveloper avatar maeglindeveloper commented on August 21, 2024

Aggree with @Manicben :) could be great to have that included in the producer, rather than to have to do a custom go partitioner prior to the Produce.
Is there a chance to see that feature in the future ?
Btw @edenhill thanks for all the good work 👍

from confluent-kafka-go.

caojun97 avatar caojun97 commented on August 21, 2024

@edenhill hello. I have an idea that the message of key="a" send to partition-0 and other message of key="b" send to partition-1. the other message ...
after i debuged, it don't work. my code of partition_cb:
`static int32_t partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata,
size_t keylen, int32_t partition_cnt,
void *rkt_opaque, void msg_opaque)
{
/
this is a simple example */
int32_t partition = 0;
if (keylen <= 0)
return partition;
if (strncmp(keydata, "a", 1) == 0)
partition = 0;
else if (strncmp(keydata, "b", 1) == 0)
partition = 1;
else if (strncmp(keydata, "c", 1) == 0)
partition = 2;
else if (strncmp(keydata, "d", 1) == 0)
partition = 3;

return partition;

}

int err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
(void *)buf, len, (void *)key, strlen(key), NULL);
`
could you help me? thanks

from confluent-kafka-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.