Coder Social home page Coder Social logo

Comments (11)

vchekan avatar vchekan commented on August 14, 2024

This error sounds familiar. I'll make pre-release nuget package to either add more debugging info or a patch.

from kafka4net.

vikmv avatar vikmv commented on August 14, 2024

Managed to reproduce it.

Steps to reproduce:

  • Prerequisites: 3 Kafka brokers [1,2,3] and one topic with one partition. Partition leader = 2
  • Stop 2nd broker (so, the leader now will be 1 or 3)
  • Create the producer (seed brokers = [1,2,3])
  • It will receive metadata about 2 brokers only - 1 and 3
  • At this point - writes are successful
  • Start 2nd broker and wait for or force preferred replica election
  • ERROR kafka4net.Internal.PartitionRecoveryMonitor received MetadataResponse for broker that is not yet in our list (2)!

Found the code that is responsible for this. Will try to fix it.

from kafka4net.

vchekan avatar vchekan commented on August 14, 2024

Great info Viktor!
"force preferred replica election": I think here is the problem, I have no single unit test to cover forced reelection. Will add it to the issues.
I should be able to work on it tomorrow.

from kafka4net.

vikmv avatar vikmv commented on August 14, 2024

It doesn't matter if it was forced election or just an ordinary election.
The main reason - when Producer is created - only 2 out of 3 brokers are available.

So when topic metadata is retrieved, MergeTopicMeta is called but this code

newBrokers.Where(b => b.NodeId != -99).ForEach(b => _newBrokerSubject.OnNext(b));

adds only 1st and 3rd brokers (2nd is not available at the moment)

When 2nd broker becames available and was reelected as a leader for that partition (once again, doesn't matter if it was forced or not)
Then new metadata is received, and then RecoveryLoop fails here:

BrokerMeta newBroker;
_brokers.TryGetValue(brokerGrp.Key, out newBroker);
if (newBroker == null)
{
    _log.Error("received MetadataResponse for broker that is not yet in our list ({0})!", brokerGrp.Key);
    return;
}

because there is no broker with id=2 in the _brokers dictionary
and it will be never added because _newMetadataEvent.OnNext(filteredResponse); will not be called

Probably we somehow should announce new broker in such case, but I'm still not sure where is the best place in code to do it.

from kafka4net.

vchekan avatar vchekan commented on August 14, 2024

@vikmv I can reproduce the bug in a test, see 5ca1636 and I can confirm that your patch does fix the issue. Just want to meditate on it a little bit more.

from kafka4net.

vchekan avatar vchekan commented on August 14, 2024

@vikmv what's your replication factor? I assume it is 2?
And which version of kafka server are you testing?

from kafka4net.

vikmv avatar vikmv commented on August 14, 2024

Patch was supposed to fix #16 (which happens because Linq Except method operates with sets and not the sequences, so Except() result also perform Distinct())
But it do not fix this one.
Sorry not to mention it earlier, but replication factor is 3 (but most likely it can be reproduced with 2 too) and kafka version is 0.8.2.1

from kafka4net.

vikmv avatar vikmv commented on August 14, 2024

Here is a dirty repro of this issue (very dirty, test even doesn't fail, but you can find a failure in a log):

        [Test]
        public async void Issue14()
        {
            var topic = "topic13." + _rnd.Next();

            VagrantBrokerUtil.CreateTopic(topic, 3, 3);   //so that at least for one of partitions broker2 would be a leader

            VagrantBrokerUtil.DescribeTopic(topic);

            VagrantBrokerUtil.StopBroker("broker2");

            Thread.Sleep(1000);

            VagrantBrokerUtil.DescribeTopic(topic);

            var cluster = new Cluster(_seedAddresses);

            var producer = new Producer(cluster, new ProducerConfiguration(topic, batchFlushSize: 1));
            producer.OnSuccess += m => _log.Info(string.Format("{0} messages sent", m.Length));

            await producer.ConnectAsync();

            foreach (var meta in await cluster.GetOrFetchMetaForTopicAsync(topic))
                _log.Info(meta);

            await Observable.Interval(TimeSpan.FromSeconds(1)).
                Take(10).
                Do(_ => producer.Send(new Message())).
                ToTask();


            VagrantBrokerUtil.StartBroker("broker2");
            Thread.Sleep(1000);
            VagrantBrokerUtil.RebalanceLeadership();


            VagrantBrokerUtil.DescribeTopic(topic);


            foreach (var meta in await cluster.GetOrFetchMetaForTopicAsync(topic))
                _log.Info(meta);


            await Observable.Interval(TimeSpan.FromSeconds(1)).
                Take(10).
                Do(_ => producer.Send(new Message())).
                ToTask();

            //enable debug logging here and you'll see something like this here:
            //17:34:35.492 Debug [kafka-scheduler 1:16] kafka4net.Internal.PartitionRecoveryMonitor #1 Partition topic13.444965174-1 is in error state NotLeaderForPartition. Adding to failed list. 
            //17:34:35.509 Info[kafka - scheduler 1:16] kafka4net.Producer Detected change in topic / partition 'topic13.444965174' / 1 / NotLeaderForPartition IsOnline True->False
            //17:34:35.707 Debug[kafka-scheduler 1:16] kafka4net.Protocols.Protocol Sending MetadataRequest to 192.168.56.30:9092 
            //17:34:35.732 Debug[kafka-scheduler 1:16] kafka4net.Internal.PartitionRecoveryMonitor Healed partitions found by broker 192.168.56.30:9092 Id:3 (will check broker availability):
            // Leader: 2
            //  Topic: topic13.444965174 [1]

            //17:34:35.748 Error[kafka-scheduler 1:16] kafka4net.Internal.PartitionRecoveryMonitor received MetadataResponse for broker that is not yet in our list! 
            //17:34:35.758 Debug[kafka-scheduler 1:16] kafka4net.Protocols.Protocol Sending MetadataRequest to 192.168.56.10:9092 
            //17:34:35.758 Debug[kafka-scheduler 1:16] kafka4net.Internal.PartitionRecoveryMonitor Healed partitions found by broker 192.168.56.10:9092 Id:1 (will check broker availability):
            // Leader: 2
            //  Topic: topic13.444965174 [1]


            await cluster.CloseAsync(TimeSpan.FromSeconds(3));
        }

from kafka4net.

vchekan avatar vchekan commented on August 14, 2024

@vikmv what's VagrantBrokerUtil.DescribeTopic(topic);, is it kafka-topics.sh --describe --topic?

from kafka4net.

vikmv avatar vikmv commented on August 14, 2024

Yes, it's --describe --topic. Just to be sure that in first call 2nd broker isn't a leader for any partition, and during second call - it became a leader for one partition.
The issue is that PartitionRecoveryMonitor._brokers knows about brokers 1 and 3 (because 2nd broker was unavailable during startup), and when it fetches new metadata (after 2nd became a leader for a partition) and see there broker with id=2, it doesn't know about such broker and fails with "received MetadataResponse for broker that is not yet in our list")

from kafka4net.

vchekan avatar vchekan commented on August 14, 2024

Yeah, I spoke too early when claimed I got reproduced the bug. But this time I've got. it. Had to fix yet another bug though.

The reason I did not add newly discovered broker while in partition recovery, is a logical one. I was at the time under impression, that when connecting, the cluster would inform me about topic topology in general and not just current state. Perhaps I developed such an impression because it takes up to 30sec for a member of cluster to notice fallen buddy and if I do not wait enough time in unit tests, I get list of all brokers, including those which are down.

Now, that I know it is wrong, the fix should be simple. I am expecting to fix it early next week.

from kafka4net.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.