Streamiz Kafka .NET is .NET stream processing library for Apache Kafka.
It's allowed to develop .NET applications that transform input Kafka topics into output Kafka topics. It's supported .NET Standard 2.1.
It's a rewriting inspired by Kafka Streams.
Finally it will provide the same functionality as Kafka Streams.
At moment, this project is being written. Thanks for you contribution !
Sample code
static void Main(string[] args)
{
CancellationTokenSource source = new CancellationTokenSource();
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-app";
config.BootstrapServers = "192.168.56.1:9092";
config.SaslMechanism = SaslMechanism.Plain;
config.SaslUsername = "admin";
config.SaslPassword = "admin";
config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.NumStreamThreads = 2;
StreamBuilder builder = new StreamBuilder();
builder.Stream<string, string>("test")
.FilterNot((k, v) => v.Contains("test"))
.Peek((k,v) => Console.WriteLine($"Key : {k} | Value : {v}"))
.To("test-output");
builder.Table("topic", InMemory<string, string>.As("test-ktable-store"));
Topology t = builder.Build();
KafkaStream stream = new KafkaStream(t, config);
Console.CancelKeyPress += (o, e) => {
source.Cancel();
stream.Close();
};
stream.Start(source.Token);
}
- End May 2020 - Beta 0.0.1 - All stateless processors, Exactly Once Semantic, InMemory store
- End October 2020 - Beta 0.0.2 - All statefull processors, Global Store, RocksDB Store
- End 2020 / Begin 2021 - 1.0.0 RC1 - Processor API, Metrics, Interactive Queries
Operator Name | Method | TODO | IMPLEMENTED | TESTED | DOCUMENTED |
---|---|---|---|---|---|
Branch | KStream -> KStream[] | โ | |||
Filter | KStream -> KStream | โ | |||
Filter | KTable -> KTable | โ | |||
InverseFilter | KStream -> KStream | โ | |||
InverseFilter | KTable -> KTable | โ | |||
FlatMap | KStream โ KStream | โ | |||
FlatMapValues | KStream โ KStream | โ | |||
Foreach | KStream โ void | โ | |||
GroupByKey | KStream โ KGroupedStream | โ | |||
GroupBy | KStream โ KGroupedStream | โ | |||
GroupBy | KTable โ KGroupedTable | โ | |||
Map | KStream โ KStream | โ | |||
MapValues | KStream โ KStream | โ | |||
MapValues | KTable โ KTable | โ | |||
Peek | KStream โ KStream | โ | |||
KStream โ void | โ | ||||
SelectKey | KStream โ KStream | โ | |||
Table to Steam | KTable โ KStream | โ |
Operator Name | Method | TODO | IMPLEMENTED | TESTED | DOCUMENTED |
---|---|---|---|---|---|
Aggregate | KGroupedStream -> KTable | โ | |||
Aggregate | KGroupedTable -> KTable | โ | |||
Aggregate(windowed) | KGroupedStream -> KTable | โ | |||
Count | KGroupedStream -> KTable | โ | |||
Count | KGroupedTable -> KTable | โ | |||
Count(windowed) | KGroupedStream โ KStream | โ | |||
Reduce | KGroupedStream โ KTable | โ | |||
Reduce | KGroupedTable โ KTable | โ | |||
Reduce(windowed) | KGroupedStream โ KTable | โ | |||
InnerJoin(windowed) | (KStream,KStream) โ KStream | โ | |||
LeftJoin(windowed) | (KStream,KStream) โ KStream | โ | |||
OuterJoin(windowed) | (KStream,KStream) โ KStream | โ | |||
InnerJoin(windowed) | (KTable,KTable) โ KTable | โ | |||
LeftJoin(windowed) | (KTable,KTable) โ KTable | โ | |||
OuterJoin(windowed) | (KTable,KTable) โ KTable | โ | |||
InnerJoin(windowed) | (KStream,KTable) โ KStream | โ | |||
LeftJoin(windowed) | (KStream,KTable) โ KStream | โ | |||
InnerJoin(windowed) | (KStream,GlobalKTable) โ KStream | โ | |||
LeftJoin(windowed) | (KStream,GlobalKTable) โ KStream | โ |
Must be used for testing your stream topology. Simulate a kafka cluster in memory. Usage:
static void Main(string[] args)
{
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "test-test-driver-app";
StreamBuilder builder = new StreamBuilder();
builder.Stream<string, string>("test")
.Filter((k, v) => v.Contains("test"))
.To("test-output");
Topology t = builder.Build();
using (var driver = new TopologyTestDriver(t, config))
{
var inputTopic = driver.CreateInputTopic<string, string>("test");
var outputTopic = driver.CreateOuputTopic<string, string>("test-output", TimeSpan.FromSeconds(5));
inputTopic.PipeInput("test", "test-1234");
var r = outputTopic.ReadKeyValue();
}
}
- Statefull processors impl [ ]
- Subtopology impl [ ]
- Task restoring [ ]
- Topology description [ ]
- Global state store [ ]
- Processor API [ ] + Refactor topology node processor builder [ ]
- Repartition impl [ ]
- Unit tests (TestTopologyDriver, ...) [ ]
- Rocks DB state implementation [ ]
- Optimizing Kafka Streams Topologies [ ]
- Interactive Queries [ ]
- Metrics [ ]
Some documentations for help during implementation : https://docs.confluent.io/current/streams/index.html