shopify / sarama Goto Github PK
View Code? Open in Web Editor NEWSarama is a Go library for Apache Kafka.
License: MIT License
Sarama is a Go library for Apache Kafka.
License: MIT License
When sending messages to a topic that doesn't exist I get a panic with error message invalid argument to Intn
. Is it possible to make the error message more specific?
I've been cobbling together my first client using Sarama over the last few weeks and I'm struggling with how to implement my producer.
Could I get some clarification on:
both SendMessage and QueueMessage return error, but we're instructed to consume Errors when using QueueMessage--so what error will QueueMessage return? My inclination is to use QeueMessage for it's ability to batch messages into messagesets for the sake of throughput, but I'm having trouble understanding how to detect errors and how to recover when they arrive off of the Errors channel.
The stream of data arriving at my producer is not easy to apply back pressure against. Every so often, with QueueMessage, the Errors channel reports a few thousand messages dropped. I suspect that I can alleviate by monitoring the brokers more closely and possibly tuning them. I also expect that, with very, very minimal buffering in memory, that my producer could handle the broker hiccup and resend lost messages if it could detect which messages failed to send. Is there any sane way to do this while still using QueueMessage?
My client config
to.clientCfg = &sarama.ClientConfig{WaitForElection: 1 * time.Second,
MetadataRetries: 10,
}
producer config:
to.producerCfg = &sarama.ProducerConfig{
RequiredAcks: sarama.WaitForLocal,
Timeout: 2000,
Partitioner: NewPartitioner(),
//Compression: to.cfg.compressCodec,
MaxBufferedBytes: 1024,
MaxBufferTime: 1,
}
I'm using the kafka shell tools to perform a graceful leader shutdown:
kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper svac17:2181/prodz/kafka-session --broker 224 --num.retries 3 --retry.interval.ms 60
and my producer never returns from SendMessage.
Is this part of synchronous behavior or should it not return an error?
who can give me a whole example about queuemessage function;
After my test, its writing speed is very slow。
thank you very much!
Are there plans for sarama to directly support automated consumer registration and rebalancing, as described on http://kafka.apache.org/documentation.html#distributionimpl.
Use case: assuming I have N topics with M partitions, and I would to run a consumer group and distribute these across O consumers as evenly as possible (with each message being consumed only once).
The process steps are nicely documented, but is purely based on ZK communication. Should this be implemented directly into sarama or would it be better to create a wrapper library?
I volunteer to help with the development, but I would like to know if there as any existing work or plans I can (should) rely on?
This is sort of a weird one. I've written a small load testing client which uses multiple goroutines pushing messages through multiple producers to my kafka cluster. When I've got a small number of topics on the cluster, I can push thousands of msgs/sec into a topic. However, when I've got many topics (~800), the message rate drops to ~30/s, even though I'm still only pushing to 4 topics for the test.
Stranger still, this only happens when I'm using sarama.WaitForAll. If I use sarama.WaitForLocal, I get decent rates (~800/sec). As well, we drafted a similar client using the python client library, and it doesn't exhibit this issue with the same settings.
Is there something in the sarama WaitForAll code that polls all the topics even if it's only publishing to a few of them? I'm at a loss to explain this behaviour otherwise.
We've found WaitForAll is required for data safety, so I'd definitely like to get to the bottom of this.
The Apache Kafka community is trying to consolidate Apache Kafka client discussion mailings, questions, issues and etc to a single list https://groups.google.com/forum/#!forum/kafka-clients would be great if you would join in too and drive discussions for Kafka clients there.
It's a bit annoying to have to materialize a message body as an []byte in memory when I have a perfectly good io.Reader that could be used to stream it straight from the origin to Kafka. Particularly, it's bad when the message is large.
I've been using your framework to build a 0.8 Producer, and it works well -- thanks for open-sourcing it. Perhaps this has been asked before, but is there any reason that a SendMessageBatch call doesn't exist on the Procuer? Go should make it easy to deal with the fact that the call would block, but it would allow me to use higher-throughput batching.
Any reason why it wouldn't be difficult to code? Perhaps I might knock it out.
New clients are unable to connect to Kafka cluster when there are partitions without leaders (f.e dead Brokers). Existing clients works OK, they just shrink available partitions.
Please allow connecting and consuming/producing messages to Kafka cluster with unavailable partitions.
hi all:
I sometimes find this error: "kafka: Dropped 5739 messages",
And this is my producer config:
func defaultProducerConfig() *sarama.ProducerConfig {
config := sarama.NewProducerConfig()
config.Compression = 2
config.MaxBufferTime = 1000000 * time.Millisecond
config.MaxBufferedBytes = 1024 * 10
return config
}
I'm use producer.QueueMessage function;I don't know why,Who can help me,and give me some advice。thanks a lot;
The Consumer should make use of https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management
The golang protocol backend already supports those request/response types, but the current Kafka 0.8 beta 1 seems to choke on them so I didn't add them to the API.
I suspect this will consist of doing a Fetch on construction of a Consumer, then adding a Commit(offset) api call that the consumer user can call as appropriate. The python bindings have an autocommit option, but I think that's overcomplicated for our needs, at least to start.
Gzip support was trivial because it's built into the Go standard libraries. For snappy we'll have to add a dependency on some 3rd-party library.
Probably: https://code.google.com/p/snappy-go/
And for reference, because dependency management is bizarre in Go if you're used to a Gemfile: http://www.stovepipestudios.com/blog/2013/02/go-dependency-management.html
Hi
I get panic: runtime error: send on closed channel
when I kill a broker in a Kafka cluster.
It looks like backPressureThreshold
is exceeded and you want to flush messages but something bad is happening.
I also wonder why I get kafka: Dropped 13803 messages
... What is the reason? I know that one of the brokers is dead, but what about new Leaders for partitions?
I use an asynchronous producer. My configuration:
producerConfig := sarama.NewProducerConfig()
producerConfig.RequiredAcks = sarama.NoResponse
producerConfig.MaxBufferedBytes = 1000000
producerConfig.BackPressureThresholdBytes = 10000000
producerConfig.MaxBufferTime = 30 * time.Second
producerConfig.Compression = sarama.CompressionSnappy
Logs:
[Sarama] 2014/09/23 18:19:24 Initializing new client
[Sarama] 2014/09/23 18:19:24 Fetching metadata from broker localhost:9092
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9092
[Sarama] 2014/09/23 18:19:24 Registered new broker #5 at localhost:9096
[Sarama] 2014/09/23 18:19:24 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:19:24 Registered new broker #2 at localhost:9093
[Sarama] 2014/09/23 18:19:24 Registered new broker #3 at localhost:9094
[Sarama] 2014/09/23 18:19:24 Registered new broker #4 at localhost:9095
[Sarama] 2014/09/23 18:19:24 Successfully initialized new client
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9096
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9095
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9094
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9093
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Fetching metadata from broker localhost:9093
[Sarama] 2014/09/23 18:21:56 Closed connection to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:21:56 Failed to connect to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 kafka: Dropped 13803 messages: dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Fetching metadata from broker localhost:9096
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 kafka: Dropped 4406 messages: dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:21:56 Failed to connect to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:58 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:58 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:58 kafka: broker: not connected
panic: runtime error: send on closed channel
Panic:
goroutine 35 [running]:
runtime.panic(0x57c3c0, 0x690fbe)
/usr/lib/go/src/pkg/runtime/panic.c:279 +0xf5
github.com/Shopify/sarama.(_brokerProducer).flushIfOverCapacity(0xc20804e680, 0x989680000f4240)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:290 +0xa0
github.com/Shopify/sarama.(_brokerProducer).addMessage(0xc20804e680, 0xc214a13e00, 0x989680000f4240)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:280 +0x2ae
github.com/Shopify/sarama.(_Producer).addMessage(0xc208044120, 0xc214a13e00, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:190 +0xad
github.com/Shopify/sarama.(_produceMessage).enqueue(0xc214a13e00, 0xc208044120, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/produce_message.go:22 +0x61
github.com/Shopify/sarama.(_Producer).genericSendMessage(0xc208044120, 0x5aa3d0, 0x9, 0x7f1e203739d0, 0xc208bd4b60, 0x7f1e203739d0, 0xc208bd4b80, 0x7f1e201cdf00, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:182 +0x257
github.com/Shopify/sarama.(_Producer).QueueMessage(0xc208044120, 0x5aa3d0, 0x9, 0x7f1e203739d0, 0xc208bd4b60, 0x7f1e203739d0, 0xc208bd4b80, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:136 +0x8b
main.worker(0xc208082000, 0xc208040020, 0xc208044120)
/home/noxis/IdeaProjects/kafka-log-producer/src/kafka-log-producer.go:112 +0x263
created by main.main
/home/noxis/IdeaProjects/kafka-log-producer/src/kafka-log-producer.go:43 +0x120
Currently any lib importing Shopify/sarama will get "testing" package pulled in. According to nice folks at #go-nuts "testing" package should be imported only from *_test.go.
start_time := int32(time.Now().Unix())
for _,j_result := range data{
//err := producer.SendMessage("weblog", nil, sarama.StringEncoder(j_result))
err := producer.QueueMessage("weblog", nil, sarama.StringEncoder(j_result))
if err != nil {
fmt.Println(err)
select{
case <-producer.Errors():
default:
//case <-time.After(1*time.Millisecond):
}
}
}
fmt.Println("closeing socket connections...")
producer.Close()
client.Close()
end_time := int32(time.Now().Unix())
fmt.Println("upload file ok",filename,"total time:",end_time - start_time," total lines:",cap(data))
I found producer.Close() is very slow;
How to solve this problem ?
When using the GetOffset function it get:
offset, err := clientKafka.GetOffset(cfgConfig.Client.Topic, 0, 1000)
Where:
offset = -1
err = Error: kafka: Response did not contain all the expected topic/partition blocks.
This has come up in the back of my mind a few times but I've always held off because I'm not sure what sane default behaviours are in all cases. Right now you specify a single host:port to construct a client and it pulls metadata on the rest of the cluster from that. It makes sense to be able to specify a list of brokers, so that if part of the cluster is offline the client can still connect.
I see a few issues which seem somewhat related but none of the commits nor suggestions seems to have any effect.
This issue is fully reproducible.
Here are my connections in CLOSE_WAIT to the failed node
$ netstat -a | grep 10.10.2.46
tcp 1 0 ip-10-10-1-202:57278 10.10.2.46:9092 CLOSE_WAIT
tcp 1 0 ip-10-10-1-202:57279 10.10.2.46:9092 CLOSE_WAIT
Any idea on what I can possibly try here?
This is what shows in the sarama log
[Sarama] 2014/08/21 22:43:17 Initializing new client
[Sarama] 2014/08/21 22:43:17 Fetching metadata from broker 10.10.1.196:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker 10.10.1.196:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010226 at kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #10101195 at kafka-i-e6780fb4-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010246 at kafka-i-1b5f9a36-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #10101196 at kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010196 at kafka-i-b77a0de5-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010250 at kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Successfully initialized new client
[Sarama] 2014/08/21 22:43:17 Initializing new client
[Sarama] 2014/08/21 22:43:17 Fetching metadata from broker 10.10.1.196:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker 10.10.1.196:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-b77a0de5-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-e6780fb4-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010226 at kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #10101195 at kafka-i-e6780fb4-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010246 at kafka-i-1b5f9a36-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #10101196 at kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010196 at kafka-i-b77a0de5-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010250 at kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Successfully initialized new client
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-1b5f9a36-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-1b5f9a36-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-b77a0de5-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-e6780fb4-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
--- THIS IS WHERE I KILL -9 ---
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 33 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 33 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 33 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 29 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 29 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 29 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 30 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 31 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 29 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:37 kafka: Dropped 112 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:45:17 Fetching metadata from broker kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:47:17 Fetching metadata from broker kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:49:17 Fetching metadata from broker kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:51:17 Fetching metadata from broker kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
I am using the default ClientConfig and BrokerConfig (via New*). Here is a dump of my sarama goroutines after failure:
14 @ 0x424659 0x4246db 0x42df08 0x42e38e 0x57929d 0x58d1c6 0x58d0a7 0x4248f0
# 0x42df08 chanrecv+0x1d8 /usr/local/go/src/pkg/runtime/chan.goc:298
# 0x42e38e runtime.chanrecv2+0x3e /usr/local/go/src/pkg/runtime/chan.goc:358
# 0x57929d github.com/Shopify/sarama.(*Broker).responseReceiver+0xdd /home/sean/Development/go/src/github.com/Shopify/sarama/broker.go:361
# 0x58d1c6 github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm+0x26 /home/sean/Development/go/src/github.com/Shopify/sarama/broker.go:114
# 0x58d0a7 github.com/Shopify/sarama.withRecover+0x37 /home/sean/Development/go/src/github.com/Shopify/sarama/utils.go:27
2 @ 0x424659 0x4246db 0x42da2b 0x42e2fc 0x58df83 0x588cad 0x5888ca 0x58da28 0x4248f0
# 0x42da2b chansend+0x16b /usr/local/go/src/pkg/runtime/chan.goc:168
# 0x42e2fc runtime.chansend1+0x4c /usr/local/go/src/pkg/runtime/chan.goc:347
# 0x58df83 github.com/Shopify/sarama.func·009+0x3a3 /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:351
# 0x588cad github.com/Shopify/sarama.(*brokerProducer).flushRequest+0x37d /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:413
# 0x5888ca github.com/Shopify/sarama.(*brokerProducer).flush+0x41a /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:352
# 0x58da28 github.com/Shopify/sarama.func·008+0xc8 /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:235
1 @ 0x424659 0x4246db 0x42da2b 0x42e2fc 0x588390 0x58815e 0x587a5d 0x5855c1 0x587987 0x58765b 0x40bcc9 0x4248f0
# 0x42da2b chansend+0x16b /usr/local/go/src/pkg/runtime/chan.goc:168
# 0x42e2fc runtime.chansend1+0x4c /usr/local/go/src/pkg/runtime/chan.goc:347
# 0x588390 github.com/Shopify/sarama.(*brokerProducer).flushIfOverCapacity+0xa0 /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:289
# 0x58815e github.com/Shopify/sarama.(*brokerProducer).addMessage+0x2ae /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:279
# 0x587a5d github.com/Shopify/sarama.(*Producer).addMessage+0xad /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:189
# 0x5855c1 github.com/Shopify/sarama.(*produceMessage).enqueue+0x61 /home/sean/Development/go/src/github.com/Shopify/sarama/produce_message.go:22
# 0x587987 github.com/Shopify/sarama.(*Producer).genericSendMessage+0x257 /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:181
# 0x58765b github.com/Shopify/sarama.(*Producer).QueueMessage+0x8b /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:135
# 0x40bcc9 main.(*KafkaStorage).MessageProxy+0xe9 /home/sean/Development/go/src/*/kafkastore.go:199
overlimit := 0
prb.reverseEach(func(msg *produceMessage) {
if err := msg.reenqueue(p); err != nil {
overlimit++
}
})
if overlimit > 0 {
errorCb(DroppedMessagesError{overlimit, nil})
}
return true
This code will block forever, because 2nd reenqueue call will return nil and errorCb will be not called. Also it seems overall retry logic is overcomplicated and won't work they way it supposed to.
file:produce_message.go
func (msg *produceMessage) enqueue(p *Producer) error {
......
errs := make(chan error, 1)
bp.flushRequest(p, prb, func(err error) {
errs <- err
})
//:3 block
return <-errs
}
file:producer.go
func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) (shutdownRequired bool) {
......
switch err {
case nil:
break
case EncodingError:
......
default:
overlimit := 0
prb.reverseEach(func(msg *produceMessage) {
//:1 if msg.reenqueue(p) return a nil, then overlimit=0
if err := msg.reenqueue(p); err != nil {
overlimit++
}
})
//:2 if overlimit=0, not call errorCb
if overlimit > 0 {
errorCb(DroppedMessagesError{overlimit, nil})
}
return true
}
......
}
func (msg *produceMessage) reenqueue(p *Producer) error {
if !msg.retried {
msg.retried = true
return msg.enqueue(p)
}
return nil
}
Spinning this off from #7 to track separately.
When the Client receives metadata from a broker it runs that metadata through Client.update()
, which tries to connect to any new brokers listed in the metadata. If any of those connections fail, it bails immediately which isn't the right choice. It also potentially leaks broker connections.
when I write produer with Compression CompressionGZIP. Then I can't read data from kafka with consumer, what's wrong ?
My program use QueueMessage to send data from logs. When restart then send process, it's always must send the data from last line when process down, so it may has lots of line to send in a monent. Sometimes, the producer process log error like this:
[ERROR][utils/kafkaUtils] kafkaUtils.go:239: In producer Error() some error comes. kafka: Dropped 278 messages
[ERROR][utils/kafkaUtils] kafkaUtils.go:239: In producer Error() some error comes. kafka: Dropped 264 messages
and can't retry or fix the problem.
Sometimes, there is no ERROR accur, but the producer program use lots of memory, the program use 30MB at the beginning, after some days, Same data and same produce, but only one of five program use 20-50GB memory.
Needs your help, Please.
Thanks.
Hello,
It would be great if you could create a mailing list for sarama. We could use the mailing list for knowledge sharing and general questions for which github issue is not suitable.
Thanks!
The ProducerConfig created by NewProducerConfig() does not pass its Validate() method since the MaxBufferSize and MaxBufferTime are zero (which they should be according to the documentation of Producer). Are these values invalid when < 0 ?
@wvanbergen @snormore @mkobetic (and anyone else who cares)
The current consumer works just fine, but is limited to fetching just a single partition of a single topic at once. You can run multiple consumers, but this has several issues:
MaxWaitTime
before responding, this introduces severe latency when multiplexing busy and non-busy topics). Using separate clients works, but generates a lot of extra connections and metadata traffic.The ideal solution is to have a consumer capable of fetching from multiple topic/partitions at once. This has been at the back of my mind for a while now, so I already have a design ready; if the design makes sense to y'all, then somebody just needs to write it :)
While the user currently specifies a single topic/partition to the constructor, they should now be able to specify a set of topic/partitions. Since some of the configuration that is currently a singleton actually needs to be per-partition (OffsetMethod
and OffsetValue
at least), this would probably be of type map[string]map[int32]*ConsumerPartitionConfig
. I considered permitting the dynamic adding/removing of partitions to the set, but I don't see a strong use case and it complicated a bunch of things.
Events are returned to the user exactly the same way they are now, over a single channel. I considered a separate channel per topic/partition but it complicated the base case, and the events already contain information on topic/partition so it's not hard for the user to dispatch appropriately if they really want to.
The constructor starts up one "controller" goroutine, which starts up and manages one goroutine per broker-that-has-a-partition-we-care-about and is responsible for (initially) dispatching each topic/partition to the appropriate broker's goroutine. The broker goroutine looks a lot like the current fetchMessages
method with a few tweaks:
I expect the success case to be fairly straightforward - as always, the complexity will come when reasoning through the failure cases and ensuring that topic/partitions are redispatched correctly, messages are not accidentally skipped in that case, etc. etc.
When the consumer is closed, it signals the controller which cleans up its children before exiting.
Thoughts?
Logged this:
WARNING: DATA RACE
Write by goroutine 41:
github.com/Shopify/sarama.(*brokerProducer).flush()
$GOPATH/src/github.com/Shopify/sarama/producer.go:326 +0x45c
github.com/Shopify/sarama.(*brokerProducer).flushIfAnyMessages()
$GOPATH/src/github.com/Shopify/sarama/producer.go:304 +0xc8
github.com/Shopify/sarama.func·008()
$GOPATH/src/github.com/Shopify/sarama/producer.go:247 +0x3e9
Previous read by goroutine 42:
github.com/Shopify/sarama.(*brokerProducer).flushIfOverCapacity()
$GOPATH/src/github.com/Shopify/sarama/producer.go:289 +0x3a
github.com/Shopify/sarama.(*brokerProducer).addMessage()
$GOPATH/src/github.com/Shopify/sarama/producer.go:285 +0x3ea
github.com/Shopify/sarama.(*Producer).addMessage()
$GOPATH/src/github.com/Shopify/sarama/producer.go:194 +0xd5
github.com/Shopify/sarama.(*produceMessage).enqueue()
$GOPATH/src/github.com/Shopify/sarama/produce_message.go:19 +0x6f
github.com/Shopify/sarama.(*Producer).genericSendMessage()
$GOPATH/src/github.com/Shopify/sarama/producer.go:186 +0x2d4
github.com/Shopify/sarama.(*Producer).QueueMessage()
$GOPATH/src/github.com/Shopify/sarama/producer.go:140 +0x80
Some config options (similar to the Java/Scala producer) for collecting and batching messages rather than sending each produced message immediately. This will be more efficient, and will make compression much more efficient as well.
@fw42 just thought of this. Set the handler, do something that spawns a go routine, then set the handler to nil. A panic would cause a second panic when trying to call a nil function pointer.
Always defer the function, then do the nil check inside the deferred function seems like the fix? There's still a race there, but dunno how to fix that, and it's really unlikely.
It might be nice if this function could be public
func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error)
I want to get offset from broker when i restart my consumer, can this function do that?
Hit this somehow when testing. Somehow a topic ended up with zero partitions in the clients metadata table, which resulted in a zero getting passed to rand.Intn which panicked.
Not sure if this was something returned from the broker or a bug on our end, or where we ought to catch this. I haven't really been able to reproduce.
The doc says this is ok.
https://github.com/Shopify/sarama/blob/master/producer.go#L9-L15
The code, which errors
https://github.com/Shopify/sarama/blob/master/producer.go#L88-L94
I have written a bare bones piece of go code to write and read events to the kafka "event" topic for a single-broker kafka on my local machine. The crux of the producer is this little bit where I write out the string messages A0
through A9
:
for i := 0; i < 10; i++ {
producer.SendMessage(topicName, nil, sarama.StringEncoder(fmt.Sprintf("A%d", i)))
}
I'm getting some very peculiar behavior here. Here is the scenario:
In kafka dir wipe everything clean and start zookeeper
$ rm -fr /tmp/kafka-logs/ /tmp/zookeeper/
$ bin/zookeeper-server-start.sh config/zookeeper.properties
#no errors
In different terminal in kafka dir start kafka
$ bin/kafka-server-start.sh config/server.properties
#no errors
In different terminal in kafka dir start console consumer. The output that follows is from running the script above. No apparent problem observed from the output although it's pretty slow.
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic event --from-beginning
A0
A1
A2
A3
A4
A5
A6
A7
A8
A9
In a different terminal I actually run the script. Note that I'm dropping several of the messages. However since they made it through the console-consumer above, it's not the fault of my producer; it must be something to do with the consumer.
$ go run sarama_example.go
creating client
creating consumer
creating producer
A0
A2
A3
A4
A6
A9
Finally, I stop and run the console consumer again and all the messages are out of order! So this problem could even be a kafka thing. (version = kafka_2.9.2-0.8.1.1) If I run the console consumer again, then the order stays the same as this.
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic event --from-beginning
A1
A5
A7
A8
A0
A2
A3
A4
A6
A9
I've replicated this behavior on my Macintosh as well as Ubuntu and CentOs VMs. Hopefully you'll look at my script and see some glaring configuration problem. In any case I would greatly appreciate any advice you might have.
I have 5 producer servers, every server has 40 clients.
All the clients' id are seted to a default id. The servers current run normal. Can I use the same client id? or this is wrong usage?
Thanks very much.
Following up to #7, #9, #10, #13 our connection management still has problems.
First some things to know about Kafka that complicate the issue:
Now some of the issues with our current implementation:
Thoughts on potential future design:
leader
function shouldn't guarantee that the broker it returns is connected, only connecting.any
function should guarantee that the broker it returns is actually connected. Walking through bad brokers to find a good one could be slow, but connecting to all the brokers just to pick one and drop the others is gross. We could try to connect to one, and if it hasn't succeed in 5 seconds, start another, etc. That should cover the common case, since if it hasn't responded after 5 seconds it probably won't, and if it does then we'll drop it right away, oh well. More importantly, we won't spawn hundreds of connections if the first broker does respond normally.Open Questions:
any
, when should it be disconnected? The extraBroker
field in client currently is a hack to work around this, and it still leaves one connection floating around for now reason. Presumably disconnectBroker
could be used and made smarter? However, if a metadata-supplied broker is used, calling disconnectBroker
will pull it out from underneath other code that could be using it as a leader. Do we need reference-counting?leader
calls would return the same broker and both call Open
, the second one will block when we don't want it to. Do we need a second lock in the brokers? That gets really messy...This function should return the last Offset correct to Topic/Partition, but always return the error:
kafka server:
The requested offset is outside the range of offsets maintained by the server for the given topic/partition.
#123
I wrote a benchmark program to see performance of the library. I noticed that it drops significant number of messages when either gzip/snappy is used with waitForLocal.
I get around 59.5 MBps without compression
I see the following error message in case of gzip/snappy:
[Sarama] 2014/03/11 18:39:06 kafka: Dropped 35497 messages
[Sarama] 2014/03/11 18:39:06 kafka: Dropped 35497 messages
[Sarama] 2014/03/11 18:39:06 kafka: Dropped 35497 messages
[Sarama] 2014/03/11 18:39:06 kafka: Dropped 35497 messages
I am sending 10 million messages, with gzip/snappy, 65k buffer and 1 second buffer timeout. Also eventually the benchmarking program gets killed.
Pending issue #2, there is no way for the consumer to manually specify which offset to start at - it's always 0. There should be a way to tell the consumer to start at a specific offset.
Hi,
I'm new to kafka so I might miss something; but here's what I want and think I cannot achieve with the Consumer type.
I'd like to be able to fetch 100 messages, process them, and only then notify Zookeeper that it can move my consumer group offset. This would allow my process to fail in the middle of a batch processing safely: so that the next time I start it, I can start processing the same batch again.
Is this possible with the current state of the library ?
When a to to create a client with in initial list of brokers, and none of these brokers is available, i got a panic, nil pointer dereference.
thank u for good job for this project
when use sarama, i come across a problem that how could i catch the connect error to reconnect? any idea for that? thank u:)
It might be nice if the producer could return the underlying correlation id when you QueueMessage()
Then the producer could have Acks() chan <- int32
and Nacks() chan <- int32
which you could receive on and know whether a particular message was successfully delivered or not.
No positive this makes sense from an implementation perspective, but the lack of this made me have to use SendMessage()
rather than QueueMessage()
Hi,
I am using an asynchronous producer to send compressed messages to Kafka cluster. But it looks like those messages exceed defined size. I get this error on Kafka Broker:
kafka.common.MessageSizeTooLargeException: Message size is 3213102 bytes which exceeds the maximum configured message size of 1000012.
This is my producer config:
producerConfig := sarama.NewProducerConfig()
producerConfig.RequiredAcks = sarama.NoResponse
producerConfig.MaxBufferedBytes = 1000000
producerConfig.MaxBufferTime = 30 * time.Second
producerConfig.Compression = sarama.CompressionSnappy
Everything works great with compression disabled.
Is 0.8.2 support planned some time soon? From what I hear all code is in the trunk already and it's in final testing stage. If it's planned I think it would make sense to start developing against kafka trunk and at least get conversation about new consumer/producer design started.
Since #65 was closed, I was hoping the re-connect stuff would work, but it doesnt appear to.
Using commit c761d93 I am attempting to constantly sync send to a single kafka broker.
I kill and restart the broker and sarama does not re-connect. The fix for #65 solved the deadlock issue I saw but now I get an infinite stream of
kafka: Dropped 1 messages: write tcp 127.0.0.1:9092: broken pipe
in my log.
Is there any special configuration necessary to get sarama to reconnect? Here is the code that creates the client and producer
log.Println("go=kafka at=new-kafka-deliver")
clientConfig := &sarama.ClientConfig{
MetadataRetries: 10,
WaitForElection: 10 * time.Second,
}
producerConfig := &sarama.ProducerConfig{
Partitioner: sarama.NewRandomPartitioner(),
RequiredAcks: sarama.WaitForLocal,
Timeout: 5 * time.Second,
Compression: sarama.CompressionNone,
MaxBufferedBytes: uint32(1000),
MaxBufferTime: 1 * time.Second,
}
client, err := sarama.NewClient(clientId, brokerList, clientConfig)
if err != nil {
return nil, err
}
log.Println("go=kafka at=created-client")
producer, err := sarama.NewProducer(client, producerConfig)
if err != nil {
return nil, err
}
log.Println("go=kafka at=created-producer")
func defaultProducerConfig() *sarama.ProducerConfig {
config := sarama.NewProducerConfig()
config.Compression = 0
config.MaxBufferTime = 5000
config.MaxBufferedBytes = 1024 *10
return config
}
I want to use the QueueMessage api and
I don't know whether it is the best way,Who can give me a better solution?
each record size is 500 bytes.
thanks a lot.
I am doing
go get github.com/Shopify/sarama
These are the errors that I get-
# github.com/Shopify/sarama
./broker.go:112: method b.responseReceiver is not an expression, must be called
./consumer.go:123: method c.fetchMessages is not an expression, must be called
./producer.go:236: timer.Reset undefined (type *time.Timer has no field or method Reset)
Is there any resolution for this ?
The Client
object has basically no public methods right now, it is just a place to dump methods (mostly broker and metadata tracking) that are shared by the Consumer and the Producer. Its primary functions should have specific semantics defined and then be published so other people can make use of them.
Probably depends on getting #15 right first...
I'm thinking (as a rough first draft):
Leader(topic, partition)
returns the broker object leading the topic/partitionTopics()
returns the list of topics for the clusterPartitions(topic)
returns the list of partitions for the topicRefreshMetadata(topics[])
Open Questions:
disconnectBroker
feels hacky, this really depends on #15)RefreshTopic(topic)
or RefreshAllTopics()
helper functions or something like that?any()
? It's only useful for metadata, so this probably depends if we expose all metadata or if a user might want to get it themselves.A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.