netmq / samples Goto Github PK
View Code? Open in Web Editor NEWSamples of NetMQ
Samples of NetMQ
Although the sample clearly uses a 'topic' based pub/sub system, the topic (in this case a zipcode) is not sent with the pattern shown in the doco:
"pub.SendMoreFrame("zipcode").SendFrame("some stuff");" where "zipcode" is the topic
ie the SendMoreFrame is not used for the topic.
The publish call includes the zip in the message....not as the topic. I guess it works because the subscriber is programmed to suit....
Regards
Rob
I am working on .net 4.7 and netmq latest version.
Getting error at line var peerIdx = rnd.Next(peers.Count - 2) + 2;
it should be
var peerIdx = rnd.Next(0, peers.Count);
Anybody look into it by running it?
In MDPBroker.cs of MajordomoPattern implementation i feel there is delay in responding to message after it recieved at broker.
So i am creating a async version of one method. Please comment if there is anything wrong in it as per pattern implementation
private void ProcessReceivedMessageAsync(object sender, NetMQSocketEventArgs e)
{
try
{
var msg = e.Socket.ReceiveMultipartMessage();
Debug.WriteLine($"Received: {msg}");
var senderFrame = msg.Pop(); // [e][protocol header][service or command][data]
var empty = msg.Pop(); // [protocol header][service or command][data]
var headerFrame = msg.Pop(); // [service or command][data]
var header = headerFrame.ConvertToString();
switch (header)
{
case MDPConstants.MDP_CLIENT_HEADER:
{
Task.Factory.StartNew(() => ProcessClientMessage(senderFrame, msg), TaskCreationOptions.LongRunning);
}
break;
case MDPConstants.MDP_WORKER_HEADER:
ProcessWorkerMessage(senderFrame, msg);
break;
default:
Debug.WriteLine("ERROR - message with invalid protocol header!");
break;
}
}
catch (Exception ex)
{
Logger.Fatal(ex.Message, ex);
}
}
Environment
NetMQ Version: 4.0.0.207
Operating System: Windows 10
.NET Version: .net core 2.1.6
Expected behaviour
broker/worker keep working for hours/days
Actual behavior
Got following error in logs
System.ArgumentOutOfRangeException: Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index
at MajordomoProtocol.MDPBroker.ProcessReceivedMessage(Object sender, NetMQSocketEventArgs e) in MDPBroker.cs:line 317
at NetMQ.NetMQSocket.InvokeEvents(Object sender, PollEvents events)
at NetMQ.NetMQPoller.RunPoller()
at NetMQ.NetMQPoller.Run(SynchronizationContext syncContext)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot)
Steps to reproduce the behavior
Setup worker/broker in above mentioned environment
The offending code is:
//publisher2.Publish("This is a message 2", TimeSpan.FromSeconds(2000));
Which is the second publish. If I un comment this code, I get the following error:
"An unhandled exception of type 'System.NullReferenceException' occurred in NetMQ.dll
Additional information: Object reference not set to an instance of an object."
When I step over the line:
string received = subscriber.ReceiveString(out peerName);
Any thoughts?
In project MajordomoProtocol file MDPClientAsync
the overloaded ctor
public MDPClientAsync([NotNull] string brokerAddress, string identity)
{
if (string.IsNullOrWhiteSpace(brokerAddress))
throw new ArgumentNullException(nameof(brokerAddress), "The broker address must not be null, empty or whitespace!");
if (!string.IsNullOrWhiteSpace(identity))
m_identity = Encoding.UTF8.GetBytes(identity);
m_brokerAddress = brokerAddress;
}
is not calling this()
.
Can someone give me a hint on how to set
Broker / Worker Timeouts to get a stable Broker-Worker Connection on the
Majordomo Implementation?
I use 10 Seconds Heartbeats for Broker and Worker, both running on the localhost
but get every now and then a disconnect.
First Bug I found is in Broker-Constructor, if I pass a timeout the
m_heartbeatExpiry did not get adjusted accordingly. So I fixed this but still get these
disconnects.
I think it is a bug to add workers to the list of available workers for every kind of network activity a queue/broker notices from them. And I'd say this is wrong by design.
Imagine a worker which is currently idle and sending heartbeat messages periodically: The implementation will add this worker over and over again to the list of available workers. The list would grow endlessly if there wasn't expiration management which removes older duplicates continously from the list.
Are services only added? When will it be removed?
Hi all,
i would like to ask for help.
sometimes i got message which is incomplete, zeroes on the end of the buffer.
Example:
System.FormatException: Deserialization of :[eyJDb3JyZWxhdGlvbklkIjoiMzM1MWNjNTktMzBmNi00NTk1LWFjYmQtMGRmYTZlMjIyYTRlIiwiRXhwaXJhdGlvblRpbWUiOm51bGwsIlNvdXJjZUFkZHJlc3MiOm51bGwsIlNlbnRUaW1lIjoiMjAxOS0xMS0wN1QxNTowOTowNC43NjQ4MzkzKzAxOjAwIiwiUGF5bG9hZFR5cGUiOiJLaXN0bGVyLkNkdC5Db250cmFjdHMuTG9nZ2VyLk1vZGVscy5JTG9nTWVzc2FnZSIsIlBheWxvYWQiOiJleUpNWlhabGJDSTZNaXdpVTI5MWNtTmxJam9pUzJsemRHeGxjaTVEWkhRdVdHMXNVbkJqVFdWemMyRm5aVUZrWVhCMFpYSk5hV1JrYkdWM1lYSmxMazFwWkdSc1pYZGhjbVV1VW1WeGRXVnpkRkpsYzNCdmJuTmxURzluWjJsdVowMXBaR1JzWlhkaGNtVWlMQ0pOWlhOellXZGxJam9pU0hSMGNDQlNaWE53YjI1elpTQkpibVp2Y20xaGRHbHZianBjWEhKY1hHNVRZMmhsYldFNmFIUjBjQ0JJYjNOME9pQXhNamN1TUM0d0xqRTZPREkzTlNCUVlYUm9PaUJjWEM5U1VFTXlJRkYxWlhKNVUzUnlhVzVuT2lBZ1VtVnpjRzl1YzJVZ1FtOWtlVG9nUEQ5NGJXd2dkbVZ5YzJsdmJqMWNYRndpTVM0d1hGeGNJaUJsYm1OdlpHbHVaejFjWEZ3aWRYUm1MVGhjWEZ3aVB6NWNYSEpjWEc0OGJXVjBhRzlrVW1WemNHOXVjMlUrWEZ4eVhGeHVJQ0E4Y0dGeVlXMXpQbHhjY2x4Y2JpQWdJQ0E4Y0dGeVlXMCtYRnh5WEZ4dUlDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ1BHRnljbUY1UGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0E4WkdGMFlUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBuTjVjM1JsYlM1c2FYTjBUV1YwYUc5a2N6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytjM2x6ZEdWdExtMWxkR2h2WkZOcFoyNWhkSFZ5WlR4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK2MzbHpkR1Z0TG0xbGRHaHZaRWhsYkhBOFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtZGxkRjlyWlhsM2IzSmtYMjVoYldWelBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXlkVzVmYTJWNWQyOXlaRHhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWjJWMFgydGxlWGR2Y21SZllYSm5kVzFsYm5SelBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NW5aWFJmYTJWNWQyOXlaRjlrYjJOMWJXVnVkR0YwYVc5dVBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXpaWEoyYVdObExrZGxkRXhwYzNSUFpreHZZV1JsWkVWeGRXbHdiV1Z1ZER4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK1kyRnNMbE5sZEUxdmJtbDBiM0pOYjJSbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWpZV3d1UjJWMFRXOXVhWFJ2Y2tacFpXeGtjenhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWTJGc0xsTmxkRU52Ym5ScGJuVnZkWE5QZFhSd2RYUThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbU5oYkM1VFpYUlFkV3h6WldSUGRYUndkWFE4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1OaGJDNVRaWFJUYVc1MWMyOXBaR0ZzVDNWMGNIVjBQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVqWVd3dVUzZHBkR05vVDNWMGNIVjBRMmhoYm01bGJFOXVQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVqWVd3dVUzZHBkR05vVDNWMGNIVjBRMmhoYm01bGJFOW1aanhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExrZGxibVZ5WVhSbFJHTldiMngwWVdkbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWtZWEV1VFdWaGMzVnlaVlp2YkhSaFoyVkVZenhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExrMWxZWE4xY21WTmFXNU5ZWGc4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1SaGNTNU5aV0Z6ZFhKbFZtOXNkR0ZuWlZCbFlXdFFaV0ZyUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1a1lYRXVVbVZ6WlhSRVpYWnBZMlU4WEZ3dmMzUnlhVzVuUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4Y1hDOTJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOGRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnSUNBOGMzUnlhVzVuUG1SaGNTNVNaV0ZrUkdsbmFYUmhiRWx1Y0hWMFVHOXlkRHhjWEM5emRISnBibWMrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEZ4Y0wzWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUR4MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0FnSUR4emRISnBibWMrWkdGeExsTmxkRVJwWjJsMFlXeFBkWFJ3ZFhSTWFXNWxVM1JoZEdVOFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtUmhjUzVIWlhSVGFXZHVZV3hUWVcxd2JHVnpQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVrYlcwdVVtVmhaRlpoYkhWbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NWtiVzB1VW1WelpYUlViMFJsWm1GMWJIUlRaWFIwYVc1bmN6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytaRzF0TGxObGRFbHVjSFYwUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1bVp5NVRkMmwwWTJoRGFHRnVibVZzVDI0OFhGd3ZjM1J5YVc1blBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeGNYQzkyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ0lDQThjM1J5YVc1blBtWm5MbE4zYVhSamFFTm9ZVzV1Wld4UFptWThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbVpuTGxObGRFOTFkSEIxZER4Y1hDOXpkSEpwYm1jK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BGeGNMM1poYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lEeDJZV3gxWlQ1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBZ0lEeHpkSEpwYm1jK1ptY3VVMlYwUVhKaWFYUnlZWEo1VTJsbmJtRnNQRnhjTDNOMGNtbHVaejVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4WEZ3dmRtRnNkV1UrWEZ4eVhGeHVJQ0FnSUNBZ0lDQWdJQ0FnUEhaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJQ0FnUEhOMGNtbHVaejVtWnk1TWIyRmtRWEppYVhSeVlYSjVVMmxuYm1Gc1BGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXZjMk11VW1WelpYUlViMFJsWm1GMWJIUlRaWFIwYVc1bmN6eGNYQzl6ZEhKcGJtYytYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQRnhjTDNaaGJIVmxQbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHgyWVd4MVpUNWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQWdJRHh6ZEhKcGJtYytiM05qTGxOMGIzQThYRnd2YzNSeWFXNW5QbHhjY2x4Y2JpQWdJQ0FnSUNBZ0lDQWdJRHhjWEM5MllXeDFaVDVjWEhKY1hHNGdJQ0FnSUNBZ0lDQWdJQ0E4ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdJQ0E4YzNSeWFXNW5QbTl6WXk1VFpYUlNkVzVOYjJSbFBGeGNMM04wY21sdVp6NWNYSEpjWEc0Z0lDQWdJQ0FnSUNBZ0lDQThYRnd2ZG1Gc2RXVStYRnh5WEZ4dUlDQWdJQ0FnSUNBZ0lDQWdQSFpoYkhWbFBseGNjbHhjYmlBZ0lDQWdJQ0FnSUNBZ0lDQWdQSE4wY21sdVp6NXZjMk11VjJGcGRGUnBiR3hCWTNGMWFYTnBkR2x2YmtOdmJYQnNaWFJsUEZ4Y0wzTjBjbWx1Wno1Y1hISmNYRzRnSUNBZ0lDQWdJQ0FnSUNBOFhGd3ZkbUZzZFdVK1hGeHlYRnh1SUNBZ0lDQWdJQ0FnSUNBZ1BIWmhiSFZsUGx4Y2NseGNiaUFnSUNBZ0lDQWdJQ0FnSUNBZ1BITjBjbWx1Wno1dmMyTXVWMkZwZEZWdWRHbHNWSEpwWjJkbGNrRnliV1ZrUEZ4Y0wzTjBjbWx1Wn ] failed ---> System.FormatException: The input is not a valid Base-64 string as it contains a non-base 64 character, more than two padding characters, or an illegal character among the padding characters.
white space on the end of message is null characters.
i have situation when creating subscribers and publishers just when need it, they are wrapped in using.
I am sending a json message which is encoded into base64 string.
my serializer methods:
public string SerializeToString(object _object)
{
return Base64Encode(JsonConvert.SerializeObject(_object));
}
public object Deserialize(byte[] data, Type targetType)
{
try
{
var json = Base64Decode(_encoding.GetString(data));
return JsonConvert.DeserializeObject(json, targetType);
}
catch (Exception e)
{
throw new FormatException($"Deserialization of :[{_encoding.GetString(data)}] failed", e);
}
}
i am using xpub-sub pattern. I have router with logging:
public class PubSubMessageRouter : IPubSubMessageRouter
{
private Proxy _proxy;
private NetMQPoller _poller;
private XSubscriberSocket _xSub;
private XPublisherSocket _xPub;
private PushSocket _controlInPush = new PushSocket();
private PushSocket _controlOutPush = new PushSocket();
private PullSocket _controlInPull = new PullSocket();
private PullSocket _controlOutPull = new PullSocket();
private Task _workerReceived;
private Task _workerSent;
private CancellationTokenSource _cancellationTokenSource;
private CancellationToken _cancellationToken;
private WeakAction<MessageBase> _subscriberReceived;
private WeakAction<MessageBase> _subscriberSent;
private WeakAction<Exception> _onExceptionHandler;
private ISerializer _serializer;
private bool _disposed;
public string FrontendConnectionString { get; }
public string BackendConnectionString { get; }
public PubSubMessageRouter(
string backendConnectionString,
string frontendConnectionString,
string controlInConnectionName = "controlIn",
string controlOutConnectionName = "controlOut")
{
BackendConnectionString = backendConnectionString;
FrontendConnectionString = frontendConnectionString;
_serializer = new Serialize.Json.JsonObjectSerializer();
_xSub = new XSubscriberSocket($"@{FrontendConnectionString}");
_xPub = new XPublisherSocket($"@{BackendConnectionString}");
_xSub.Options.ReceiveHighWatermark = 10000;
_xSub.Options.ReceiveBuffer = 44000;
_xSub.Options.SendBuffer = 44000;
_xSub.Options.Linger = TimeSpan.FromSeconds(5);
_xPub.Options.ReceiveHighWatermark = 10000;
_xPub.Options.ReceiveBuffer = 44000;
_xPub.Options.SendBuffer = 44000;
_xPub.Options.Linger = TimeSpan.FromSeconds(5);
_controlInPush.Bind($"inproc://{controlInConnectionName}");
_controlInPull.Connect($"inproc://{controlInConnectionName}");
_controlOutPush.Bind($"inproc://{controlOutConnectionName}");
_controlOutPull.Connect($"inproc://{controlOutConnectionName}");
_poller = new NetMQPoller
{
_xSub,
_xPub
};
// proxy messages between frontend / backend
_proxy = new Proxy(
_xSub,
_xPub,
poller: _poller,
controlIn: _controlInPush,
controlOut: _controlOutPush
);
}
public PubSubMessageRouter(
string backendConnectionString,
string frontendConnectionString,
Action<MessageBase> handlerReceived,
Action<MessageBase> handlerSent,
Action<Exception> onException=null) : this(backendConnectionString, frontendConnectionString)
{
_subscriberReceived = new WeakAction<MessageBase>(handlerReceived);
_subscriberSent = new WeakAction<MessageBase>(handlerSent);
if (onException != null)
{
_onExceptionHandler = new WeakAction<Exception>(onException);
}
}
private async void MessagePumpReceived(CancellationToken ct)
{
while (true)
{
try
{
if (_cancellationToken.IsCancellationRequested)
break;
var mqMessage = new NetMQMessage();
if (_controlInPull.TryReceiveMultipartMessage(ref mqMessage))
{
if (mqMessage.FrameCount == 1) //subscribe notification
{
continue;
}
var message = (MessageBase) _serializer.Deserialize(mqMessage.Last.Buffer, typeof(MessageBase));
if (_subscriberReceived?.Target != null)
{
_subscriberReceived.Execute(message);
}
}
else
{
await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
}
}
catch (Exception ex)
{
OnExceptionHandler(ex);
}
}
}
private async void MessagePumpSent(CancellationToken ct)
{
while (true)
{
try
{
if (_cancellationToken.IsCancellationRequested)
break;
var mqMessage = new NetMQMessage();
if (_controlOutPull.TryReceiveMultipartMessage(ref mqMessage))
{
if (mqMessage.FrameCount == 1) //subscribe notification
{
continue;
}
var message = (MessageBase) _serializer.Deserialize(mqMessage.Last.Buffer, typeof(MessageBase));
if (_subscriberSent?.Target != null)
{
_subscriberSent.Execute(message);
}
}
else
{
await Task.Delay(TimeSpan.FromMilliseconds(100), ct);
}
}
catch (Exception ex)
{
OnExceptionHandler(ex);
}
}
}
private void OnExceptionHandler(Exception obj)
{
if(_onExceptionHandler != null && _onExceptionHandler.IsAlive)
{
_onExceptionHandler.Execute(obj);
}
}
public void StartRouting()
{
_cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token;
_proxy.Start();
_poller.RunAsync();
_workerReceived = Task.Run(
() => MessagePumpReceived(_cancellationToken),
_cancellationToken);
_workerSent = Task.Run(
() => MessagePumpSent(_cancellationToken),
_cancellationToken);
}
public async Task StopRouting()
{
_proxy.Stop();
_poller.Stop();
_cancellationTokenSource?.Cancel();
await Task.WhenAny(Task.WhenAll(_workerSent, _workerReceived),
Task.Delay(5000)); //wait when all thread finished, but max 5 seconds
}
public async void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
await StopRouting();
_poller?.Dispose();
_poller = null;
_proxy = null;
_xSub?.Dispose();
_xSub = null;
_xPub?.Dispose();
_xPub = null;
_controlInPush?.Dispose();
_controlInPush = null;
_controlInPull?.Dispose();
_controlInPull = null;
_controlOutPush?.Dispose();
_controlOutPush = null;
_controlOutPull?.Dispose();
_controlOutPull = null;
_workerSent = null;
_workerReceived = null;
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
_subscriberReceived.MarkForDeletion();
_subscriberReceived = null;
_subscriberSent.MarkForDeletion();
_subscriberSent = null;
_onExceptionHandler.MarkForDeletion();
_onExceptionHandler = null;
_serializer = null;
}
_disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
GC.Collect();
}
}
I have ansyc message handlers which are fired by service api. Problem usually happens when i am sending 2 long messages one following other:
public async Task Handle(MessageBase message, IApiContext context)
{
using (var logger = new MessageLogger(typeof(RequestSuiteMetaDataHandler).FullName, context.ApplicationSettings.FrontendConnection, context.LogTopic))
using (var adapterManager = new AdaptersManager.AdaptersManager(new WindowsRegistrySettingsProvider()))
{
var response = new RequestSuiteMetaDataResponse();
try
{
var request = new JsonObjectSerializer().Deserialize<RequestSuiteMetaData>(message.Payload);
//some code skipped
response.Succeed = true;
response.SuiteName = queryResponse.SuiteName;
response.TestPhases = queryResponse.TestPhases;// long content around 8 kB
catch (Exception exp)
{
await logger.LogMessage(LogLevel.Error, exp.GetAllMessages());
response.Error = exp.GetAllMessages();
response.Succeed = false;
}
var payloadString = new JsonObjectSerializer().SerializeToString(response);
var messageToSend = new MessageBase
{
CorrelationId = message.CorrelationId,
PayloadType = typeof(IRequestSuiteMetaDataResponse).FullName,
SentTime = DateTime.Now,
Payload = payloadString
};
// problem source:
await logger.LogMessage(LogLevel.Debug,
$"Sending message: {Base64Decode(messageToSend.Payload)} of type: {messageToSend.PayloadType}");
Thread.Sleep(50);
await context.ResponsePublisher.SendMessage(context.ResponseTopic, messageToSend);
}
}
i have central async logger, sending log overzero mq.
public class MessageLogger: ILogger
{
const char TestBackSlash = '\';
const char TestSlash = '/';
const char TestDblQuote = '"';
private readonly string _source;
private readonly string _logTopic;
private ISerializer _serializer;
private IPublisher _publisher;
private bool _disposed;
public CdtMessageLogger(string source,
string frontendConnectionString,
string logTopic = TopicsList.AllLogs,
ISerializer serializer = null,
SingleThreadTaskScheduler scheduler = null)
{
_source = source;
_logTopic = logTopic;
_serializer = serializer ?? new Serialize.Json.JsonObjectSerializer();
_publisher = new Publisher($">{frontendConnectionString}", scheduler);
}
public Task LogMessage(LogLevel level, string message)
{
var messageToSend = new MessageBase
{
CorrelationId = Guid.NewGuid(),
PayloadType = typeof(ILogMessage).FullName,
SentTime = DateTime.Now,
Payload = _serializer.SerializeToString(new LogMessage
{
Level = level,
Message = RemoveSpecialChars(message),
Source = _source
})
};
return _publisher.SendMessage(_logTopic, messageToSend);
}
private string RemoveSpecialChars(string input)
{
var output = new StringBuilder(input.Length);
foreach (var c in input)
{
switch (c)
{
case TestSlash:
output.AppendFormat("{0}{1}", TestBackSlash, TestSlash);
break;
case TestBackSlash:
output.AppendFormat("{0}{0}", TestBackSlash);
break;
case TestDblQuote:
output.AppendFormat("{0}{1}", TestBackSlash, TestDblQuote);
break;
case '\b':
output.Append("\\b");
break;
case '\f':
output.Append("\\f");
break;
case '\n':
output.Append("\\n");
break;
case '\r':
output.Append("\\r");
break;
case '\t':
output.Append("\\t");
break;
default:
output.Append(c);
break;
}
}
return output.ToString();
}
public void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
_publisher?.Dispose();
_publisher = null;
_serializer = null;
}
_disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
GC.Collect();
}
}
public class Publisher: IPublisher
{
private PublisherSocket _pub;
private ISerializer _serializer;
private SingleThreadTaskScheduler _taskScheduler;
private bool _disposed;
public Publisher(string connectionString , SingleThreadTaskScheduler scheduler=null)
{
_pub = new PublisherSocket(connectionString);
_pub.Options.SendHighWatermark = 1000;
_pub.Options.ReceiveBuffer = 44000;
_pub.Options.SendBuffer = 44000;
_pub.Options.ReconnectInterval = TimeSpan.FromMilliseconds(50);
_pub.Options.Linger = TimeSpan.FromSeconds(5);
_serializer = new Serialize.Json.JsonObjectSerializer();
_taskScheduler = scheduler ?? new SingleThreadTaskScheduler();
}
public void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
{
_pub?.Close();
_pub?.Dispose();
_pub = null;
_serializer = null;
_taskScheduler.StopSchedulerThread();
_taskScheduler.Dispose();
_taskScheduler = null;
}
_disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
GC.Collect();
}
public Task SendMessage(string topicName, MessageBase message)
{
return Task.Factory.StartNew(() =>
{
var data = _serializer.Serialize(message);
var messToSend = new NetMQMessage();
messToSend.Append(topicName);
messToSend.Append(data);
_pub.SendMultipartMessage(messToSend);
}, CancellationToken.None, TaskCreationOptions.None, _taskScheduler);
}
}
Message rate is not so high <20 messages/s, usually a lot of short messages and sometimes a long ones. It is looking like message buffer is allocated in correct size, but not all characters are received, why?
any ideas?
This question is related to the Paranoid Pirate Pattern sample project:
Is it by intention that the Paranoid Pirate Pattern is not supposed to recover from a restart of the Queue component or is this an error of the sample implementation (or even NetMQ core) which needs to be fixed?
Steps to reproduce:
After these steps messages from clients wont be processed anymore despite all components (queue + worker + client) are up and running again after step 6.
When I use ZeroMQ library or ZMQ library (from NUGET), I can communicate perfectly fine between C# and Python (pyzmq).
However, when I use NetMQ, I cannot get any interoperability between them.
I've attempted PUB/SUB and REP/REQ - as mentioned - they work well with the other two C# libraries and Python (both as initiator and receiver and vice-versa), but Python doesn't see anything whatsoever when using NetMQ. From the print statements in my code, it looks like NetMQ claims it has sent messages, but nothing is seen by Python and vice-versa in the other direction.
It's the same code for all three libraries (as ZeroMQ/ZMQ). I really don't know what is going on. Using Python 3.8.7 and examples from github from all three C# libraries.
Could you kindly investigate this? With thanks!
run StartInterBrokerRouter.bat Error
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.