Coder Social home page Coder Social logo

p-brito / kafka-dotnet Goto Github PK

View Code? Open in Web Editor NEW
2.0 1.0 0.0 398 KB

The purpose of this repository is to explain step by step how to build producer and consumer applications that interact using Confluent Kafka.

C# 100.00%
confluent-kafka kafka dotnet-kafka dotnet eventdriven consumer-producer

kafka-dotnet's Introduction

kafka-dotnet

The purpose of this repository is to explain step by step how to build producer and consumer applications that interact using Confluent Kafka. Create a cluster

Getting started

  • If you don't have the docker installed, please do install it here.

  • Let's set the confluent platform to run locally, download this confluent platform all-in-one docker compose file.

  • Then run the following command docker-compose up -d. This will start the confluent platform, it might take some minutes.

  • Go to http://localhost:9021 and create a topic.

  • You are ready! If not, click here for more details.

Confluent Kafka

Producing a message using the Confluent Kafka.

  • To instantiate a producer you need a ProducerConfig. In this example, we will only define the BootstrapServers but you can define the BatchSize, CompressionType, and many others.
ProducerConfig config = new()
{
    BootstrapServers = this.Options.Server
};
  • Building the IProducer.
using IProducer<string, TMessage> producer = new ProducerBuilder<string, TMessage>(config).Build();
  • Producing a message.
string eventId = Guid.NewGuid().ToString();

Message<string, TMessage> @event = new()
{
    Key = eventId,
    Value = msg,
};

await producer.ProduceAsync(topic, @event, cancellationToken).ConfigureAwait(false);

Consuming a message using the Confluent Kafka.

  • To instantiate a consumer you need a ConsumerConfig. In this, example we define the BootstrapServers, the GroupId, and the AutoOffsetReset but you can define many others. The AutoOffReset is set to Earliest which means that it will automatically reset the offset to the earliest offset.
ConsumerConfig config = new()
{
    BootstrapServers = this.Options.Server,
    GroupId = $"{topic}-{Guid.NewGuid()}",
    AutoOffsetReset = AutoOffsetReset.Earliest
};
  • Building the IConsumer.
using IConsumer<string, TMessage> consumer = new ConsumerBuilder<string, TMessage>(config).Build();
  • Subscribing a topic.
consumer.Subscribe(topic);
  • Consuming a message.
var message = consumer.Consume(cancellationToken);

Kafka service

This is a wrapper around the confluent Kafka implementation, the goal was to simplify!

The IKafkaService has two methods:

  • ProduceAsync, all the logic above about producing messages to a topic is wrapped within this method.
  • ConsumeAsync, all the logic above about consuming messages from a topic is wrapped within this method.

The KafkaServiceOptions gather all the necessary configurations for the producer and consumer.

In your application add the service as follows:

services.Configure<KafkaServiceOptions>(this.Configuration.GetSection(nameof(KafkaServiceOptions)));

services.AddKafkaService();

Set the KafkaServiceOptions in your appsettings json file as follows:

{
  "KafkaServiceOptions": {
    "Server": "localhost:9092",
    "Topics": [ "myTopic" ]
  }
}

DEMO

Producing messages using the Kafka service wrapper.

try
{
    int count = 1;

    while (!stoppingToken.IsCancellationRequested)
    {
        await this.kafkaService.ProduceAsync($"Hi! I'm the event number {count}", "myTopic", stoppingToken).ConfigureAwait(false);

        count++;
    }
}
catch (Exception ex)
{
    this.Logger.LogError($"Exception: {ex.GetType().FullName} | " + $"Message: {ex.Message}");
}

Consuming messages using the Kafka service wrapper.

try
{
    MessageHandlerDelegate<string> handler = this.HandleAsync;

    await this.kafkaService.ConsumeAsync<string>("myTopic", handler, stoppingToken).ConfigureAwait(false);
}
catch (Exception ex)
{
    this.Logger.LogError($"Exception: {ex.GetType().FullName} | " + $"Message: {ex.Message}");
}

Notice, that a handler is passed, it's in this method where all the messages will be delivered.

private Task HandleAsync(string @event)
{
    this.Logger.LogInformation($"Event received: {@event}");

    return Task.CompletedTask;
}

Notice that this a demo, there is a lot of aspects that are not covered here.

kafka-dotnet's People

Contributors

p-brito avatar

Stargazers

 avatar  avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.