Coder Social home page Coder Social logo

sammym1982 / orleans.streams.kafka Goto Github PK

View Code? Open in Web Editor NEW

This project forked from jonathansant/orleans.streams.kafka

0.0 0.0 0.0 619 KB

An implementation of a PersistentStreamProvider for Microsoft Orleans and Kafka using the Confluent API

License: GNU General Public License v3.0

C# 100.00%

orleans.streams.kafka's Introduction

Orleans.Stream.Kafka

Kafka persistent stream provider for Microsoft Orleans that uses the Confluent SDK.

NuGet version

Dependencies

Orleans.Streams.Kafka has the following dependencies:

  • Confluent.Kafka: 1.1.0
  • Orleans.Streams.Utils: NuGet version

Installation

To start working with the Orleans.Streams.Kafka make sure you do the following steps:

  1. Install Kafka on a machine (or cluster) which you have access to use the Confluent Platform.
  2. Create Topics in Kafka with as many partitions as needed for each topic.
  3. Install the Orleans.Streams.Kafka nuget from the nuget repository.
  4. Add to the Silo configuration the new stream provider with the necessary parameters and the optional ones (if you wish). you can see what is configurable in KafkaStreamProvider under Configurable Values.

Example KafkaStreamProvider configuration:

public class SiloBuilderConfigurator : ISiloBuilderConfigurator
{
	public void Configure(ISiloHostBuilder hostBuilder)
		=> hostBuilder
				.AddMemoryGrainStorage("PubSubStore")
				.AddKafka("KafkaStreamProvider")
				.WithOptions(options =>
				{
					options.BrokerList = new [] {"localhost:8080"};
					options.ConsumerGroupId = "E2EGroup";
					options.ConsumeMode = ConsumeMode.StreamEnd;

					options
						.AddTopic(Consts.StreamNamespace)
						.AddTopic(Consts.StreamNamespace2, new TopicCreationConfig { AutoCreate = true, Partitions = 2, ReplicationFactor = 1 })
						.AddExternalTopic<TestModel>(Consts.StreamNamespaceExternal)
						;
				})
				.AddJson()
				.AddLoggingTracker()
        .Build()
        ;
}

public class ClientBuilderConfigurator : IClientBuilderConfigurator
{
	public virtual void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
		=> clientBuilder
					.AddKafka("KafkaStreamProvider")
				  .WithOptions(options =>
				  {
					  options.BrokerList =  new [] {"localhost:8080"};
					  options.ConsumerGroupId = "E2EGroup";

					  options
						 .AddTopic(Consts.StreamNamespace)
						 .AddTopic(Consts.StreamNamespace2, new TopicCreationConfig { AutoCreate = true, Partitions = 2, ReplicationFactor = 1 })
						 .AddExternalTopic<TestModel>(Consts.StreamNamespaceExternal)
						  ;
				  })
				  .AddJson()
          .Build()
          ;
}

Usage

Producing:

var testGrain = clusterClient.GetGrain<ITestGrain>(grainId);

var result = await testGrain.GetThePhrase();

Console.BackgroundColor = ConsoleColor.DarkMagenta;
Console.WriteLine(result);

var streamProvider = clusterClient.GetStreamProvider("KafkaProvider");
var stream = streamProvider.GetStream<TestModel>("streamId", "topic1");
await stream.OnNextAsync(new TestModel
{
	Greeting = "hello world"
});

Consuming:

var kafkaProvider = GetStreamProvider("KafkaStreamProvider");
var testStream = kafkaProvider.GetStream<TestModel>("streamId", "topic1");

// To resume stream in case of stream deactivation
var subscriptionHandles = await testStream.GetAllSubscriptionHandles();

if (subscriptionHandles.Count > 0)
{
	foreach (var subscriptionHandle in subscriptionHandles)
	{
		await subscriptionHandle.ResumeAsync(OnNextTestMessage);
	}
}

await testStream.SubscribeAsync(OnNextTestMessage);

Note: The Stream Namespace identifies the Kafka topic.

Configurable Values

These are the configurable values that the Orleans.Streams.Kafka:

  • Topics: The topics that will be used where messages will be Produced/Consumed.
  • BrokerList: List of Kafka brokers to connect to.
  • ConsumerGroupId: The ConsumerGroupId used by the Kafka Consumer. Default value is orleans-kafka
  • PollTimeout: Determines the duration that the Kafka consumer blocks for to wait for messages. Default value is 100ms
  • PollBufferTimeout: Determines the duration the KafkaAdapterReceiver will continue to poll for messages (for the same batch) Default value is 500ms
  • AdminRequestTimeout: Timeout for admin requests. Default value is 5s
  • ConsumeMode: Determines the offset to start consuming from. Default value is ConsumeMode.LastCommittedMessage
  • ProducerTimeout: Timeout for produce requests. Default value is 5s

orleans.streams.kafka's People

Contributors

jonathansant avatar pfrendo avatar stephenlautier avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.