Coder Social home page Coder Social logo

sarama's People

Contributors

aaronkavlie-wf avatar agapoff avatar alrs avatar bai avatar bobrik avatar burke avatar d1egoaz avatar d6o avatar dependabot[bot] avatar dim avatar dnwe avatar eapache avatar faillefer avatar francoispoinsot avatar hindessm avatar horkhe avatar k-wall avatar kjtsanaktsidis avatar mimaison avatar mk6i avatar mongey avatar napallday avatar prestona avatar skidder avatar slaunay avatar stanislavkozlovski avatar thesalmontapes avatar vlad-arista avatar weeco avatar wvanbergen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sarama's Issues

a little help?

I've been cobbling together my first client using Sarama over the last few weeks and I'm struggling with how to implement my producer.

Could I get some clarification on:

both SendMessage and QueueMessage return error, but we're instructed to consume Errors when using QueueMessage--so what error will QueueMessage return? My inclination is to use QeueMessage for it's ability to batch messages into messagesets for the sake of throughput, but I'm having trouble understanding how to detect errors and how to recover when they arrive off of the Errors channel.

The stream of data arriving at my producer is not easy to apply back pressure against. Every so often, with QueueMessage, the Errors channel reports a few thousand messages dropped. I suspect that I can alleviate by monitoring the brokers more closely and possibly tuning them. I also expect that, with very, very minimal buffering in memory, that my producer could handle the broker hiccup and resend lost messages if it could detect which messages failed to send. Is there any sane way to do this while still using QueueMessage?

Producer.SendMessage hangs on leader election 0.8.1

My client config

    to.clientCfg = &sarama.ClientConfig{WaitForElection: 1 * time.Second,
        MetadataRetries: 10,
    }

producer config:

    to.producerCfg = &sarama.ProducerConfig{
        RequiredAcks: sarama.WaitForLocal,
        Timeout:      2000,
        Partitioner:  NewPartitioner(),
        //Compression:      to.cfg.compressCodec,
        MaxBufferedBytes: 1024,
        MaxBufferTime:    1,
    }

I'm using the kafka shell tools to perform a graceful leader shutdown:

kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper svac17:2181/prodz/kafka-session --broker 224   --num.retries 3 --retry.interval.ms 60

and my producer never returns from SendMessage.

Is this part of synchronous behavior or should it not return an error?

Consumer registration and rebalancing

Are there plans for sarama to directly support automated consumer registration and rebalancing, as described on http://kafka.apache.org/documentation.html#distributionimpl.

Use case: assuming I have N topics with M partitions, and I would to run a consumer group and distribute these across O consumers as evenly as possible (with each message being consumed only once).

The process steps are nicely documented, but is purely based on ZK communication. Should this be implemented directly into sarama or would it be better to create a wrapper library?

I volunteer to help with the development, but I would like to know if there as any existing work or plans I can (should) rely on?

producers become very slow when cluster has large number of topics

This is sort of a weird one. I've written a small load testing client which uses multiple goroutines pushing messages through multiple producers to my kafka cluster. When I've got a small number of topics on the cluster, I can push thousands of msgs/sec into a topic. However, when I've got many topics (~800), the message rate drops to ~30/s, even though I'm still only pushing to 4 topics for the test.

Stranger still, this only happens when I'm using sarama.WaitForAll. If I use sarama.WaitForLocal, I get decent rates (~800/sec). As well, we drafted a similar client using the python client library, and it doesn't exhibit this issue with the same settings.

Is there something in the sarama WaitForAll code that polls all the topics even if it's only publishing to a few of them? I'm at a loss to explain this behaviour otherwise.

We've found WaitForAll is required for data safety, so I'd definitely like to get to the bottom of this.

Why not SendMessageBatch?

I've been using your framework to build a 0.8 Producer, and it works well -- thanks for open-sourcing it. Perhaps this has been asked before, but is there any reason that a SendMessageBatch call doesn't exist on the Procuer? Go should make it easy to deal with the fact that the call would block, but it would allow me to use higher-throughput batching.

Any reason why it wouldn't be difficult to code? Perhaps I might knock it out.

Connection failure to Kafka cluster when some partitions have no leader

New clients are unable to connect to Kafka cluster when there are partitions without leaders (f.e dead Brokers). Existing clients works OK, they just shrink available partitions.

Please allow connecting and consuming/producing messages to Kafka cluster with unavailable partitions.

kafka: Dropped 5739 messages

hi all:
I sometimes find this error: "kafka: Dropped 5739 messages",
And this is my producer config:
func defaultProducerConfig() *sarama.ProducerConfig {
config := sarama.NewProducerConfig()
config.Compression = 2
config.MaxBufferTime = 1000000 * time.Millisecond
config.MaxBufferedBytes = 1024 * 10
return config
}

I'm use producer.QueueMessage function;I don't know why,Who can help me,and give me some advice。thanks a lot;

Offset Fetch/Commit Support

The Consumer should make use of https://cwiki.apache.org/confluence/display/KAFKA/Offset+Management

The golang protocol backend already supports those request/response types, but the current Kafka 0.8 beta 1 seems to choke on them so I didn't add them to the API.

I suspect this will consist of doing a Fetch on construction of a Consumer, then adding a Commit(offset) api call that the consumer user can call as appropriate. The python bindings have an autocommit option, but I think that's overcomplicated for our needs, at least to start.

panic: runtime error: send on closed channel

Hi

I get panic: runtime error: send on closed channel when I kill a broker in a Kafka cluster.

It looks like backPressureThreshold is exceeded and you want to flush messages but something bad is happening.

I also wonder why I get kafka: Dropped 13803 messages... What is the reason? I know that one of the brokers is dead, but what about new Leaders for partitions?

I use an asynchronous producer. My configuration:

producerConfig := sarama.NewProducerConfig()
producerConfig.RequiredAcks = sarama.NoResponse
producerConfig.MaxBufferedBytes = 1000000
producerConfig.BackPressureThresholdBytes = 10000000
producerConfig.MaxBufferTime = 30 * time.Second
producerConfig.Compression = sarama.CompressionSnappy

Logs:

[Sarama] 2014/09/23 18:19:24 Initializing new client
[Sarama] 2014/09/23 18:19:24 Fetching metadata from broker localhost:9092
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9092
[Sarama] 2014/09/23 18:19:24 Registered new broker #5 at localhost:9096
[Sarama] 2014/09/23 18:19:24 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:19:24 Registered new broker #2 at localhost:9093
[Sarama] 2014/09/23 18:19:24 Registered new broker #3 at localhost:9094
[Sarama] 2014/09/23 18:19:24 Registered new broker #4 at localhost:9095
[Sarama] 2014/09/23 18:19:24 Successfully initialized new client
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9096
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9095
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9094
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9093
[Sarama] 2014/09/23 18:19:24 Connected to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Fetching metadata from broker localhost:9093
[Sarama] 2014/09/23 18:21:56 Closed connection to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:21:56 Failed to connect to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 kafka: Dropped 13803 messages: dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Fetching metadata from broker localhost:9096
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 kafka: Dropped 4406 messages: dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:56 Registered new broker #1 at localhost:9092
[Sarama] 2014/09/23 18:21:56 Failed to connect to broker localhost:9092
[Sarama] 2014/09/23 18:21:56 dial tcp 127.0.0.1:9092: connection refused
[Sarama] 2014/09/23 18:21:56 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:56 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:56 kafka: broker: not connected
[Sarama] 2014/09/23 18:21:58 Disconnecting Broker 1
[Sarama] 2014/09/23 18:21:58 Failed to close connection to broker localhost:9092.
[Sarama] 2014/09/23 18:21:58 kafka: broker: not connected
panic: runtime error: send on closed channel

Panic:

goroutine 35 [running]:
runtime.panic(0x57c3c0, 0x690fbe)
/usr/lib/go/src/pkg/runtime/panic.c:279 +0xf5
github.com/Shopify/sarama.(_brokerProducer).flushIfOverCapacity(0xc20804e680, 0x989680000f4240)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:290 +0xa0
github.com/Shopify/sarama.(_brokerProducer).addMessage(0xc20804e680, 0xc214a13e00, 0x989680000f4240)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:280 +0x2ae
github.com/Shopify/sarama.(_Producer).addMessage(0xc208044120, 0xc214a13e00, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:190 +0xad
github.com/Shopify/sarama.(_produceMessage).enqueue(0xc214a13e00, 0xc208044120, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/produce_message.go:22 +0x61
github.com/Shopify/sarama.(_Producer).genericSendMessage(0xc208044120, 0x5aa3d0, 0x9, 0x7f1e203739d0, 0xc208bd4b60, 0x7f1e203739d0, 0xc208bd4b80, 0x7f1e201cdf00, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:182 +0x257
github.com/Shopify/sarama.(_Producer).QueueMessage(0xc208044120, 0x5aa3d0, 0x9, 0x7f1e203739d0, 0xc208bd4b60, 0x7f1e203739d0, 0xc208bd4b80, 0x0, 0x0)
/home/noxis/go/src/github.com/Shopify/sarama/producer.go:136 +0x8b
main.worker(0xc208082000, 0xc208040020, 0xc208044120)
/home/noxis/IdeaProjects/kafka-log-producer/src/kafka-log-producer.go:112 +0x263
created by main.main
/home/noxis/IdeaProjects/kafka-log-producer/src/kafka-log-producer.go:43 +0x120

mockbroker.go should not import testing

Currently any lib importing Shopify/sarama will get "testing" package pulled in. According to nice folks at #go-nuts "testing" package should be imported only from *_test.go.

How to solve the producer Close very slow

    start_time := int32(time.Now().Unix())
    for _,j_result := range data{
        //err := producer.SendMessage("weblog", nil, sarama.StringEncoder(j_result))
        err := producer.QueueMessage("weblog", nil, sarama.StringEncoder(j_result))
        if err != nil {
            fmt.Println(err)
            select{
            case <-producer.Errors():
            default:
                //case <-time.After(1*time.Millisecond):
            }
        }

    }
    fmt.Println("closeing socket connections...")
    producer.Close()
    client.Close()
    end_time := int32(time.Now().Unix())
    fmt.Println("upload file ok",filename,"total time:",end_time - start_time," total lines:",cap(data))

I found producer.Close() is very slow;
How to solve this problem ?

Initializing Client with Multiple Brokers

This has come up in the back of my mind a few times but I've always held off because I'm not sure what sane default behaviours are in all cases. Right now you specify a single host:port to construct a client and it pulls metadata on the rest of the cluster from that. It makes sense to be able to specify a list of brokers, so that if part of the cluster is offline the client can still connect.

Client reconnect on `kill -9 {kafkaProc}`

I see a few issues which seem somewhat related but none of the commits nor suggestions seems to have any effect.

This issue is fully reproducible.

  • start client / producers
  • call QueueMessage for each message
  • kill -9 a broker

Here are my connections in CLOSE_WAIT to the failed node

$ netstat -a | grep 10.10.2.46
tcp        1      0 ip-10-10-1-202:57278    10.10.2.46:9092         CLOSE_WAIT
tcp        1      0 ip-10-10-1-202:57279    10.10.2.46:9092         CLOSE_WAIT

Any idea on what I can possibly try here?

This is what shows in the sarama log

[Sarama] 2014/08/21 22:43:17 Initializing new client
[Sarama] 2014/08/21 22:43:17 Fetching metadata from broker 10.10.1.196:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker 10.10.1.196:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010226 at kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #10101195 at kafka-i-e6780fb4-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010246 at kafka-i-1b5f9a36-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #10101196 at kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010196 at kafka-i-b77a0de5-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010250 at kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Successfully initialized new client
[Sarama] 2014/08/21 22:43:17 Initializing new client
[Sarama] 2014/08/21 22:43:17 Fetching metadata from broker 10.10.1.196:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker 10.10.1.196:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-b77a0de5-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-e6780fb4-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010226 at kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #10101195 at kafka-i-e6780fb4-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010246 at kafka-i-1b5f9a36-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #10101196 at kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010196 at kafka-i-b77a0de5-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Registered new broker #1010250 at kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Successfully initialized new client
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-1b5f9a36-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-1b5f9a36-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-b77a0de5-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-e6780fb4-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:43:17 Connected to broker kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
--- THIS IS WHERE I KILL -9 ---
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 33 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 33 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 33 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 29 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 29 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 29 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 30 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 31 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:35 kafka: Dropped 29 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:43:37 kafka: Dropped 112 messages: kafka server: Request exceeded the user-specified time limit in the request.
[Sarama] 2014/08/21 22:45:17 Fetching metadata from broker kafka-i-24671076-integ.us-east-1a.ec2.integ:9092
[Sarama] 2014/08/21 22:47:17 Fetching metadata from broker kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:49:17 Fetching metadata from broker kafka-i-6621614c-integ.us-east-1c.ec2.integ:9092
[Sarama] 2014/08/21 22:51:17 Fetching metadata from broker kafka-i-d01a5afa-integ.us-east-1c.ec2.integ:9092

I am using the default ClientConfig and BrokerConfig (via New*). Here is a dump of my sarama goroutines after failure:

14 @ 0x424659 0x4246db 0x42df08 0x42e38e 0x57929d 0x58d1c6 0x58d0a7 0x4248f0
#   0x42df08    chanrecv+0x1d8                                      /usr/local/go/src/pkg/runtime/chan.goc:298
#   0x42e38e    runtime.chanrecv2+0x3e                                  /usr/local/go/src/pkg/runtime/chan.goc:358
#   0x57929d    github.com/Shopify/sarama.(*Broker).responseReceiver+0xdd               /home/sean/Development/go/src/github.com/Shopify/sarama/broker.go:361
#   0x58d1c6    github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm+0x26 /home/sean/Development/go/src/github.com/Shopify/sarama/broker.go:114
#   0x58d0a7    github.com/Shopify/sarama.withRecover+0x37                      /home/sean/Development/go/src/github.com/Shopify/sarama/utils.go:27

2 @ 0x424659 0x4246db 0x42da2b 0x42e2fc 0x58df83 0x588cad 0x5888ca 0x58da28 0x4248f0
#   0x42da2b    chansend+0x16b                          /usr/local/go/src/pkg/runtime/chan.goc:168
#   0x42e2fc    runtime.chansend1+0x4c                      /usr/local/go/src/pkg/runtime/chan.goc:347
#   0x58df83    github.com/Shopify/sarama.func·009+0x3a3           /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:351
#   0x588cad    github.com/Shopify/sarama.(*brokerProducer).flushRequest+0x37d  /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:413
#   0x5888ca    github.com/Shopify/sarama.(*brokerProducer).flush+0x41a     /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:352
#   0x58da28    github.com/Shopify/sarama.func·008+0xc8                /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:235

1 @ 0x424659 0x4246db 0x42da2b 0x42e2fc 0x588390 0x58815e 0x587a5d 0x5855c1 0x587987 0x58765b 0x40bcc9 0x4248f0
#   0x42da2b    chansend+0x16b                              /usr/local/go/src/pkg/runtime/chan.goc:168
#   0x42e2fc    runtime.chansend1+0x4c                          /usr/local/go/src/pkg/runtime/chan.goc:347
#   0x588390    github.com/Shopify/sarama.(*brokerProducer).flushIfOverCapacity+0xa0    /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:289
#   0x58815e    github.com/Shopify/sarama.(*brokerProducer).addMessage+0x2ae        /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:279
#   0x587a5d    github.com/Shopify/sarama.(*Producer).addMessage+0xad           /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:189
#   0x5855c1    github.com/Shopify/sarama.(*produceMessage).enqueue+0x61        /home/sean/Development/go/src/github.com/Shopify/sarama/produce_message.go:22
#   0x587987    github.com/Shopify/sarama.(*Producer).genericSendMessage+0x257      /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:181
#   0x58765b    github.com/Shopify/sarama.(*Producer).QueueMessage+0x8b         /home/sean/Development/go/src/github.com/Shopify/sarama/producer.go:135
#   0x40bcc9    main.(*KafkaStorage).MessageProxy+0xe9                  /home/sean/Development/go/src/*/kafkastore.go:199

Retry logic is broken in the producer

    overlimit := 0
    prb.reverseEach(func(msg *produceMessage) {
        if err := msg.reenqueue(p); err != nil {
            overlimit++
        }
    })
    if overlimit > 0 {
        errorCb(DroppedMessagesError{overlimit, nil})
    }
    return true

This code will block forever, because 2nd reenqueue call will return nil and errorCb will be not called. Also it seems overall retry logic is overcomplicated and won't work they way it supposed to.

Producer deadlock: overlimit=0

file:produce_message.go

func (msg *produceMessage) enqueue(p *Producer) error {
    ......
    errs := make(chan error, 1)
    bp.flushRequest(p, prb, func(err error) {
        errs <- err
    })
    //:3 block
    return <-errs
}

file:producer.go

func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) (shutdownRequired bool) {
    ......
    switch err {
    case nil:
        break
    case EncodingError:
        ......
    default:
        overlimit := 0
        prb.reverseEach(func(msg *produceMessage) {
            //:1 if msg.reenqueue(p) return a nil, then overlimit=0
            if err := msg.reenqueue(p); err != nil {
                overlimit++
            }
        })
        //:2 if overlimit=0, not call errorCb
        if overlimit > 0 {
            errorCb(DroppedMessagesError{overlimit, nil})
        }
        return true
    }
    ......
}

func (msg *produceMessage) reenqueue(p *Producer) error {
    if !msg.retried {
        msg.retried = true
        return msg.enqueue(p)
    }
    return nil
}

Handling Unreachable Brokers

Spinning this off from #7 to track separately.

When the Client receives metadata from a broker it runs that metadata through Client.update(), which tries to connect to any new brokers listed in the metadata. If any of those connections fail, it bails immediately which isn't the right choice. It also potentially leaks broker connections.

Is it a bug with Compression?

when I write produer with Compression CompressionGZIP. Then I can't read data from kafka with consumer, what's wrong ?

Producer drop message and sometimes use lots of memory

My program use QueueMessage to send data from logs. When restart then send process, it's always must send the data from last line when process down, so it may has lots of line to send in a monent. Sometimes, the producer process log error like this:
[ERROR][utils/kafkaUtils] kafkaUtils.go:239: In producer Error() some error comes. kafka: Dropped 278 messages
[ERROR][utils/kafkaUtils] kafkaUtils.go:239: In producer Error() some error comes. kafka: Dropped 264 messages
and can't retry or fix the problem.

Sometimes, there is no ERROR accur, but the producer program use lots of memory, the program use 30MB at the beginning, after some days, Same data and same produce, but only one of five program use 20-50GB memory.

Needs your help, Please.

Thanks.

Create a mailing list for sarama

Hello,

It would be great if you could create a mailing list for sarama. We could use the mailing list for knowledge sharing and general questions for which github issue is not suitable.

Thanks!

Default ProducerConfig is invalid

The ProducerConfig created by NewProducerConfig() does not pass its Validate() method since the MaxBufferSize and MaxBufferTime are zero (which they should be according to the documentation of Producer). Are these values invalid when < 0 ?

Compression codec flag set incorrectly

I will have to write a test to verify this, but I think #8 introduced a bug. The contents of the message is correctly compressed, but the flag indicating the compression codec used is no longer set properly in the outgoing message.

(CC @burke @fw42 )

MultiConsumer

@wvanbergen @snormore @mkobetic (and anyone else who cares)

The current consumer works just fine, but is limited to fetching just a single partition of a single topic at once. You can run multiple consumers, but this has several issues:

  • They can't use the same client, or else the broker connection has contention problems (brokers handle requests on a single connection serially, so if it has to wait for MaxWaitTime before responding, this introduces severe latency when multiplexing busy and non-busy topics). Using separate clients works, but generates a lot of extra connections and metadata traffic.
  • You don't get messages from multiple partitions batched into a single response, when that makes sense for efficiency.

The ideal solution is to have a consumer capable of fetching from multiple topic/partitions at once. This has been at the back of my mind for a while now, so I already have a design ready; if the design makes sense to y'all, then somebody just needs to write it :)

While the user currently specifies a single topic/partition to the constructor, they should now be able to specify a set of topic/partitions. Since some of the configuration that is currently a singleton actually needs to be per-partition (OffsetMethod and OffsetValue at least), this would probably be of type map[string]map[int32]*ConsumerPartitionConfig. I considered permitting the dynamic adding/removing of partitions to the set, but I don't see a strong use case and it complicated a bunch of things.

Events are returned to the user exactly the same way they are now, over a single channel. I considered a separate channel per topic/partition but it complicated the base case, and the events already contain information on topic/partition so it's not hard for the user to dispatch appropriately if they really want to.

The constructor starts up one "controller" goroutine, which starts up and manages one goroutine per broker-that-has-a-partition-we-care-about and is responsible for (initially) dispatching each topic/partition to the appropriate broker's goroutine. The broker goroutine looks a lot like the current fetchMessages method with a few tweaks:

  • Some minor work needed to handle multiple blocks in the requests and responses.
  • When a topic/partition is reassigned to a new broker, that topic/partition gets returned to the controller via a channel; the goroutine tracks how many it is "responsible" for and exits if that reaches 0.
  • Similarly when a broker goes down hard, all topic/partitions are returned to the controller for re-dispatching and the goroutine exits.

I expect the success case to be fairly straightforward - as always, the complexity will come when reasoning through the failure cases and ensuring that topic/partitions are redispatched correctly, messages are not accidentally skipped in that case, etc. etc.

When the consumer is closed, it signals the controller which cleans up its children before exiting.

Thoughts?

Race condition within producer

Logged this:

WARNING: DATA RACE
Write by goroutine 41:
  github.com/Shopify/sarama.(*brokerProducer).flush()
      $GOPATH/src/github.com/Shopify/sarama/producer.go:326 +0x45c
  github.com/Shopify/sarama.(*brokerProducer).flushIfAnyMessages()
      $GOPATH/src/github.com/Shopify/sarama/producer.go:304 +0xc8
  github.com/Shopify/sarama.func·008()
      $GOPATH/src/github.com/Shopify/sarama/producer.go:247 +0x3e9

Previous read by goroutine 42:
  github.com/Shopify/sarama.(*brokerProducer).flushIfOverCapacity()
      $GOPATH/src/github.com/Shopify/sarama/producer.go:289 +0x3a
  github.com/Shopify/sarama.(*brokerProducer).addMessage()
      $GOPATH/src/github.com/Shopify/sarama/producer.go:285 +0x3ea
  github.com/Shopify/sarama.(*Producer).addMessage()
      $GOPATH/src/github.com/Shopify/sarama/producer.go:194 +0xd5
  github.com/Shopify/sarama.(*produceMessage).enqueue()
      $GOPATH/src/github.com/Shopify/sarama/produce_message.go:19 +0x6f
  github.com/Shopify/sarama.(*Producer).genericSendMessage()
      $GOPATH/src/github.com/Shopify/sarama/producer.go:186 +0x2d4
  github.com/Shopify/sarama.(*Producer).QueueMessage()
      $GOPATH/src/github.com/Shopify/sarama/producer.go:140 +0x80

Async/Batched Producer

Some config options (similar to the Java/Scala producer) for collecting and batching messages rather than sending each produced message immediately. This will be more efficient, and will make compression much more efficient as well.

Changing recovery function can cause panic

@fw42 just thought of this. Set the handler, do something that spawns a go routine, then set the handler to nil. A panic would cause a second panic when trying to call a nil function pointer.

Always defer the function, then do the nil check inside the deferred function seems like the fix? There's still a race there, but dunno how to fix that, and it's really unlikely.

Consumer: public the consumer getOffset function?

It might be nice if this function could be public
func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error)
I want to get offset from broker when i restart my consumer, can this function do that?

Possible panic in random partitioner

Hit this somehow when testing. Somehow a topic ended up with zero partitions in the clients metadata table, which resulted in a zero getting passed to rand.Intn which panicked.

Not sure if this was something returned from the broker or a bug on our end, or where we ought to catch this. I haven't really been able to reproduce.

Super-simple sarama example dropping and mis-ordering messages.

I have written a bare bones piece of go code to write and read events to the kafka "event" topic for a single-broker kafka on my local machine. The crux of the producer is this little bit where I write out the string messages A0 through A9:

    for i := 0; i < 10; i++ {
        producer.SendMessage(topicName, nil, sarama.StringEncoder(fmt.Sprintf("A%d", i)))
    }

I'm getting some very peculiar behavior here. Here is the scenario:

In kafka dir wipe everything clean and start zookeeper

$ rm -fr /tmp/kafka-logs/ /tmp/zookeeper/ 
$ bin/zookeeper-server-start.sh config/zookeeper.properties
#no errors

In different terminal in kafka dir start kafka

$ bin/kafka-server-start.sh config/server.properties
#no errors

In different terminal in kafka dir start console consumer. The output that follows is from running the script above. No apparent problem observed from the output although it's pretty slow.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic event --from-beginning
A0
A1
A2
A3
A4
A5
A6
A7
A8
A9

In a different terminal I actually run the script. Note that I'm dropping several of the messages. However since they made it through the console-consumer above, it's not the fault of my producer; it must be something to do with the consumer.

$ go run sarama_example.go
creating client
creating consumer
creating producer
A0
A2
A3
A4
A6
A9

Finally, I stop and run the console consumer again and all the messages are out of order! So this problem could even be a kafka thing. (version = kafka_2.9.2-0.8.1.1) If I run the console consumer again, then the order stays the same as this.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic event --from-beginning
A1
A5
A7
A8
A0
A2
A3
A4
A6
A9

I've replicated this behavior on my Macintosh as well as Ubuntu and CentOs VMs. Hopefully you'll look at my script and see some glaring configuration problem. In any case I would greatly appreciate any advice you might have.

If all the client id is same, what will happen?

I have 5 producer servers, every server has 40 clients.
All the clients' id are seted to a default id. The servers current run normal. Can I use the same client id? or this is wrong usage?

Thanks very much.

Better Broker Connection Management

Following up to #7, #9, #10, #13 our connection management still has problems.

First some things to know about Kafka that complicate the issue:

  • Broker metadata requests do not seem required to return all the brokers they know about (it is not documented which ones they do return, but I suspect it is the only the subset of brokers that are leading a partition whose data is also in the response).
  • The user only specifies (host, port) pairs while Kafka metadata returns (host, port, brokerId) triples, with no guarantee that the host matches the user-specified hostname, and with no apparent way to query a broker's own brokerId.

Now some of the issues with our current implementation:

  • If the user provides three addresses, only the last of which is valid, we wait for the first two connections to completely fail when fetching metadata before we try the third. Since the TCP connection timeout can be very long (many minutes) this can take a long time. Trying them all in parallel and using the first valid one would be better.
  • Once we've had an address fail, we discard it forever. Over the lifetime of a long-running cluster, all of the nodes may be down at one point or another, meaning that eventually we may 'run out' of nodes and abort.

Thoughts on potential future design:

  • We should connect to brokers lazily, ie only when we actually want to talk to them, not just when we know they exist.
  • Since there is only ever one leader for a partition, if the connection fails it doesn't make sense to just try another broker. Therefore the caller should decide to proceed, which means that the client's leader function shouldn't guarantee that the broker it returns is connected, only connecting.
  • However, when fetching metadata, any broker will do, so the any function should guarantee that the broker it returns is actually connected. Walking through bad brokers to find a good one could be slow, but connecting to all the brokers just to pick one and drop the others is gross. We could try to connect to one, and if it hasn't succeed in 5 seconds, start another, etc. That should cover the common case, since if it hasn't responded after 5 seconds it probably won't, and if it does then we'll drop it right away, oh well. More importantly, we won't spawn hundreds of connections if the first broker does respond normally.
  • User-provided addresses should be managed separately (which they already are) since they don't have brokerIds. They're only useful for metadata requests, so they should be disconnected entirely 99% of the time. If one of them fails, we should probably just mark it as recently-failed, and retry it after some time has passed. Only if all user-supplied nodes have recently failed (and we have no metadata from the cluster itself) should we abort somehow.

Open Questions:

  • If a user-supplied broker is used to satisfy a call to any, when should it be disconnected? The extraBroker field in client currently is a hack to work around this, and it still leaves one connection floating around for now reason. Presumably disconnectBroker could be used and made smarter? However, if a metadata-supplied broker is used, calling disconnectBroker will pull it out from underneath other code that could be using it as a leader. Do we need reference-counting?
  • There is currently no way to tell that a Broker is 'Connecting' you can only tell that the lock is currently held. This means that if two concurrent leader calls would return the same broker and both call Open, the second one will block when we don't want it to. Do we need a second lock in the brokers? That gets really messy...

GetOffset function not working properly

This function should return the last Offset correct to Topic/Partition, but always return the error:
kafka server:
The requested offset is outside the range of offsets maintained by the server for the given topic/partition.
#123

sarama drops message when either gzip/snappy is used

I wrote a benchmark program to see performance of the library. I noticed that it drops significant number of messages when either gzip/snappy is used with waitForLocal.
I get around 59.5 MBps without compression

I see the following error message in case of gzip/snappy:

[Sarama] 2014/03/11 18:39:06 kafka: Dropped 35497 messages
[Sarama] 2014/03/11 18:39:06 kafka: Dropped 35497 messages
[Sarama] 2014/03/11 18:39:06 kafka: Dropped 35497 messages
[Sarama] 2014/03/11 18:39:06 kafka: Dropped 35497 messages

I am sending 10 million messages, with gzip/snappy, 65k buffer and 1 second buffer timeout. Also eventually the benchmarking program gets killed.

Manual Consumer Offset Management

Pending issue #2, there is no way for the consumer to manually specify which offset to start at - it's always 0. There should be a way to tell the consumer to start at a specific offset.

Safely consuming batch of messages

Hi,

I'm new to kafka so I might miss something; but here's what I want and think I cannot achieve with the Consumer type.
I'd like to be able to fetch 100 messages, process them, and only then notify Zookeeper that it can move my consumer group offset. This would allow my process to fail in the middle of a batch processing safely: so that the next time I start it, I can start processing the same batch again.
Is this possible with the current state of the library ?

any idea for producer/consumer reconnect?

thank u for good job for this project
when use sarama, i come across a problem that how could i catch the connect error to reconnect? any idea for that? thank u:)

Producer: expose correlationId, send it in Nacks() / Acks() ?

It might be nice if the producer could return the underlying correlation id when you QueueMessage()

Then the producer could have Acks() chan <- int32 and Nacks() chan <- int32 which you could receive on and know whether a particular message was successfully delivered or not.

No positive this makes sense from an implementation perspective, but the lack of this made me have to use SendMessage() rather than QueueMessage()

Message exceeds the maximum size when using Compression

Hi,

I am using an asynchronous producer to send compressed messages to Kafka cluster. But it looks like those messages exceed defined size. I get this error on Kafka Broker:

kafka.common.MessageSizeTooLargeException: Message size is 3213102 bytes which exceeds the maximum configured message size of 1000012.

This is my producer config:

producerConfig := sarama.NewProducerConfig()
producerConfig.RequiredAcks = sarama.NoResponse
producerConfig.MaxBufferedBytes = 1000000
producerConfig.MaxBufferTime = 30 * time.Second
producerConfig.Compression = sarama.CompressionSnappy

Everything works great with compression disabled.

kafka 0.8.2 support

Is 0.8.2 support planned some time soon? From what I hear all code is in the trunk already and it's in final testing stage. If it's planned I think it would make sense to start developing against kafka trunk and at least get conversation about new consumer/producer design started.

sarama does not seem to re-connect approriately

Since #65 was closed, I was hoping the re-connect stuff would work, but it doesnt appear to.

Using commit c761d93 I am attempting to constantly sync send to a single kafka broker.

I kill and restart the broker and sarama does not re-connect. The fix for #65 solved the deadlock issue I saw but now I get an infinite stream of

kafka: Dropped 1 messages: write tcp 127.0.0.1:9092: broken pipe

in my log.

Is there any special configuration necessary to get sarama to reconnect? Here is the code that creates the client and producer

    log.Println("go=kafka at=new-kafka-deliver")
    clientConfig := &sarama.ClientConfig{
        MetadataRetries:      10,
        WaitForElection:      10 * time.Second,
    }
    producerConfig := &sarama.ProducerConfig{
        Partitioner:      sarama.NewRandomPartitioner(),
        RequiredAcks:     sarama.WaitForLocal,
        Timeout:          5 * time.Second,
        Compression:      sarama.CompressionNone,
        MaxBufferedBytes: uint32(1000),
        MaxBufferTime:    1 * time.Second,
    }

    client, err := sarama.NewClient(clientId, brokerList, clientConfig)
    if err != nil {
        return nil, err
    }
    log.Println("go=kafka at=created-client")

    producer, err := sarama.NewProducer(client, producerConfig)
    if err != nil {
        return nil, err
    }
    log.Println("go=kafka at=created-producer")

how to set Producer Configis the most optimized !!

func defaultProducerConfig() *sarama.ProducerConfig {
    config := sarama.NewProducerConfig()
    config.Compression = 0
    config.MaxBufferTime = 5000
    config.MaxBufferedBytes = 1024 *10
    return config
}

I want to use the QueueMessage api and
I don't know whether it is the best way,Who can give me a better solution?
each record size is 500 bytes.
thanks a lot.

Installation problem

I am doing

go get github.com/Shopify/sarama

These are the errors that I get-

# github.com/Shopify/sarama
./broker.go:112: method b.responseReceiver is not an expression, must be called
./consumer.go:123: method c.fetchMessages is not an expression, must be called
./producer.go:236: timer.Reset undefined (type *time.Timer has no field or method Reset)

Is there any resolution for this ?

Expose consistent client method set

The Client object has basically no public methods right now, it is just a place to dump methods (mostly broker and metadata tracking) that are shared by the Consumer and the Producer. Its primary functions should have specific semantics defined and then be published so other people can make use of them.

Probably depends on getting #15 right first...

I'm thinking (as a rough first draft):

  • Leader(topic, partition) returns the broker object leading the topic/partition
  • Topics() returns the list of topics for the cluster
  • Partitions(topic) returns the list of partitions for the topic
  • RefreshMetadata(topics[])

Open Questions:

  • Does it make sense to store/return other metadata (replicas or ISRs, for example)? Does any of this provide benefit beyond just sending a MetadataRequest using the low-level Broker object?
  • How are brokers returned to the client when no longer needed? (the current disconnectBroker feels hacky, this really depends on #15)
  • Do we want to provide RefreshTopic(topic) or RefreshAllTopics() helper functions or something like that?
  • Does it make sense to expose any()? It's only useful for metadata, so this probably depends if we expose all metadata or if a user might want to get it themselves.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.