Comments (15)
Thanks for the feedback and some background, very interesting.
Generally speaking, package users should implement their own concurrency and channel behaviors.
Is this a general Go community standpoint? That packages should not provide their own channels?
As stated in the README we weren't sure if people were more interested in the channel or function APIs, and we really haven't gotten much wiser on that subject so that's why both are still in there.
One of the main draw-backs, performance wise, of Go channels is the lack of batch support: it is a well known fact that high-performance queues should be implemented in a batching fashion, that is: the consumer locks the queue, moves the entire queues content to a local queue, and then unlocks the queue. This keeps down on the number of times the queue is accessed by both producer and consumer threads and thus minimizes lock contention, improves cache coherency, allows smarter handling of buffers, etc. (This is how librdkafka's internal producer queue is implemented).
So yes, I now understand your concern and I wholly agree that we should provide a batch consume call.
from confluent-kafka-go.
Having something like this would be incredibly useful for doing bulk ElasticSearch index requests
from confluent-kafka-go.
Why was this closed in favour of #128 . Arenβt they unrelated? That issue is about running the library on Windows but this is about batch consumers
from confluent-kafka-go.
any news on this, as I would also be very interested in this. At the moment I am mostly sticking to sarama, cause the performance gains of librdkafka are not that big really and the pain of a c library then easily outweighs the pure go implementation. batching inbound and a less channel centric api could change quite a bit here. Channels are unfortunately not exactly the fastest beast on earth. While being easy to handle, I've consistently encountered to be able to push (sometimes dramatically) more performance out with mutexed implementations. We cant forget - channels mutex too. Thats not free. Might seem like an optimization, but the reason for adding a c library when using go is mainly that. Otherwise you'd not go through the pain of it :)
from confluent-kafka-go.
Neither I understood, why is closed in favour of 128
We need to consume batches from Poll() as well, this would be so great. Currently we have to implement our own solution around.
from confluent-kafka-go.
I dont see a problem adding a PollBatch() (..batch is such a confusing word in this context), but it seems like you need to wrap the Consumer anyway (to filter certain events) in which case you should be able to implement the batching behaviour with a timed channel consumer loop, right?
from confluent-kafka-go.
What I'm trying to get at is a way to use librdkafka through Go without adding the overhead of the Poll method directly using the underlying channel method. For some reason I thought there was a librdkafka function which would return an array of pending messages; if not, just wrapping the underlying _rk_queue_poll function seems like it would be sufficient. I just don't want to have to use the channel consumer, whether through a synchronous wrapper or not.
from confluent-kafka-go.
@edenhill I think the idea would be to add a C method to consumer.go
that does something like:
rd_kafka_consumer_poll(... long timeout...)
while(true) {
rd_kafka_consumer_poll(... 0 timeout...)
if (no event returned) break;
}
return <list of messages>
i.e. try to get multiple messages via fewer cgo calls as long as they are available. I realize we use different APIs in the Go implementation, but I think the real question here is whether it will be possible to use a single cgo call to return a set of messages rather than just one at a time.
from confluent-kafka-go.
Yes, I understand, but I'm asking why this can't just be done with the existing channel interface?
Pseudo-code:
readBatch(wanted_cnt, my_timeout) *Message[].. {
msgs := make(..wanted_cnt)
outer:
while len(msgs) < wanted_cnt {
select {
case msg := <.c.Events()
msgs.append(msg)
case <-time.After(my_timeout):
break outer
}
}
return msgs
}
from confluent-kafka-go.
Ah, my presumption was that the cgo <-> c cost was too expensive. I haven't done anything to verify, just read previous threads where it seemed the 1 message per call issue became a performance issue.
from confluent-kafka-go.
Possibly yes, @binary132 - can you try the channel-based batch approach and see if it is performant enough for your needs?
If not we'll look into a C:ier alternative.
from confluent-kafka-go.
The question isn't a team requirement, just a suggestion / RFC. I don't think it's a premature optimization since this package (and librdkafka) are marketed as the highest-performing Kafka client options, and exporting an idiomatic API is crucial to a community-leading project like this. Generally speaking, package users should implement their own concurrency and channel behaviors. However, Sarama has already established precedent for a channel-based exported API. I find that decision disagreeable too, but it is what it is.
It may be that incurring a mutex lock and unlock on every received message, in addition to the CGo calls, is inconsequential compared to I/O performance limitations for the vast majority of users.
I would be happy to help contribute this feature if you think it would be valuable.
See https://www.datadoghq.com/blog/go-performance-tales/ for some of the reasoning around channels (again, this is an optimization perspective that is irrelevant to most users at the application level, but may be important at the client package / IO level.)
from confluent-kafka-go.
@edenhill I wouldn't say there's a hard-line community standpoint WRT channels and concurrency in packages. I suppose it depends. In the end, a package shouldn't/can't make assumptions as to how it'll be used. for all you know, the runtime might be constricted to a single thread (runtime.GOMAXPROCS(1)
). A package can't do anything about that, but the user of said package could chose the optimal solution given the runtime restrictions.
Packages, I think, should offer convenience, abstract some nitty-gritty stuff, and ideally offer quick and easy solutions that cover the more common/generic use-cases. For specific use-cases, it falls on to the user to wield the package in a way that makes most sense. A low level API tends to provide the most flexibility (although it's riskier to use, but that again is the users responsibility).
TL;DR:
- Is it a general community stand-point: AFAIK, there is no general standpoint, but it makes sense.
- The current (higher-level) solutions the package offers are great, and should remain an integral part of the package. It allows users, particularly at this earlier stage, to hack out quick prototypes or PoC's
- A low-level API would definitely add value to this package, although it means exposing more of the underlying librdkafka library, it also means users can implement the best solution for their specific problems.
- Performance: As is the case with librdkafka, kafka in general, or indeed anything in life: performance depends on what you make of it, and what you want/need to do. Offering the channel-based API as an acceptable default implementation that will suite most users' needs (especially when prototyping or hacking out a quick PoC) is a major selling point at this stage. A low-level API is harder, as is everything else if you're gunning for performance
from confluent-kafka-go.
@edenhill: "batching" Go channels can be done by sending slices, but then you're passing by reference and state can get hairy, plus you're potentially causing a lot of heap thrashing. On that note, slice allocation is always to the heap, a consumer returning a slice will constantly be causing allocations, so it could be worth offering something a bit akin to io.ReaderFrom
, where the user can manage allocation and pass the slice to be read into to your consumer function, even maybe implementing something like io.WriterTo
on your consumer type! (Write directly to socket from Kafka without copy?) But I'm getting a bit ahead of myself here. π
Getting into total performance analysis isn't the point of this RFC...
As to the community standard, I would argue that the Go standard library sets the highest standards for package design, code smell, etc. and hardly anything in the stdlib exports channel-based interfaces, except a few synchronization types such as time.Timer
. But in this case I agree with @EVODelavega especially since Sarama already offers a channel consumer for Kafka. However, I would try to implement it as a helper function (not method) for consuming from a synchronous interface defined by this package (kafka.Consumer
?) But that's also tangential to the RFC.
@edenhill glad we're in agreement! Let me know if there's anything I can do. I've actually been meaning to get more familiar with C idioms...
from confluent-kafka-go.
Closed in favour of #128
from confluent-kafka-go.
Related Issues (20)
- Is the producer a tcp keepalive connection
- Signal: bus error when running on macos sonoma 14.4.1 HOT 1
- allow us to unsubscribe from a topic more easily HOT 4
- Producer messages stuck flushing even though there weren't any writes. HOT 4
- Consumers unable to join group until group leader is restarted HOT 3
- Newly added consumers with different version of library in group are not consuming from subscribed topics.
- Schema reference for JSON schemas HOT 1
- consumer recived repeat message after crash
- Does confluent-kafka-go producer PartitionAny is round robin produce message to kafka? HOT 1
- Can we set krb5.conf path for kinit?
- Duplicate messages consumed during rebalancing HOT 2
- Schema Registry misleading error message when unauthorized
- Segmentation violation Error with confluent kafka GO sdk v2 HOT 3
- Can't provide my own http.Client to schemaregistroy.Client via schemaregistry.Config HOT 2
- Question about producer and deliveryChan HOT 3
- go get couldn't download kafka HOT 2
- Producer messages not flushing after Flush()? HOT 1
- Question - Using Confluent Schema Registry and AVRO: How to Serialize and Send a Message with Unknown Structure?
- Retries ignored and client stuck in retry loop when SASL authentication fails
- Refresh Schema-Cache state with according to schemaregistry.GetLatestSchemaMetadata invocation HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. πππ
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google β€οΈ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from confluent-kafka-go.