ntent / kafka4net Goto Github PK
View Code? Open in Web Editor NEWC# client for Kafka
License: Apache License 2.0
C# client for Kafka
License: Apache License 2.0
We have encountered some weird kafka cluster issue.
One of the kafka brokers was alive, but wasn't sending any metadata info and wasn't accepting new messages.
Kafka issue was fixed by restarting this single broker.
But some kafka4net producers during that time have consumed all the available memory.
Here what we have managed to find:
Logs (severity=info, no debug logs available):
06:53:05,677 [kafka-scheduler 3] ERROR kafka4net.Producer Send Buffer Full for partition kafka4net.Internal.PartitionStateChangeEvent.
06:53:05,896 [kafka-scheduler 3] INFO kafka4net.Protocols.ResponseCorrelation CorrelationLoop IO exception. <broker_ip>:9092 conn object hash: 60777834
06:53:05,899 [kafka-scheduler 3] WARN kafka4net.Internal.PartitionRecoveryMonitor Metadata points to broker but it is not accessible. Error: Unable to read data from the transport connection: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
...(same WARN repeats indefinitely with the rate up to ~200 warns/sec)
and at this moment RAM usage increases drastically
Memory dump showed that there are a lot of LIB <<mscorlib!Tasks.TaskCompletionSource<kafka4net.Protocols.Responses.MetadataResponse>>>
instances.
And there was this one exception with huge stack trace.
Exception: System.Net.Sockets.SocketException
Message: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond
Stack trace:
System.Net.Sockets.NetworkStream.EndRead(System.IAsyncResult)
System.Threading.Tasks.TaskFactory`1+FromAsyncTrimPromise`1[[System.Int32, mscorlib],[System.__Canon, mscorlib]].Complete(System.__Canon, System.Func`3<System.__Canon,System.IAsyncResult,Int32>, System.IAsyncResult, Boolean)
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task)
kafka4net.Protocols.ResponseCorrelation+<ReadBuffer>d__0.MoveNext()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter`1[[System.Int32, mscorlib]].GetResult()
kafka4net.Protocols.ResponseCorrelation+<CorrelateResponseLoop>d__e.MoveNext()
[--start of repeating block--]
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter`1[[System.__Canon, mscorlib]].GetResult()
kafka4net.Protocols.ResponseCorrelation+<SendAndCorrelateAsync>d__19`1[[System.__Canon, mscorlib]].MoveNext()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter`1[[System.__Canon, mscorlib]].GetResult()
kafka4net.Protocols.Protocol+<MetadataRequest>d__d.MoveNext()
System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(System.Threading.Tasks.Task)
System.Runtime.CompilerServices.TaskAwaiter`1[[System.__Canon, mscorlib]].GetResult()
kafka4net.Internal.PartitionRecoveryMonitor+<>c__DisplayClass41+<<RecoveryLoop>b__24>d__57.MoveNext()
[--end of repeating block--]
And then this "repeating block" repeats many times.
It seems there is recursion in handling this exception.
But sadly I was unable to reproduce the issue.
Only seen once we upgraded our brokers to 0.10.1.1 however it seems like this problem should have been present in 0.8.2 as well. The default broker setting for connections.max.idle.ms
is 600,000 (10 mins). After we upgraded to 0.10.1.1 we are getting idle connection timeouts at 10 minutes, and the ResponseCorrelation class throws this as System.Exception instead of System.IO.IOException. That causes the catch in CorrelateResponseLoop to log the situation as Error instead of Info. The connection is retried, and the system does recover gracefully, however Error log in this case is too high.
Handling in ReadBuffer:
if (read == 0)
{
_log.Info("Server closed connection");
_etw.CorrelationServerClosedConnection();
throw new Exception("Server closed connection");
}
And the catches in CorrelateResponseLoop:
catch (SocketException e)
{
// shorter version of socket exception, without stack trace dump
_log.Info("CorrelationLoop socket exception. {0}. {1}", e.Message, _id);
throw;
}
catch (ObjectDisposedException)
{
_log.Debug("CorrelationLoop socket exception. Object disposed. {0}", _id);
throw;
}
catch (IOException)
{
_log.Info("CorrelationLoop IO exception. {0}", _id);
throw;
}
catch (Exception e)
{
_log.Error(e, "CorrelateResponseLoop error. {0}", _id);
throw;
}
Create nuget package
It looks like there is a memory leak.
The issue is that after sending a big chunk of data kafka4net still keeps a reference to some data that keeps memory from being collected.
This is quite easy to reproduce
You can also run a memory profiler to get more idea.
Hi. I'm trying to determine how will the Producer behave when it is not able to connect to any brokers in the Kafka cluster during startup. Based on my testing, no exceptions thrown when the brokers are unavailable. Am I doing anything wrong?
This is how I tested:
Some context on why I need to do this:
It is possible for my application to be disconnected from the Kafka cluster at anytime. Hence, to prevent any data lost, the application will cache the data on the computer and send the data to the cluster upon re-connection.
Publish symbols.
Package with framework 4.6
If my code start pushing 1 million records to Kafka as a job, and before that job ends if Kafka starts re-balancing data on its brokers (3 of them), then the job stops pushing the rest of the records for some reason.
Question - When Kafka re-balances the data, how should my code handle the situation? Or it's should be transparent that my code doesn't need to do anything?
Measure GC, memory footprint, cpu, throughput, and evaluate either they are sensible.
Low-hanging fruit is to not allocate tcp buffer for each read but reuse one.
Noticed, that kafka producer stops working after running "kafka-reassign-partitions"
Here is excerpt from debug log:
Sending ProduceRequest to Connection: <broker1>, Request: Broker: <broker1> Id:1 Acks: 1 Timeout: 1000 Topics: [Topic: <topicname> Parts: [Part: 1 Messages: 1]] |
Got ProduceResponse: Topics: [<topicname> [Part: 1, Err: NoError Offset: 16122593985]] |
after running reassignment, partition leader was changed, but producer did not refreshed metadata and still trying to send message to invalid broker
Sending ProduceRequest to Connection: <broker1>, Request: Broker: <broker1> Id:1 Acks: 1 Timeout: 1000 Topics: [Topic: <topicname> Parts: [Part: 1 Messages: 1]] |
Got ProduceResponse: Topics: [<topicname> [Part: 1, Err: UnknownTopicOrPartition Offset: -1]] |
..(and the same message repeats then)..
I'll try to reproduce it with unit test and think about fixing it.
It seems that fix would be to change IsPermanentFailure
method so that it would treat "UnknownTopicOrPartition" as recoverable error, but I'm not 100% sure yet.
What were the reasons to treat UnknownTopicOrPartition error as a permanent? (while "NotLeaderForPartition" is not permanent)
I believe UnknownTopicOrPartition & NotLeaderForPartition should be assigned to the same category.
Because after partition reassignment there can be such situations:
It looks like it is quite straight forward to consumer message from "start" or from the "end" like this
var startPosition = new StartPositionTopicStart();
var consumer = new Consumer(new ConsumerConfiguration(_seedAddresses, topic, startPosition));
var consumerSubscription = consumer.OnMessageArrived.
Subscribe(msg =>
{
Console.WriteLine("Received : {0}", Encoding.UTF8.GetString(msg.Value));
});
consumer.IsConnected.Wait();
Could you please share a code that shows how to consume from a specific offset.
Here is a sample scenario : Run a process that starts consuming new messages and before recycling stores the offset. Run the process again and start consuming from stored offset.
MemoryStream allocates new memory each time, in case of large messages it goes to LOH. An example code:
https://github.com/ntent-ad/kafka4net/blob/a2c96aa987ddf3aa73446b02bfc0c1635a1a09b2/src/Protocols/Serializer.cs#L152
Proposed solution is Microsoft.IO.RecyclableMemoryStream
https://github.com/Microsoft/Microsoft.IO.RecyclableMemoryStream
WDYT?
We have service application which logs http requests/responses and some other stuff into kafka through serilog-sinks-kafka nuget package. Under the hood this package uses kafka4net. Also we have other in-process integrations with serilog/nlog logging frameworks to send messages to different targets/sinks.
After some time we noticed that application became to consume too much memory and threads. Turning off logs to kafka in config file normalizes situation and performance counters drop to previous low level as usually.
This picture shows memory/cpu level before turning on kafka (A) and after turning it off (B):
ETW tracing shows that many threads wait on ManualResetEventSlim.Wait() in kafka4net.Producer.SendLoop:
"GCFrame",
"HelperMethodFrame_1OBJ",
"System.Threading.Monitor.Wait(System.Object, Int32, Boolean)",
"System.Threading.Monitor.Wait(System.Object, Int32)",
"System.Threading.ManualResetEventSlim.Wait(Int32, System.Threading.CancellationToken)",
"kafka4net.Producer.b__38_3()",
"System.Threading.Tasks.Task.InnerInvoke()",
"System.Threading.Tasks.Task.Execute()",
"System.Threading.Tasks.Task.ExecutionContextCallback(System.Object)",
"System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)",
"System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)",
"System.Threading.Tasks.Task.ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef)",
"System.Threading.Tasks.Task.ExecuteEntry(Boolean)",
"System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()",
"System.Threading.ThreadPoolWorkQueue.Dispatch()",
"System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()",
"DebuggerU2MCatchHandlerFrame"
Actually we see total 173 threads in running application and 63 threads waiting on ManualResetEventSlim in SendLoop.
Further investigation led me to this line of source code:
https://github.com/ntent-ad/kafka4net/blob/master/src/Producer.cs#L366
And comment above this line makes one think that something important wasn't complete)
If out of range offsets were provided, the driver falls into infinite failure-recovery loop. In this particular case I observed offset being "-1".
Several problems to address here:
Hi,
I've been debugging a problem for the last week where my consuming application would crash with an out-of-memory exception. What I observed:
I think I figured out the root cause of the problem. In the Consumer class, there is a BehaviorSubject
readonly BehaviorSubject<int> _flowControlInput = new BehaviorSubject<int>(1);
which is controlling the flow control. I think what goes wrong is that this subject is written to simultaneously from multiple threads. Once from the thread that increments the counter for new messages:
onMessage = onMessage.Do(msg =>
{
var count = Interlocked.Increment(ref _outstandingMessageProcessingCount);
_flowControlInput.OnNext(count);
});
And once from the thread that acknowledges the messages (called from my application)
public void Ack(int messageCount = 1)
{
if(!Configuration.UseFlowControl)
throw new InvalidOperationException("UseFlowControl is OFF, Ack is not allowed");
var count = Interlocked.Add(ref _outstandingMessageProcessingCount, - messageCount);
_flowControlInput.OnNext(count);
}
I'm not an expert on threading and concurrency with reactive extensions, but calling OnNext simultaneously from multiple threads is a bad idea, e.g. discussed here: http://stackoverflow.com/questions/14396449/why-are-subjects-not-recommended-in-net-reactive-extensions
I could fix this pretty quickly by putting a lock
statement around both calls to OnNext. But I'm not sure if that is the best possible solution to the problem. If anyone's interested, I can put together a pull request for this.
Or am I doing something wrong in my application? Am I expected to call Ack
on the original thread? Would that solve it?
Create documentation to promote adoption.
Have quickstart examples on the front page.
More detailed documentation in wiki.
Document use cases (reading from beginning, custom offsets, synchronization, etc)
Document internal architecture.
Here is the code and the stack trace :
await producer.ConnectAsync();
await producer.CloseAsync(TimeSpan.FromSeconds(10));
await producer.ConnectAsync();
at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
at kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) in c:\Users\am\Documents\Visual Studio 2013\Projects\kafka4net\src\Utils\WatchdogScheduler.cs:line 48
at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
at kafka4net.Utils.AskEx.Ask[T](IScheduler scheduler, Func`1 action) in c:\Users\am\Documents\Visual Studio 2013\Projects\kafka4net\src\Utils\AskEx.cs:line 12
at kafka4net.Producer.<ConnectAsync>d__29.MoveNext() in c:\Users\am\Documents\Visual Studio 2013\Projects\kafka4net\src\Producer.cs:line 89
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
at Kafka4nConsumer.Program.<KafkaTest>d__1.MoveNext() in c:\Users\am\Documents\Visual Studio 2013\Projects\KafkaConsumer\Kafka4nConsumer\Program.cs:line 140
Fetching position API is not intuitive. Make a default consumer api such that there is not need to know it before you really need. Default to fetching from the end of the queue.
Maybe rethink either it is possible to make the API more intuitive.
Currently error processing is focused around kafka broker recovery. Connecting to broker, fetching offsets is not reliable and failures are not handled properly, leading to occasional random behavior.
While working on #14, noticed one more issue.
Steps to reproduce:
new Producer("kafka1,kafka2,kafka3", configuration)
My findings why this is happening:
//Cluster.ConnectAsync method:
..
initBrokers = ...; // initialized with 3 brokers, and their id=-99
..
MergeTopicMeta(initMeta); // method is called, and there are still 3 brokers in there
..
//inside MergeTopicMeta method:
var newBrokers = topicMeta.Brokers.Except(_metadata.Brokers, BrokerMeta.NodeIdComparer).ToArray();
//newBroker is left with only 1 record (instead of 3), because all the brokers have the same Id=-99
..
_metadata.Brokers = _metadata.Brokers.Concat(newBrokers).ToArray();
So, metadata.Brokers results in adding only one broker to the list and producer is unable to fetch topic metadata from it.
Message format changes seems to be minor, but Vagrant testing suite will require more work. Perhaps it makes sense to switch to Docker containers for better flexibility and lower VM resources consumption.
Some research is required on compatibility strategy, i.e. are clients backward/forward compatible with server? Would it be possible/desired to do automatic version detection?
I am trying to stress test the producer and it seems like it is timing out all the time
Here is the code
var producer = new Producer(seed2Addresses,
new ProducerConfiguration(topic));
producer.ConnectAsync().Wait();
for (int i = 0; i < 200000; ++i)
{
var msg = new Message()
{
Key = BitConverter.GetBytes(1),
Value = Encoding.UTF8.GetBytes(String.Format("Test{0}", i))
};
producer.Send(msg);
}
producer.CloseAsync(TimeSpan.FromSeconds(120)).Wait();
While running this code it throws 2 exceptions :
Timed out while waiting for Send buffers to drain. Canceling
and
Timed out even after cancelling send loop! This shouldn't happen, there will likely be message loss
I suspect that it is because of CloseAsync, but not sure ... Is this how the producer is supposed to be used in kafka4net ?
When seed connections are taken over by plain ones, exception is thrown. It is no harm, but confusing.
Think about more logical algorithm, perhaps the one which reuses connection. Previous attempt to reuse failed because old callbacks were called.
I've enhanced ProducerConfiguration class with new parameter, CompressionType. This keeps it source-compatible with previous version but binary incompatible. Method not found exception can be thrown if not all dependencies in a project using kafka4net are updated.
Rework config classes and move optional parameters into properties. This will allow to preserve both, source and binary backward compatibility.
Do this upon next major release because this is yet another API change.
Hi,
since version 3 the nuget package name for reactive extension is System.Reactive instead of Rx-Main.
Could you please upgrade to this version?
Hi there, trying to find sample code for a consumer implementation that has a subscription to multiple topics and automatically commits offsets (either to kafka or other store).
Also, does this work with the latest version of Kafka ?
If driver enters failure-recovery loop, this should be detected and promoted to an error.
A flexible policy should be implemented with meaningful default.
Expose failures and recoveries as IObservable and subscribe a policy which would raise an error when failure-recovery pattern happens for more than 1000 times within 5 minutes and no message has been received (no progress).
Upgrade to nunit version 3 and use its features, such as constructor/dispose, variables injectors, callback based exceptions testers.
Implement something like akka.tetkit Within() to test time constrains more elegant.
Implement compression
It seems I could find offset commit implementation, is there any idea when it will be released?
This may or may not be a bug. Please delete if this is not correct area. I need some information from the experts.
I have a process which subscribes to messages in Kafka queue. This process runs all the time and subscribes to messages(no termination)
Here is my issue. When ever there is an exception in my additional processing steps, I don't receive any more message. All exceptions are handled so I except the subscription to not halt.
consumer.OnMessageArrived.Subscribe(msg => {
// Perform your own deserialization here
var text = Encoding.UTF8.GetString(msg.Value);
try{
//My addition processing
}
catch(Exception ex){
}
});
I need suggestion on how to handle consumer connection loss or consumer subscription loss. My assumption is that heart beat is already handled in the client so connection loss might be due to network failure or machine down cases.
There is a bug in FletcherHashedMessagePartitioner.
Because Random is not thread-safe, when using Producer in multithreaded environment for some time it'll start writing to the single one partition (only when message has no Key specified).
Here is small snippet that shows, usage of random in multithreaded environment:
class Message { public int Partition; }
static void Main()
{
var random = new Random();
//Here is how to fix it:
//var randomLocal = new ThreadLocal<Random>(() => new Random());
var data = new ConcurrentDictionary<int, int>();
var subject = new Subject<Message>();
subject
//another way of fixing - adding .Synchronize() here
.Do(msg => msg.Partition = random.Next(10))
.Subscribe(x => { data.AddOrUpdate(x.Partition, i => 1, (i, val) => val + 1); });
var threads = new List<Thread>();
for (int t = 0; t < 10; t++)
{
var thread = new Thread(() =>
{
for (int i = 0; i < 100000; i++)
subject.OnNext(new Message());
});
thread.Start();
threads.Add(thread);
}
foreach (var thread in threads)
thread.Join();
foreach (var d in data)
Console.WriteLine("Value: {0}, Count: {1}", d.Key, d.Value);
//results with non thread-local random are something like this:
//Value: 0, Count: 933076
//Value: 1, Count: 7443
//Value: 2, Count: 7345
//Value: 3, Count: 7363
//Value: 4, Count: 7600
//Value: 5, Count: 7454
//Value: 6, Count: 7658
//Value: 7, Count: 7302
//Value: 8, Count: 7360
//Value: 9, Count: 7399
}
In certain conditions it is possible to get exception after closing Producer:
2016-05-24 10:11:31.8848 Error [:13] RecoveryTest Unhandled exception System.ObjectDisposedException: Cannot access a disposed object.
at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
at kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) in C:\projects\kafka4net\src\Utils\WatchdogScheduler.cs:line 63
at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
at kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state) in C:\projects\kafka4net\src\Utils\RxSyncContextFromScheduler.cs:line 20
at System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
at System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() at System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
at kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action) in C:\projects\kafka4net\src\Utils\WatchdogScheduler.cs:line 63
at System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
at kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state) in C:\projects\kafka4net\src\Utils\RxSyncContextFromScheduler.cs:line 20
at System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
at System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()
The reason is quite tricky. Producer will inform caller about success or failure of individual messages via callbacks OnSuccess and OnPermError. Those callbacks are executed in driver's internal thread loop context. As long as subscriber to those events does not consume too much time, everything is good. But, in case if one create an observable from those events and does await successMessagesObservable
, things break down. Observable is executed in driver's internal thread and when Observable is complete, it calls observer's OnComplete. If user code has no synchronization context, TaskScheduler decides to execute continuation in current thread, which cause kafka driver's internal tread exposure to the user's code. Now all await
commands will be executed in kafka driver's synchronization context. But driver is already closed and it's EventLoopScheduler is disposed, so any "await" continuation will cause ObjectDisposedException.
Producer sometimes stops working after restart of single Kafka brokers.
It doesn't happen always (have not found stable way to reproduce it yet), but here are some observations.
WARN kafka4net.Producer Produce Request Failed for topic <topic_name> partition 14 with error NotLeaderForPartition
INFO kafka4net.Cluster Cluster saw new partition state: <topic_name>-14-NotLeaderForPartition
INFO kafka4net.Producer Detected change in topic/partition '<topic_name>'/14/NotLeaderForPartition IsOnline True->False
ERROR kafka4net.Internal.PartitionRecoveryMonitor received MetadataResponse for broker that is not yet in our list!
<the same error repeats multiple times>
and then
ERROR kafka4net.Producer Send Buffer Full for partition kafka4net.Internal.PartitionStateChangeEvent.
last one is expected, because buffer autogrowth is disabled
I will try to reproduce this behaviour and will add my findings here.
#14 highlighted that we have no coverage of scenarios when preferred replica is changed.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.