Coder Social home page Coder Social logo

kafka4net's People

Contributors

thunderstumpges avatar vchekan avatar vikmv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka4net's Issues

Memory leak in producer

We have encountered some weird kafka cluster issue.
One of the kafka brokers was alive, but wasn't sending any metadata info and wasn't accepting new messages.
Kafka issue was fixed by restarting this single broker.

But some kafka4net producers during that time have consumed all the available memory.
Here what we have managed to find:

Logs (severity=info, no debug logs available):

06:53:05,677 [kafka-scheduler 3] ERROR kafka4net.Producer Send Buffer Full for partition kafka4net.Internal.PartitionStateChangeEvent. 
06:53:05,896 [kafka-scheduler 3] INFO kafka4net.Protocols.ResponseCorrelation CorrelationLoop IO exception. <broker_ip>:9092 conn object hash: 60777834 
06:53:05,899 [kafka-scheduler 3] WARN kafka4net.Internal.PartitionRecoveryMonitor Metadata points to broker but it is not accessible. Error: Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. 
...(same WARN repeats indefinitely with the rate up to ~200 warns/sec)

and at this moment RAM usage increases drastically

Memory dump showed that there are a lot of LIB <<mscorlib!Tasks.TaskCompletionSource<kafka4net.Protocols.Responses.MetadataResponse>>> instances.

And there was this one exception with huge stack trace.
Exception: System.Net.Sockets.SocketException
Message: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
Stack trace:

System.Net.Sockets.NetworkStream.EndRead(System.IAsyncResult) 
System.Threading.Tasks.TaskFactory`1+FromAsyncTrimPromise`1[[System.Int32, mscorlib],[System.__Canon, mscorlib]].Complete(System.__Canon, System.Func`3<System.__Canon,System.IAsyncResult,Int32>, System.IAsyncResult, Boolean) 
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task) 
kafka4net.Protocols.ResponseCorrelation+<ReadBuffer>d__0.MoveNext() 
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter`1[[System.Int32, mscorlib]].GetResult() 
kafka4net.Protocols.ResponseCorrelation+<CorrelateResponseLoop>d__e.MoveNext() 

[--start of repeating block--]
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter`1[[System.__Canon, mscorlib]].GetResult() 
kafka4net.Protocols.ResponseCorrelation+<SendAndCorrelateAsync>d__19`1[[System.__Canon, mscorlib]].MoveNext() 
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter`1[[System.__Canon, mscorlib]].GetResult() 
kafka4net.Protocols.Protocol+<MetadataRequest>d__d.MoveNext() 
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task) 
System.Runtime.CompilerServices.TaskAwaiter`1[[System.__Canon, mscorlib]].GetResult() 
kafka4net.Internal.PartitionRecoveryMonitor+<>c__DisplayClass41+<<RecoveryLoop>b__24>d__57.MoveNext() 
[--end of repeating block--]
And then this "repeating block" repeats many times.

It seems there is recursion in handling this exception.

But sadly I was unable to reproduce the issue.

Idle Connections getting closed are logged as Error

Only seen once we upgraded our brokers to 0.10.1.1 however it seems like this problem should have been present in 0.8.2 as well. The default broker setting for connections.max.idle.ms is 600,000 (10 mins). After we upgraded to 0.10.1.1 we are getting idle connection timeouts at 10 minutes, and the ResponseCorrelation class throws this as System.Exception instead of System.IO.IOException. That causes the catch in CorrelateResponseLoop to log the situation as Error instead of Info. The connection is retried, and the system does recover gracefully, however Error log in this case is too high.
Handling in ReadBuffer:

                if (read == 0)
                {
                    _log.Info("Server closed connection");
                    _etw.CorrelationServerClosedConnection();
                    throw new Exception("Server closed connection");
                }

And the catches in CorrelateResponseLoop:

                    catch (SocketException e)
                    {
                        // shorter version of socket exception, without stack trace dump
                        _log.Info("CorrelationLoop socket exception. {0}. {1}", e.Message, _id);
                        throw;
                    }
                    catch (ObjectDisposedException)
                    {
                        _log.Debug("CorrelationLoop socket exception. Object disposed. {0}", _id);
                        throw;
                    }
                    catch (IOException)
                    {
                        _log.Info("CorrelationLoop IO exception. {0}", _id);
                        throw;
                    }
                    catch (Exception e)
                    {
                        _log.Error(e, "CorrelateResponseLoop error. {0}", _id);
                        throw;
                    }

Memory leak

It looks like there is a memory leak.
The issue is that after sending a big chunk of data kafka4net still keeps a reference to some data that keeps memory from being collected.

This is quite easy to reproduce

  1. Bulk Send (1 mln probably)
  2. Wait till everything is sent
  3. Close connection
  4. Run GC
  5. You will notice that kafka4net still keeps a reference to big amount of memory.

You can also run a memory profiler to get more idea.

No exceptions thrown when all brokers are not available during startup of producer

Hi. I'm trying to determine how will the Producer behave when it is not able to connect to any brokers in the Kafka cluster during startup. Based on my testing, no exceptions thrown when the brokers are unavailable. Am I doing anything wrong?

This is how I tested:

  1. Shutdown all Kafka broker processes
  2. Run a simple producer and call await ConnectAsync()
  3. Send a message

Some context on why I need to do this:
It is possible for my application to be disconnected from the Kafka cluster at anytime. Hence, to prevent any data lost, the application will cache the data on the computer and send the data to the cluster upon re-connection.

How should I handle the situation when Kafka starts re-balancing data on brokers.

If my code start pushing 1 million records to Kafka as a job, and before that job ends if Kafka starts re-balancing data on its brokers (3 of them), then the job stops pushing the rest of the records for some reason.

Question - When Kafka re-balances the data, how should my code handle the situation? Or it's should be transparent that my code doesn't need to do anything?

Measure performance

Measure GC, memory footprint, cpu, throughput, and evaluate either they are sensible.
Low-hanging fruit is to not allocate tcp buffer for each read but reuse one.

Producer stops working after manual partition reassignment

Noticed, that kafka producer stops working after running "kafka-reassign-partitions"

Here is excerpt from debug log:

Sending ProduceRequest to Connection: <broker1>, Request: Broker: <broker1> Id:1 Acks: 1 Timeout: 1000 Topics: [Topic: <topicname> Parts: [Part: 1 Messages: 1]] | 
Got ProduceResponse: Topics: [<topicname> [Part: 1, Err: NoError Offset: 16122593985]] | 

after running reassignment, partition leader was changed, but producer did not refreshed metadata and still trying to send message to invalid broker

Sending ProduceRequest to Connection: <broker1>, Request: Broker: <broker1> Id:1 Acks: 1 Timeout: 1000 Topics: [Topic: <topicname> Parts: [Part: 1 Messages: 1]] | 
Got ProduceResponse: Topics: [<topicname> [Part: 1, Err: UnknownTopicOrPartition Offset: -1]] |
..(and the same message repeats then)..

I'll try to reproduce it with unit test and think about fixing it.
It seems that fix would be to change IsPermanentFailure method so that it would treat "UnknownTopicOrPartition" as recoverable error, but I'm not 100% sure yet.
What were the reasons to treat UnknownTopicOrPartition error as a permanent? (while "NotLeaderForPartition" is not permanent)

I believe UnknownTopicOrPartition & NotLeaderForPartition should be assigned to the same category.
Because after partition reassignment there can be such situations:

  • broker is still the leader for the partition (everything is ok)
  • broker is not the leader for the partition, but it still have this topic (or it's replica) - partition will have error "NotLeaderForPartition" and RecoveryLoop would query new metadata for this topic
  • broker is not the leader and it doesn't have this topic at all - partition will have "UnknownTopicOrPartition" error and RecoveryLoop will not do anything with it because it's "permanent" - and that's bad

How to start consuming with a specific offset.

It looks like it is quite straight forward to consumer message from "start" or from the "end" like this

            var startPosition = new StartPositionTopicStart();
            var consumer = new Consumer(new ConsumerConfiguration(_seedAddresses, topic, startPosition));

            var consumerSubscription = consumer.OnMessageArrived.
                Subscribe(msg =>
                {
                    Console.WriteLine("Received : {0}", Encoding.UTF8.GetString(msg.Value));
                });

            consumer.IsConnected.Wait();

Could you please share a code that shows how to consume from a specific offset.

Here is a sample scenario : Run a process that starts consuming new messages and before recycling stores the offset. Run the process again and start consuming from stored offset.

Huge memory and threads consumption

We have service application which logs http requests/responses and some other stuff into kafka through serilog-sinks-kafka nuget package. Under the hood this package uses kafka4net. Also we have other in-process integrations with serilog/nlog logging frameworks to send messages to different targets/sinks.

After some time we noticed that application became to consume too much memory and threads. Turning off logs to kafka in config file normalizes situation and performance counters drop to previous low level as usually.

This picture shows memory/cpu level before turning on kafka (A) and after turning it off (B):
default

ETW tracing shows that many threads wait on ManualResetEventSlim.Wait() in kafka4net.Producer.SendLoop:

"GCFrame",
"HelperMethodFrame_1OBJ",
"System.Threading.Monitor.Wait(System.Object, Int32, Boolean)",
"System.Threading.Monitor.Wait(System.Object, Int32)",
"System.Threading.ManualResetEventSlim.Wait(Int32, System.Threading.CancellationToken)",
"kafka4net.Producer.b__38_3()",
"System.Threading.Tasks.Task.InnerInvoke()",
"System.Threading.Tasks.Task.Execute()",
"System.Threading.Tasks.Task.ExecutionContextCallback(System.Object)",
"System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)",
"System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)",
"System.Threading.Tasks.Task.ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef)",
"System.Threading.Tasks.Task.ExecuteEntry(Boolean)",
"System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()",
"System.Threading.ThreadPoolWorkQueue.Dispatch()",
"System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()",
"DebuggerU2MCatchHandlerFrame"

Actually we see total 173 threads in running application and 63 threads waiting on ManualResetEventSlim in SendLoop.

Further investigation led me to this line of source code:
https://github.com/ntent-ad/kafka4net/blob/master/src/Producer.cs#L366
And comment above this line makes one think that something important wasn't complete)

Infinite failure loop if offset is incorrect

If out of range offsets were provided, the driver falls into infinite failure-recovery loop. In this particular case I observed offset being "-1".

Several problems to address here:

  • Where -1 comes from?
  • Out of boundaries error is classified as a terminal one, how come it is not reported and partition fetcher is not terminated?

Consumer flow control is broken

Hi,

I've been debugging a problem for the last week where my consuming application would crash with an out-of-memory exception. What I observed:

  • when running the application on my local computer (fast CPU, but slow network), I can process the messages faster than kafka4net can fetch them from the network. Flow control is not needed, and the application runs fine through my topic which has about 100 GB of data in it.
  • when running the application on a cloud server (in my case AWS m3.large, which has 7.5 GB of RAM and 1 Gbps network speed), the processing is too slow compared to the network speed. Flow control works for a small time, and then catastrophically fails. The consumer will then fill its buffers as fast as possible, and after about a minute, the process dies with an OutOfMemoryException

I think I figured out the root cause of the problem. In the Consumer class, there is a BehaviorSubject

readonly BehaviorSubject<int> _flowControlInput = new BehaviorSubject<int>(1);

which is controlling the flow control. I think what goes wrong is that this subject is written to simultaneously from multiple threads. Once from the thread that increments the counter for new messages:

onMessage = onMessage.Do(msg =>
{
    var count = Interlocked.Increment(ref _outstandingMessageProcessingCount);
    _flowControlInput.OnNext(count);
});

And once from the thread that acknowledges the messages (called from my application)

public void Ack(int messageCount = 1)
{
    if(!Configuration.UseFlowControl)
        throw new InvalidOperationException("UseFlowControl is OFF, Ack is not allowed");

    var count = Interlocked.Add(ref _outstandingMessageProcessingCount, - messageCount);
    _flowControlInput.OnNext(count);
}

I'm not an expert on threading and concurrency with reactive extensions, but calling OnNext simultaneously from multiple threads is a bad idea, e.g. discussed here: http://stackoverflow.com/questions/14396449/why-are-subjects-not-recommended-in-net-reactive-extensions

I could fix this pretty quickly by putting a lock statement around both calls to OnNext. But I'm not sure if that is the best possible solution to the problem. If anyone's interested, I can put together a pull request for this.

Or am I doing something wrong in my application? Am I expected to call Ack on the original thread? Would that solve it?

Create documentation

Create documentation to promote adoption.
Have quickstart examples on the front page.
More detailed documentation in wiki.
Document use cases (reading from beginning, custom offsets, synchronization, etc)

Document internal architecture.

Closing and opening connection right after, throws exception

Here is the code and the stack trace :

await producer.ConnectAsync();
await producer.CloseAsync(TimeSpan.FromSeconds(10));
await producer.ConnectAsync();
 at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   at kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) in c:\Users\am\Documents\Visual Studio 2013\Projects\kafka4net\src\Utils\WatchdogScheduler.cs:line 48
   at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
   at kafka4net.Utils.AskEx.Ask[T](IScheduler scheduler, Func`1 action) in c:\Users\am\Documents\Visual Studio 2013\Projects\kafka4net\src\Utils\AskEx.cs:line 12
   at kafka4net.Producer.<ConnectAsync>d__29.MoveNext() in c:\Users\am\Documents\Visual Studio 2013\Projects\kafka4net\src\Producer.cs:line 89
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Kafka4nConsumer.Program.<KafkaTest>d__1.MoveNext() in c:\Users\am\Documents\Visual Studio 2013\Projects\KafkaConsumer\Kafka4nConsumer\Program.cs:line 140

Better default for fetching position

Fetching position API is not intuitive. Make a default consumer api such that there is not need to know it before you really need. Default to fetching from the end of the queue.
Maybe rethink either it is possible to make the API more intuitive.

Redesign error processing

Currently error processing is focused around kafka broker recovery. Connecting to broker, fetching offsets is not reliable and failures are not handled properly, leading to occasional random behavior.

Cannot configure Tcp Keepalive

Related to Issue #40 Currently there is no way to configure Tcp Keepalive. This is done in .net through Socket.IOControl see here

We should make this configurable, but leave the default off.

Producer is unable to fetch topic metadata when first kafka broker is down

While working on #14, noticed one more issue.

Steps to reproduce:

  • Prerequisites: 3 kafka brokers (1,2,3)
  • Stop 1st kafka broker
  • Create producer new Producer("kafka1,kafka2,kafka3", configuration)
  • Producer is unable to fetch metadata and repeats the error: "Error while trying to fetch topic 'topic_name' metadata, will retry."

My findings why this is happening:

//Cluster.ConnectAsync method:
..
initBrokers = ...; // initialized with 3 brokers, and their id=-99
..
MergeTopicMeta(initMeta);   // method is called, and there are still 3 brokers in there
..

//inside MergeTopicMeta method:

var newBrokers = topicMeta.Brokers.Except(_metadata.Brokers, BrokerMeta.NodeIdComparer).ToArray();
//newBroker is left with only 1 record (instead of 3), because all the brokers have the same Id=-99
..
_metadata.Brokers = _metadata.Brokers.Concat(newBrokers).ToArray();

So, metadata.Brokers results in adding only one broker to the list and producer is unable to fetch topic metadata from it.

Implement protocol v1 and v2 (aka kafka-0.9 and 0.10)

Message format changes seems to be minor, but Vagrant testing suite will require more work. Perhaps it makes sense to switch to Docker containers for better flexibility and lower VM resources consumption.

Some research is required on compatibility strategy, i.e. are clients backward/forward compatible with server? Would it be possible/desired to do automatic version detection?

Producer throws exception when sending a lot of messages

I am trying to stress test the producer and it seems like it is timing out all the time
Here is the code

            var producer = new Producer(seed2Addresses, 
                new ProducerConfiguration(topic));
            producer.ConnectAsync().Wait();

            for (int i = 0; i < 200000; ++i)
            {
                var msg = new Message()
                {
                    Key = BitConverter.GetBytes(1),
                    Value = Encoding.UTF8.GetBytes(String.Format("Test{0}", i))
                };
                producer.Send(msg);
            }
            producer.CloseAsync(TimeSpan.FromSeconds(120)).Wait();

While running this code it throws 2 exceptions :

Timed out while waiting for Send buffers to drain. Canceling
and
Timed out even after cancelling send loop! This shouldn't happen, there will likely be message loss

I suspect that it is because of CloseAsync, but not sure ... Is this how the producer is supposed to be used in kafka4net ?

Quiet seed connections

When seed connections are taken over by plain ones, exception is thrown. It is no harm, but confusing.
Think about more logical algorithm, perhaps the one which reuses connection. Previous attempt to reuse failed because old callbacks were called.

Better backward comptibility when enhancing configuration classes

I've enhanced ProducerConfiguration class with new parameter, CompressionType. This keeps it source-compatible with previous version but binary incompatible. Method not found exception can be thrown if not all dependencies in a project using kafka4net are updated.

Rework config classes and move optional parameters into properties. This will allow to preserve both, source and binary backward compatibility.

Do this upon next major release because this is yet another API change.

Upgrade to System.Reactive 3

Hi,
since version 3 the nuget package name for reactive extension is System.Reactive instead of Rx-Main.
Could you please upgrade to this version?

Sample consumer code for multi-topic subscription

Hi there, trying to find sample code for a consumer implementation that has a subscription to multiple topics and automatically commits offsets (either to kafka or other store).

Also, does this work with the latest version of Kafka ?

Detect recurrent failures

If driver enters failure-recovery loop, this should be detected and promoted to an error.
A flexible policy should be implemented with meaningful default.

Expose failures and recoveries as IObservable and subscribe a policy which would raise an error when failure-recovery pattern happens for more than 1000 times within 5 minutes and no message has been received (no progress).

Upgrade to nunit-3

Upgrade to nunit version 3 and use its features, such as constructor/dispose, variables injectors, callback based exceptions testers.

Implement something like akka.tetkit Within() to test time constrains more elegant.

Consumer subscription to messages lost

This may or may not be a bug. Please delete if this is not correct area. I need some information from the experts.

I have a process which subscribes to messages in Kafka queue. This process runs all the time and subscribes to messages(no termination)

Here is my issue. When ever there is an exception in my additional processing steps, I don't receive any more message. All exceptions are handled so I except the subscription to not halt.

consumer.OnMessageArrived.Subscribe(msg => {
                // Perform your own deserialization here
                var text = Encoding.UTF8.GetString(msg.Value);
               try{
                     //My addition processing

               }
              catch(Exception ex){
              }                
            });

I need suggestion on how to handle consumer connection loss or consumer subscription loss. My assumption is that heart beat is already handled in the client so connection loss might be due to network failure or machine down cases.

Producer starts writing to the same single partition (when no Message.Key is specified)

There is a bug in FletcherHashedMessagePartitioner.

Because Random is not thread-safe, when using Producer in multithreaded environment for some time it'll start writing to the single one partition (only when message has no Key specified).

Here is small snippet that shows, usage of random in multithreaded environment:

class Message { public int Partition; }

static void Main()
{
    var random = new Random();

    //Here is how to fix it:
    //var randomLocal = new ThreadLocal<Random>(() => new Random());

    var data = new ConcurrentDictionary<int, int>();

    var subject = new Subject<Message>();

    subject
        //another way of fixing - adding .Synchronize() here
        .Do(msg => msg.Partition = random.Next(10))
        .Subscribe(x => { data.AddOrUpdate(x.Partition, i => 1, (i, val) => val + 1); });

    var threads = new List<Thread>();

    for (int t = 0; t < 10; t++)
    {
        var thread = new Thread(() =>
        {
            for (int i = 0; i < 100000; i++)
                subject.OnNext(new Message());
        });

        thread.Start();
        threads.Add(thread);
    }

    foreach (var thread in threads)
        thread.Join();

    foreach (var d in data)
        Console.WriteLine("Value: {0}, Count: {1}", d.Key, d.Value);

    //results with non thread-local random are something like this:
    //Value: 0, Count: 933076
    //Value: 1, Count: 7443
    //Value: 2, Count: 7345
    //Value: 3, Count: 7363
    //Value: 4, Count: 7600
    //Value: 5, Count: 7454
    //Value: 6, Count: 7658
    //Value: 7, Count: 7302
    //Value: 8, Count: 7360
    //Value: 9, Count: 7399
}

ObjectDisposedException after closing producer

In certain conditions it is possible to get exception after closing Producer:

2016-05-24 10:11:31.8848 Error [:13] RecoveryTest Unhandled exception System.ObjectDisposedException: Cannot access a disposed object.
   at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   at kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) in C:\projects\kafka4net\src\Utils\WatchdogScheduler.cs:line 63
   at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
   at kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state) in C:\projects\kafka4net\src\Utils\RxSyncContextFromScheduler.cs:line 20
   at System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
   at System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
   at System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()    at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   at kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) in C:\projects\kafka4net\src\Utils\WatchdogScheduler.cs:line 63
   at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
   at kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state) in C:\projects\kafka4net\src\Utils\RxSyncContextFromScheduler.cs:line 20
   at System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
   at System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
   at System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

The reason is quite tricky. Producer will inform caller about success or failure of individual messages via callbacks OnSuccess and OnPermError. Those callbacks are executed in driver's internal thread loop context. As long as subscriber to those events does not consume too much time, everything is good. But, in case if one create an observable from those events and does await successMessagesObservable, things break down. Observable is executed in driver's internal thread and when Observable is complete, it calls observer's OnComplete. If user code has no synchronization context, TaskScheduler decides to execute continuation in current thread, which cause kafka driver's internal tread exposure to the user's code. Now all await commands will be executed in kafka driver's synchronization context. But driver is already closed and it's EventLoopScheduler is disposed, so any "await" continuation will cause ObjectDisposedException.

Producer sometimes stops working after broker restart

Producer sometimes stops working after restart of single Kafka brokers.
It doesn't happen always (have not found stable way to reproduce it yet), but here are some observations.

  • There are messages in log like this:
WARN kafka4net.Producer Produce Request Failed for topic <topic_name> partition 14 with error NotLeaderForPartition 
INFO kafka4net.Cluster Cluster saw new partition state: <topic_name>-14-NotLeaderForPartition 
INFO kafka4net.Producer Detected change in topic/partition '<topic_name>'/14/NotLeaderForPartition IsOnline True->False 
ERROR kafka4net.Internal.PartitionRecoveryMonitor received MetadataResponse for broker that is not yet in our list! 
<the same error repeats multiple times>

and then 
ERROR kafka4net.Producer Send Buffer Full for partition kafka4net.Internal.PartitionStateChangeEvent.  
last one is expected, because buffer autogrowth is disabled
  • We are running multiple instances of producers (on different hosts) and when the issue occurs, it happens on all the producers which are writing to that topic at the same time.

I will try to reproduce this behaviour and will add my findings here.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.