Comments (18)
We would love to have this available in the producer, as we're moving from Sarama to confluent-kafka-go, but we still need to support Sarama's default partitioner, which uses the 32 bit FNV-1a hashing algorithm (part of hash/fnv
in Go).
For now we will follow @edenhill's advice (i.e. get topic metadata, run custom Go partitioner prior to Produce
and set Message.Partition
), but it would still be nice to be able to have custom partitioners be supported in some form in the Producer API (either Go or C). Although, I may just try my hand at adding the FNV-1a algo to librdkafka...
from confluent-kafka-go.
The problem with implementing partitioner_cb support in high-level language bindings is that the partitioner callback may be called from an internal librdkafka thread and this isn't trivial to handle in cgo, cpython, et, al.
This should be fixed in librdkafka, rather than the bindings, but this isnt trivial either, that's why this functionality is currently missing from our bindings.
But here's a dumb idea:
what if you, as a Go app developer, implemented the partitioner in C (cgo) and the Go client provided an API to set the C partitioner_cb.
You wouldn't be allowed to call any Go methods from this callback, but since partitioners are pretty minimal by design this might be okay.
It would look something like this:
/*
#include <librdkafka/rdkafka.h>
static int32_t my_partitioner (rd_kafka_topic_t *rkt, ..) {
..some custom hasher goes here..
return hash % partition_cnt;
}
*/
import "C"
...
conf := ConfigMap{..., "default.topic.config": &ConfigMap{"partitioner_cb", C.my_partitioner}}
p, err := NewProducer(conf)
...
I know it is ugly, but would it be a reasonable workaround for you until proper Go partitioners are supported?
from confluent-kafka-go.
@tchap the default partitioner is Consistent-Random, which maps the same key to the same partition, so you should be fine.
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L1606
from confluent-kafka-go.
Out of curiousity, can you share your custom partitioner?
It might be generic enough to be integrated.
from confluent-kafka-go.
@edenhill It's application specific. Basically custom_hash(static_cast<State*>(msg_opaque)) % partition_cnt
in C++.
from confluent-kafka-go.
Okay, what you'll need to do in the meantime is get the partition count (with GetMetadata()) for your topic(s) and then run your partitioner prior to calling Produce() and setting Message.Partition accordingly.
from confluent-kafka-go.
I wouldn't even need an entirely custom partitioner, I would just need the same key to always go to the same partition. Perhaps it would be possible to bake in something like that, for example by adding another constant besides PartitionAny
... ?
from confluent-kafka-go.
The next version of librdkafka (the underlying Kafka client) will expose the builtin partitioners as configuration properties, allowing you to change to an alternative builtin partitioner, such as the Java compatible murmur2_random partitioner.
Search for 'partitioner' here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Custom partitioners are not yet exposed in the Go client though.
from confluent-kafka-go.
Custom partitioners are not yet exposed in the Go client though.
I have a usecase which really needs this, and I'm trying any kind of workaround and it seems not to be easy.
First, I tried the workaround mentioned here #16 (comment) , but I get the error:
Failed to create producer: Invalid value type unsafe.Pointer for key partitioner_cb (expected string,bool,int,ConfigMap)
Then, I casted the pointer to an uintptr, and then, casted again to int, and then I ran into a runtime exception saying:
Failed to create producer: Property "partitioner_cb" must be set through dedicated ..set..() function
At this point, I stopped trying, but it looks like it wants me to use rd_kafka_topic_conf_set_partitioner_cb()
through some more CGo magic, but I gave up since I didn't know what to put in the first param to that function, which is rd_kafka_topic_conf_t *topic_conf
, and I don't know how to get the rd_kafka_topic_conf_t
in the context of my golang program.
Do you have any tips for completing this workaround?
Thanks!
from confluent-kafka-go.
Due to the generic way configuration is passed from Go to C it is a bit tricky to add a special case for set_partitioner_cb()
, so for the sake of proof-of-concepting I suggest you insert a call to ..set_partitioner_cb()
with a hardcoded C-function callback here, right before rd_kafka_conf_set_default_topic_conf()
:
https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/config.go#L149
Do note that this callback may be called from internal librdkafka threads and it is not clear to me how to safely trigger a Go call from such a thread.
from confluent-kafka-go.
Thank you, @edenhill ! What I actually need is not generic custom partitioner. Rather, what I'm looking for is a way to hash based on something other than the kafka key, since I'm using the key for other purposes. It looks like hashing based on the msg_opaque would work for me.
const rd_kafka_topic_t *rkt,
const void *keydata,
size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque
as long as I can set the msg_opaque at the golang level and it looks like i can (https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/message.go#L79).
Would you accept a PR to add a new builtin partitioner to librdkafka, like this:
consistent_opaque - CRC32 hash of msg_opaque (Empty and NULL msg_opaque are mapped to single partition)
? Although I'm not sure if the CRC32 should be applied to the pointer address msg_opaque
, or "the whole data behind the pointer, and since it doesn't have a corresponding length argument, the msg_opaque would have to point to a c-string so the CRC32 knows how far to read...
from confluent-kafka-go.
I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction.
If you need additional data with your message you can either create a richer message payload (using for example avro and schema-registry), or use message headers to "tag along" arbitary data to your liking.
We will not accept a PR that does partitioning on something else than the key, sorry.
Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages.
from confluent-kafka-go.
Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages.
Ah, that would certainly kill the idea.
from confluent-kafka-go.
I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction.
This ship has sailed for a while at my company. In the future when we upgrade to kafka 0.11, we'll probably stick this metadata in the 0.11+ Headers. Additionally, we have no use for compaction and have it turned off for our use case.
from confluent-kafka-go.
or use message headers to "tag along" arbitary data to your liking.
This is a possibility too. I will re-visit that. It was my first option, but ran into social problems :)
from confluent-kafka-go.
Yeah, the simplest alternative is to add it as a builtin partitioner to librdkafka.
Look's like the fnv1a code is very simple, so should be fairly straight forward.
Find all occurences of murmur2_random
in the librdkafka/src code and add fnv1a_random
counterparts.
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_msg.c#L870
from confluent-kafka-go.
Aggree with @Manicben :) could be great to have that included in the producer, rather than to have to do a custom go partitioner prior to the Produce
.
Is there a chance to see that feature in the future ?
Btw @edenhill thanks for all the good work 👍
from confluent-kafka-go.
@edenhill hello. I have an idea that the message of key="a" send to partition-0 and other message of key="b" send to partition-1. the other message ...
after i debuged, it don't work. my code of partition_cb:
`static int32_t partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata,
size_t keylen, int32_t partition_cnt,
void *rkt_opaque, void msg_opaque)
{
/ this is a simple example */
int32_t partition = 0;
if (keylen <= 0)
return partition;
if (strncmp(keydata, "a", 1) == 0)
partition = 0;
else if (strncmp(keydata, "b", 1) == 0)
partition = 1;
else if (strncmp(keydata, "c", 1) == 0)
partition = 2;
else if (strncmp(keydata, "d", 1) == 0)
partition = 3;
return partition;
}
int err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
(void *)buf, len, (void *)key, strlen(key), NULL);
`
could you help me? thanks
from confluent-kafka-go.
Related Issues (20)
- Feature Request: Support to specify `RecordNameStrategy`
- Subscribed topic not available even though it's already created HOT 8
- Failed to build confluent-kafka-go on arm64 machine. HOT 2
- golang confluent kafka producer client running into error - Failed to create thread: Resource temporarily unavailable (11) HOT 1
- Automatic offset commit led to the loss of some messages HOT 2
- delivery.report.only.error check does not work in case type of the value is not boolean
- Async Producer Panic HOT 1
- Is the producer a tcp keepalive connection
- Signal: bus error when running on macos sonoma 14.4.1 HOT 1
- allow us to unsubscribe from a topic more easily HOT 4
- Producer messages stuck flushing even though there weren't any writes. HOT 4
- Consumers unable to join group until group leader is restarted HOT 3
- Newly added consumers with different version of library in group are not consuming from subscribed topics.
- Schema reference for JSON schemas HOT 1
- consumer recived repeat message after crash
- Does confluent-kafka-go producer PartitionAny is round robin produce message to kafka? HOT 1
- Can we set krb5.conf path for kinit?
- Duplicate messages consumed during rebalancing HOT 2
- Schema Registry misleading error message when unauthorized
- Segmentation violation Error with confluent kafka GO sdk v2 HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from confluent-kafka-go.