Comments (10)
I am in the exact same boat, and while I don't have a definitive answer, this is what I do know.
The error returned from QueueMessage will only be non-nil for errors in constructing the message (since it isn't actually sent at this time). SendMessage (being synchronous) will return an error more likely tied to sending / acking. The channel returned from Producer.Errors() is a buffered channel (size 16) that will often times contain nil errors. I have not determined the source for the nils being written to the channel, but they are definitely there.
From what I can tell, there is no means currently to recover the message (or batch of messages) that generated a non-nil error on the errors channel. I have started playing with possibilities for handling this - though no significant progress has been made. I hope to determine a solution as well as the messages being persisted to the Kafka queue in my case can not be lossy.
from sarama.
I spent some time reworking how data arrives at my producer so that I can apply back pressure against it. So now I'm looking at whether to just change over to SendMessage or look more closely at how QueueMessage is constructed and possibly reimplement what's going on under the covers to get messagesets working.
I might have focused on the wrong source since the errors I'm seeing are seen by others. It could be that I've mis-diagnosed my problem and just need to look into whether using snappy is the source of data loss.
from sarama.
I spent some time last night on this issue as well. I am in the position where every message is equally important and I can't lose any of them.
What I did (testing for now) is add an additional channel, failed chan []byte
to the Producer and then anywhere there is a non-nil error returned I call
func (p *Producer) sendFailed(prb produceRequestBuilder) {
for _, msg := range prb {
p.failed <- *msg.value
}
}
Then I simply have a goroutine that consumes off of this channel and replays them onto the channel I am originally getting the messages from.
from sarama.
@schleppy is this a workaround to allow continued use of QueueMessage?
from sarama.
@davidbirdsong This is a workaround to allow for handling of failed flushes.
from sarama.
Sure, but are you using Producer.SendMessage or Produser.QueueMessage?
from sarama.
QueueMessage
from sarama.
@schleppy that's a good idea, do you want to clean it up (potentially make it configurable?) and submit it in a pull request?
@davidbirdsong are there any other issues you have (besides ones filed separately like #71) or should this be closed?
from sarama.
@eapache thanks, closing.
from sarama.
@eapache Yeah, I will look at getting that in a pull request today or tomorrow. It currently isn't configurable, so that may take some thinking.
from sarama.
Related Issues (20)
- [Bug]: Current offset is higher than last offset
- One bad issure HOT 2
- unaligned 64-bit atomic operation on i386 HOT 4
- Bug while adding new ACLs HOT 5
- sarama.CreateAclsRequest: Ignoring TextMarshaler & TextUnmarshaler for members HOT 1
- Kafka: client has run out of available brokers HOT 3
- Should share the max retries of metadata? HOT 1
- Topic retention settings causes consumer to skip consuming new messages that arrived after deleted messages HOT 5
- A topic with multiple partitions won't get dedicated consumer members in consumer group for each partition HOT 6
- kafka: Failed to produce message to topic myRsp: write tcp [ip1]:[port1]->[ip2]:[port2]: write: broken pipe HOT 7
- Enabling idempotency still allows producing same message
- No throughput difference between sync and async producers HOT 1
- High CPU usage in `newBrokerProducer.func2` HOT 3
- Incorrect rate5 metrics values
- Error and data race using transaction example from the library HOT 2
- kafka server: The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing HOT 1
- Will handling response this way result in subsequent requests all being mishandled? HOT 1
- unsubscribe w/o passing whole list HOT 2
- Sincerely ask, is this redundant? HOT 1
- How can consumer group consume new partition without losing data using OffsetNewest config?
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from sarama.