Coder Social home page Coder Social logo

Comments (6)

dhalfageme avatar dhalfageme commented on August 14, 2024

I'm using the latest version of the kafka4net NuGet package (2.0.1), which seems to have been realeased on March of 2017 (The version 2.0.0 is the version that fixed this bug on May of 2016). However I'm still experiencing this issue, so I would like to manage it on my own code if possible.

I'm not sure If I understand completely this issue. The solution to fix it would be to include a critical section / lock on producer.Disconnect() and on OnSuccess and OnPermError callback methods ?
Thank you

from kafka4net.

vchekan avatar vchekan commented on August 14, 2024

@dhalfageme could you give more details, stack trace if you can?

from kafka4net.

dhalfageme avatar dhalfageme commented on August 14, 2024

Hi @vchekan, I'm doing the following steps:

  1. Connecting a producer and wait for the task to complete
  2. Send some messages in a loop and disconnecting the producer.
  3. Disconnect the producer and wait for the task to complete

I'm doing these tasks several times (every time I have enough messages to get a batch) in a streaming fashion.

I defined OnPermError and OnSuccess callbacks. (Initially I tried to resend the messages if the code reached the OnPermError callback, but even if I removed that code it still failed).

After some loops, the following exception is reaised, which seems to be the same exception of this issue:

Unhandled exception: System.ObjectDisposedException: Cannot access a disposed object.
   en System.Reactive.Concurrency.EventLoopScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   en kafka4net.Utils.WatchdogScheduler.Schedule[TState](TState state, TimeSpan dueTime, Func`3 action)
   en System.Reactive.Concurrency.LocalScheduler.Schedule[TState](TState state, Func`3 action)
   en System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Action action)
   en kafka4net.Utils.RxSyncContextFromScheduler.Post(SendOrPostCallback d, Object state)
   en System.Threading.Tasks.SynchronizationContextAwaitTaskContinuation.PostAction(Object state)
   en System.Threading.Tasks.AwaitTaskContinuation.RunCallback(ContextCallback callback, Object state, Task& currentTask)
--- End of stack trace from previous location where exception was thrown ---
   en System.Threading.Tasks.AwaitTaskContinuation.<>c.<ThrowAsyncIfNecessary>b__18_0(Object s)
   en System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   en System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   en System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   en System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   en System.Threading.ThreadPoolWorkQueue.Dispatch()
   en System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

The exception is thrown after closed the producer, but the code stills continue executing (looking at the log messages), and then the application breaks down, so it seems some kind of asynchronous fail, ¿maybe in one of the callbacks?

I wrote below my code (the irrelevant parts have been removed):

            try
            {
                producer = new KafkaProducer("myTopic");

                producer.Connect()

                foreach (Message message in messageBatch)
                {
                    mLog.DebugFormat("Processing file {0}", message.OutputPath);
                    producer.SendToKafka(
                        System.Environment.TickCount.ToString(),
                        SliverImageSerializer.ConvertToJSON(message),
                        message.OutputPath);
                }
            }
            catch (Exception e)
            {
                mLog.ErrorFormat("An error occurred while exporting messages: {0}", e.Message);
                mLog.DebugFormat("Stacktrace: {0}", e.StackTrace);
            }
            finally
            {
                if (producer != null)
                    producer.Disconnect();
            }


    public class KafkaProducer
    {
        static long messagesSent = 0;

        public KafkaProducer(string topic)
        {
            mProducer = new Producer(KAFKA_BROKERS, GetProducerConfiguration(topic));

            mProducer.OnPermError += (exception, messages) =>
            {
                {
                    mLog.Error($"Failed to write {messages.Length} messages because of {exception.Message}");
                }  
            };

            mProducer.OnSuccess += messages =>
            {
                {
                    messagesSent += messages.Length;
                    mLog.Info($"Sent {messages.Length} messages. Total sent {messagesSent}");
                }
            };
        }

        public bool Connect()
        {
            try
            {
                Task task = mProducer.ConnectAsync();
                task.Wait();
                return true;
            }
            catch (Exception e)
            {
                mLog.ErrorFormat("Error connection kafka producer: {0}. Disconnecting...", e.Message);
                Disconnect();
                return false;
            }
        }

        public void Disconnect()
        {
            try
            {
                Task task =  mProducer.CloseAsync(TimeSpan.FromMinutes(10));
                task.Wait();
            }
            catch (Exception e)
            {
                mLog.ErrorFormat("Error disconnecting kafka producer: {0}", e.Message);
            }
        }

        public void SendToKafka(string messageKey, string message, string file)
        {
            try
            {
                ProduceMessage(DateTime.Now.ToString(), message);
            }
            catch (Exception e)
            {
                mLog.ErrorFormat("Couldn't send Kafka message with key {0} for file {1}. Error message {2}",
                    messageKey, file, e.Message);
                mLog.DebugFormat("Stacktrace: {0}", e.StackTrace);
            }
        }

        void ProduceMessage(string key, string message)
        {
            mProducer.Send(new Message{
                Value = Encoding.UTF8.GetBytes(message),
                Key = Encoding.UTF8.GetBytes(key)
            });
        }

        Producer mProducer;

        static ILog mLog = log4net.LogManager.GetLogger("KafkaProdcer");
        const string KAFKA_BROKERS = "10.250.17.242:9092,10.250.17.241:9092";
    }

Thank you in advance

from kafka4net.

dhalfageme avatar dhalfageme commented on August 14, 2024

Hi again,

I checked that the error happens even If y don't set the OnSucess and OnPermError callbacks

from kafka4net.

vchekan avatar vchekan commented on August 14, 2024

Hmm, too complex stack trace to figure it out without debugging. The only thing I can spot is that you use Task.Wait(), which is blocking call. Just for sake of experiment, can ou turn your code into async?

from kafka4net.

dhalfageme avatar dhalfageme commented on August 14, 2024

It happens again! (This time running the disconnect asynchronously, but again using the callback methods)

from kafka4net.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.