Comments (6)
I'm using the latest version of the kafka4net NuGet package (2.0.1), which seems to have been realeased on March of 2017 (The version 2.0.0 is the version that fixed this bug on May of 2016). However I'm still experiencing this issue, so I would like to manage it on my own code if possible.
I'm not sure If I understand completely this issue. The solution to fix it would be to include a critical section / lock on producer.Disconnect() and on OnSuccess and OnPermError callback methods ?
Thank you
from kafka4net.
@dhalfageme could you give more details, stack trace if you can?
from kafka4net.
Hi @vchekan, I'm doing the following steps:
- Connecting a producer and wait for the task to complete
- Send some messages in a loop and disconnecting the producer.
- Disconnect the producer and wait for the task to complete
I'm doing these tasks several times (every time I have enough messages to get a batch) in a streaming fashion.
I defined OnPermError and OnSuccess callbacks. (Initially I tried to resend the messages if the code reached the OnPermError callback, but even if I removed that code it still failed).
After some loops, the following exception is reaised, which seems to be the same exception of this issue:
Unhandled exception: System.ObjectDisposedException: Cannot access a disposed object.
en System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
en kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
en System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
en System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
en kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state)
en System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
en System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
en System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
en System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
en System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
en System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
en System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
en System.Threading.ThreadPoolWorkQueue.Dispatch()
en System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()
The exception is thrown after closed the producer, but the code stills continue executing (looking at the log messages), and then the application breaks down, so it seems some kind of asynchronous fail, ¿maybe in one of the callbacks?
I wrote below my code (the irrelevant parts have been removed):
try
{
producer = new KafkaProducer("myTopic");
producer.Connect()
foreach (Message message in messageBatch)
{
mLog.DebugFormat("Processing file {0}", message.OutputPath);
producer.SendToKafka(
System.Environment.TickCount.ToString(),
SliverImageSerializer.ConvertToJSON(message),
message.OutputPath);
}
}
catch (Exception e)
{
mLog.ErrorFormat("An error occurred while exporting messages: {0}", e.Message);
mLog.DebugFormat("Stacktrace: {0}", e.StackTrace);
}
finally
{
if (producer != null)
producer.Disconnect();
}
public class KafkaProducer
{
static long messagesSent = 0;
public KafkaProducer(string topic)
{
mProducer = new Producer(KAFKA_BROKERS, GetProducerConfiguration(topic));
mProducer.OnPermError += (exception, messages) =>
{
{
mLog.Error($"Failed to write {messages.Length} messages because of {exception.Message}");
}
};
mProducer.OnSuccess += messages =>
{
{
messagesSent += messages.Length;
mLog.Info($"Sent {messages.Length} messages. Total sent {messagesSent}");
}
};
}
public bool Connect()
{
try
{
Task task = mProducer.ConnectAsync();
task.Wait();
return true;
}
catch (Exception e)
{
mLog.ErrorFormat("Error connection kafka producer: {0}. Disconnecting...", e.Message);
Disconnect();
return false;
}
}
public void Disconnect()
{
try
{
Task task = mProducer.CloseAsync(TimeSpan.FromMinutes(10));
task.Wait();
}
catch (Exception e)
{
mLog.ErrorFormat("Error disconnecting kafka producer: {0}", e.Message);
}
}
public void SendToKafka(string messageKey, string message, string file)
{
try
{
ProduceMessage(DateTime.Now.ToString(), message);
}
catch (Exception e)
{
mLog.ErrorFormat("Couldn't send Kafka message with key {0} for file {1}. Error message {2}",
messageKey, file, e.Message);
mLog.DebugFormat("Stacktrace: {0}", e.StackTrace);
}
}
void ProduceMessage(string key, string message)
{
mProducer.Send(new Message{
Value = Encoding.UTF8.GetBytes(message),
Key = Encoding.UTF8.GetBytes(key)
});
}
Producer mProducer;
static ILog mLog = log4net.LogManager.GetLogger("KafkaProdcer");
const string KAFKA_BROKERS = "10.250.17.242:9092,10.250.17.241:9092";
}
Thank you in advance
from kafka4net.
Hi again,
I checked that the error happens even If y don't set the OnSucess and OnPermError callbacks
from kafka4net.
Hmm, too complex stack trace to figure it out without debugging. The only thing I can spot is that you use Task.Wait(), which is blocking call. Just for sake of experiment, can ou turn your code into async?
from kafka4net.
It happens again! (This time running the disconnect asynchronously, but again using the callback methods)
from kafka4net.
Related Issues (20)
- How to start consuming with a specific offset. HOT 6
- Producer throws exception when sending a lot of messages HOT 3
- Memory leak HOT 19
- Closing and opening connection right after, throws exception
- Producer starts writing to the same single partition (when no Message.Key is specified) HOT 1
- Producer stops working after manual partition reassignment HOT 5
- Memory leak in producer HOT 1
- Sample consumer code for multi-topic subscription HOT 2
- How should I handle the situation when Kafka starts re-balancing data on brokers. HOT 4
- Implement protocol v1 and v2 (aka kafka-0.9 and 0.10) HOT 3
- Better backward comptibility when enhancing configuration classes
- Upgrade to System.Reactive 3 HOT 2
- Consumer flow control is broken HOT 2
- Consumer subscription to messages lost HOT 4
- Idle Connections getting closed are logged as Error
- Cannot configure Tcp Keepalive HOT 1
- Use RecyclableMemoryStream instead of MemoryStream HOT 4
- Huge memory and threads consumption HOT 5
- Upgrade to System.Reactive 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka4net.