Comments (11)
This error sounds familiar. I'll make pre-release nuget package to either add more debugging info or a patch.
from kafka4net.
Managed to reproduce it.
Steps to reproduce:
- Prerequisites: 3 Kafka brokers [1,2,3] and one topic with one partition. Partition leader = 2
- Stop 2nd broker (so, the leader now will be 1 or 3)
- Create the producer (seed brokers = [1,2,3])
- It will receive metadata about 2 brokers only - 1 and 3
- At this point - writes are successful
- Start 2nd broker and wait for or force preferred replica election
- ERROR kafka4net.Internal.PartitionRecoveryMonitor received MetadataResponse for broker that is not yet in our list (2)!
Found the code that is responsible for this. Will try to fix it.
from kafka4net.
Great info Viktor!
"force preferred replica election": I think here is the problem, I have no single unit test to cover forced reelection. Will add it to the issues.
I should be able to work on it tomorrow.
from kafka4net.
It doesn't matter if it was forced election or just an ordinary election.
The main reason - when Producer is created - only 2 out of 3 brokers are available.
So when topic metadata is retrieved, MergeTopicMeta is called but this code
newBrokers.Where(b => b.NodeId != -99).ForEach(b => _newBrokerSubject.OnNext(b));
adds only 1st and 3rd brokers (2nd is not available at the moment)
When 2nd broker becames available and was reelected as a leader for that partition (once again, doesn't matter if it was forced or not)
Then new metadata is received, and then RecoveryLoop fails here:
BrokerMeta newBroker;
_brokers.TryGetValue(brokerGrp.Key, out newBroker);
if (newBroker == null)
{
_log.Error("received MetadataResponse for broker that is not yet in our list ({0})!", brokerGrp.Key);
return;
}
because there is no broker with id=2 in the _brokers dictionary
and it will be never added because _newMetadataEvent.OnNext(filteredResponse);
will not be called
Probably we somehow should announce new broker in such case, but I'm still not sure where is the best place in code to do it.
from kafka4net.
@vikmv I can reproduce the bug in a test, see 5ca1636 and I can confirm that your patch does fix the issue. Just want to meditate on it a little bit more.
from kafka4net.
@vikmv what's your replication factor? I assume it is 2?
And which version of kafka server are you testing?
from kafka4net.
Patch was supposed to fix #16 (which happens because Linq Except method operates with sets and not the sequences, so Except() result also perform Distinct())
But it do not fix this one.
Sorry not to mention it earlier, but replication factor is 3 (but most likely it can be reproduced with 2 too) and kafka version is 0.8.2.1
from kafka4net.
Here is a dirty repro of this issue (very dirty, test even doesn't fail, but you can find a failure in a log):
[Test]
public async void Issue14()
{
var topic = "topic13." + _rnd.Next();
VagrantBrokerUtil.CreateTopic(topic, 3, 3); //so that at least for one of partitions broker2 would be a leader
VagrantBrokerUtil.DescribeTopic(topic);
VagrantBrokerUtil.StopBroker("broker2");
Thread.Sleep(1000);
VagrantBrokerUtil.DescribeTopic(topic);
var cluster = new Cluster(_seedAddresses);
var producer = new Producer(cluster, new ProducerConfiguration(topic, batchFlushSize: 1));
producer.OnSuccess += m => _log.Info(string.Format("{0} messages sent", m.Length));
await producer.ConnectAsync();
foreach (var meta in await cluster.GetOrFetchMetaForTopicAsync(topic))
_log.Info(meta);
await Observable.Interval(TimeSpan.FromSeconds(1)).
Take(10).
Do(_ => producer.Send(new Message())).
ToTask();
VagrantBrokerUtil.StartBroker("broker2");
Thread.Sleep(1000);
VagrantBrokerUtil.RebalanceLeadership();
VagrantBrokerUtil.DescribeTopic(topic);
foreach (var meta in await cluster.GetOrFetchMetaForTopicAsync(topic))
_log.Info(meta);
await Observable.Interval(TimeSpan.FromSeconds(1)).
Take(10).
Do(_ => producer.Send(new Message())).
ToTask();
//enable debug logging here and you'll see something like this here:
//17:34:35.492 Debug [kafka-scheduler 1:16] kafka4net.Internal.PartitionRecoveryMonitor #1 Partition topic13.444965174-1 is in error state NotLeaderForPartition. Adding to failed list.
//17:34:35.509 Info[kafka - scheduler 1:16] kafka4net.Producer Detected change in topic / partition 'topic13.444965174' / 1 / NotLeaderForPartition IsOnline True->False
//17:34:35.707 Debug[kafka-scheduler 1:16] kafka4net.Protocols.Protocol Sending MetadataRequest to 192.168.56.30:9092
//17:34:35.732 Debug[kafka-scheduler 1:16] kafka4net.Internal.PartitionRecoveryMonitor Healed partitions found by broker 192.168.56.30:9092 Id:3 (will check broker availability):
// Leader: 2
// Topic: topic13.444965174 [1]
//17:34:35.748 Error[kafka-scheduler 1:16] kafka4net.Internal.PartitionRecoveryMonitor received MetadataResponse for broker that is not yet in our list!
//17:34:35.758 Debug[kafka-scheduler 1:16] kafka4net.Protocols.Protocol Sending MetadataRequest to 192.168.56.10:9092
//17:34:35.758 Debug[kafka-scheduler 1:16] kafka4net.Internal.PartitionRecoveryMonitor Healed partitions found by broker 192.168.56.10:9092 Id:1 (will check broker availability):
// Leader: 2
// Topic: topic13.444965174 [1]
await cluster.CloseAsync(TimeSpan.FromSeconds(3));
}
from kafka4net.
@vikmv what's VagrantBrokerUtil.DescribeTopic(topic);
, is it kafka-topics.sh --describe --topic
?
from kafka4net.
Yes, it's --describe --topic
. Just to be sure that in first call 2nd broker isn't a leader for any partition, and during second call - it became a leader for one partition.
The issue is that PartitionRecoveryMonitor._brokers knows about brokers 1 and 3 (because 2nd broker was unavailable during startup), and when it fetches new metadata (after 2nd became a leader for a partition) and see there broker with id=2, it doesn't know about such broker and fails with "received MetadataResponse for broker that is not yet in our list")
from kafka4net.
Yeah, I spoke too early when claimed I got reproduced the bug. But this time I've got. it. Had to fix yet another bug though.
The reason I did not add newly discovered broker while in partition recovery, is a logical one. I was at the time under impression, that when connecting, the cluster would inform me about topic topology in general and not just current state. Perhaps I developed such an impression because it takes up to 30sec for a member of cluster to notice fallen buddy and if I do not wait enough time in unit tests, I get list of all brokers, including those which are down.
Now, that I know it is wrong, the fix should be simple. I am expecting to fix it early next week.
from kafka4net.
Related Issues (20)
- How to start consuming with a specific offset. HOT 6
- Producer throws exception when sending a lot of messages HOT 3
- Memory leak HOT 19
- Closing and opening connection right after, throws exception
- Producer starts writing to the same single partition (when no Message.Key is specified) HOT 1
- Producer stops working after manual partition reassignment HOT 5
- Memory leak in producer HOT 1
- ObjectDisposedException after closing producer HOT 6
- Sample consumer code for multi-topic subscription HOT 2
- How should I handle the situation when Kafka starts re-balancing data on brokers. HOT 4
- Implement protocol v1 and v2 (aka kafka-0.9 and 0.10) HOT 3
- Better backward comptibility when enhancing configuration classes
- Upgrade to System.Reactive 3 HOT 2
- Consumer flow control is broken HOT 2
- Consumer subscription to messages lost HOT 4
- Idle Connections getting closed are logged as Error
- Cannot configure Tcp Keepalive HOT 1
- Use RecyclableMemoryStream instead of MemoryStream HOT 4
- Huge memory and threads consumption HOT 5
- Upgrade to System.Reactive 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka4net.