Coder Social home page Coder Social logo

nats.net.v1's Introduction

📣 ⚠️ This repository was renamed from nats.net on Wednesday, July 17th, 2024

Please be prepared to update your references to the new name. You can also read the announcement for more information.


NATS

NATS - .NET V1 C# Client

A C# .NET client for the NATS messaging system multi targetting .NET4.6+ and .NETStandard1.6.

Current Release: 1.1.6   Current Head: 1.1.7

License Apache 2.0 API Documentation Build Status NuGet

.NET V2 C# Client

Are you greenfielding a new project with the latest C# or just wanting the latest .NET support? NATS.Net V2 is now GA.

NATS.Net V2

  • Only supports Async I/O (async/await)
  • Target current .NET LTS releases (currently .NET 6.0 & .NET 8.0)

Checkout https://github.com/nats-io/nats.net

Getting started

The easiest and recommended way to start using NATS in your .NET projects, is to use the NuGet package. For examples on how to use the client, see below or in any of the included sample projects.

Check out NATS by example - An evolving collection of runnable, cross-client reference examples for NATS.

Get up and running with the source code

First, download the source code:

git clone [email protected]:nats-io/nats.net.v1.git

Project files

The repository contains several projects, all located under src\

  • NATS - The NATS.Client assembly

  • Tests

    • IntegrationTests - XUnit tests, verifying the client integration with nats-server.exe (ensure you have nats-server.exe in your path to run these).
    • IntegrationTestsInternal - XUnit tests, verifying internal only functionality
    • IntegrationTestsUsingNitoAsyncEx - XUnit tests, verifying functionality that depends on NitoAsyncEx
    • UnitTests - XUnit tests that requires no dependencies
  • Samples

    To see, the full set of samples is documented in the Samples Readme. Some examples provide statistics for benchmarking.

.NET Core SDK

.NET Core SDK style projects are used, so ensure your environment (command line, VSCode, Visual Studio, etc) supports the targetted .NET Core SDK in src\global.json as well as .NET Framework 4.6 or greater.

Visual Studio

The recommendation is to load src\NATS.sln into Visual Studio 2019 (Visual Studio 2017 works as well). .NET Core SDK style projects are used to multitarget different frameworks, so when working with the source code (debugging, running tets etc) you might need to mind the "context" of the current framework.

XML documentation is generated (in Release), so code completion, context help, etc, will be available in the editor.

Command line

Since .NET Core SDK style projects are used, you can use the .NET SDK to build, run tests, pack etc.

E.g. to build:

dotnet build src\NATS.sln -c Release

This will build the respective NATS.Client.dll, examples etc in Release mode, with only requiring the .NET Core SDK and the .NET Platform.

Building the API Documentation

Doxygen is used for building the API documentation. To build the API documentation, change directories to documentation and run the following command:

documentation\build_doc.bat

Doxygen will build the NATS .NET Client API documentation, placing it in the documentation\NATS.Client\html directory. Doxygen is required to be installed and in the PATH. Version 1.8 is known to work.

Current API Documentation

Version Notes

Version 1.1.0 Support for Server 2.10

The 1.1.0 release has support for Server 2.10 features and client validation improvements including:

  • Stream and Consumer info timestamps
  • Stream Configuration
    • Compression Option
    • Subject Transform
    • Consumer Limits
    • First Sequence
  • Multiple Filter Subjects
  • Subject Validation

Multiple Filter Subjects

A new feature is the ability to have multiple filter subjects for any single JetStream consumer.

ConsumerConfiguration cc = ConsumerConfiguration.Builder()
    ...
    .WithFilterSubjects("subject1", "subject2")
    .Build();

Subject and Queue Name Validation

For subjects, up until now, the client has been very strict when validating subject names for consumer subject filters and subscriptions. It only allowed printable ascii characters except for *, >, ., \\ and /. This restriction has been changed to the following:

  • cannot contain spaces \r \n \t
  • cannot start or end with subject token delimiter .
  • cannot have empty segments

This means that UTF characters are now allowed in subjects in this client.

For queue names, there has been inconsistent validation, if any. Queue names now require the same validation as subjects. Important We realize this may affect existing applications, but need to require consistency across clients.

Subscribe Subject Validation

Additionally, for subjects used in subscribe api, applications may start throwing an exception:

90011 Subject does not match consumer configuration filter

Let's say you have a stream with subject foo.> And you are subscribing to foo.a. When you don't supply a filter subject on a consumer, it becomes >, which means all subjects.

So this is a problem, because you think you are subscribing to foo.a but in reality, without this check, you will be getting all messages foo.> subjects, not just foo.a

Validating the subscribe subject against the filter subject is needed to prevent this. Unfortunately, this makes existing code throw the 90011 exception.

Version 1.0.8 Simplification and Service Framework

Simplification

There is a new simplified api that makes working with streams and consumers well, simpler! Simplification is released as of 1.0.8

Check out the examples:

Service Framework

The service API allows you to easily build NATS services. The Service Framework is released as of 1.0.8

The Services Framework introduces a higher-level API for implementing services with NATS. NATS has always been a strong technology on which to build services, as they are easy to write, are location and DNS independent and can be scaled up or down by simply adding or removing instances of the service.

The Services Framework further streamlines their development by providing observability and standardization. The Service Framework allows your services to be discovered, queried for status and schema information without additional work.

Check out the ServiceExample

Version 1.0.1 Consumer Create

This release by default will use a new JetStream consumer create API when interacting with nats-server version 2.9.0 or higher. This changes the subjects used by the client to create consumers, which might in some cases require changes in access and import/export configuration. The developer can opt out of using this feature by using a custom JetStreamOptions and using it when creating JetStream, Key Value and Object Store regular and management contexts.

JetStreamOptions jso = JetStreamOptions.Builder().WithOptOut290ConsumerCreate(true).Build();

IJetStream js = connection.CreateJetStreamContext(jso);
IJetStreamManagement jsm = connection.CreateJetStreamManagementContext(jso);
IKeyValue kv = connection.CreateKeyValueContext("bucket", KeyValueOptions.Builder(jso).Build());
IKeyValueManagement kvm = connection.CreateKeyValueManagementContext(KeyValueOptions.Builder(jso).Build());
IObjectStore os = connection.CreateObjectStoreContext("bucket", ObjectStoreOptions.Builder(jso).Build());
IObjectStoreManagement osm = connection.CreateObjectStoreManagementContext(ObjectStoreOptions.Builder(jso).Build());

Basic Usage

NATS .NET C# Client uses interfaces to reference most NATS client objects, and delegates for all types of events.

Creating a NATS .NET Application

First, reference the NATS.Client assembly so you can use it in your code. Be sure to add a reference in your project or if compiling via command line, compile with the /r:NATS.Client.DLL parameter. While the NATS client is written in C#, any .NET langage can use it.

Below is some code demonstrating basic API usage. Note that this is example code, not functional as a whole (e.g. requests will fail without a subscriber to reply).

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

// Reference the NATS client.
using NATS.Client;

Here are example snippets of using the API to create a connection, subscribe, publish, and request data.

// Create a new connection factory to create
// a connection.
ConnectionFactory cf = new ConnectionFactory();

// Creates a live connection to the default
// NATS Server running locally
IConnection c = cf.CreateConnection();

// Setup an event handler to process incoming messages.
// An anonymous delegate function is used for brevity.
EventHandler<MsgHandlerEventArgs> h = (sender, args) =>
{
    // print the message
    Console.WriteLine(args.Message);

    // Here are some of the accessible properties from
    // the message:
    // args.Message.Data;
    // args.Message.Reply;
    // args.Message.Subject;
    // args.Message.ArrivalSubcription.Subject;
    // args.Message.ArrivalSubcription.QueuedMessageCount;
    // args.Message.ArrivalSubcription.Queue;

    // Unsubscribing from within the delegate function is supported.
    args.Message.ArrivalSubcription.Unsubscribe();
};

// The simple way to create an asynchronous subscriber
// is to simply pass the event in.  Messages will start
// arriving immediately.
IAsyncSubscription s = c.SubscribeAsync("foo", h);

// Alternatively, create an asynchronous subscriber on subject foo,
// assign a message handler, then start the subscriber.   When
// multicasting delegates, this allows all message handlers
// to be setup before messages start arriving.
IAsyncSubscription sAsync = c.SubscribeAsync("foo");
sAsync.MessageHandler += h;
sAsync.Start();

// Simple synchronous subscriber
ISyncSubscription sSync = c.SubscribeSync("foo");

// Using a synchronous subscriber, gets the first message available,
// waiting up to 1000 milliseconds (1 second)
Msg m = sSync.NextMessage(1000);

c.Publish("foo", Encoding.UTF8.GetBytes("hello world"));

// Unsubscribing
sAsync.Unsubscribe();

// Publish requests to the given reply subject:
c.Publish("foo", "bar", Encoding.UTF8.GetBytes("help!"));

// Sends a request (internally creates an inbox) and Auto-Unsubscribe the
// internal subscriber, which means that the subscriber is unsubscribed
// when receiving the first response from potentially many repliers.
// This call will wait for the reply for up to 1000 milliseconds (1 second).
m = c.Request("foo", Encoding.UTF8.GetBytes("help"), 1000);

// Draining and closing a connection
c.Drain();

// Closing a connection
c.Close();

RX Usage

Importing the namespace NATS.Client.Rx you will be able to use an extension method connection.Observe(subject) to turn the connection to an observable. If you have a more advanced IAsyncSubscription, you can use asyncSubscription.ToObservable().

You can now import the namespace NATS.Client.Rx.Ops. After this you get builtin support for:

  • Subscribe
  • SubscribeSafe (will not fail an observer if it misbehaves)
  • Where
  • Select

If you want, you could instead take an external dependency on System.Reactive and use that instead of NATS.RX.Ops.

See the full example here: RxSample

Wildcard Subscriptions

The * wildcard matches any token, at any level of the subject:

IAsyncSubscription s = c.SubscribeAsync("foo.*.baz");

This subscriber would receive messages sent to:

  • foo.bar.baz
  • foo.a.baz
  • etc...

It would not, however, receive messages on:

  • foo.baz
  • foo.baz.bar
  • etc...

The > wildcard matches any length of the tail of a subject, and can only be the last token.

IAsyncSubscription s = c.SubscribeAsync("foo.>");

This subscriber would receive any message sent to:

  • foo.bar
  • foo.bar.baz
  • foo.foo.bar.bax.22
  • etc...

However, it would not receive messages sent on:

  • foo
  • bar.foo.baz
  • etc...

Publishing on this subject would cause the two above subscriber to receive the message:

c.Publish("foo.bar.baz", null);

Queue Groups

All subscriptions with the same queue name will form a queue group. Each message will be delivered to only one subscriber per queue group, using queue sematics. You can have as many queue groups as you wish. Normal subscribers will continue to work as expected.

ISyncSubscription s1 = c.SubscribeSync("foo", "job_workers");

or

IAsyncSubscription s = c.SubscribeAsync("foo", "job_workers", myHandler);

To unsubscribe, call the ISubscriber Unsubscribe method:

s.Unsubscribe();

When finished with NATS, close the connection.

c.Close();

Advanced Usage

Connection and Subscriber objects implement IDisposable and can be created in a using statement. Here is all the code required to connect to a default server, receive ten messages, and clean up, unsubcribing and closing the connection when finished.

using (IConnection c = new ConnectionFactory().CreateConnection())
{
    using (ISyncSubscription s = c.SubscribeSync("foo"))
    {
        for (int i = 0; i < 10; i++)
        {
            Msg m = s.NextMessage();
            System.Console.WriteLine("Received: " + m);
        }
    }  
}

Or to publish ten messages:

using (IConnection c = new ConnectionFactory().CreateConnection())
{
    for (int i = 0; i < 10; i++)
    {
        c.Publish("foo", Encoding.UTF8.GetBytes("hello"));
    }
}

Flush a connection to the server - this call returns when all messages have been processed. Optionally, a timeout in milliseconds can be passed.

c.Flush();

c.Flush(1000);

Setup a subscriber to auto-unsubscribe after ten messsages.

IAsyncSubscription s = c.SubscribeAsync("foo");
s.MessageHandler += (sender, args) =>
{
   Console.WriteLine("Received: " + args.Message);
};

s.Start();
s.AutoUnsubscribe(10);

Note that an anonymous function was used. This is for brevity here - in practice, delegate functions can be used as well.

Other events can be assigned delegate methods through the options object.

Options opts = ConnectionFactory.GetDefaultOptions();

opts.AsyncErrorEventHandler += (sender, args) =>
{
    Console.WriteLine("Error: ");
    Console.WriteLine("   Server: " + args.Conn.ConnectedUrl);
    Console.WriteLine("   Message: " + args.Error);
    Console.WriteLine("   Subject: " + args.Subscription.Subject);
};

opts.ServerDiscoveredEventHandler += (sender, args) =>
{
    Console.WriteLine("A new server has joined the cluster:");
    Console.WriteLine("    " + string.Join(", ", args.Conn.DiscoveredServers));
};

opts.ClosedEventHandler += (sender, args) =>
{
    Console.WriteLine("Connection Closed: ");
    Console.WriteLine("   Server: " + args.Conn.ConnectedUrl);
};

opts.DisconnectedEventHandler += (sender, args) =>
{
    Console.WriteLine("Connection Disconnected: ");
    Console.WriteLine("   Server: " + args.Conn.ConnectedUrl);
};

IConnection c = new ConnectionFactory().CreateConnection(opts);

After version 0.5.0, the C# .NET client supports async Requests.

public async void MyRequestDataMethod(IConnection c)
{
    var m = await c.RequestAsync("foo", null);

    ...
    m = c.RequestAsync("foo", null);
    // do some work
    await m;

    // cancellation tokens are supported.
    var cts = new CancellationTokenSource();

    var msg = c.RequestAsync("foo", null, cts.Token);
    // do stuff
    if (requestIsNowIrrevelant())
        cts.Cancel();

    await msg;
    // be sure to handle OperationCancelled Exception.
}

The NATS .NET client supports the cluster discovery protocol. The list of servers known to a connection is automatically updated when a connection is established, and afterword in realtime as cluster changes occur. A current list of known servers in a cluster can be obtained using the IConnection.Servers property; this list will be used if the client needs to reconnect to the cluster.

Clustered Usage

string[] servers = new string[] {
    "nats://localhost:1222",
    "nats://localhost:1224"
};

Options opts = ConnectionFactory.GetDefaultOptions();
opts.MaxReconnect = 2;
opts.ReconnectWait = 1000;
opts.Servers = servers;

IConnection c = new ConnectionFactory().CreateConnection(opts);

TLS

The NATS .NET client supports TLS 1.2. Set the secure option, add the certificate, and connect. Note that .NET requires both the private key and certificate to be present in the same certificate file.

In addition to the code found here, please refer to the sample code at TlsVariationsExample

Options opts = ConnectionFactory.GetDefaultOptions();
opts.Secure = true;

// .NET requires the private key and cert in the
//  same file. 'client.pfx' is generated from:
//
// openssl pkcs12 -export -out client.pfx
//    -inkey client-key.pem -in client-cert.pem
X509Certificate2 cert = new X509Certificate2("client.pfx", "password");

opts.AddCertificate(cert);

// Some connections like those with OCSP 
// require CheckCertificateRevocation
opts.CheckCertificateRevocation = true;

IConnection c = new ConnectionFactory().CreateConnection(opts);

Many times, it is useful when developing an application (or necessary when using self-signed certificates) to override server certificate validation. This is achieved by overriding the remove certificate validation callback through the NATS client options.

private bool verifyServerCert(object sender,
X509Certificate certificate, X509Chain chain,
        SslPolicyErrors sslPolicyErrors)
{
    if (sslPolicyErrors == SslPolicyErrors.None)
        return true;

    // Do what is necessary to achieve the level of
    // security you need given a policy error.
}        

<...>

Options opts = ConnectionFactory.GetDefaultOptions();
opts.Secure = true;
opts.TLSRemoteCertificationValidationCallback = verifyServerCert;
opts.AddCertificate("client.pfx");

IConnection c = new ConnectionFactory().CreateConnection(opts);

The NATS server default cipher suites may not be supported by the Microsoft .NET framework. Please refer to nats-server --help_tls usage and configure the NATS server to include the most secure cipher suites supported by the .NET framework.

Custom Dialer/Custom TCP connection.

The NATs .NET client supports passing in a custom implementation of the ITCPConnection class.

	public class TCPConnection : ITCPConnection
    {
        <Custom implementation of ITCPConnection>
    }

	<...>
		Options opts = ConnectionFactory.GetDefaultOptions();
        opts.TCPConnection = new CustomTCPConnection();

        IConnection c = new ConnectionFactory().CreateConnection(opts);

This is useful for testing, or implementing a TCPConnection that supports TLS termination.

See TLSReverseProxyExample for an implementation.

NATS 2.0 Authentication (Nkeys and User Credentials)

This requires server with version >= 2.0.0

NATS servers have a new security and authentication mechanism to authenticate with user credentials and Nkeys. The simplest form is to use the helper method UserCredentials(credsFilepath).

IConnection c = new ConnectionFactory().CreateConnection("nats://127.0.0.1", "user.creds")

The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server. The core client library never has direct access to your private key and simply performs the callback for signing the server challenge. The helper will load and wipe and erase memory it uses for each connect or reconnect.

The helper also can take two entries, one for the JWT and one for the NKey seed file.

IConnection c = new ConnectionFactory().CreateConnection("nats://127.0.0.1", "user.jwt", "user.nk");

You can also set the event handlers directly and manage challenge signing directly.

EventHandler<UserJWTEventArgs> jwtEh = (sender, args) =>
{
    // Obtain a user JWT...
    string jwt = getMyUserJWT();

    // You must set the JWT in the args to hand off
    // to the client library.
    args.JWT = jwt;
};

EventHandler<UserSignatureEventArgs> sigEh = (sender, args) =>
{
    // get a private key seed from your environment.
    string seed = getMyUserSeed();

    // Generate a NkeyPair
    NkeyPair kp = Nkeys.FromSeed(seed);

    // Sign the nonce and return the result in the SignedNonce
    // args property.  This must be set.
    args.SignedNonce = kp.Sign(args.ServerNonce)
};
Options opts = ConnectionFactory.GetDefaultOptions();
opts.SetUserCredentialHandlers(jwtEh, sigEh);

IConnection c = new ConnectionFactory().CreateConnection(opts));

Bare Nkeys are also supported. The nkey seed should be in a read only file, e.g. seed.txt

> cat seed.txt
SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM

This is a helper function which will load and decode and do the proper signing for the server nonce. It will clear memory in between invocations. You can choose to use the low level option and provide the public key and a signature callback on your own.

Options opts = ConnectionFactory.GetDefaultOptions();
opts.SetNkey("UCKKTOZV72L3NITTGNOCRDZUI5H632XCT4ZWPJBC2X3VEY72KJUWEZ2Z",
"./config/certs/user.nk");

// Direct
IConnection c = new ConnectionFactory().CreateConnection(opts));

Message Headers

The NATS.Client version 0.11.0 and NATS server version 2.2 support message headers. Message headers are represented as a string name value pair just as HTTP headers are.

Setting Message Headers

IConnection c = new new ConnectionFactory().CreateConnection();

Msg m = new Msg();
m.Header["Content-Type"] = "json";
m.Subject = "foo";
c.Publish(m);

Getting Message Headers

IConnection c = new new ConnectionFactory().CreateConnection();
var s = c.SubscribeSync("foo")

Msg m = s.NextMessage();
string contentType = m.Header["Content-Type"];

Exceptions

The NATS .NET client can throw the following exceptions:

  • NATSException - The generic NATS exception, and base class for all other NATS exception.
  • NATSConnectionException - The exception that is thrown when there is a connection error.
  • NATSProtocolException - This exception that is thrown when there is an internal error with the NATS protocol.
  • NATSNoServersException - The exception that is thrown when a connection cannot be made to any server.
  • NATSSecureConnRequiredException - The exception that is thrown when a secure connection is required.
  • NATSConnectionClosedException - The exception that is thrown when an operation is performed on a connection that is closed.
  • NATSSlowConsumerException - The exception that is thrown when a consumer (subscription) is slow.
  • NATSStaleConnectionException - The exception that is thrown when an operation occurs on a connection that has been determined to be stale.
  • NATSMaxPayloadException - The exception that is thrown when a message payload exceeds what the maximum configured.
  • NATSBadSubscriptionException - The exception that is thrown when a subscriber operation is performed on an invalid subscriber.
  • NATSTimeoutException - The exception that is thrown when a NATS operation times out.
  • NATSJetStreamStatusException - The exception that is thrown when a JetStream subscription detects an exceptional or unknown status

JetStream

Publishing and subscribing to JetStream enabled servers is straightforward. A JetStream enabled application will connect to a server, establish a JetStream context, and then publish or subscribe. This can be mixed and matched with standard NATS subject, and JetStream subscribers, depending on configuration, receive messages from both streams and directly from other NATS producers.

The JetStream Context

After establishing a connection as described above, create a JetStream Context.

IJetStream js = c.CreateJetStreamContext();

You can pass options to configure the JetStream client, although the defaults should suffice for most users. See the JetStreamOptions class.

There is no limit to the number of contexts used, although normally one would only require a single context. Contexts may be prefixed to be used in conjunction with NATS authorization.

Publishing

To publish messages, use the IJetStream.Publish(...) API. A stream must be established before publishing. You can publish in either a synchronous or asynchronous manner.

Synchronous:

// create a typical NATS message
Msg msg = new Msg("foo", Encoding.UTF8.GetBytes("hello"));
PublishAck pa = js.Publish(msg);

See JetStreamPublish in the JetStream samples for a detailed and runnable sample.

If there is a problem an exception will be thrown, and the message may not have been persisted. Otherwise, the stream name and sequence number is returned in the publish acknowledgement.

There are a variety of publish options that can be set when publishing. When duplicate checking has been enabled on the stream, a message ID should be set. One set of options are expectations. You can set a publish expectation such as a particular stream name, previous message ID, or previous sequence number. These are hints to the server that it should reject messages where these are not met, primarily for enforcing your ordering or ensuring messages are not stored on the wrong stream.

The PublishOptions are immutable, but the builder can be re-used for expectations by clearing the expected.

For example:

PublishOptions.PublishOptionsBuilder builder = PublishOptions.Builder()
  .WithExpectedStream(stream)
  .WithMessageId("mid1");

PublishAck pa = js.Publish("foo", null, builder.Build());

pubOptsBuilder.ClearExpected()
  .WithExpectedLastSequence("mid1")
  .WithMessageId("mid2");
pa = js.Publish("foo", null, pubOptsBuilder.build());

See JetStreamPublishWithOptionsUseCases in the JetStream samples for a detailed and runnable sample.

Asynchronous:

IList<Task<PublishAck>> tasks = new new List<Task<PublishAck>>();
for (int x = 1; x < roundCount; x++) {
  // create a typical NATS message
  Msg msg = new Msg("foo", Encoding.UTF8.GetBytes("hello"));

  // Publish a message
  tasks.Add(js.PublishAsync(msg));
}

foreach (Task<PublishAck> task in tasks) {
 ... process the task
}

See the JetStreamPublishAsync in the JetStream samples for a detailed and runnable sample.

ReplyTo When Publishing

The Message object allows you to set a replyTo, but in publish requests, the replyTo is reserved for internal use as the address for the server to respond to the client with the PublishAck.

Subscribing

There are three methods of subscribing, Push Async, Push Sync and Pull with each variety having its own set of options and abilities.

Push subscriptions can be synchronous or asynchronous. The server pushes messages to the client.

Push Async Subscribing

void MyHandler(object sender, MsgHandlerEventArgs args)
{
    // Process the message.
    // Ack the message depending on the ack model
}

PushSubscribeOptions pso = PushSubscribeOptions.Builder()
    .WithDurable("optional-durable-name")
    .build();

boolean autoAck = ...

// Subscribe using the handler
IJetStreamPushAsyncSubscription sub = 
    js.PushSubscribeAsync("subject", MyHandler, false, pso);

See the JetStreamPushSubcribeBasicAsync in the JetStream samples for a detailed and runnable sample.

Push Sync Subscribing

PushSubscribeOptions pso = PushSubscribeOptions.Builder()
    .WithDurable("optional-durable-name")
    .build();

// Subscribe 
IJetStreamPushSyncSubscription sub = 
        js.PushSubscribeSync("subject", pso);

See JetStreamPushSubcribeSync in the JetStream samples for a detailed and runnable sample.

Pull Subscribing

Pull subscriptions are always synchronous. The server organizes messages into a batch which it sends when requested.

    PullSubscribeOptions options = PullSubscribeOptions.Builder()
        .WithDurable("durable-name-is-required")
        .Build();

    IJetStreamPullSubscription sub = js.PullSubscribe("subject", options);

Fetch:

List<Msg> message = sub.Fetch(100, 1000); // 100 messages, 1000 millis timeout
for (Msg m : messages) {
    // process message
    m.Ack();
}

The fetch pull is a macro pull that uses advanced pulls under the covers to return a list of messages. The list may be empty or contain at most the batch size. All status messages are handled for you. The client can provide a timeout to wait for the first message in a batch. The fetch call returns when the batch is ready. The timeout may be exceeded if the server sent messages very near the end of the timeout period.

See JetStreamPullSubFetch and JetStreamPullSubFetchUseCases in the JetStream samples for a detailed and runnable sample.

Batch Size:

sub.Pull(100); // 100 messages
...
Msg m = sub.NextMessage(1000);

An advanced version of pull specifies a batch size. When asked, the server will send whatever messages it has up to the batch size. If it has no messages it will wait until it has some to send. The client may time out before that time. If there are less than the batch size available, you can ask for more later. Once the entire batch size has been filled, you must make another pull request.

See JetStreamPullSubBatchSize and JetStreamPullSubBatchSizeUseCases in the JetStream samples for detailed and runnable samples.

No Wait and Batch Size:

sub.PullNoWait(100); // 100 messages
...
Msg m = sub.NextMessage(1000);

An advanced version of pull also specifies a batch size. When asked, the server will send whatever messages it has up to the batch size, but will never wait for the batch to fill and the client will return immediately. If there are less than the batch size available, you will get what is available and a 404 status message indicating the server did not have enough messages. You must make a pull request every time. This is an advanced api

See the JetStreamPullSubNoWaitUseCases in the JetStream samples for a detailed and runnable sample.

Expires In and Batch Size:

sub.PullExpiresIn(100, 3000); // 100 messages, expires in 3000 millis
...
Msg m = sub.NextMessage(4000);

Another advanced version of pull specifies a maximum time to wait for the batch to fill. The server returns messages until either the batch is filled or the time expires. It's important to set your client's timeout to be longer than the time you've asked the server to expire in. You must make a pull request every time. In subsequent pulls, you will receive multiple 408 status messages, one for each message the previous batch was short. You can just ignore these. This is an advanced api

See JetStreamPullSubExpiresIn and JetStreamPullSubExpiresInUseCases in the JetStream samples for detailed and runnable samples.

Client Error Messages

In addition to some generic validation messages for values in builders, there are also additional grouped and numbered client error messages:

  • Subscription building and creation
  • Consumer creation
  • Object Store operations
Name Group Code Description
JsSoDurableMismatch SO 90101 Builder durable must match the consumer configuration durable if both are provided.
JsSoDeliverGroupMismatch SO 90102 Builder deliver group must match the consumer configuration deliver group if both are provided.
JsSoDeliverSubjectMismatch SO 90103 Builder deliver subject must match the consumer configuration deliver subject if both are provided.
JsSoOrderedNotAllowedWithBind SO 90104 Bind is not allowed with an ordered consumer.
JsSoOrderedNotAllowedWithDeliverGroup SO 90105 Deliver group is not allowed with an ordered consumer.
JsSoOrderedNotAllowedWithDurable SO 90106 Durable is not allowed with an ordered consumer.
JsSoOrderedNotAllowedWithDeliverSubject SO 90107 Deliver subject is not allowed with an ordered consumer.
JsSoOrderedRequiresAckPolicyNone SO 90108 Ordered consumer requires Ack Policy None.
JsSoOrderedRequiresMaxDeliverOfOne SO 90109 Max deliver is limited to 1 with an ordered consumer.
JsSoNameMismatch SO 90110 Builder name must match the consumer configuration name if both are provided.
JsSoOrderedMemStorageNotSuppliedOrTrue SO 90111 Mem Storage must be true if supplied.
JsSoOrderedReplicasNotSuppliedOrOne SO 90112 Replicas must be 1 if supplied.
JsSoNameOrDurableRequiredForBind SO 90113 Name or Durable required for Bind.
JsSubPullCantHaveDeliverGroup SUB 90001 Pull subscriptions can't have a deliver group.
JsSubPullCantHaveDeliverSubject SUB 90002 Pull subscriptions can't have a deliver subject.
JsSubPushCantHaveMaxPullWaiting SUB 90003 Push subscriptions cannot supply max pull waiting.
JsSubQueueDeliverGroupMismatch SUB 90004 Queue / deliver group mismatch.
JsSubFcHbNotValidPull SUB 90005 Flow Control and/or heartbeat is not valid with a pull subscription.
JsSubFcHbNotValidQueue SUB 90006 Flow Control and/or heartbeat is not valid in queue mode.
JsSubNoMatchingStreamForSubject SUB 90007 No matching streams for subject.
JsSubConsumerAlreadyConfiguredAsPush SUB 90008 Consumer is already configured as a push consumer.
JsSubConsumerAlreadyConfiguredAsPull SUB 90009 Consumer is already configured as a pull consumer.
removed SUB 90010
JsSubSubjectDoesNotMatchFilter SUB 90011 Subject does not match consumer configuration filter.
JsSubConsumerAlreadyBound SUB 90012 Consumer is already bound to a subscription.
JsSubExistingConsumerNotQueue SUB 90013 Existing consumer is not configured as a queue / deliver group.
JsSubExistingConsumerIsQueue SUB 90014 Existing consumer is configured as a queue / deliver group.
JsSubExistingQueueDoesNotMatchRequestedQueue SUB 90015 Existing consumer deliver group does not match requested queue / deliver group.
JsSubExistingConsumerCannotBeModified SUB 90016 Existing consumer cannot be modified.
JsSubConsumerNotFoundRequiredInBind SUB 90017 Consumer not found, required in bind mode.
JsSubOrderedNotAllowOnQueues SUB 90018 Ordered consumer not allowed on queues.
JsSubPushCantHaveMaxBatch SUB 90019 Push subscriptions cannot supply max batch.
JsSubPushCantHaveMaxBytes SUB 90020 Push subscriptions cannot supply max bytes.
JsSubSubjectNeededToLookupStream SUB 90022 Subject needed to lookup stream. Provide either a subscribe subject or a ConsumerConfiguration filter subject.
JsConsumerCreate290NotAvailable CON 90301 Name field not valid when v2.9.0 consumer create api is not available.
JsConsumerNameDurableMismatch CON 90302 Name must match durable if both are supplied.
JsMultipleFilterSubjects210NotAvailable CON 90303 Multiple filter subjects not available until server version 2.10.0.
OsObjectNotFound OS 90201 The object was not found.
OsObjectIsDeleted OS 90202 The object is deleted.
OsObjectAlreadyExists OS 90203 An object with that name already exists.
OsCantLinkToLink OS 90204 A link cannot link to another link.
OsGetDigestMismatch OS 90205 Digest does not match meta data.
OsGetChunksMismatch OS 90206 Number of chunks does not match meta data.
OsGetSizeMismatch OS 90207 Total size does not match meta data.
OsGetLinkToBucket OS 90208 Cannot get object, it is a link to a bucket.
OsLinkNotAllowOnPut OS 90209 Link not allowed in metadata when putting an object.

Message Acknowledgements

There are multiple types of acknowledgements in JetStream:

  • Msg.Ack(): Acknowledges a message.
  • Msg.AckSync(timeout): Acknowledges a message and waits for a confirmation. When used with deduplications this creates exactly once delivery guarantees (within the deduplication window). This may significantly impact performance of the system.
  • Msg.Nak(): A negative acknowledgment indicating processing failed and the message should be resent later.
  • Msg.Term(): Never send this message again, regardless of configuration.
  • Msg.InProgress(): The message is being processed and reset the redelivery timer in the server. The message must be acknowledged later when processing is complete.

Note that exactly once delivery guarantee can be achieved by using a consumer with explicit ack mode attached to stream setup with a deduplication window and using the ackSync to acknowledge messages. The guarantee is only valid for the duration of the deduplication window.

Benchmarking the NATS .NET Client

Benchmarking the NATS .NET Client is simple - run the benchmark application with a default NATS server running. Tests can be customized, run benchmark -h for more details. In order to get the best out of your test, update the priority of the benchmark application and the NATS server:

start /B /REALTIME nats-server.exe

And the benchmark:

start /B /REALTIME benchmark.exe

The benchmarks include:

  • PubOnly - publish only
  • PubSub - publish and subscribe
  • ReqReply - request/reply
  • Lat - latency.

Note, kb/s is solely payload, excluding the NATS message protocol. Latency is measure in microseconds.

Here is some sample output, from a VM running on a Macbook Pro, simulating a cloud environment. Running the benchmarks in your environment is highly recommended; these numbers below should reflect the low end of performance numbers.

PubOnlyNo	  10000000	   4715384 msgs/s	       0 kb/s
PubOnly8b	  10000000	   4058182 msgs/s	   31704 kb/s
PubOnly32b	  10000000	   3044199 msgs/s	   95131 kb/s
PubOnly256b	  10000000	    408034 msgs/s	  102008 kb/s
PubOnly512b	  10000000	    203681 msgs/s	  101840 kb/s
PubOnly1k	   1000000	     94106 msgs/s	   94106 kb/s
PubOnly4k	    500000	     52653 msgs/s	  210612 kb/s
PubOnly8k	    100000	      8552 msgs/s	   68416 kb/s
PubSubNo	  10000000	   1101135 msgs/s	       0 kb/s
PubSub8b	  10000000	   1075814 msgs/s	    8404 kb/s
PubSub32b	  10000000	    990223 msgs/s	   30944 kb/s
PubSub256b	  10000000	    391208 msgs/s	   97802 kb/s
PubSub512b	    500000	    190811 msgs/s	   95405 kb/s
PubSub1k	    500000	     97392 msgs/s	   97392 kb/s
PubSub4k	    500000	     23714 msgs/s	   94856 kb/s
PubSub8k	    100000	     11870 msgs/s	   94960 kb/s
ReqReplNo	     20000	      3245 msgs/s	       0 kb/s
ReqRepl8b	     10000	      3237 msgs/s	      25 kb/s
ReqRepl32b	     10000	      3076 msgs/s	      96 kb/s
ReqRepl256b	      5000	      2446 msgs/s	     611 kb/s
ReqRepl512b	      5000	      2530 msgs/s	    1265 kb/s
ReqRepl1k	      5000	      2973 msgs/s	    2973 kb/s
ReqRepl4k	      5000	      1944 msgs/s	    7776 kb/s
ReqRepl8k	      5000	      1394 msgs/s	   11152 kb/s
LatNo (us)	500 msgs, 141.74 avg, 86.00 min, 600.40 max, 23.28 stddev
Lat8b (us)	500 msgs, 141.52 avg, 70.30 min, 307.00 max, 26.77 stddev
Lat32b (us)	500 msgs, 139.64 avg, 93.70 min, 304.20 max, 15.88 stddev
Lat256b (us)	500 msgs, 175.55 avg, 101.80 min, 323.90 max, 14.93 stddev
Lat512b (us)	500 msgs, 182.56 avg, 103.00 min, 1982.00 max, 81.50 stddev
Lat1k (us)	500 msgs, 193.15 avg, 86.20 min, 28705.20 max, 1277.13 stddev
Lat4k (us)	500 msgs, 291.09 avg, 99.70 min, 43679.10 max, 2047.67 stddev
Lat8k (us)	500 msgs, 363.56 avg, 131.50 min, 39428.50 max, 1990.81 stddev

About the code and contributing

A note: The NATS C# .NET client was originally developed with the idea in mind that it would support the .NET 4.0 code base for increased adoption, and closely parallel the GO client (internally) for maintenance purposes. So, some of the nice .NET APIs/features were intentionally left out. While this has certainly paid off, after consideration, and some maturation of the NATS C# library, the NATS C# code will move toward more idiomatic .NET coding style where it makes sense.

To that end, with any contributions, certainly feel free to code in a more .NET idiomatic style than what you see. PRs are always welcome!

TODO

  • Key Value
  • Ordered Consumer
  • Object Store
  • JetStream
  • Another performance pass - look at stream directly over socket, contention, fastpath optimizations, rw locks.
  • Rx API (unified over NATS Streaming?)
  • Allow configuration for performance tuning (buffer sizes), defaults based on plaform.
  • Azure Service Bus Connector
  • Visual Studio Starter Kit
  • Expand Unit Tests to test internals (namely Parsing)
  • Travis CI (Used AppVeyor instead)
  • .NET Core compatibility, TLS required.
  • Convert unit tests to xunit
  • Comprehensive benchmarking
  • TLS
  • Encoding (Serialization/Deserialization)
  • Update delegates from traditional model to custom
  • NuGet package
  • Strong name the assembly

Any suggestions and/or contributions are welcome!

nats.net.v1's People

Contributors

adowork avatar alexanderinochkin avatar aqlasolutions avatar bruth avatar buybackoff avatar colinsullivan1 avatar cstlaurent avatar danielwertheim avatar devklauss avatar gcolliso avatar harrisa1 avatar jarema avatar jasper-d avatar jlumsden-mts avatar joibel avatar lanfeust69 avatar laurensvergote avatar maksimenko-stanislav avatar matthiashanel avatar mtmk avatar scottf avatar sixlettervariables avatar skinnysackboy avatar sspates avatar stephenjannin avatar tbeets avatar tkeller-moxe avatar tobzip avatar watfordgnf avatar wennemyr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nats.net.v1's Issues

I need a publish method where I can pass in the byte count

I would like to be able to pass in the byte count that goes with my byte[] passed to the Publish method. In other words, I want to set the "num" value in Connection.publish with a parameter so that I don't have to make an additional array copy of my data after serialization.

Extreamly slow when send message to multiple Topics (300) for first time

Hi,
I did a test and result is weird. Please take a look

  1. I created 300 subscription that subscribed 300 Topic.
  2. I Publish 1 message to each Topic.
  3. Wait for all subscriptions receive first message.
    (takes over 290,000ms to send 300 messages to 300 Topics (1 message per Topic))
  4. Send 10 messages to each Topic
  5. wait for all subscription receive 10 messages
    (take 7ms) (takes over 7ms to send 3000 messages to 300 Topics (10 messages per Topic))

Im using:
NATS: nats-server version 0.9.4
OS: Window 10
CPU: Intel Xeon E3-1240
RAM: 8GB
Client: https://github.com/nats-io/csharp-nats
There is my code

    class Program
    {
        static void Main(string[] args)
        {
            var topics = 300;
            
            //init connection and subscribers

            var connFac = new ConnectionFactory();
            var subConn = connFac.CreateConnection();
            var pubConn = connFac.CreateConnection();

            var received = 0;
            var subscriptions = new List<IAsyncSubscription>();

            for (var topicId = 0; topicId < topics; topicId++)
            {
                var subscription = subConn.SubscribeAsync(topicId.ToString(),
                    (sender, x) => { Interlocked.Increment(ref received); });

                subscriptions.Add(subscription);
            }

            //1st run: 300 topics, 1 messages per topic

            var messagesPerTopic = 1;
            var noOfMessages = topics * messagesPerTopic;
            Console.WriteLine("Test with {0:n0} topics, {1:n0} messages per topic", topics, messagesPerTopic);

            var sw = Stopwatch.StartNew();
            for (var topicId = 0; topicId < topics; topicId++)
            {
                for (var i = 0; i < messagesPerTopic; i++)
                {
                    pubConn.Publish(topicId.ToString(), Encoding.UTF8.GetBytes("hello"));
                }
            }

            while (received < noOfMessages && sw.Elapsed.TotalMinutes < 5)
            {
                Thread.Sleep(1);
            }
            sw.Stop();

            Console.WriteLine("{0:n0} messages received in {1:n0}ms", received, sw.ElapsedMilliseconds);
            Console.WriteLine("{0:n0} msg/s", received * 1000 / sw.ElapsedMilliseconds);

            //2nd run: 300 topics, 10 messages per topic

            received = 0;
            messagesPerTopic = 10;
            noOfMessages = topics * messagesPerTopic;

            Console.WriteLine("Test with {0:n0} topics, {1:n0} messages per topic", topics, messagesPerTopic);

            sw = Stopwatch.StartNew();
            for (var topicId = 0; topicId < topics; topicId++)
            {
                for (var i = 0; i < messagesPerTopic; i++)
                {
                    pubConn.Publish(topicId.ToString(), Encoding.UTF8.GetBytes("hello"));
                }
            }

            while (received < noOfMessages && sw.Elapsed.TotalMinutes < 5)
            {
                Thread.Sleep(1);
            }
            sw.Stop();

            Console.WriteLine("{0:n0} messages received in {1:n0}ms", received, sw.ElapsedMilliseconds);
            Console.WriteLine("{0:n0} msg/s", received * 1000 / sw.ElapsedMilliseconds);

            foreach (var s in subscriptions)
            {
                s.Dispose();
            }

            pubConn.Dispose();
            subConn.Dispose();

        }
    }

Does/can Connection.Publish block?

The Publish method returns void. How long will it block?

It has its own local buffer? What happens when the buffer is full? How do I configure the buffer?

It appears to throw an exception if the connection is closed? But what about if we're reconnecting? Do we go into a closed state and then into a reconnecting state? Is "closed" a manual close only or does it go into that state with a connection fault?

Benchmarks

Which exactly code you run to get the performance of several millions messages per sec and what hardware do you use? Is it VM on *Nix? I played with pure .NET's TCP sockets recently and cannot get such numbers even on localhost loopback. I am looking through the code to try to understand is it buffering or something else that yields such results. Also, as far as I understand you do not use IOCP or SocketAsyncEventArgs?

Missing url in Disconnect/Closed events

I have this code for the connection callbacks:

Options opts = ConnectionFactory.GetDefaultOptions();
opts.ClosedEventHandler += ( s, e ) => {
    _logger.Error( "Connection to {0} was closed.", e.Conn.ConnectedUrl );
};
opts.DisconnectedEventHandler += ( s, e ) => {
    _logger.Error( "Disconnected from {0}.", e.Conn.ConnectedUrl );
};

It seems that the e.Conn.ConnectedUrl is null on both events, which makes it hard to know from which server one lost the connection.

Also, I miss a "ConnectedEventHandler" so that I can know when the first connection actually succeeded and to which server (ReconnectedEventHandler doesn't fire on inital connections).

Rx support

Daniel Wertheim has implemented a very nice Rx support in his .Net client. https://github.com/danielwertheim/mynatsclient
I really like that approach as it is fast, elegant and modern. On the other hand I prefer to use officially supported libraries as they tend over time to be more stable and supported.
Can you please consider to include the Rx support in the core of the official client lib?

How Can I use TLS by ‘bidirectional authentication’

Now our company plan to using nats server by 'bidirectional authentication', I reference the example code:

Options opts = ConnectionFactory.GetDefaultOptions();
opts.Secure = true;

    // .NET requires the private key and cert in the
    //  same file. 'client.pfx' is generated from:
    //
    // openssl pkcs12 -export -out client.pfx
    //    -inkey client-key.pem -in client-cert.pem
    X509Certificate2 cert = new X509Certificate2("client.pfx", "password");

    opts.AddCertificate(cert);

    IConnection c = new ConnectionFactory().CreateConnection(opts);  

But it not works, I try to implement it by Java code according to Java Tls example, it works fine.

So, could you please let me know or get me some tip:

  1. How can I set the client certification file, when I use 'bidirectional authentication'?
  2. Need I write more code to implement it?

Looking forward your response.

Thank you!
Windy

Add prerelease packages on NuGet

Pre-Release NuGet Packages should be provided for users to take advantage of new features without needing to compile themselves, and is very convenient for .NET core users. This will be a build of new features on the master branch that pass all tests.

Support large numbers of asynchronous subscribers in a single process

In order to support large numbers (thousands) of asynchronous subscribers in an application, using one task per subscriber to handle message events requires more resources than are typically available. An option should be provided that will setup a NATS connection to process all subscriber message events in a single task or thread, providing a choice of scalability over performance.

This will address #113.

very very serious performance problem

Hi All,

This is a very simple program. but you can see that the subscriber callback function is very very slow and it suspend - run - suspend - run, you can see it on subscribe console.

I tested on Windows 8 64bit and gnatsd 0.9.4 and csnats 0.6.0

I've tested golang client, it seems better than cs client.

I've tested if subscribe maybe 1-2 subjects, it's ok, but if subscribe count more than 20 or 30. the performance is very very terrible.

Can anyone tell what's wrong with this? or how can I fix it?

I think subject count should't significant impact on performance and I only use 30-50. I think 30-50 is very very easy to NATS.

It's not about data size because I use just 10 bytes. and it's same as 500 bytes
I look at taskmgr, the cpu usage is very low, but it's very slow :(

Publish

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NATS.Client;
namespace Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            Options opts = ConnectionFactory.GetDefaultOptions();
            opts.Url = "nats://127.0.0.1:4222";
            string data = "01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
            using (IConnection c = new ConnectionFactory().CreateConnection(opts))
            {
                for(int i=0;i<100000; ++i)
                {
                    c.Publish("ABCDEFGHIJK_" + (i % 200).ToString(), Encoding.Default.GetBytes(data + i.ToString()));
                }
                c.Flush();
            }
            Console.WriteLine("Finish");
            Console.Read();
        }
    }
}

Subscriber

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NATS.Client;
namespace Subscribe
{
    class Program
    {
        static void Main(string[] args)
        {
            Options opts = ConnectionFactory.GetDefaultOptions();
            opts.Url = "nats://127.0.0.1:4222";

            int count = 0;
            object obj = new object();

            IConnection c = new ConnectionFactory().CreateConnection(opts);
            EventHandler<MsgHandlerEventArgs> msgHandler = (sender, args2) =>
            {
                lock(obj)
                {
                    count++;
                    Console.WriteLine(count);
                    if (count == 100000)
                    {
                        Console.WriteLine("Finish");
                    }
                }
            };


            for (int i = 0; i < 30; ++i)
            {
                string subject = "ABCDEFGHIJK_" + (i).ToString();
                c.SubscribeAsync(subject, msgHandler);
            }
            c.Flush();
            Console.Read();
        }
    }
}

Requestor sample does not work

Hi,

I'm using server 0.9.2 in Docker container on different machine (Ubuntu 14.04) to client (Windows 10). Subscribe and Publish work as designed. However, c.Request(subject, payload, 1000); times out. I modified the sample to add a timeout as the original c.Request(subject, payload); just hung. Url = "nats://i-am-hiding-server-ip-address:4222":

NATS.Client.NATSTimeoutException: Timeout occurred.
   at NATS.Client.Channel`1.get(Int32 timeout) in F:\csnats-master\csnats-master\NATS\Channel.cs:line 70
   at NATS.Client.SyncSubscription.NextMessage(Int32 timeout) in F:\csnats-master\csnats-master\NATS\SyncSub.cs:line 61
   at NATS.Client.Connection.request(String subject, Byte[] data, Int32 timeout) in F:\csnats-master\csnats-master\NATS\Conn.cs:line 1901
   at NATS.Client.Connection.Request(String subject, Byte[] data, Int32 timeout) in F:\csnats-master\csnats-master\NATS\Conn.cs:line 1917
   at NATSExamples.Requestor.Run(String[] args) in F:\csnats-master\csnats-master\examples\Requestor\Requestor.cs:line 40

Any ideas?

Warmest regards,

Garrard.

.NET 4.5 Code Compatibility

Previously, .NET 4.0 code compatibility was maintained. As .NET 4.0 is no longer supported, move to use features found in .NET 4.5.

Improve performance of the Request API

Look into enhancing performance of the Connection.Request API.

  • Improve latency in the benchmark through usage of a synchronous subscription when replying.
  • Replace calls to the public API with directly writing the protocol commands to the write buffer. This reduces contention in a number of ways, primarily through eliminating unnecessary kicks to the flusher thread.
  • Evaluate a "soft" flush - flushing the write buffer without a ping.

Based on #73.

Request Problem

Hi, I play the example Requestor and Replier.
I found there's a exception when request return because
In Conn.cs

        internal virtual Msg request(string subject, byte[] data, int timeout)
        {
            Msg    m     = null;
            string inbox = NewInbox();

            SyncSubscription s = subscribeSync(inbox, null);
            s.AutoUnsubscribe(1);

            publish(subject, inbox, data);
            m = s.NextMessage(timeout);
            try
            {
                // the auto unsubscribe should handle this.
                s.Unsubscribe();
            }
            catch (Exception) {  /* NOOP */ }

            return m;
        }

when s.NextMessage(timeout);

            if (msg != null)
            {
                long d = tallyDeliveredMessage(msg);
                if (d == max)
                {
                    // Remove subscription if we have reached max.
                    localConn.removeSub(this);
                }
                if (localMax > 0 && d > localMax)
                {
                    throw new NATSMaxMessagesException();
                }
            }

localConn.removeSub(this);

so

       internal virtual void removeSub(Subscription s)
        {
            Subscription o;

            subs.TryRemove(s.sid, out o);
            if (s.mch != null)
            {
                s.mch.close();
                s.mch = null;
            }

            s.conn = null;
            s.closed = true;
        }

s.conn = null; here, but next return to Conn.cs and execute

              // the auto unsubscribe should handle this.
                s.Unsubscribe();
        public virtual void Unsubscribe()
        {
            Connection c;
            lock (mu)
            {
                c = this.conn;
            }

            if (c == null)
                throw new NATSBadSubscriptionException();

            c.unsubscribe(this, 0);
        }

the exception here

 if (c == null)
    throw new NATSBadSubscriptionException();

I have two problems

  1. when execute request and the max count =1 and unsubscribe in NextMessage() function so c always null here and the exception always throw?

2.if throw new NATSBadSubscriptionException(); it has significant impact on performance, and Is this a bug? or can I delete this exception such as if (c == null) return; ???

or is there something wrong with me?

Thanks All

Update .NET core dependencies

.NET core moves fast. Update dependencies for the client, tests, and samples to reference the latest stable libraries/frameworks.

Update: Best to wait and do this with the .NET core 2.0 release barring any security issues or major bugs.

Benchmark latency test needs to be asynchronous

An asynchronous subscriber needs to handle the return message from the server. The test erroneously assumes the flush will return before the published message arrives back to the subscriber.

Request threading issues?

Hi Colin,
I'm doing some fairly basic tests at the moment to explore throughput. I've a system that I want to Request about 20 sources and aggregate the results.

If I just use publish (using the same approach as below), then I can comfortably publish 10,000 messages in <300ms on my PC. So I put together the following snippet with Request, and I was expecting something similar, but am seeing NATSTimeoutException instead.

                Msg msg = null;
                ConnectionFactory cf = new ConnectionFactory();
                var sw = new Stopwatch();
                sw.Start();
                using (var c = cf.CreateConnection())
                {
                    var tasks = Enumerable.Range(0, 10000).Select(x =>
                         Task.Factory.StartNew(() =>
                         {
                           msg = c.Request("AppointmentSearch", new AppointmentSearch().ToMsg(), 10 * 1000);
                         })
                    );
                    Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(10));
                }
                return sw.ElapsedMilliseconds;

Looking at gnatsd -V I can see a throughput of about 1 message per second. This doesn't seem right to me? Is there anything I can look at?

AsyncSub Messages lost during reconnect

I’m doing some advanced unit tests.
I have a 3 nodes cluster
I create 2 connections

  • 1 for sending messages on node 1 on subject foo
  • 1 for receiving messages on node 2 on subject foo

I start sending messages every 50 ms
I stop node 2 (the one that is used by the AsyncSub)
The AsyncSubscription is loosing all the messages that have been sent before it get reconnected to another node.

Is is the expected behavior ?

Here is a unit test that can be put in UnitTestReconnect.cs

[Fact]
public void TestIntensivePublishShouldNotFail()
{
List<byte[]> msgs = new List<byte[]>();
List sentMsgs = new List();
List receivedMsgs = new List();

        ManualResetEvent recvReconnected = new ManualResetEvent(false);

        // one connection for receiving
        Options recvOptions = ConnectionFactory.GetDefaultOptions();
        recvOptions.Servers = new [] { "nats://localhost:1222", "nats://localhost:1223", "nats://localhost:1224" }; 
        recvOptions.NoRandomize = true;
        recvOptions.AsyncErrorEventHandler += (sender, args) => { Debug.WriteLine("Receive Error " + args.Error + " Dropped=" + args.Subscription.Dropped); };
        recvOptions.ClosedEventHandler += (sender, args) => { Debug.WriteLine("Receive Closed "); };
        recvOptions.DisconnectedEventHandler += (sender, args) => { Debug.WriteLine("Receive Disconnected "); };
        recvOptions.ReconnectedEventHandler += (sender, args) => { recvReconnected.Set();Debug.WriteLine("Receive Reconnected "); };

        ManualResetEvent sendReconnected = new ManualResetEvent(false);

        // one connection for sending (better for sending big amount of data
        Options sendOptions = ConnectionFactory.GetDefaultOptions();
        sendOptions.Servers = new [] { "nats://localhost:1223", "nats://localhost:1222", "nats://localhost:1224" }; 
        sendOptions.NoRandomize = true;
        sendOptions.AsyncErrorEventHandler += (sender, args) => { Debug.WriteLine("Send Error " + args.Error); };
        sendOptions.ClosedEventHandler += (sender, args) => { Debug.WriteLine("Send Closed "); };
        sendOptions.DisconnectedEventHandler += (sender, args) => { Debug.WriteLine("Send Disconnected "); };
        sendOptions.ReconnectedEventHandler += (sender, args) => { sendReconnected.Set();Debug.WriteLine("Send Reconnected "); };


        // create server
        NATSServer ns = utils.CreateServerWithArgs("-p 1222 -cluster nats://localhost:1422 -routes nats://localhost:1423,nats://localhost:1424 -D");
        NATSServer ns2 = utils.CreateServerWithArgs("-p 1223 -cluster nats://localhost:1423 -routes nats://localhost:1422,nats://localhost:1424 -D");
        NATSServer ns3 = utils.CreateServerWithArgs("-p 1224 -cluster nats://localhost:1424 -routes nats://localhost:1422,nats://localhost:1423 -D");
        Thread.Sleep(1000);

        using (IConnection sendConn = new ConnectionFactory().CreateConnection(sendOptions))
        using (IConnection rcvConn = new ConnectionFactory().CreateConnection(recvOptions))
        {
            int receivedMessages = 0;
            int sentMessages = 0;
            int sentFailure = 0;

            IAsyncSubscription s = rcvConn.SubscribeAsync("foo");
            s.MessageHandler += (sender, args) =>
            {
                Interlocked.Increment(ref receivedMessages);
                ThreadPool.QueueUserWorkItem(o =>
                {
                    var str = Encoding.UTF8.GetString(args.Message.Data);
                    int j = Int32.Parse(str.Split(' ')[0]);
                    lock(receivedMsgs)
                        receivedMsgs[j]++;
                });
            };

            s.Start();
            rcvConn.Flush();
            sendConn.Flush();


            bool serverRestarted = false;
            ManualResetEvent sendFinished = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(o =>
            {
                while (!serverRestarted)
                {
                    try
                    {
                        Thread.Sleep(50);
                        byte[] msg = Encoding.UTF8.GetBytes(String.Format("{0} additionnalPayload", sentMessages));
                        msgs.Add(msg);
                        sentMsgs.Add(true);
                        lock(receivedMsgs)
                             receivedMsgs.Add(0);
                        sendConn.Publish("foo", msg);
                        sentMessages++;
                    }
                    catch (NATSConnectionClosedException)
                    {
                        // we are dead, we will never reconnect. Shall we retry ?
                        //sendConn.Dispose();
                        //sendConn = new ConnectionFactory().CreateConnection(sendOptions);
                        throw;

                    }
                    catch (Exception)
                    {
                        sentFailure++;
                    }
                }

                // after server is restarted, send more messages to check reconnect is ok
                for (int i = 0; i < 20; i++)
                {
                    try
                    {
                        Thread.Sleep(50);
                        byte[] msg = Encoding.UTF8.GetBytes(String.Format("{0} additionnalPayload", sentMessages));
                        msgs.Add(msg);
                        sentMsgs.Add(true);
                        lock (receivedMsgs)
                            receivedMsgs.Add(0);
                        sendConn.Publish("foo", msg);
                        sentMessages++;
                    }
                    catch (NATSConnectionClosedException)
                    {
                        // we are dead, we will never reconnect. Shall we retry ?
                        //sendConn.Dispose();
                        //sendConn = new ConnectionFactory().CreateConnection(sendOptions);
                        throw;
                    }
                    catch (Exception)
                    {
                        sentFailure++;
                    }
                }
                sendFinished.Set();
            });

            Thread.Sleep(500);
            // kill the server !
            int indexBeforeRestart = sentMessages;
            ns.Shutdown();

            // Wait for receiver to reconnect...
            //Assert.Equal(true, sendReconnected.WaitOne(10000));
            Assert.Equal(true, recvReconnected.WaitOne(10000));
            int indexAfterRestart = sentMessages;
            // sender will be in the second part
            serverRestarted = true;

            // wait for sending finished
            Assert.True(sendFinished.WaitOne(Int32.MaxValue));

            rcvConn.Flush();
            sendConn.Flush();

            // maybe give time to receive messages
            //Thread.Sleep(1000);

            // now check failures
            Assert.Equal(0, sentFailure);
            Assert.Equal(0, s.QueuedMessageCount);
            Assert.Equal(0, s.PendingMessages);
            Assert.Equal(0, s.Dropped);
            Assert.Equal(s.Delivered, receivedMessages);
            Assert.Equal(sendConn.Stats.OutMsgs, sentMessages); // on compte bien ?
            Assert.Equal(rcvConn.Stats.InMsgs, receivedMessages); // on compte bien ?
            Assert.Equal(sentMessages, receivedMessages);

            Assert.Equal(rcvConn.Stats.Reconnects, 1);
            //Assert.Equal(sendConn.Stats.Reconnects, 1);
        }
        ns.Shutdown();
        ns2.Shutdown();
        ns3.Shutdown();
    }

Travis CI for the .NET client

It would be nice to have continuous integration for the .NET client. Currently, Travis CI supports mono and a few test frameworks as beta. The .NATS C# test suite uses the Visual Studio Test Suite to avoid external dependencies.

Investigate:

  • Level of support (now and future) in Travis CI
  • Other user experiences of .NET in Travis CI
  • NUnit vs xunit, including level of effort to convert

Potential memory leak when closing connection quickly after opening it

I noticed in a loop that opens a new connection, then rapidly closes it (say there was a very small amount of data to transmit intermittently, but a persisted connection was not desired), that resources are not released as expected, and if the loop runs for long enough, an OutOfMemoryException is thrown. I found that a brief pause before closing the connection is sufficient to avoid the problem.

I have tested this in .NET 4.5, and .NET 4.6.1, using the NATS.Client 0.5.0 NuGet package, and in Debug and Release configuration.

To reproduce:
Step 1:
Create a .NET 4.6.1 C# Console application in VS. I used VS2015 Community Edition)

Step 2:
Install the NATS.Client package from NuGet. I ran install-package NATS.Client from the Package Manager Console.

Step 3:
Put the following code (C# 6.0) in the Program.cs file:

#define MEMORYLEAKSAREAWESOME

using static System.Console;
using static System.Threading.Thread;
using static System.Diagnostics.Stopwatch;
using static System.Diagnostics.Process;
using static System.Globalization.CultureInfo;

using NATS.Client;

namespace NatsLeakExample
{
    class Program
    {
        static void Main(string[] args)
        {
            var proc = GetCurrentProcess();
            var sw = StartNew();
            bool continueTest = true;
            ConnectionFactory cf = new ConnectionFactory();
            while (continueTest)
            {
                using (IConnection conn = cf.CreateConnection())
                {
#if !MEMORYLEAKSAREAWESOME
                    Sleep(100); // brief pause in the connection lifetime dodges the problem
#endif
                }
#if MEMORYLEAKSAREAWESOME
                Sleep(100); // this will not affect the connection, and a memory leak will result
#endif
                if (sw.ElapsedMilliseconds > 300000)
                {
                    continueTest = false;
                }
            }
            sw.Stop();
            long privateBytesUsed = proc.PrivateMemorySize64;   // I chose PrivateMemory as it reflected the memory usage graph in VS debugging
            string formattedBytesUsed = privateBytesUsed.ToString("#,##0,,M", InvariantCulture);

#if MEMORYLEAKSAREAWESOME
            string testStatus = "set";
#else
            string testStatus = "not set";
#endif

            WriteLine($"Memory used after 5 minutes when {testStatus} to enable the leak : {formattedBytesUsed}");
        }
    }
}

When MEMORYLEAKSAREAWESOME is defined, the memory leak will occur. When it is not defined, the memory leak will not occur.

Step 4:
Run the program.

Release output when MEMORYLEAKSAREAWESOME is defined:

Memory used after 5 minutes when set to enable the leak : 347M
Press any key to continue . . .

Release output when MEMORYLEAKSAREAWESOME is not defined:

Memory used after 5 minutes when not set to enable the leak : 16M
Press any key to continue . . .

This is just what I've noted, and created a proof of concept for. It may also be related to Issue #91.

Slow consumer false positive

i have been tracking some issues in this section of the code for past 24 hours. We have around 1,000 clients connected to NATS, and this issue will pop up on 1 about every 3-4 hours.

The symptoms is that the client receives a slow consumer exception. This initially caused some confusion, because the only code on the subscriber callback is a function where the message is added to a Queue for processing by a different thread.

i have downloaded the source, and have been putting some additional debug info in, and have found 2 situations where it has been triggered incorrectly.

Below is my test code:

// Check for a Slow Consumer if (pMsgs > pMsgsLimit || pBytes > pBytesLimit) { Console.WriteLine($"pMsgs: {pMsgs} pMsgsLimit: {pMsgsLimit}"); Console.WriteLine($"pBytes: {pBytes} pBytesLimit: {pBytesLimit}"); Console.ReadKey(); // slow consumer handleSlowConsumer(msg); return false; }

on 1 occasion this block was entered with the following items logged:

pMsgs:3
pMsgsLimit:65536
pBytes:4294967170
pBytesLimit:67108864

now this is clear why its there, but this is an error in its self. there is no way that it received 3 messages totalling 429mb. to verify this i looked at memory the process was using, and it was only using 55mb. so need to understand where this crazy value of pBytes has come from.

on another occasion i got the following logged:

pMsgs:2
pMsgsLimit:65536
pBytes:-126
pBytesLimit:67108864

so the first issue is i can't see how it made it past the if statement with those figures. secondly the pBytes figure is negative. i have seen a few instances where this is triggered when pBytes is negative.

lastly, my understanding is that the client is supposed to disconnect and reconnect if this happens. i have checked the code and can see no logic to do this, and from the results in my app, its not working.

right now my only guess is that this is a threading issue, but am pretty much at a loss here.

Edit:

Just had another one with following logged:

pMsgs:4294967295
pMsgsLimit:65536
pBytes:0
pBytesLimit:67108864

again, stats are just crazy, no way that it got that many messages, and even if it did, it would have stopped at 65537 not that far out...

any ideas?

Add ManyRequests feature

From official documentation:

NATS supports two flavors of request reply messaging: point-to-point or one-to-many. Point-to-point involves the fastest or first to respond. In a one-to-many exchange, you set a limit on the number of responses the requestor may receive.

Your implementation does not contain a method for one-to-many scenario.

Implementation example

public IEnumerable<Msg> RequestMany(string subject, byte[] data, int timeout)
{
    if (timeout <= 0)
    {
        throw new ArgumentException("Timeout must be greater that 0.", "timeout");
    }
    string inbox = InboxNameGen.NewInbox();
    SyncSubscription s = subscribeSync(inbox, null);
    publish(subject, inbox, data);
    Flush();
    var set = new List<Msg>();
    Msg m = null;
    do
    {
        try
        {
            m = s.NextMessage(timeout);
            set.Add(m);
        }
        catch(Exception ex)
        {
            break;
        }
    } while (m != null);
    s.unsubscribe(false);
    return set;
}

.NET Core Client

I wanted to evaluate NATS for a .NET Core project. Unfortunately, the NuGet is only for net45. Is it possible to have the next version also include a build for .NET Core?

Optionally sign the assembly

Users may want to sign the NATS client assembly themselves so signing should be an optional step.

NOTE: It should be done automatically when generating the official NUGET package.

Subscription EventHandler Async/Await Support

Are there any plans to support async / await within an EventHandler? Is this what #59 is referring to?

eg:

IAsyncSubscription s = c.SubscribeAsync("foo");
s.MessageHandler += async (sender, args) =>
{
    await SomeAsyncMethod(args.Message);
};
s.Start();

Timeout not working as expected

I'm doing some testing on failure scenarios, and am testing what happens if the NATS server is stopped while my process is running.

I'm finding the timeout periods seem to be getting ignored, assuming I've configured it correctly.

Here is my reproduce code.

using NATS.Client;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace TestNats
{
    class Program
    {
        static void Main(string[] args)
        {
            Task.Run(() => Run());
            Console.ReadLine();
            
        }

        static async Task Run()
        {
            Console.WriteLine("Hello World!");
            var cf = new ConnectionFactory();
            var opts = ConnectionFactory.GetDefaultOptions();
            opts.Timeout = 1000;
            opts.AllowReconnect = false;
            var conn = cf.CreateConnection(opts);
            conn.SubscribeAsync("test", (p, a) => conn.Publish(a.Message.Reply, new byte[0]));
            await conn.RequestAsync("test", new byte[0], 1000);

            Console.WriteLine("Now stop nats server, you have 5 seconds");
            Thread.Sleep(5000);
            
            var s = new Stopwatch();
            s.Start();
            try
            {
                await conn.RequestAsync("test", new byte[0], 1000);
                s.Stop();
                Console.WriteLine(s.ElapsedMilliseconds / 1000 + "s");
            }
            catch (Exception ex)
            {
                s.Stop();
                Console.WriteLine(s.ElapsedMilliseconds / 1000 + "s");
                Console.WriteLine(ex.Message);
            }
        }
    }
}

Output:

Hello World!
Now stop nats server, you have 5 seconds
60s
Timeout occurred.

Publish Failure during reconnect

I’m doing some advanced unit tests.
I have a 3 nodes cluster
I create 2 connections

1 for sending messages on node 1 on subject foo
1 for receiving messages on node 2 on subject foo
I start sending messages every 20 ms
I stop node 1 (the one that is connected to the send thread)

The publish of messages in failing with 2 different types of issues :

  • Socket or IO or NotSupportedException : the underlying stream is not in a good shape. After a while the connection is connected to another node, and I can retry sending the message
  • NATSConnectionClosedException : this one is dangerous, because the connection never get reconnected again. I have to create a new one.

Is is the expected behavior ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.