Coder Social home page Coder Social logo

nats.net.v1's Issues

Improve performance of the Request API

Look into enhancing performance of the Connection.Request API.

  • Improve latency in the benchmark through usage of a synchronous subscription when replying.
  • Replace calls to the public API with directly writing the protocol commands to the write buffer. This reduces contention in a number of ways, primarily through eliminating unnecessary kicks to the flusher thread.
  • Evaluate a "soft" flush - flushing the write buffer without a ping.

Based on #73.

Rx support

Daniel Wertheim has implemented a very nice Rx support in his .Net client. https://github.com/danielwertheim/mynatsclient
I really like that approach as it is fast, elegant and modern. On the other hand I prefer to use officially supported libraries as they tend over time to be more stable and supported.
Can you please consider to include the Rx support in the core of the official client lib?

Does/can Connection.Publish block?

The Publish method returns void. How long will it block?

It has its own local buffer? What happens when the buffer is full? How do I configure the buffer?

It appears to throw an exception if the connection is closed? But what about if we're reconnecting? Do we go into a closed state and then into a reconnecting state? Is "closed" a manual close only or does it go into that state with a connection fault?

Timeout not working as expected

I'm doing some testing on failure scenarios, and am testing what happens if the NATS server is stopped while my process is running.

I'm finding the timeout periods seem to be getting ignored, assuming I've configured it correctly.

Here is my reproduce code.

using NATS.Client;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace TestNats
{
    class Program
    {
        static void Main(string[] args)
        {
            Task.Run(() => Run());
            Console.ReadLine();
            
        }

        static async Task Run()
        {
            Console.WriteLine("Hello World!");
            var cf = new ConnectionFactory();
            var opts = ConnectionFactory.GetDefaultOptions();
            opts.Timeout = 1000;
            opts.AllowReconnect = false;
            var conn = cf.CreateConnection(opts);
            conn.SubscribeAsync("test", (p, a) => conn.Publish(a.Message.Reply, new byte[0]));
            await conn.RequestAsync("test", new byte[0], 1000);

            Console.WriteLine("Now stop nats server, you have 5 seconds");
            Thread.Sleep(5000);
            
            var s = new Stopwatch();
            s.Start();
            try
            {
                await conn.RequestAsync("test", new byte[0], 1000);
                s.Stop();
                Console.WriteLine(s.ElapsedMilliseconds / 1000 + "s");
            }
            catch (Exception ex)
            {
                s.Stop();
                Console.WriteLine(s.ElapsedMilliseconds / 1000 + "s");
                Console.WriteLine(ex.Message);
            }
        }
    }
}

Output:

Hello World!
Now stop nats server, you have 5 seconds
60s
Timeout occurred.

Add ManyRequests feature

From official documentation:

NATS supports two flavors of request reply messaging: point-to-point or one-to-many. Point-to-point involves the fastest or first to respond. In a one-to-many exchange, you set a limit on the number of responses the requestor may receive.

Your implementation does not contain a method for one-to-many scenario.

Implementation example

public IEnumerable<Msg> RequestMany(string subject, byte[] data, int timeout)
{
    if (timeout <= 0)
    {
        throw new ArgumentException("Timeout must be greater that 0.", "timeout");
    }
    string inbox = InboxNameGen.NewInbox();
    SyncSubscription s = subscribeSync(inbox, null);
    publish(subject, inbox, data);
    Flush();
    var set = new List<Msg>();
    Msg m = null;
    do
    {
        try
        {
            m = s.NextMessage(timeout);
            set.Add(m);
        }
        catch(Exception ex)
        {
            break;
        }
    } while (m != null);
    s.unsubscribe(false);
    return set;
}

Extreamly slow when send message to multiple Topics (300) for first time

Hi,
I did a test and result is weird. Please take a look

  1. I created 300 subscription that subscribed 300 Topic.
  2. I Publish 1 message to each Topic.
  3. Wait for all subscriptions receive first message.
    (takes over 290,000ms to send 300 messages to 300 Topics (1 message per Topic))
  4. Send 10 messages to each Topic
  5. wait for all subscription receive 10 messages
    (take 7ms) (takes over 7ms to send 3000 messages to 300 Topics (10 messages per Topic))

Im using:
NATS: nats-server version 0.9.4
OS: Window 10
CPU: Intel Xeon E3-1240
RAM: 8GB
Client: https://github.com/nats-io/csharp-nats
There is my code

    class Program
    {
        static void Main(string[] args)
        {
            var topics = 300;
            
            //init connection and subscribers

            var connFac = new ConnectionFactory();
            var subConn = connFac.CreateConnection();
            var pubConn = connFac.CreateConnection();

            var received = 0;
            var subscriptions = new List<IAsyncSubscription>();

            for (var topicId = 0; topicId < topics; topicId++)
            {
                var subscription = subConn.SubscribeAsync(topicId.ToString(),
                    (sender, x) => { Interlocked.Increment(ref received); });

                subscriptions.Add(subscription);
            }

            //1st run: 300 topics, 1 messages per topic

            var messagesPerTopic = 1;
            var noOfMessages = topics * messagesPerTopic;
            Console.WriteLine("Test with {0:n0} topics, {1:n0} messages per topic", topics, messagesPerTopic);

            var sw = Stopwatch.StartNew();
            for (var topicId = 0; topicId < topics; topicId++)
            {
                for (var i = 0; i < messagesPerTopic; i++)
                {
                    pubConn.Publish(topicId.ToString(), Encoding.UTF8.GetBytes("hello"));
                }
            }

            while (received < noOfMessages && sw.Elapsed.TotalMinutes < 5)
            {
                Thread.Sleep(1);
            }
            sw.Stop();

            Console.WriteLine("{0:n0} messages received in {1:n0}ms", received, sw.ElapsedMilliseconds);
            Console.WriteLine("{0:n0} msg/s", received * 1000 / sw.ElapsedMilliseconds);

            //2nd run: 300 topics, 10 messages per topic

            received = 0;
            messagesPerTopic = 10;
            noOfMessages = topics * messagesPerTopic;

            Console.WriteLine("Test with {0:n0} topics, {1:n0} messages per topic", topics, messagesPerTopic);

            sw = Stopwatch.StartNew();
            for (var topicId = 0; topicId < topics; topicId++)
            {
                for (var i = 0; i < messagesPerTopic; i++)
                {
                    pubConn.Publish(topicId.ToString(), Encoding.UTF8.GetBytes("hello"));
                }
            }

            while (received < noOfMessages && sw.Elapsed.TotalMinutes < 5)
            {
                Thread.Sleep(1);
            }
            sw.Stop();

            Console.WriteLine("{0:n0} messages received in {1:n0}ms", received, sw.ElapsedMilliseconds);
            Console.WriteLine("{0:n0} msg/s", received * 1000 / sw.ElapsedMilliseconds);

            foreach (var s in subscriptions)
            {
                s.Dispose();
            }

            pubConn.Dispose();
            subConn.Dispose();

        }
    }

Slow consumer false positive

i have been tracking some issues in this section of the code for past 24 hours. We have around 1,000 clients connected to NATS, and this issue will pop up on 1 about every 3-4 hours.

The symptoms is that the client receives a slow consumer exception. This initially caused some confusion, because the only code on the subscriber callback is a function where the message is added to a Queue for processing by a different thread.

i have downloaded the source, and have been putting some additional debug info in, and have found 2 situations where it has been triggered incorrectly.

Below is my test code:

// Check for a Slow Consumer if (pMsgs > pMsgsLimit || pBytes > pBytesLimit) { Console.WriteLine($"pMsgs: {pMsgs} pMsgsLimit: {pMsgsLimit}"); Console.WriteLine($"pBytes: {pBytes} pBytesLimit: {pBytesLimit}"); Console.ReadKey(); // slow consumer handleSlowConsumer(msg); return false; }

on 1 occasion this block was entered with the following items logged:

pMsgs:3
pMsgsLimit:65536
pBytes:4294967170
pBytesLimit:67108864

now this is clear why its there, but this is an error in its self. there is no way that it received 3 messages totalling 429mb. to verify this i looked at memory the process was using, and it was only using 55mb. so need to understand where this crazy value of pBytes has come from.

on another occasion i got the following logged:

pMsgs:2
pMsgsLimit:65536
pBytes:-126
pBytesLimit:67108864

so the first issue is i can't see how it made it past the if statement with those figures. secondly the pBytes figure is negative. i have seen a few instances where this is triggered when pBytes is negative.

lastly, my understanding is that the client is supposed to disconnect and reconnect if this happens. i have checked the code and can see no logic to do this, and from the results in my app, its not working.

right now my only guess is that this is a threading issue, but am pretty much at a loss here.

Edit:

Just had another one with following logged:

pMsgs:4294967295
pMsgsLimit:65536
pBytes:0
pBytesLimit:67108864

again, stats are just crazy, no way that it got that many messages, and even if it did, it would have stopped at 65537 not that far out...

any ideas?

Requestor sample does not work

Hi,

I'm using server 0.9.2 in Docker container on different machine (Ubuntu 14.04) to client (Windows 10). Subscribe and Publish work as designed. However, c.Request(subject, payload, 1000); times out. I modified the sample to add a timeout as the original c.Request(subject, payload); just hung. Url = "nats://i-am-hiding-server-ip-address:4222":

NATS.Client.NATSTimeoutException: Timeout occurred.
   at NATS.Client.Channel`1.get(Int32 timeout) in F:\csnats-master\csnats-master\NATS\Channel.cs:line 70
   at NATS.Client.SyncSubscription.NextMessage(Int32 timeout) in F:\csnats-master\csnats-master\NATS\SyncSub.cs:line 61
   at NATS.Client.Connection.request(String subject, Byte[] data, Int32 timeout) in F:\csnats-master\csnats-master\NATS\Conn.cs:line 1901
   at NATS.Client.Connection.Request(String subject, Byte[] data, Int32 timeout) in F:\csnats-master\csnats-master\NATS\Conn.cs:line 1917
   at NATSExamples.Requestor.Run(String[] args) in F:\csnats-master\csnats-master\examples\Requestor\Requestor.cs:line 40

Any ideas?

Warmest regards,

Garrard.

Request Problem

Hi, I play the example Requestor and Replier.
I found there's a exception when request return because
In Conn.cs

        internal virtual Msg request(string subject, byte[] data, int timeout)
        {
            Msg    m     = null;
            string inbox = NewInbox();

            SyncSubscription s = subscribeSync(inbox, null);
            s.AutoUnsubscribe(1);

            publish(subject, inbox, data);
            m = s.NextMessage(timeout);
            try
            {
                // the auto unsubscribe should handle this.
                s.Unsubscribe();
            }
            catch (Exception) {  /* NOOP */ }

            return m;
        }

when s.NextMessage(timeout);

            if (msg != null)
            {
                long d = tallyDeliveredMessage(msg);
                if (d == max)
                {
                    // Remove subscription if we have reached max.
                    localConn.removeSub(this);
                }
                if (localMax > 0 && d > localMax)
                {
                    throw new NATSMaxMessagesException();
                }
            }

localConn.removeSub(this);

so

       internal virtual void removeSub(Subscription s)
        {
            Subscription o;

            subs.TryRemove(s.sid, out o);
            if (s.mch != null)
            {
                s.mch.close();
                s.mch = null;
            }

            s.conn = null;
            s.closed = true;
        }

s.conn = null; here, but next return to Conn.cs and execute

              // the auto unsubscribe should handle this.
                s.Unsubscribe();
        public virtual void Unsubscribe()
        {
            Connection c;
            lock (mu)
            {
                c = this.conn;
            }

            if (c == null)
                throw new NATSBadSubscriptionException();

            c.unsubscribe(this, 0);
        }

the exception here

 if (c == null)
    throw new NATSBadSubscriptionException();

I have two problems

  1. when execute request and the max count =1 and unsubscribe in NextMessage() function so c always null here and the exception always throw?

2.if throw new NATSBadSubscriptionException(); it has significant impact on performance, and Is this a bug? or can I delete this exception such as if (c == null) return; ???

or is there something wrong with me?

Thanks All

Update .NET core dependencies

.NET core moves fast. Update dependencies for the client, tests, and samples to reference the latest stable libraries/frameworks.

Update: Best to wait and do this with the .NET core 2.0 release barring any security issues or major bugs.

I need a publish method where I can pass in the byte count

I would like to be able to pass in the byte count that goes with my byte[] passed to the Publish method. In other words, I want to set the "num" value in Connection.publish with a parameter so that I don't have to make an additional array copy of my data after serialization.

very very serious performance problem

Hi All,

This is a very simple program. but you can see that the subscriber callback function is very very slow and it suspend - run - suspend - run, you can see it on subscribe console.

I tested on Windows 8 64bit and gnatsd 0.9.4 and csnats 0.6.0

I've tested golang client, it seems better than cs client.

I've tested if subscribe maybe 1-2 subjects, it's ok, but if subscribe count more than 20 or 30. the performance is very very terrible.

Can anyone tell what's wrong with this? or how can I fix it?

I think subject count should't significant impact on performance and I only use 30-50. I think 30-50 is very very easy to NATS.

It's not about data size because I use just 10 bytes. and it's same as 500 bytes
I look at taskmgr, the cpu usage is very low, but it's very slow :(

Publish

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NATS.Client;
namespace Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            Options opts = ConnectionFactory.GetDefaultOptions();
            opts.Url = "nats://127.0.0.1:4222";
            string data = "01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
            using (IConnection c = new ConnectionFactory().CreateConnection(opts))
            {
                for(int i=0;i<100000; ++i)
                {
                    c.Publish("ABCDEFGHIJK_" + (i % 200).ToString(), Encoding.Default.GetBytes(data + i.ToString()));
                }
                c.Flush();
            }
            Console.WriteLine("Finish");
            Console.Read();
        }
    }
}

Subscriber

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NATS.Client;
namespace Subscribe
{
    class Program
    {
        static void Main(string[] args)
        {
            Options opts = ConnectionFactory.GetDefaultOptions();
            opts.Url = "nats://127.0.0.1:4222";

            int count = 0;
            object obj = new object();

            IConnection c = new ConnectionFactory().CreateConnection(opts);
            EventHandler<MsgHandlerEventArgs> msgHandler = (sender, args2) =>
            {
                lock(obj)
                {
                    count++;
                    Console.WriteLine(count);
                    if (count == 100000)
                    {
                        Console.WriteLine("Finish");
                    }
                }
            };


            for (int i = 0; i < 30; ++i)
            {
                string subject = "ABCDEFGHIJK_" + (i).ToString();
                c.SubscribeAsync(subject, msgHandler);
            }
            c.Flush();
            Console.Read();
        }
    }
}

Support large numbers of asynchronous subscribers in a single process

In order to support large numbers (thousands) of asynchronous subscribers in an application, using one task per subscriber to handle message events requires more resources than are typically available. An option should be provided that will setup a NATS connection to process all subscriber message events in a single task or thread, providing a choice of scalability over performance.

This will address #113.

Potential memory leak when closing connection quickly after opening it

I noticed in a loop that opens a new connection, then rapidly closes it (say there was a very small amount of data to transmit intermittently, but a persisted connection was not desired), that resources are not released as expected, and if the loop runs for long enough, an OutOfMemoryException is thrown. I found that a brief pause before closing the connection is sufficient to avoid the problem.

I have tested this in .NET 4.5, and .NET 4.6.1, using the NATS.Client 0.5.0 NuGet package, and in Debug and Release configuration.

To reproduce:
Step 1:
Create a .NET 4.6.1 C# Console application in VS. I used VS2015 Community Edition)

Step 2:
Install the NATS.Client package from NuGet. I ran install-package NATS.Client from the Package Manager Console.

Step 3:
Put the following code (C# 6.0) in the Program.cs file:

#define MEMORYLEAKSAREAWESOME

using static System.Console;
using static System.Threading.Thread;
using static System.Diagnostics.Stopwatch;
using static System.Diagnostics.Process;
using static System.Globalization.CultureInfo;

using NATS.Client;

namespace NatsLeakExample
{
    class Program
    {
        static void Main(string[] args)
        {
            var proc = GetCurrentProcess();
            var sw = StartNew();
            bool continueTest = true;
            ConnectionFactory cf = new ConnectionFactory();
            while (continueTest)
            {
                using (IConnection conn = cf.CreateConnection())
                {
#if !MEMORYLEAKSAREAWESOME
                    Sleep(100); // brief pause in the connection lifetime dodges the problem
#endif
                }
#if MEMORYLEAKSAREAWESOME
                Sleep(100); // this will not affect the connection, and a memory leak will result
#endif
                if (sw.ElapsedMilliseconds > 300000)
                {
                    continueTest = false;
                }
            }
            sw.Stop();
            long privateBytesUsed = proc.PrivateMemorySize64;   // I chose PrivateMemory as it reflected the memory usage graph in VS debugging
            string formattedBytesUsed = privateBytesUsed.ToString("#,##0,,M", InvariantCulture);

#if MEMORYLEAKSAREAWESOME
            string testStatus = "set";
#else
            string testStatus = "not set";
#endif

            WriteLine($"Memory used after 5 minutes when {testStatus} to enable the leak : {formattedBytesUsed}");
        }
    }
}

When MEMORYLEAKSAREAWESOME is defined, the memory leak will occur. When it is not defined, the memory leak will not occur.

Step 4:
Run the program.

Release output when MEMORYLEAKSAREAWESOME is defined:

Memory used after 5 minutes when set to enable the leak : 347M
Press any key to continue . . .

Release output when MEMORYLEAKSAREAWESOME is not defined:

Memory used after 5 minutes when not set to enable the leak : 16M
Press any key to continue . . .

This is just what I've noted, and created a proof of concept for. It may also be related to Issue #91.

Missing url in Disconnect/Closed events

I have this code for the connection callbacks:

Options opts = ConnectionFactory.GetDefaultOptions();
opts.ClosedEventHandler += ( s, e ) => {
    _logger.Error( "Connection to {0} was closed.", e.Conn.ConnectedUrl );
};
opts.DisconnectedEventHandler += ( s, e ) => {
    _logger.Error( "Disconnected from {0}.", e.Conn.ConnectedUrl );
};

It seems that the e.Conn.ConnectedUrl is null on both events, which makes it hard to know from which server one lost the connection.

Also, I miss a "ConnectedEventHandler" so that I can know when the first connection actually succeeded and to which server (ReconnectedEventHandler doesn't fire on inital connections).

Subscription EventHandler Async/Await Support

Are there any plans to support async / await within an EventHandler? Is this what #59 is referring to?

eg:

IAsyncSubscription s = c.SubscribeAsync("foo");
s.MessageHandler += async (sender, args) =>
{
    await SomeAsyncMethod(args.Message);
};
s.Start();

.NET 4.5 Code Compatibility

Previously, .NET 4.0 code compatibility was maintained. As .NET 4.0 is no longer supported, move to use features found in .NET 4.5.

Travis CI for the .NET client

It would be nice to have continuous integration for the .NET client. Currently, Travis CI supports mono and a few test frameworks as beta. The .NATS C# test suite uses the Visual Studio Test Suite to avoid external dependencies.

Investigate:

  • Level of support (now and future) in Travis CI
  • Other user experiences of .NET in Travis CI
  • NUnit vs xunit, including level of effort to convert

Request threading issues?

Hi Colin,
I'm doing some fairly basic tests at the moment to explore throughput. I've a system that I want to Request about 20 sources and aggregate the results.

If I just use publish (using the same approach as below), then I can comfortably publish 10,000 messages in <300ms on my PC. So I put together the following snippet with Request, and I was expecting something similar, but am seeing NATSTimeoutException instead.

                Msg msg = null;
                ConnectionFactory cf = new ConnectionFactory();
                var sw = new Stopwatch();
                sw.Start();
                using (var c = cf.CreateConnection())
                {
                    var tasks = Enumerable.Range(0, 10000).Select(x =>
                         Task.Factory.StartNew(() =>
                         {
                           msg = c.Request("AppointmentSearch", new AppointmentSearch().ToMsg(), 10 * 1000);
                         })
                    );
                    Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(10));
                }
                return sw.ElapsedMilliseconds;

Looking at gnatsd -V I can see a throughput of about 1 message per second. This doesn't seem right to me? Is there anything I can look at?

AsyncSub Messages lost during reconnect

I’m doing some advanced unit tests.
I have a 3 nodes cluster
I create 2 connections

  • 1 for sending messages on node 1 on subject foo
  • 1 for receiving messages on node 2 on subject foo

I start sending messages every 50 ms
I stop node 2 (the one that is used by the AsyncSub)
The AsyncSubscription is loosing all the messages that have been sent before it get reconnected to another node.

Is is the expected behavior ?

Here is a unit test that can be put in UnitTestReconnect.cs

[Fact]
public void TestIntensivePublishShouldNotFail()
{
List<byte[]> msgs = new List<byte[]>();
List sentMsgs = new List();
List receivedMsgs = new List();

        ManualResetEvent recvReconnected = new ManualResetEvent(false);

        // one connection for receiving
        Options recvOptions = ConnectionFactory.GetDefaultOptions();
        recvOptions.Servers = new [] { "nats://localhost:1222", "nats://localhost:1223", "nats://localhost:1224" }; 
        recvOptions.NoRandomize = true;
        recvOptions.AsyncErrorEventHandler += (sender, args) => { Debug.WriteLine("Receive Error " + args.Error + " Dropped=" + args.Subscription.Dropped); };
        recvOptions.ClosedEventHandler += (sender, args) => { Debug.WriteLine("Receive Closed "); };
        recvOptions.DisconnectedEventHandler += (sender, args) => { Debug.WriteLine("Receive Disconnected "); };
        recvOptions.ReconnectedEventHandler += (sender, args) => { recvReconnected.Set();Debug.WriteLine("Receive Reconnected "); };

        ManualResetEvent sendReconnected = new ManualResetEvent(false);

        // one connection for sending (better for sending big amount of data
        Options sendOptions = ConnectionFactory.GetDefaultOptions();
        sendOptions.Servers = new [] { "nats://localhost:1223", "nats://localhost:1222", "nats://localhost:1224" }; 
        sendOptions.NoRandomize = true;
        sendOptions.AsyncErrorEventHandler += (sender, args) => { Debug.WriteLine("Send Error " + args.Error); };
        sendOptions.ClosedEventHandler += (sender, args) => { Debug.WriteLine("Send Closed "); };
        sendOptions.DisconnectedEventHandler += (sender, args) => { Debug.WriteLine("Send Disconnected "); };
        sendOptions.ReconnectedEventHandler += (sender, args) => { sendReconnected.Set();Debug.WriteLine("Send Reconnected "); };


        // create server
        NATSServer ns = utils.CreateServerWithArgs("-p 1222 -cluster nats://localhost:1422 -routes nats://localhost:1423,nats://localhost:1424 -D");
        NATSServer ns2 = utils.CreateServerWithArgs("-p 1223 -cluster nats://localhost:1423 -routes nats://localhost:1422,nats://localhost:1424 -D");
        NATSServer ns3 = utils.CreateServerWithArgs("-p 1224 -cluster nats://localhost:1424 -routes nats://localhost:1422,nats://localhost:1423 -D");
        Thread.Sleep(1000);

        using (IConnection sendConn = new ConnectionFactory().CreateConnection(sendOptions))
        using (IConnection rcvConn = new ConnectionFactory().CreateConnection(recvOptions))
        {
            int receivedMessages = 0;
            int sentMessages = 0;
            int sentFailure = 0;

            IAsyncSubscription s = rcvConn.SubscribeAsync("foo");
            s.MessageHandler += (sender, args) =>
            {
                Interlocked.Increment(ref receivedMessages);
                ThreadPool.QueueUserWorkItem(o =>
                {
                    var str = Encoding.UTF8.GetString(args.Message.Data);
                    int j = Int32.Parse(str.Split(' ')[0]);
                    lock(receivedMsgs)
                        receivedMsgs[j]++;
                });
            };

            s.Start();
            rcvConn.Flush();
            sendConn.Flush();


            bool serverRestarted = false;
            ManualResetEvent sendFinished = new ManualResetEvent(false);
            ThreadPool.QueueUserWorkItem(o =>
            {
                while (!serverRestarted)
                {
                    try
                    {
                        Thread.Sleep(50);
                        byte[] msg = Encoding.UTF8.GetBytes(String.Format("{0} additionnalPayload", sentMessages));
                        msgs.Add(msg);
                        sentMsgs.Add(true);
                        lock(receivedMsgs)
                             receivedMsgs.Add(0);
                        sendConn.Publish("foo", msg);
                        sentMessages++;
                    }
                    catch (NATSConnectionClosedException)
                    {
                        // we are dead, we will never reconnect. Shall we retry ?
                        //sendConn.Dispose();
                        //sendConn = new ConnectionFactory().CreateConnection(sendOptions);
                        throw;

                    }
                    catch (Exception)
                    {
                        sentFailure++;
                    }
                }

                // after server is restarted, send more messages to check reconnect is ok
                for (int i = 0; i < 20; i++)
                {
                    try
                    {
                        Thread.Sleep(50);
                        byte[] msg = Encoding.UTF8.GetBytes(String.Format("{0} additionnalPayload", sentMessages));
                        msgs.Add(msg);
                        sentMsgs.Add(true);
                        lock (receivedMsgs)
                            receivedMsgs.Add(0);
                        sendConn.Publish("foo", msg);
                        sentMessages++;
                    }
                    catch (NATSConnectionClosedException)
                    {
                        // we are dead, we will never reconnect. Shall we retry ?
                        //sendConn.Dispose();
                        //sendConn = new ConnectionFactory().CreateConnection(sendOptions);
                        throw;
                    }
                    catch (Exception)
                    {
                        sentFailure++;
                    }
                }
                sendFinished.Set();
            });

            Thread.Sleep(500);
            // kill the server !
            int indexBeforeRestart = sentMessages;
            ns.Shutdown();

            // Wait for receiver to reconnect...
            //Assert.Equal(true, sendReconnected.WaitOne(10000));
            Assert.Equal(true, recvReconnected.WaitOne(10000));
            int indexAfterRestart = sentMessages;
            // sender will be in the second part
            serverRestarted = true;

            // wait for sending finished
            Assert.True(sendFinished.WaitOne(Int32.MaxValue));

            rcvConn.Flush();
            sendConn.Flush();

            // maybe give time to receive messages
            //Thread.Sleep(1000);

            // now check failures
            Assert.Equal(0, sentFailure);
            Assert.Equal(0, s.QueuedMessageCount);
            Assert.Equal(0, s.PendingMessages);
            Assert.Equal(0, s.Dropped);
            Assert.Equal(s.Delivered, receivedMessages);
            Assert.Equal(sendConn.Stats.OutMsgs, sentMessages); // on compte bien ?
            Assert.Equal(rcvConn.Stats.InMsgs, receivedMessages); // on compte bien ?
            Assert.Equal(sentMessages, receivedMessages);

            Assert.Equal(rcvConn.Stats.Reconnects, 1);
            //Assert.Equal(sendConn.Stats.Reconnects, 1);
        }
        ns.Shutdown();
        ns2.Shutdown();
        ns3.Shutdown();
    }

Benchmarks

Which exactly code you run to get the performance of several millions messages per sec and what hardware do you use? Is it VM on *Nix? I played with pure .NET's TCP sockets recently and cannot get such numbers even on localhost loopback. I am looking through the code to try to understand is it buffering or something else that yields such results. Also, as far as I understand you do not use IOCP or SocketAsyncEventArgs?

How Can I use TLS by ‘bidirectional authentication’

Now our company plan to using nats server by 'bidirectional authentication', I reference the example code:

Options opts = ConnectionFactory.GetDefaultOptions();
opts.Secure = true;

    // .NET requires the private key and cert in the
    //  same file. 'client.pfx' is generated from:
    //
    // openssl pkcs12 -export -out client.pfx
    //    -inkey client-key.pem -in client-cert.pem
    X509Certificate2 cert = new X509Certificate2("client.pfx", "password");

    opts.AddCertificate(cert);

    IConnection c = new ConnectionFactory().CreateConnection(opts);  

But it not works, I try to implement it by Java code according to Java Tls example, it works fine.

So, could you please let me know or get me some tip:

  1. How can I set the client certification file, when I use 'bidirectional authentication'?
  2. Need I write more code to implement it?

Looking forward your response.

Thank you!
Windy

Benchmark latency test needs to be asynchronous

An asynchronous subscriber needs to handle the return message from the server. The test erroneously assumes the flush will return before the published message arrives back to the subscriber.

Optionally sign the assembly

Users may want to sign the NATS client assembly themselves so signing should be an optional step.

NOTE: It should be done automatically when generating the official NUGET package.

.NET Core Client

I wanted to evaluate NATS for a .NET Core project. Unfortunately, the NuGet is only for net45. Is it possible to have the next version also include a build for .NET Core?

Publish Failure during reconnect

I’m doing some advanced unit tests.
I have a 3 nodes cluster
I create 2 connections

1 for sending messages on node 1 on subject foo
1 for receiving messages on node 2 on subject foo
I start sending messages every 20 ms
I stop node 1 (the one that is connected to the send thread)

The publish of messages in failing with 2 different types of issues :

  • Socket or IO or NotSupportedException : the underlying stream is not in a good shape. After a while the connection is connected to another node, and I can retry sending the message
  • NATSConnectionClosedException : this one is dangerous, because the connection never get reconnected again. I have to create a new one.

Is is the expected behavior ?

Add prerelease packages on NuGet

Pre-Release NuGet Packages should be provided for users to take advantage of new features without needing to compile themselves, and is very convenient for .NET core users. This will be a build of new features on the master branch that pass all tests.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.