Coder Social home page Coder Social logo

apache / pulsar-dotpulsar Goto Github PK

View Code? Open in Web Editor NEW
234.0 42.0 60.0 16.19 MB

The official .NET client library for Apache Pulsar

Home Page: https://pulsar.apache.org/

License: Apache License 2.0

C# 100.00%
pulsar pubsub messaging streaming queuing event-streaming dotnet

pulsar-dotpulsar's Introduction

DotPulsar

CI - Unit

The official .NET client library for Apache Pulsar.

DotPulsar is written entirely in C# and implements Apache Pulsar's binary protocol.

What's new?

Have a look at the changelog.

Getting Started

Let's take a look at a "Hello world" example, where we first produce a message and then consume it. Note that the topic and subscription will be created if they don't exist.

First, we need a Pulsar setup. See Pulsar docs for how to set up a local standalone Pulsar instance.

Install the NuGet package DotPulsar and run the follow code example:

using DotPulsar;
using DotPulsar.Extensions;

const string myTopic = "persistent://public/default/mytopic";

// connecting to pulsar://localhost:6650
await using var client = PulsarClient.Builder().Build();

// produce a message
await using var producer = client.NewProducer(Schema.String).Topic(myTopic).Create();
await producer.Send("Hello World");

// consume messages
await using var consumer = client.NewConsumer(Schema.String)
    .SubscriptionName("MySubscription")
    .Topic(myTopic)
    .InitialPosition(SubscriptionInitialPosition.Earliest)
    .Create();

await foreach (var message in consumer.Messages())
{
    Console.WriteLine($"Received: {message.Value()}");
    await consumer.Acknowledge(message);
}

For a more in-depth tour of the API, please visit the Wiki.

Supported features

  • Service discovery
  • Automatic reconnect
  • TLS connections
  • Pulsar Proxy
  • Producer - send with custom metadata
  • Producer - send with event time, sequence id, and delayed message delivery
  • Producer - send with key and ordering key
  • Producer - partitioned topics
  • Consumer - subscription with initial position and priority level
  • Consumer - subscription types exclusive, shared, failover, and key shared
  • Consumer - receive and single + cumulative acknowledge
  • Consumer/Reader - seek on message-id and publish time
  • Consumer - unsubscribe
  • Consumer - compacted topics
  • Consumer - partitioned topics
  • Reader API
  • Read/Consume/Acknowledge batched messages
  • Telemetry
  • Authentication
    • TLS Authentication
    • JSON Web Token Authentication
    • Custom Authentication
  • Message compression
    • LZ4
    • ZLIB
    • ZSTD
    • SNAPPY
  • Schemas
    • Boolean
    • Bytes (using byte[] and ReadOnlySequence<byte>)
    • String (UTF-8, UTF-16, and US-ASCII)
    • INT8, INT16, INT32, and INT64
    • Float and Double
    • Time (using TimeSpan)
    • Timestamp and Date (using DateTime)

For a horizontal comparison with more language-specific clients, see Client Feature Matrix.

Roadmap

Help prioritizing the roadmap is most welcome, so please reach out and tell us what you want and need.

Join Our Community

Apache Pulsar has a Slack instance, and there you'll find us in the #dev-dotpulsar channel.

Versioning

We use SemVer for versioning. For the versions available, see the tags on this repository.

Authors

Contributions are welcomed and greatly appreciated. See also the list of contributors who participated in this project. Read the CONTRIBUTING guide for how to participate.

If your contribution adds Pulsar features for C# clients, you need to update both the Pulsar docs and the Client Feature Matrix. See Contribution Guide for more details.

License

This project is licensed under Apache License, Version 2.0.

pulsar-dotpulsar's People

Contributors

aintjoshinya avatar blankensteiner avatar chickenzilla avatar dionjansen avatar entvex avatar gfoidl avatar goldenccargill avatar hsa-dc avatar janpieterz avatar jvmdc avatar kandersen82 avatar lee-chrisbell avatar lhotari avatar mhelleborg avatar michaeljmarshall avatar momo-jun avatar mortenkj avatar ragingkore avatar robertindie avatar sijie avatar tisonkun avatar toneill818 avatar usaguerrilla avatar vp89 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-dotpulsar's Issues

Breaking: Consumer always faults randomly on high throughput with ReadOnlySequence<byte> exceptions

Let me start by saying that IMHO the project looks great, uses most of the best and latest features of .NET Core, and code looks nice. So congrats @blankensteiner and all other contributors, on a most excellent start!

Under high throughput the Consumer always faults randomly (sometimes sooner sometimes later) but always 100% of the times and while trying to read a ReadOnlySequence<T> (MacOs + SDK 3.1.101)

System.ArgumentOutOfRangeException: Specified argument was out of the range of valid values. (Parameter 'start')
   at DotPulsar.Internal.Extensions.ReadOnlySequenceExtensions.StartsWith[T](ReadOnlySequence`1 sequence, ReadOnlyMemory`1 target)

While stress testing I have seen it happen in these areas of the code but only when consuming:

It is reproducible by simply using the Samples solution, running the Producer to produce a large number of messages - say 50K, and starting the Consumer.

I tried to fix everything by attempting several solutions without success:

  • Using SequenceReader<byte>
  • Using sequence.TryGet(ref position, out var memory) in a while loop

Researching .Net issues I did find a lot of potentially related issues, some already fixed, but not available until the next releases of the framework:

Potential solutions:

  • Retrofit these fixes until it works and release a new version.
  • Don't use Pipes and go old school.
  • Use Pipes but without exposing the ReadOnlySequence but instead streams, spans, memory and arrays, whatever is necessary.

I would happily contribute with a fix if I can indeed fix it, because this is a show stopper for me, my team and our new solution design.

Currently I'm going deeper and up the call stack (PulsarStream, Connector,ConsumeChannel, etc...) trying to figure out if are corrupting the memory or have a memory leak.

Doesn't work

Hi there, I cloned the repo, built locally (using VS 2019) and try to run the tests. Unit tests but the integration tests just ran and ran until I stopped them. I then try to run the sample projects (producer, consumer and reader as a multiple projects on startup) and nothing happens. I debugged the code and when the code in the Producing.Program class ran this code:
_ = await producer.Send(data, cancellationToken);
Nothing happened. It never returned and nothing was received.
I hope it's just a case of me needing to set something else up, but I cant see from the documentation what that might be

After Service Url console application Terminated

Hi there,

Thanku for latest pulsar client library,i have tried to test above nuget ,imported in visual studio as console application ,but when i am sending message Programme auto-terminated

may i know what i am missing ,i have apache pulsar version 2.4

please find attached screen-shots as code refrence

PulsarProducerError

Message between earliest and latest

Hi @blankensteiner thanks for your effort on this!
Please I will need to be able to achieve this: To create a reader that will read from some message between earliest and latest with the library! I see this is possible on the java client but not fully implemented on dotPulsar

MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader<byte[]> reader = pulsarClient.newReader()
    .topic(topic)
    .startMessageId(id)
    .create();

Would love to see this feature implemented

Thanks a lot!!

Null Consumer

Hi @blankensteiner
While creating Reader, Pulsar server throws Cannot Create consumer: null error
I have not been successful with Reader thus far!

IProducer does not contain definition for NewMessage()

I build a small NET Core app to test Pulsar. I am trying to repeat steps described here https://pulsar.apache.org/docs/en/client-libraries-dotnet/
I have added the NuGet DotPulsar.

And I have errors during the compilation.
For example,
var data = Encoding.UTF8.GetBytes("Hello World");
var messageId = await pulsarProducer.NewMessage()
.Property("SomeKey", "SomeValue")
.Send(data);

IProducer does not contains definition for NewMessage() etc. How to fix it?

Support for Seek in IReader interface

Both IConsumer and IReader should expose a Seek method to reset the cursor of an existing reader channel (see java docs).

I believe this is a matter of:

  1. Adding a Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken) to IReaderChannel (I believe the ReaderChannelFactory actually returns a ConsumerChannel instance on create so this should be implemented already)
  2. Adding and implementing the ValueTask Seek(MessageId messageId, CancellationToken cancellationToken) for IReader

@blankensteiner I can submit a PR for this if you agree with the suggested implementation.

Usage of System.Diagnostics.DiagnosticsSource (5.0.1) break Azure Insights

After implementing the package in our project the Azure Insights stopped working. After some investigations there seems to be known known conflict when using System.Diagnostics.DiagnosticsSource.
I tested with a poc app and when bringing in just that dll (5.0.1) the insights directly stopped working in Azure.

See: https://docs.microsoft.com/en-us/azure/azure-monitor/app/status-monitor-v2-troubleshoot

Would it be possible to remove that dependancy?

Discuss - Stress Test fail on ci

When I created a pull request that merges to master on my own repository(RobertIndie#1), it failed to test on "DotPulsar.StressTests.ConsumerTests.Messages_GivenTopicWithMessages_ShouldConsumeAll".
Check
Is this problem caused by the performance limitations of Github Action?
Is it a good way not to run stress tests on CI?
@blankensteiner

Memory leak in DotPulsar.Internal.AsyncQueue

public void Enqueue(T item)
{
    lock (_lock)
    {
        ThrowIfDisposed();

        if (_pendingDequeues.Count > 0)
        {
            var tcs = _pendingDequeues.First;
            _pendingDequeues.RemoveFirst();
            tcs.Value.SetResult(item);
        }
        else
            _queue.Enqueue(item);
    }
}

missing tcs.Value.Dispose();

Should be:

public void Enqueue(T item)
{
    lock (_lock)
    {
        ThrowIfDisposed();

        if (_pendingDequeues.Count > 0)
        {
            var tcs = _pendingDequeues.First;
            _pendingDequeues.RemoveFirst();
            tcs.Value.SetResult(item);
            tcs.Value.Dispose();
        }
        else
            _queue.Enqueue(item);
    }
}

Support - Proxy

Add Pulsar Proxy support.

This is required to use SAAS offerings, which do not expose the brokers directly to the internet.

Single batched message metadata is not read correctly

Initial issue: apache/pulsar-client-node#129

It seems that sending a single message from either the Python or Node Pulsar clients with batching enabled and with message properties cases the dotpulsar consumer to read the the properties as part of the message data.

Repro

  1. Update the Consuming example to print the properties:

    var props = string.Join(", ", message.Properties.Select(p => $"{p.Key}={p.Value}"));
    Console.WriteLine($"Received: '{data}', props: {props}");
  2. Producer a message using either the Python or node pulsar clients with batching enabled. In all tests performed the message body is test and the message properties are {"client": "python"} or {"client": "node"} depending on the producing client e.g. for node:

    const producer = await client.createProducer({
      topic: 'persistent://public/default/test',
      batchingEnabled: true
    });
    
    await producer.send({
      data: Buffer.from("test"),
      properties: {
        client: "node",
      },
    });

Expected

The Consuming example should print:

Press a key to exit
The consumer for topic 'persistent://test/topic' is active
Received: 'test', props: client=python
Received: 'test', props: client=node

Actual

The Consuming example prints:

Press a key to exit
The consumer for topic 'persistent://test/topic' is active
Received: '

clientpythontest', props: 
Received: '

clientnodetest', props: 

Extra info

  1. Misreading the message body/ properties only occurs when batching is enabled on the producer
  2. The issue does not occur when batching is enabled on the producer but the batch contains more than 1 message
  3. I have not tested other clients
  4. I have tested consuming using the Java client and this works as expected
  5. The Node and Python clients are both based on the cpp-client
  6. Below are some more details presented for both the working as the not working cases for both Node and Python:

In the results below I recorded the raw frames retrieved by the dotpulsar client:

  • Batching: disabled
    • Message: "test"
    • Properties: {"client": "python"}
      • Frame: System.Buffers.ReadOnlySequence<Byte>[84] 0,0,0,24,8,9,74,20,8,0,18,16,8,184,84,16,2,24,255,255,255,255,255,255,255,255,255,1,14,1,22,119,98,99,0,0,0,42,10,13,112,117,108,115,97,114,45,54,50,45,49,54,48,16,0,24,222,185,230,190,209,46,34,16,10,6,99,108,105,101,110,116,18,6,112,121,116,104,111,110,116,101,115,116
      • Metadata: MetadataSize 42 NumMessagesInBatch 1 metadata.Properties.Count: 1
    • Properties: {"client": "node"}
      • Frame System.Buffers.ReadOnlySequence<Byte>[82] 0,0,0,24,8,9,74,20,8,0,18,16,8,184,84,16,3,24,255,255,255,255,255,255,255,255,255,1,14,1,156,178,222,175,0,0,0,40,10,13,112,117,108,115,97,114,45,54,50,45,49,54,49,16,0,24,132,214,231,190,209,46,34,14,10,6,99,108,105,101,110,116,18,4,110,111,100,101,116,101,115,116
      • MetadataSize 40 NumMessagesInBatch 1 metadata.Properties.Count: 1
  • Batching: enabled:
    • Message: "test"
    • Properties: {"client": "python"}
      • Frame: System.Buffers.ReadOnlySequence<Byte>[92] 0,0,0,24,8,9,74,20,8,0,18,16,8,184,84,16,4,24,255,255,255,255,255,255,255,255,255,1,14,1,99,1,207,89,0,0,0,26,10,13,112,117,108,115,97,114,45,54,50,45,49,54,50,16,0,24,251,144,235,190,209,46,88,1,0,0,0,20,10,16,10,6,99,108,105,101,110,116,18,6,112,121,116,104,111,110,24,4,116,101,115,116
      • MetdaData: MetadataSize 26 NumMessagesInBatch 1 metadata.Properties.Count: 0
    • Properties: {"client": "node"}
      • Frame: System.Buffers.ReadOnlySequence<Byte>[90] 0,0,0,24,8,9,74,20,8,0,18,16,8,184,84,16,5,24,255,255,255,255,255,255,255,255,255,1,14,1,59,201,107,43,0,0,0,26,10,13,112,117,108,115,97,114,45,54,50,45,49,54,51,16,0,24,148,247,235,190,209,46,88,1,0,0,0,18,10,14,10,6,99,108,105,101,110,116,18,4,110,111,100,101,24,4,116,101,115,116
      • MetadataSize 26 NumMessagesInBatch 1 metadata.Properties.Count: 0

"If requested by the community" Request

Hey thanks for this library.
Do you have to wait till the community makes a request to implement any feature?
If you have free time now, while wait? I think the best thing to do is implement them because when the requests start coming you may no more have that free time!
I have interest:

  1. Schema
  2. Partitioned topics
  3. Multi-topic subscriptions
  4. TLS Authentication
    Thanks

Docker Connection Issues

I have a simple dotnet 3.1 console app that has the DotPulsar example code in it. I am using docker-compose to startup the application and a standalone Pulsar instance. I am using the name of the docker service in the address (pulsar://pulsar:6650) to connect to. DotPular cannot seem to connect. It gets the following error:

System.Net.Internals.SocketExceptionFactory+ExtendedSocketException (111): Connection refused [::ffff:172.20.0.2]:6650
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw(Exception source)
at System.Net.Sockets.Socket.EndConnect(IAsyncResult asyncResult)
at System.Net.Sockets.Socket.DoMultipleAddressConnectCallback(Object result, MultipleAddressConnectAsyncResult context)

That said I tried the Pulsar.Client package and it connects and can send/receive without issue so it appears to be either an issue with DotPulsar or my configuration of it. Here is a repo that reproduces the issue: https://github.com/JarrodJ83/DotPulsarTest

Can anyone assist?

Wondering about proto

As I learn more about the project, I was wondering how DotPulsar.Internal.PulsarApi is generated? To be honest I was expecting to see a github workflow (or some automation) that creates the .cs but I don't (apologies if I missed it).

Producer Executor object gets killed because exception is thrown which isn't handled by Handle method.

Producer Executor object gets killed because exception is thrown which isn't handled by Handle method. This is happening when server gets disconnected. After it happens producer won't reconnect after service become available again.

Exception:

{Name = "PersistenceException" FullName = "DotPulsar.Exceptions.PersistenceException"}

Stack trace:

 	DotPulsar.dll!DotPulsar.Internal.Executor.Execute<DotPulsar.Internal.PulsarApi.CommandSendReceipt>(System.Func<System.Threading.Tasks.Task<DotPulsar.Internal.PulsarApi.CommandSendReceipt>> func, System.Threading.CancellationToken cancellationToken) Line 123	C#
 	[Resuming Async Method]	
 	System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)	Unknown
 	System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<DotPulsar.Internal.PulsarApi.CommandSendReceipt>.AsyncStateMachineBox<DotPulsar.Internal.Executor.<Execute>d__8<DotPulsar.Internal.PulsarApi.CommandSendReceipt>>.MoveNext(System.Threading.Thread threadPoolThread)	Unknown
 	System.Private.CoreLib.dll!System.Runtime.CompilerServices.TaskAwaiter.OutputWaitEtwEvents.AnonymousMethod__12_0(System.Action innerContinuation, System.Threading.Tasks.Task innerTask)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Action action, bool allowInlining)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.RunContinuations(object continuationObject)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.FinishSlow(bool userDelegateExecute)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.TrySetException(object exceptionObject)	Unknown
 	System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<DotPulsar.Internal.PulsarApi.CommandSendReceipt>.SetException(System.Exception exception)	Unknown
 	[Completed] DotPulsar.dll!DotPulsar.Internal.ProducerChannel.SendPackage(DotPulsar.Internal.PulsarApi.MessageMetadata metadata, System.Buffers.ReadOnlySequence<byte> payload, System.Threading.CancellationToken cancellationToken) Line 109	C#
 	System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)	Unknown
 	System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<DotPulsar.Internal.PulsarApi.CommandSendReceipt>.AsyncStateMachineBox<DotPulsar.Internal.ProducerChannel.<SendPackage>d__9>.MoveNext(System.Threading.Thread threadPoolThread)	Unknown
 	System.Private.CoreLib.dll!System.Runtime.CompilerServices.TaskAwaiter.OutputWaitEtwEvents.AnonymousMethod__12_0(System.Action innerContinuation, System.Threading.Tasks.Task innerTask)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.AwaitTaskContinuation.RunOrScheduleAction(System.Action action, bool allowInlining)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task.RunContinuations(object continuationObject)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.Task<System.__Canon>.TrySetResult(System.__Canon result)	Unknown
 	System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<DotPulsar.Internal.PulsarApi.BaseCommand>.SetResult(DotPulsar.Internal.PulsarApi.BaseCommand result)	Unknown
 	[Completed] DotPulsar.dll!DotPulsar.Internal.Connection.Send(DotPulsar.Internal.SendPackage command, System.Threading.CancellationToken cancellationToken) Line 194	C#
 	System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state)	Unknown
 	System.Private.CoreLib.dll!System.Runtime.CompilerServices.AsyncTaskMethodBuilder<DotPulsar.Internal.PulsarApi.BaseCommand>.AsyncStateMachineBox<DotPulsar.Internal.Connection.<Send>d__23>.MoveNext(System.Threading.Thread threadPoolThread)	Unknown
 	System.Private.CoreLib.dll!System.Runtime.CompilerServices.TaskAwaiter.OutputWaitEtwEvents.AnonymousMethod__12_0(System.Action innerContinuation, System.Threading.Tasks.Task innerTask)	Unknown
 	System.Private.CoreLib.dll!System.Threading.Tasks.AwaitTaskContinuation.System.Threading.IThreadPoolWorkItem.Execute()	Unknown
 	System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()	Unknown
 	[Async Call Stack]	
>	[Async] DotPulsar.dll!DotPulsar.Internal.Producer.Send(DotPulsar.MessageMetadata metadata, System.Buffers.ReadOnlySequence<byte> data, System.Threading.CancellationToken cancellationToken) Line 117	C#
 	[Async] DotPulsar.dll!DotPulsar.Internal.MessageBuilder.Send(System.ReadOnlyMemory<byte> data, System.Threading.CancellationToken cancellationToken) Line 89	C#
 	[Async] PulsarTest.dll!PulsarTest.DotPulsarTest.Send(DotPulsar.Abstractions.IProducer publisher, int messageSize, long index) Line 199	C#

Reading Incoming Message Properties

I need a way to read all of the properties of the incoming message. Is this possible in the current implementation?

From what I can tell DotPulsar.MessageMetadata currently does not provide a way to iterate over the Internal.PulsarApi.MessageMetadata.Properties of the received message. The only way to access these properties is through the indexer on DotPulsar.MessageMetadata which would require the consumer to know all of the property key names.

If it is welcomed I could implement this functionality and submit a PR.

Consuming a Message generated by a Python Pulsar Function puts Consumer in Faulted state.

I've been generating messages in a Python Pulsar Function and spitting them out to a receiving topic.

When I consume from that topic using the client app bundled with Pulsar, everything works okay and I can pull down and see my resulting messages.

If the DotPulsar consumer tries to pick up any of those messages created from the function, it immediately enters a faulted state.

The exception message is "{"Specified argument was out of the range of valid values. (Parameter 'length')"}"

I've been able to pull the following stack trace from the library:

System.ThrowHelper.ThrowStartOrEndArgumentValidationException(Int64 start)
   at System.Buffers.ReadOnlySequence`1.Slice(Int64 start, Int64 length)
   at DotPulsar.Internal.BatchHandler.Add(MessageIdData messageId, UInt32 redeliveryCount, MessageMetadata metadata, ReadOnlySequence`1 data) in C:\Users\oven-baked\source\repos\pulsar-dotpulsar\src\DotPulsar\Internal\BatchHandler.cs:line 51
   at DotPulsar.Internal.ConsumerChannel.<Receive>d__9.MoveNext() in C:\Users\oven-baked\source\repos\pulsar-dotpulsar\src\DotPulsar\Internal\ConsumerChannel.cs:line 89
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Threading.Tasks.ValueTask`1.get_Result()
   at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable`1.ConfiguredValueTaskAwaiter.GetResult()
   at DotPulsar.Internal.Consumer.<Receive>d__17.MoveNext() in C:\Users\oven-baked\source\repos\pulsar-dotpulsar\src\DotPulsar\Internal\Consumer.cs:line 99
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Threading.Tasks.ValueTask`1.get_Result()
   at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable`1.ConfiguredValueTaskAwaiter.GetResult()
   at DotPulsar.Internal.Executor.<Execute>d__9`1.MoveNext() in C:\Users\oven-baked\source\repos\pulsar-dotpulsar\src\DotPulsar\Internal\Executor.cs:line 136

Feature request - Support connectivity retries

I think most users will have to figure out something regarding connection management.

Currently (I think) the idea is to re-create the consumers and producers from the client (correct me if I'm wrong).

It would be interesting to provide some sort of built-in reconnect, where a consumer or producer isn't related to lifecycle but just to a topic/subscription and anywhere in the application you can (named) inject an IConsumer or an IProducer.

Not sure about the best approach/abstraction.

Remove concept of "NotReadyChannel" / Initialize channels eagerly

When working with remote clusters where connection/channel initialization may take some time, the concept of a NotReadyChannel seems like it could result in some data loss. As messages produced in that small window when the channel is a NotReadyChannel would not go anywhere. This also manifests if there are any errors when initializing the underlying channel(s), the producer keeps receiving messages which go nowhere and throw swallowed exceptions which seems like undesirable behavior.

I propose to initialize channels eagerly when creating the producer / consumer / reader, such that the returned IProducer / IConsumer / IReader is usable.

This would probably require some changes to the interface (ie. CreateProducer etc ... should be async) so I wanted to see what people thought first before coding up this kind of change.

Invalid scheme 'http'

Hi,i want to use 'http://xxx.xxx.xxx.xxx' as serviceUrl,but it failed. Target pulsar version is 2.7.1.
Can i connect to such a serviceUrl that start with 'http'? Because provider just provide that.

Recover after network disconnect

Hi Everyone!

We have just started using DotPuslar and mostly it works great!

We are having an issue where we think that our company network goes offline for a while, and when coming back up we think that the client waiting for a message might get stuck and doesn't notice that the tcp connection is lost.

Do you think this is possible? If yes, is there any way for us to detect this or work around it?

Edit: We are currently using version 1.0.2

Difference from pulsar-client-dotnet

Assuming a pulsar-client-dotnet library exists, which library should I prefer when starting up a new project based on Apache Pulsar? pulsar-client-dotnet seems to provide full-blown Pulsar support, and this seems to be true for pulsar-dotpulsar also. Probably the main difference is, that the pulsar-dotpulsar repository receives official support from Apache (I mean commercial maintenance, not community maintenance). Thanks for the help in choosing.

Prepare a new release for dotpulsar

Since dotpulsar is now part of the ASF, it would be great to start cutting the first release after it enters the ASF.

Cutting the first release can help figure a few things out:

  • Find any problems related to license and fix them.
  • Figure the release flow for dotpulsar

IOException on cancellation

We are seeing something that is happening on https://github.com/danske-commodities/dotpulsar/blob/0f722c19cd4f999a143836d24737413534bb2618/src/DotPulsar/Internal/PulsarStream.cs#L92.

It's the ReadAsync that throws an IOException 995: The I/O operation has been aborted because of either a thread exit or an application request. It's handled by our UnobservedTaskException handler, nothing urgent but we could look to handle it.

It only happens so far when we've already cancelled the consumer and are quitting the application. I followed it up and it seems that from https://github.com/danske-commodities/dotpulsar/blob/0f722c19cd4f999a143836d24737413534bb2618/src/DotPulsar/Internal/Connection.cs#L175 there's no cancellation token passed down that (could) cancel this reliably.

The content suggests it'd always be an intentional IOException for dotpulsar, unless a thread exit can happen that's not intentional?

Suggestion would be to add a cancellation token in Connection.cs for incoming frame processing and pass that down and cancel it on the DisposeAsync.

Support for RedeliveryCount in Message API

It would be great if the DotPulsar lib exposed RedeliveryCount in the Message API. This allows clients to build mechanisms like exponential backoff enforcing a max amount of processing a message on a consumer.

Related to PIP 22.

If server dies Consumer / Producer Channel sometimes goes into faulted state and never tries to reconnect

(I am not a big fan of exceptions or basing retry, etc. logic on them)

If I add the following exceptions cases into default handler then consumer and producer can reconnect. IMHO handler is a dangerous things as if implementation or underlying .net changes and new exceptions are thrown we might end up with server applications which hang from time to time for no apparent reason.

PersistenceException _ => FaultAction.Retry,
IOException _ => FaultAction.Retry,

These exception are throw if channel was sending data to the server at the time server went down.

Full code (in DotPulsar.Internal.DefaultExceptionHandler):

private FaultAction DetermineFaultAction(Exception exception, CancellationToken cancellationToken)
    => exception switch
    {
        PersistenceException _ => FaultAction.Retry,
        IOException _ => FaultAction.Retry,
        TooManyRequestsException _ => FaultAction.Retry,
        ChannelNotReadyException _ => FaultAction.Retry,
        ServiceNotReadyException _ => FaultAction.Retry,
        ConnectionDisposedException _ => FaultAction.Retry,
        AsyncLockDisposedException _ => FaultAction.Retry,
        PulsarStreamDisposedException _ => FaultAction.Retry,
        AsyncQueueDisposedException _ => FaultAction.Retry,
        OperationCanceledException _ => cancellationToken.IsCancellationRequested ? FaultAction.Rethrow : FaultAction.Retry,
        DotPulsarException _ => FaultAction.Rethrow,
        SocketException socketException => socketException.SocketErrorCode switch
        {
            SocketError.HostNotFound => FaultAction.Rethrow,
            SocketError.HostUnreachable => FaultAction.Rethrow,
            SocketError.NetworkUnreachable => FaultAction.Rethrow,
            _ => FaultAction.Retry
        },
        _ => FaultAction.Rethrow
    };

Support - Consumer.NegativeAcknowledge

NegativeAcknowledge provides a nice way to shape error handling when messages cannot be processed and you want to do so at a later time with a given timeout:

Prefer negative acknowledgements over acknowledgement timeout. Negative acknowledgement controls the re-delivery of individual messages with more precision, and avoids invalid redeliveries when the message processing time exceeds the acknowledgement timeout.

Update the files include ALv2 license header

Motivation

As part of the IP Clearance process for transferring dotpulsar to the ASF, the ASF policy requires the source code files are updated to include the ALv2 license header.

SetupChannel exceptions

When SetupChannel throws an exception, the application crashes because the method is async void.

I propose catching exceptions in this method and setting the ConsumerState or ProducerState to faulted.

private async void SetupChannel()
{
    try
    {
        var channel = await _factory.Create(CancellationTokenSource.Token).ConfigureAwait(false);
        await _consumer.SetChannel(channel).ConfigureAwait(false);
    }
    catch(Exception ex)
    {
        Console.WriteLine($"Consumer state is now faulted \n {ex.ToString()}");
        _stateManager.SetState(ConsumerState.Faulted);
    }
}

Client usage

Hi guys, im wondering what the expected usage of this library is;

  • How should I consider my client? Singleton/scoped/transient?
  • How should I consider the consumer/producer instances?
  • The client, doesn't support any delegate for rolling a new jwt, which indicates to me, that the client is to be considered either scoped or transient - What are the characteristics of port usage/connection pooling used in the client?
    • Should I wrap client creation in a factory with expiring cache based on the token or do you plan on natively supporting rolling a new token through a delegate (or similar)?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.