Given the following test program:
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/wvanbergen/kafka/consumergroup"
"gopkg.in/Shopify/sarama.v1"
)
func main() {
sarama.Logger = log.New(os.Stdout, "[Sarama]", log.LstdFlags)
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 10 * time.Second
consumer, consumerErr := consumergroup.JoinConsumerGroup("FOO_TEST_CAN_GO", []string{"testproducercango"}, []string{"127.0.0.1"}, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
defer consumer.Close()
go func() {
for err := range consumer.Errors() {
log.Println(err)
}
}()
eventCount := 0
messProcessed := make(map[string]int)
StreamLoop:
for {
select {
case <-time.After(time.Second * 10):
break StreamLoop
case mess := <-consumer.Messages():
fmt.Printf("Got event from stream. Topic: %v, Partition: %v, Offset: %v, Mess: %v \n", mess.Topic, mess.Partition, mess.Offset, string(mess.Value))
eventCount += 1
// Simulate processing time
time.Sleep(2 * time.Second)
consumer.CommitUpto(mess)
messProcessed[string(mess.Value)] += 1
}
}
log.Printf("Processed %d events.", eventCount)
log.Printf("%+v", messProcessed)
}
With topic testproducercango
having 20 messages called bladiebla1...bladiebla20
in 2 partitions.
I get the following output when I start this program 4
times in parallel.
Pid 0:
[Sarama]2015/04/07 17:37:39 Initializing new client
[Sarama]2015/04/07 17:37:39 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:39 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:39 Successfully initialized new client
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Consumer instance registered (me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840).
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Currently registered consumers: 1
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Claiming 2 of 2 partitions
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Partition consumer starting at the oldest available offset.
[Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Partition consumer starting at the oldest available offset.
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 0, Mess: bladiebla2
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 1, Mess: bladiebla3
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Stopping partition consumer at offset 6
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Last processed offset: 1. Waiting up to 10s for another 5 messages to process...
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Last processed offset: -1. Waiting up to 10s for another 13 messages to process...
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: -1
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] FAILED closing the offset manager: Not all offsets were committed before shutdown was completed!
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] Deregistered consumer instance me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840.
[Sarama]2015/04/07 17:37:55 Closing Client
[Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x47a613]
goroutine 1 [running]:
github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x2, 0x0)
/home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123
github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x0, 0x2, 0xc208381e00)
/home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208041590, 0x0, 0x0)
/home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1
main.main()
/home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f
goroutine 5 [semacquire]:
sync.(*WaitGroup).Wait(0xc20801e140)
/usr/local/go/src/sync/waitgroup.go:132 +0x169
github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0)
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d
github.com/samuel/go-zookeeper/zk.func·001()
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c
created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f
goroutine 7 [runnable]:
github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f9731560bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0)
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9
github.com/samuel/go-zookeeper/zk.func·002()
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a
created by github.com/samuel/go-zookeeper/zk.(*Conn).loop
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680
goroutine 17 [syscall, locked to thread]:
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:2232 +0x1
Pid 1:
[Sarama]2015/04/07 17:37:45 Initializing new client
[Sarama]2015/04/07 17:37:45 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:45 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:45 Successfully initialized new client
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Consumer instance registered (me-user:7c3f576d-063f-4399-b346-509180e0075d).
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 2.
[Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 3, Mess: bladiebla8
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 4, Mess: bladiebla12
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 5, Mess: bladiebla14
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset 6
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Last processed offset: 4. Waiting up to 10s for another 2 messages to process...
Got event from stream. Topic: testproducercango, Partition: 0, Offset: 6, Mess: bladiebla19
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 0 of 2 partitions
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 3.
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process...
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13
[Sarama]2015/04/07 17:38:13 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: 3
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 1
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 2 of 2 partitions
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 4.
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18
[Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12
2015/04/07 17:38:50 Processed 24 events.
2015/04/07 17:38:50 map[bladiebla13:2 bladiebla17:2 bladiebla6:1 bladiebla7:2 bladiebla16:2 bladiebla12:1 bladiebla8:1 bladiebla19:1 bladiebla11:2 bladiebla15:2 bladiebla4:1 bladiebla9:2 bladiebla10:2 bladiebla18:2 bladiebla14:1]
[Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] Deregistered consumer instance me-user:7c3f576d-063f-4399-b346-509180e0075d.
[Sarama]2015/04/07 17:38:51 Closing Client
[Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092
Pid 2:
[Sarama]2015/04/07 17:37:52 Initializing new client
[Sarama]2015/04/07 17:37:52 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:52 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:52 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:52 Successfully initialized new client
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Consumer instance registered (me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7).
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/14644cac47d7] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:37:56 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1
2015/04/07 17:38:02 Processed 0 events.
2015/04/07 17:38:02 map[]
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] Deregistered consumer instance me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7.
[Sarama]2015/04/07 17:38:02 Closing Client
[Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092
Pid 3:
[Sarama]2015/04/07 17:37:55 Initializing new client
[Sarama]2015/04/07 17:37:55 Fetching metadata for all topics from broker localhost:9092
[Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092
[Sarama]2015/04/07 17:37:55 Registered new broker #0 at localhost:9092
[Sarama]2015/04/07 17:37:55 Successfully initialized new client
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Consumer instance registered (me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0).
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 3
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Partition consumer starting at the oldest available offset.
[Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 0, Mess: bladiebla0
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 1, Mess: bladiebla1
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 2, Mess: bladiebla5
Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Triggering rebalance due to consumer list change
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Stopping partition consumer at offset 12
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process...
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 2
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions
[Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Partition consumer starting at offset 7.
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Stopping partition consumer at offset -1
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer
[Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] Deregistered consumer instance me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0.
[Sarama]2015/04/07 17:38:03 Closing Client
[Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092
[Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x47a613]
goroutine 1 [running]:
github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x3, 0x0)
/home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123
github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x1, 0x3, 0xc208381700)
/home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208040fa0, 0x0, 0x0)
/home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1
main.main()
/home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f
goroutine 5 [semacquire]:
sync.(*WaitGroup).Wait(0xc20801e140)
/usr/local/go/src/sync/waitgroup.go:132 +0x169
github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0)
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d
github.com/samuel/go-zookeeper/zk.func·001()
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c
created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f
goroutine 7 [runnable]:
github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f7bc8842bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0)
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9
github.com/samuel/go-zookeeper/zk.func·002()
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a
created by github.com/samuel/go-zookeeper/zk.(*Conn).loop
/home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680
goroutine 17 [syscall, locked to thread]:
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:2232 +0x1
Issues in bold. For now I see two:
markAsProcessed
acts on already closed partition
- Messages are processed multiple times!