nats-io / nats.net.v1 Goto Github PK
View Code? Open in Web Editor NEWThe official C# Client for NATS
License: Apache License 2.0
The official C# Client for NATS
License: Apache License 2.0
Look into enhancing performance of the Connection.Request API.
Based on #73.
This was an issue in the GO client, and needs to be investigated in the C# client.
See nats-io/nats.go#118
Daniel Wertheim has implemented a very nice Rx support in his .Net client. https://github.com/danielwertheim/mynatsclient
I really like that approach as it is fast, elegant and modern. On the other hand I prefer to use officially supported libraries as they tend over time to be more stable and supported.
Can you please consider to include the Rx support in the core of the official client lib?
The Publish method returns void. How long will it block?
It has its own local buffer? What happens when the buffer is full? How do I configure the buffer?
It appears to throw an exception if the connection is closed? But what about if we're reconnecting? Do we go into a closed state and then into a reconnecting state? Is "closed" a manual close only or does it go into that state with a connection fault?
I'm doing some testing on failure scenarios, and am testing what happens if the NATS server is stopped while my process is running.
I'm finding the timeout periods seem to be getting ignored, assuming I've configured it correctly.
Here is my reproduce code.
using NATS.Client;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace TestNats
{
class Program
{
static void Main(string[] args)
{
Task.Run(() => Run());
Console.ReadLine();
}
static async Task Run()
{
Console.WriteLine("Hello World!");
var cf = new ConnectionFactory();
var opts = ConnectionFactory.GetDefaultOptions();
opts.Timeout = 1000;
opts.AllowReconnect = false;
var conn = cf.CreateConnection(opts);
conn.SubscribeAsync("test", (p, a) => conn.Publish(a.Message.Reply, new byte[0]));
await conn.RequestAsync("test", new byte[0], 1000);
Console.WriteLine("Now stop nats server, you have 5 seconds");
Thread.Sleep(5000);
var s = new Stopwatch();
s.Start();
try
{
await conn.RequestAsync("test", new byte[0], 1000);
s.Stop();
Console.WriteLine(s.ElapsedMilliseconds / 1000 + "s");
}
catch (Exception ex)
{
s.Stop();
Console.WriteLine(s.ElapsedMilliseconds / 1000 + "s");
Console.WriteLine(ex.Message);
}
}
}
}
Output:
Hello World!
Now stop nats server, you have 5 seconds
60s
Timeout occurred.
From official documentation:
NATS supports two flavors of request reply messaging: point-to-point or one-to-many. Point-to-point involves the fastest or first to respond. In a one-to-many exchange, you set a limit on the number of responses the requestor may receive.
Your implementation does not contain a method for one-to-many scenario.
Implementation example
public IEnumerable<Msg> RequestMany(string subject, byte[] data, int timeout)
{
if (timeout <= 0)
{
throw new ArgumentException("Timeout must be greater that 0.", "timeout");
}
string inbox = InboxNameGen.NewInbox();
SyncSubscription s = subscribeSync(inbox, null);
publish(subject, inbox, data);
Flush();
var set = new List<Msg>();
Msg m = null;
do
{
try
{
m = s.NextMessage(timeout);
set.Add(m);
}
catch(Exception ex)
{
break;
}
} while (m != null);
s.unsubscribe(false);
return set;
}
Hi,
I did a test and result is weird. Please take a look
Im using:
NATS: nats-server version 0.9.4
OS: Window 10
CPU: Intel Xeon E3-1240
RAM: 8GB
Client: https://github.com/nats-io/csharp-nats
There is my code
class Program
{
static void Main(string[] args)
{
var topics = 300;
//init connection and subscribers
var connFac = new ConnectionFactory();
var subConn = connFac.CreateConnection();
var pubConn = connFac.CreateConnection();
var received = 0;
var subscriptions = new List<IAsyncSubscription>();
for (var topicId = 0; topicId < topics; topicId++)
{
var subscription = subConn.SubscribeAsync(topicId.ToString(),
(sender, x) => { Interlocked.Increment(ref received); });
subscriptions.Add(subscription);
}
//1st run: 300 topics, 1 messages per topic
var messagesPerTopic = 1;
var noOfMessages = topics * messagesPerTopic;
Console.WriteLine("Test with {0:n0} topics, {1:n0} messages per topic", topics, messagesPerTopic);
var sw = Stopwatch.StartNew();
for (var topicId = 0; topicId < topics; topicId++)
{
for (var i = 0; i < messagesPerTopic; i++)
{
pubConn.Publish(topicId.ToString(), Encoding.UTF8.GetBytes("hello"));
}
}
while (received < noOfMessages && sw.Elapsed.TotalMinutes < 5)
{
Thread.Sleep(1);
}
sw.Stop();
Console.WriteLine("{0:n0} messages received in {1:n0}ms", received, sw.ElapsedMilliseconds);
Console.WriteLine("{0:n0} msg/s", received * 1000 / sw.ElapsedMilliseconds);
//2nd run: 300 topics, 10 messages per topic
received = 0;
messagesPerTopic = 10;
noOfMessages = topics * messagesPerTopic;
Console.WriteLine("Test with {0:n0} topics, {1:n0} messages per topic", topics, messagesPerTopic);
sw = Stopwatch.StartNew();
for (var topicId = 0; topicId < topics; topicId++)
{
for (var i = 0; i < messagesPerTopic; i++)
{
pubConn.Publish(topicId.ToString(), Encoding.UTF8.GetBytes("hello"));
}
}
while (received < noOfMessages && sw.Elapsed.TotalMinutes < 5)
{
Thread.Sleep(1);
}
sw.Stop();
Console.WriteLine("{0:n0} messages received in {1:n0}ms", received, sw.ElapsedMilliseconds);
Console.WriteLine("{0:n0} msg/s", received * 1000 / sw.ElapsedMilliseconds);
foreach (var s in subscriptions)
{
s.Dispose();
}
pubConn.Dispose();
subConn.Dispose();
}
}
The parser in the connection, Connection.ps
, may not be needed and can likely be removed.
See: nats-io/nats.go#209
Support the Paket dependency manager.
i have been tracking some issues in this section of the code for past 24 hours. We have around 1,000 clients connected to NATS, and this issue will pop up on 1 about every 3-4 hours.
The symptoms is that the client receives a slow consumer exception. This initially caused some confusion, because the only code on the subscriber callback is a function where the message is added to a Queue for processing by a different thread.
i have downloaded the source, and have been putting some additional debug info in, and have found 2 situations where it has been triggered incorrectly.
Below is my test code:
// Check for a Slow Consumer if (pMsgs > pMsgsLimit || pBytes > pBytesLimit) { Console.WriteLine($"pMsgs: {pMsgs} pMsgsLimit: {pMsgsLimit}"); Console.WriteLine($"pBytes: {pBytes} pBytesLimit: {pBytesLimit}"); Console.ReadKey(); // slow consumer handleSlowConsumer(msg); return false; }
on 1 occasion this block was entered with the following items logged:
pMsgs:3
pMsgsLimit:65536
pBytes:4294967170
pBytesLimit:67108864
now this is clear why its there, but this is an error in its self. there is no way that it received 3 messages totalling 429mb. to verify this i looked at memory the process was using, and it was only using 55mb. so need to understand where this crazy value of pBytes has come from.
on another occasion i got the following logged:
pMsgs:2
pMsgsLimit:65536
pBytes:-126
pBytesLimit:67108864
so the first issue is i can't see how it made it past the if statement with those figures. secondly the pBytes figure is negative. i have seen a few instances where this is triggered when pBytes is negative.
lastly, my understanding is that the client is supposed to disconnect and reconnect if this happens. i have checked the code and can see no logic to do this, and from the results in my app, its not working.
right now my only guess is that this is a threading issue, but am pretty much at a loss here.
Edit:
Just had another one with following logged:
pMsgs:4294967295
pMsgsLimit:65536
pBytes:0
pBytesLimit:67108864
again, stats are just crazy, no way that it got that many messages, and even if it did, it would have stopped at 65537 not that far out...
any ideas?
Hi,
I'm using server 0.9.2 in Docker container on different machine (Ubuntu 14.04) to client (Windows 10). Subscribe and Publish work as designed. However, c.Request(subject, payload, 1000);
times out. I modified the sample to add a timeout as the original c.Request(subject, payload);
just hung. Url = "nats://i-am-hiding-server-ip-address:4222"
:
NATS.Client.NATSTimeoutException: Timeout occurred.
at NATS.Client.Channel`1.get(Int32 timeout) in F:\csnats-master\csnats-master\NATS\Channel.cs:line 70
at NATS.Client.SyncSubscription.NextMessage(Int32 timeout) in F:\csnats-master\csnats-master\NATS\SyncSub.cs:line 61
at NATS.Client.Connection.request(String subject, Byte[] data, Int32 timeout) in F:\csnats-master\csnats-master\NATS\Conn.cs:line 1901
at NATS.Client.Connection.Request(String subject, Byte[] data, Int32 timeout) in F:\csnats-master\csnats-master\NATS\Conn.cs:line 1917
at NATSExamples.Requestor.Run(String[] args) in F:\csnats-master\csnats-master\examples\Requestor\Requestor.cs:line 40
Any ideas?
Warmest regards,
Garrard.
Hi, I play the example Requestor and Replier.
I found there's a exception when request return because
In Conn.cs
internal virtual Msg request(string subject, byte[] data, int timeout)
{
Msg m = null;
string inbox = NewInbox();
SyncSubscription s = subscribeSync(inbox, null);
s.AutoUnsubscribe(1);
publish(subject, inbox, data);
m = s.NextMessage(timeout);
try
{
// the auto unsubscribe should handle this.
s.Unsubscribe();
}
catch (Exception) { /* NOOP */ }
return m;
}
when s.NextMessage(timeout);
if (msg != null)
{
long d = tallyDeliveredMessage(msg);
if (d == max)
{
// Remove subscription if we have reached max.
localConn.removeSub(this);
}
if (localMax > 0 && d > localMax)
{
throw new NATSMaxMessagesException();
}
}
localConn.removeSub(this);
so
internal virtual void removeSub(Subscription s)
{
Subscription o;
subs.TryRemove(s.sid, out o);
if (s.mch != null)
{
s.mch.close();
s.mch = null;
}
s.conn = null;
s.closed = true;
}
s.conn = null; here, but next return to Conn.cs and execute
// the auto unsubscribe should handle this.
s.Unsubscribe();
public virtual void Unsubscribe()
{
Connection c;
lock (mu)
{
c = this.conn;
}
if (c == null)
throw new NATSBadSubscriptionException();
c.unsubscribe(this, 0);
}
the exception here
if (c == null)
throw new NATSBadSubscriptionException();
I have two problems
2.if throw new NATSBadSubscriptionException(); it has significant impact on performance, and Is this a bug? or can I delete this exception such as if (c == null) return; ???
or is there something wrong with me?
Thanks All
.NET core moves fast. Update dependencies for the client, tests, and samples to reference the latest stable libraries/frameworks.
Update: Best to wait and do this with the .NET core 2.0 release barring any security issues or major bugs.
Add functionality to mirror the go-nats feature, https://github.com/nats-io/go-nats/pull/295.
I would like to be able to pass in the byte count that goes with my byte[] passed to the Publish method. In other words, I want to set the "num" value in Connection.publish with a parameter so that I don't have to make an additional array copy of my data after serialization.
Hi All,
This is a very simple program. but you can see that the subscriber callback function is very very slow and it suspend - run - suspend - run, you can see it on subscribe console.
I tested on Windows 8 64bit and gnatsd 0.9.4 and csnats 0.6.0
I've tested golang client, it seems better than cs client.
I've tested if subscribe maybe 1-2 subjects, it's ok, but if subscribe count more than 20 or 30. the performance is very very terrible.
Can anyone tell what's wrong with this? or how can I fix it?
I think subject count should't significant impact on performance and I only use 30-50. I think 30-50 is very very easy to NATS.
It's not about data size because I use just 10 bytes. and it's same as 500 bytes
I look at taskmgr, the cpu usage is very low, but it's very slow :(
Publish
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NATS.Client;
namespace Publish
{
class Program
{
static void Main(string[] args)
{
Options opts = ConnectionFactory.GetDefaultOptions();
opts.Url = "nats://127.0.0.1:4222";
string data = "01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
using (IConnection c = new ConnectionFactory().CreateConnection(opts))
{
for(int i=0;i<100000; ++i)
{
c.Publish("ABCDEFGHIJK_" + (i % 200).ToString(), Encoding.Default.GetBytes(data + i.ToString()));
}
c.Flush();
}
Console.WriteLine("Finish");
Console.Read();
}
}
}
Subscriber
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NATS.Client;
namespace Subscribe
{
class Program
{
static void Main(string[] args)
{
Options opts = ConnectionFactory.GetDefaultOptions();
opts.Url = "nats://127.0.0.1:4222";
int count = 0;
object obj = new object();
IConnection c = new ConnectionFactory().CreateConnection(opts);
EventHandler<MsgHandlerEventArgs> msgHandler = (sender, args2) =>
{
lock(obj)
{
count++;
Console.WriteLine(count);
if (count == 100000)
{
Console.WriteLine("Finish");
}
}
};
for (int i = 0; i < 30; ++i)
{
string subject = "ABCDEFGHIJK_" + (i).ToString();
c.SubscribeAsync(subject, msgHandler);
}
c.Flush();
Console.Read();
}
}
}
The unit tests seem to randomly fail in CI with connection related issues.
In order to support large numbers (thousands) of asynchronous subscribers in an application, using one task per subscriber to handle message events requires more resources than are typically available. An option should be provided that will setup a NATS connection to process all subscriber message events in a single task or thread, providing a choice of scalability over performance.
This will address #113.
Have you considered adding a pre-built package to nuget?
I noticed in a loop that opens a new connection, then rapidly closes it (say there was a very small amount of data to transmit intermittently, but a persisted connection was not desired), that resources are not released as expected, and if the loop runs for long enough, an OutOfMemoryException is thrown. I found that a brief pause before closing the connection is sufficient to avoid the problem.
I have tested this in .NET 4.5, and .NET 4.6.1, using the NATS.Client 0.5.0 NuGet package, and in Debug and Release configuration.
To reproduce:
Step 1:
Create a .NET 4.6.1 C# Console application in VS. I used VS2015 Community Edition)
Step 2:
Install the NATS.Client package from NuGet. I ran install-package NATS.Client
from the Package Manager Console.
Step 3:
Put the following code (C# 6.0) in the Program.cs file:
#define MEMORYLEAKSAREAWESOME
using static System.Console;
using static System.Threading.Thread;
using static System.Diagnostics.Stopwatch;
using static System.Diagnostics.Process;
using static System.Globalization.CultureInfo;
using NATS.Client;
namespace NatsLeakExample
{
class Program
{
static void Main(string[] args)
{
var proc = GetCurrentProcess();
var sw = StartNew();
bool continueTest = true;
ConnectionFactory cf = new ConnectionFactory();
while (continueTest)
{
using (IConnection conn = cf.CreateConnection())
{
#if !MEMORYLEAKSAREAWESOME
Sleep(100); // brief pause in the connection lifetime dodges the problem
#endif
}
#if MEMORYLEAKSAREAWESOME
Sleep(100); // this will not affect the connection, and a memory leak will result
#endif
if (sw.ElapsedMilliseconds > 300000)
{
continueTest = false;
}
}
sw.Stop();
long privateBytesUsed = proc.PrivateMemorySize64; // I chose PrivateMemory as it reflected the memory usage graph in VS debugging
string formattedBytesUsed = privateBytesUsed.ToString("#,##0,,M", InvariantCulture);
#if MEMORYLEAKSAREAWESOME
string testStatus = "set";
#else
string testStatus = "not set";
#endif
WriteLine($"Memory used after 5 minutes when {testStatus} to enable the leak : {formattedBytesUsed}");
}
}
}
When MEMORYLEAKSAREAWESOME
is defined, the memory leak will occur. When it is not defined, the memory leak will not occur.
Step 4:
Run the program.
Release output when MEMORYLEAKSAREAWESOME
is defined:
Memory used after 5 minutes when set to enable the leak : 347M
Press any key to continue . . .
Release output when MEMORYLEAKSAREAWESOME
is not defined:
Memory used after 5 minutes when not set to enable the leak : 16M
Press any key to continue . . .
This is just what I've noted, and created a proof of concept for. It may also be related to Issue #91.
I have this code for the connection callbacks:
Options opts = ConnectionFactory.GetDefaultOptions();
opts.ClosedEventHandler += ( s, e ) => {
_logger.Error( "Connection to {0} was closed.", e.Conn.ConnectedUrl );
};
opts.DisconnectedEventHandler += ( s, e ) => {
_logger.Error( "Disconnected from {0}.", e.Conn.ConnectedUrl );
};
It seems that the e.Conn.ConnectedUrl is null on both events, which makes it hard to know from which server one lost the connection.
Also, I miss a "ConnectedEventHandler" so that I can know when the first connection actually succeeded and to which server (ReconnectedEventHandler doesn't fire on inital connections).
Parallel the go-nats client fix for this found here.
When trying to pass credentials via URL, nats://user:pwd@ubuntu01:4223
you don't seem to handle passwords that contains e.g. "@".
Tried switching for %40, did not work either.
Adding the Nuget package for NATS.Client on Linux results in getting a NATS.Client.DLL file. This file cannot be found when attempting to load assemblies unless it is renamed NATS.Client.dll.
Are there any plans to support async / await within an EventHandler? Is this what #59 is referring to?
eg:
IAsyncSubscription s = c.SubscribeAsync("foo");
s.MessageHandler += async (sender, args) =>
{
await SomeAsyncMethod(args.Message);
};
s.Start();
Previously, .NET 4.0 code compatibility was maintained. As .NET 4.0 is no longer supported, move to use features found in .NET 4.5.
It would be nice to have continuous integration for the .NET client. Currently, Travis CI supports mono and a few test frameworks as beta. The .NATS C# test suite uses the Visual Studio Test Suite to avoid external dependencies.
Investigate:
Hi Colin,
I'm doing some fairly basic tests at the moment to explore throughput. I've a system that I want to Request about 20 sources and aggregate the results.
If I just use publish (using the same approach as below), then I can comfortably publish 10,000 messages in <300ms on my PC. So I put together the following snippet with Request, and I was expecting something similar, but am seeing NATSTimeoutException
instead.
Msg msg = null;
ConnectionFactory cf = new ConnectionFactory();
var sw = new Stopwatch();
sw.Start();
using (var c = cf.CreateConnection())
{
var tasks = Enumerable.Range(0, 10000).Select(x =>
Task.Factory.StartNew(() =>
{
msg = c.Request("AppointmentSearch", new AppointmentSearch().ToMsg(), 10 * 1000);
})
);
Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(10));
}
return sw.ElapsedMilliseconds;
Looking at gnatsd -V
I can see a throughput of about 1 message per second. This doesn't seem right to me? Is there anything I can look at?
Some user references remain in NATS objects after they are disposed, namely event handlers. Look at all NATS objects for these occurrences, and release any references to user objects during Dispose.
I’m doing some advanced unit tests.
I have a 3 nodes cluster
I create 2 connections
I start sending messages every 50 ms
I stop node 2 (the one that is used by the AsyncSub)
The AsyncSubscription is loosing all the messages that have been sent before it get reconnected to another node.
Is is the expected behavior ?
Here is a unit test that can be put in UnitTestReconnect.cs
[Fact]
public void TestIntensivePublishShouldNotFail()
{
List<byte[]> msgs = new List<byte[]>();
List sentMsgs = new List();
List receivedMsgs = new List();
ManualResetEvent recvReconnected = new ManualResetEvent(false);
// one connection for receiving
Options recvOptions = ConnectionFactory.GetDefaultOptions();
recvOptions.Servers = new [] { "nats://localhost:1222", "nats://localhost:1223", "nats://localhost:1224" };
recvOptions.NoRandomize = true;
recvOptions.AsyncErrorEventHandler += (sender, args) => { Debug.WriteLine("Receive Error " + args.Error + " Dropped=" + args.Subscription.Dropped); };
recvOptions.ClosedEventHandler += (sender, args) => { Debug.WriteLine("Receive Closed "); };
recvOptions.DisconnectedEventHandler += (sender, args) => { Debug.WriteLine("Receive Disconnected "); };
recvOptions.ReconnectedEventHandler += (sender, args) => { recvReconnected.Set();Debug.WriteLine("Receive Reconnected "); };
ManualResetEvent sendReconnected = new ManualResetEvent(false);
// one connection for sending (better for sending big amount of data
Options sendOptions = ConnectionFactory.GetDefaultOptions();
sendOptions.Servers = new [] { "nats://localhost:1223", "nats://localhost:1222", "nats://localhost:1224" };
sendOptions.NoRandomize = true;
sendOptions.AsyncErrorEventHandler += (sender, args) => { Debug.WriteLine("Send Error " + args.Error); };
sendOptions.ClosedEventHandler += (sender, args) => { Debug.WriteLine("Send Closed "); };
sendOptions.DisconnectedEventHandler += (sender, args) => { Debug.WriteLine("Send Disconnected "); };
sendOptions.ReconnectedEventHandler += (sender, args) => { sendReconnected.Set();Debug.WriteLine("Send Reconnected "); };
// create server
NATSServer ns = utils.CreateServerWithArgs("-p 1222 -cluster nats://localhost:1422 -routes nats://localhost:1423,nats://localhost:1424 -D");
NATSServer ns2 = utils.CreateServerWithArgs("-p 1223 -cluster nats://localhost:1423 -routes nats://localhost:1422,nats://localhost:1424 -D");
NATSServer ns3 = utils.CreateServerWithArgs("-p 1224 -cluster nats://localhost:1424 -routes nats://localhost:1422,nats://localhost:1423 -D");
Thread.Sleep(1000);
using (IConnection sendConn = new ConnectionFactory().CreateConnection(sendOptions))
using (IConnection rcvConn = new ConnectionFactory().CreateConnection(recvOptions))
{
int receivedMessages = 0;
int sentMessages = 0;
int sentFailure = 0;
IAsyncSubscription s = rcvConn.SubscribeAsync("foo");
s.MessageHandler += (sender, args) =>
{
Interlocked.Increment(ref receivedMessages);
ThreadPool.QueueUserWorkItem(o =>
{
var str = Encoding.UTF8.GetString(args.Message.Data);
int j = Int32.Parse(str.Split(' ')[0]);
lock(receivedMsgs)
receivedMsgs[j]++;
});
};
s.Start();
rcvConn.Flush();
sendConn.Flush();
bool serverRestarted = false;
ManualResetEvent sendFinished = new ManualResetEvent(false);
ThreadPool.QueueUserWorkItem(o =>
{
while (!serverRestarted)
{
try
{
Thread.Sleep(50);
byte[] msg = Encoding.UTF8.GetBytes(String.Format("{0} additionnalPayload", sentMessages));
msgs.Add(msg);
sentMsgs.Add(true);
lock(receivedMsgs)
receivedMsgs.Add(0);
sendConn.Publish("foo", msg);
sentMessages++;
}
catch (NATSConnectionClosedException)
{
// we are dead, we will never reconnect. Shall we retry ?
//sendConn.Dispose();
//sendConn = new ConnectionFactory().CreateConnection(sendOptions);
throw;
}
catch (Exception)
{
sentFailure++;
}
}
// after server is restarted, send more messages to check reconnect is ok
for (int i = 0; i < 20; i++)
{
try
{
Thread.Sleep(50);
byte[] msg = Encoding.UTF8.GetBytes(String.Format("{0} additionnalPayload", sentMessages));
msgs.Add(msg);
sentMsgs.Add(true);
lock (receivedMsgs)
receivedMsgs.Add(0);
sendConn.Publish("foo", msg);
sentMessages++;
}
catch (NATSConnectionClosedException)
{
// we are dead, we will never reconnect. Shall we retry ?
//sendConn.Dispose();
//sendConn = new ConnectionFactory().CreateConnection(sendOptions);
throw;
}
catch (Exception)
{
sentFailure++;
}
}
sendFinished.Set();
});
Thread.Sleep(500);
// kill the server !
int indexBeforeRestart = sentMessages;
ns.Shutdown();
// Wait for receiver to reconnect...
//Assert.Equal(true, sendReconnected.WaitOne(10000));
Assert.Equal(true, recvReconnected.WaitOne(10000));
int indexAfterRestart = sentMessages;
// sender will be in the second part
serverRestarted = true;
// wait for sending finished
Assert.True(sendFinished.WaitOne(Int32.MaxValue));
rcvConn.Flush();
sendConn.Flush();
// maybe give time to receive messages
//Thread.Sleep(1000);
// now check failures
Assert.Equal(0, sentFailure);
Assert.Equal(0, s.QueuedMessageCount);
Assert.Equal(0, s.PendingMessages);
Assert.Equal(0, s.Dropped);
Assert.Equal(s.Delivered, receivedMessages);
Assert.Equal(sendConn.Stats.OutMsgs, sentMessages); // on compte bien ?
Assert.Equal(rcvConn.Stats.InMsgs, receivedMessages); // on compte bien ?
Assert.Equal(sentMessages, receivedMessages);
Assert.Equal(rcvConn.Stats.Reconnects, 1);
//Assert.Equal(sendConn.Stats.Reconnects, 1);
}
ns.Shutdown();
ns2.Shutdown();
ns3.Shutdown();
}
Hi, it seems like the subscriber example will only receive messages while the example publisher is publishing (both sessions going). Is this the case? How do I subscribe and receive messages after they have already been published?
Now that .NET 4.5 is supported, do this by adding async/await support for IConnection.Request.
A great suggestion from @dotnetchris:
"It would be really easy to provide a series of nuget packages: NATS.Client.JIL, NATS.Client.Bond, NATS.Client.Protobuf that each depend on NATS.Client and the respective serializer."
Referred in #78
User contributions here are welcome!
Hi,
What strategy do you recommend I implement if I only want to publish messages when there is at least one subscriber?
Warmest regards,
Garrard.
Which exactly code you run to get the performance of several millions messages per sec and what hardware do you use? Is it VM on *Nix? I played with pure .NET's TCP sockets recently and cannot get such numbers even on localhost loopback. I am looking through the code to try to understand is it buffering or something else that yields such results. Also, as far as I understand you do not use IOCP or SocketAsyncEventArgs?
Now our company plan to using nats server by 'bidirectional authentication', I reference the example code:
Options opts = ConnectionFactory.GetDefaultOptions();
opts.Secure = true;
// .NET requires the private key and cert in the
// same file. 'client.pfx' is generated from:
//
// openssl pkcs12 -export -out client.pfx
// -inkey client-key.pem -in client-cert.pem
X509Certificate2 cert = new X509Certificate2("client.pfx", "password");
opts.AddCertificate(cert);
IConnection c = new ConnectionFactory().CreateConnection(opts);
But it not works, I try to implement it by Java code according to Java Tls example, it works fine.
So, could you please let me know or get me some tip:
Looking forward your response.
Thank you!
Windy
An asynchronous subscriber needs to handle the return message from the server. The test erroneously assumes the flush will return before the published message arrives back to the subscriber.
Replace the homegrown JSON parsing code with the [DataContract]
/[DataMember]
attributes with the JSON serializer, found in System.Runtime.Serialization
Depends on #103
Add C# support for NATS streaming
Users may want to sign the NATS client assembly themselves so signing should be an optional step.
NOTE: It should be done automatically when generating the official NUGET package.
Add functionality to mirror the go-nats feature,https://github.com/nats-io/go-nats/pull/282.
The documentation for the Go client suggests that the pending message and byte limits treat negative values as "no limits", however the C# client does not contain this logic.
I wanted to evaluate NATS for a .NET Core project. Unfortunately, the NuGet is only for net45. Is it possible to have the next version also include a build for .NET Core?
I’m doing some advanced unit tests.
I have a 3 nodes cluster
I create 2 connections
1 for sending messages on node 1 on subject foo
1 for receiving messages on node 2 on subject foo
I start sending messages every 20 ms
I stop node 1 (the one that is connected to the send thread)
The publish of messages in failing with 2 different types of issues :
Is is the expected behavior ?
Consider building and packaging .NET 4.0 NATS Client assemblies. Care was taken to ensure code compatibility with .NET 4.0, so this would be project/build related changes only.
We're happy to support our user base any way we can, and as a note do encourage those using older versions of the .NET framework to review at Microsoft's .NET support lifecycle: https://support.microsoft.com/en-us/lifecycle#gp/Framework_FAQ
Pre-Release NuGet Packages should be provided for users to take advantage of new features without needing to compile themselves, and is very convenient for .NET core users. This will be a build of new features on the master branch that pass all tests.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.