Comments (6)
Hey Joe,
Thank you for your support of the library! :-)
Easy.MessageHub is an implementation of the Event Aggregator pattern and I have intentionally tried to avoid adding features that would deviate from the pattern. IMO any additional logic that would determine who receives the message would turn this into a Mediator which is not the problem the library is concerned with. In fact, I think the library is feature complete unless of course bugs are found which I will fix.
For your particular use-case, I would suggest housing this logic within your subscriber; Given you know the type of the message e.g. Person
, you can combine Easy.MessageHub with System.Threading.Channel
whereby any incoming invocation through the hub is funnelled into the Channel<Person>
within which you will have a bunch of Func<Person, bool>
deciding how you should handle the message.
I highly recommend using this library in conjunction with System.Threading.Channel as it unlocks many potentials which allows you to build higher-level features on top the library.
HTH.
from easy.messagehub.
for the record do you have an example of what are suggesting?
You mean to have a "singleton" channel for that message type and then within the channel consumer redirect the event to who has to receive it based on the list of Fun<person, bool> ?
I have an event bus similar to your, where I built a SubscriptionManager
class to handle better the registration/unregistration process and in it I've a clause (something that I think could be useful to you too) and I've added a simple method like this to handle the "clause"
/// <summary>
/// Registers a <see cref="IEventBus"/> message <paramref name="handler"/> that will be automatically
/// unregistered once the component gets disposed.
/// </summary>
/// <param name="subscription">The subscription.</param>
protected void RegisterLifeTimeEventHandler<TEventBase>(Func<TEventBase, ValueTask> handler, Func<TEventBase, bool>? clause = null) where TEventBase : EventBase
{
_eventSubscriptionManager.RegisterSubscriptions(eventBus.Subscribe<TEventBase>(async (message) =>
{
// ensure we listen only to events we are interested into
if (clause != null && !clause(message)) return;
await handler(message).ConfigureAwait(false);
}));
}
it's not a perfect solution (performance wise) because this causes to have an handler wrapped within another handler but it does it's job
from easy.messagehub.
P.S.
my implementation is opensourced here if you want to adopt the SubscriptionManager in your library
https://github.com/MithrilMan/MithrilShards/tree/master/src/MithrilShards.Core/EventBus
from easy.messagehub.
Here's a quick example which should give you a good perf (so long as your not adding or removing registrations too many times) and more ideas on how to expand:
void Main()
{
IMessageHub hub = new MessageHub();
using var fancyHub = new MyFancyHub<MyBaseEvent>(hub);
Action<MyBaseEvent> handlesAllEvents = x => Console.WriteLine("[A] " + x.ToString());
Action<MyBaseEvent> handlesPositiveEvents = x => Console.WriteLine("[+] " + x.ToString());
Action<MyOtherEvent> handlesOtherEvents = x => Console.WriteLine("[O] " + x.ToString());
fancyHub.Register(handlesAllEvents);
fancyHub.Register(handlesPositiveEvents, x => x.Number > 0);
fancyHub.Register(handlesOtherEvents, x => x.Name == "Foo");
fancyHub.Publish(new MyBaseEvent { Number = 42 });
fancyHub.Publish(new MyOtherEvent { Number = -42, Name = "Foo" });
Console.ReadLine();
}
public sealed class MyFancyHub<T> : IDisposable
{
private readonly IMessageHub _hub;
private readonly Channel<T> _channel;
private readonly CancellationTokenSource _cts;
private readonly Guid _token;
private readonly List<FancyRegistration> _registrations;
public MyFancyHub(IMessageHub hub)
{
_hub = hub;
UnboundedChannelOptions options = new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleWriter = false,
SingleReader = true
};
_channel = Channel.CreateUnbounded<T>(options);
_cts = new CancellationTokenSource();
_registrations = new List<FancyRegistration>();
_token = _hub.Subscribe<T>(x => _channel.Writer.TryWrite(x));
_ = StartReadingFromChannel(_channel.Reader, _cts.Token);
}
public void Publish<TPayload>(TPayload payload) where TPayload : T => _hub.Publish(payload);
public Guid Register<TPayload>(Action<TPayload> handler) where TPayload : T => Register(handler, x => true);
public Guid Register<TPayload>(Action<TPayload> handler, Predicate<TPayload> predicate) where TPayload : T
{
var registration = new FancyRegistration(handler, predicate, typeof(TPayload));
lock (_registrations)
{
_registrations.Add(registration);
}
return registration.Token;
}
public bool Remove(Guid token)
{
lock (_registrations)
{
int possibleIdx = _registrations.FindIndex(x => x.Token == token);
if (possibleIdx < 0)
{
return false;
}
_registrations.RemoveAt(possibleIdx);
return true;
}
}
public void Dispose()
{
_hub.Unsubscribe(_token);
_channel.Writer.Complete();
_cts.Cancel();
}
private Task StartReadingFromChannel(ChannelReader<T> reader, CancellationToken cToken) =>
Task.Factory.StartNew(async () =>
{
while (await reader.WaitToReadAsync(cToken).ConfigureAwait(false))
{
while (reader.TryRead(out T item))
{
if (cToken.IsCancellationRequested) { break; }
lock (_registrations)
{
foreach (var registration in _registrations)
{
bool handled = registration.TryHandle(item);
}
}
}
}
},
TaskCreationOptions.LongRunning);
private sealed class FancyRegistration
{
private readonly Delegate _handler;
private readonly Delegate _predicate;
private readonly Type _payloadType;
public FancyRegistration(Delegate handler, Delegate predicate, Type payloadType)
{
_handler = handler;
_predicate = predicate;
_payloadType = payloadType;
Token = Guid.NewGuid();
}
public Guid Token { get; }
public bool TryHandle<TIn>(TIn input) where TIn : T
{
Type inputType = input.GetType();
bool shouldHandle = inputType.IsAssignableTo(_payloadType);
if (!shouldHandle) { return false; }
if (_predicate is Predicate<TIn> pred && pred(input))
{
var handler = (Action<TIn>)_handler;
handler(input);
return true;
}
if ((bool)_predicate.DynamicInvoke(input))
{
_handler.DynamicInvoke(input);
return true;
}
return false;
}
}
}
public class MyBaseEvent
{
public int Number { get; set; }
public override string ToString() => Number.ToString();
}
public sealed class MyOtherEvent : MyBaseEvent
{
public string Name { get; set; }
public override string ToString() => $"{Number.ToString()} | {Name.ToString()}";
}
from easy.messagehub.
ok I see thanks, as I thought, even if I'd avoid DynamicInvoke that's very slow, you'd want to use a generic predicate only
Another thing to point out is that this way the consumer is on its own thread: events are executed sequentially but it's no more blocking and so you can't use events to drive the workflow if you want, it may or may not be problem depending your needs, it's fine as the user knows the limitations
what I mean is that you can't do like
class MyEvent{
public bool Cancel
}
var myEvent = new MyEvent();
eventbus.Publish(myEvent);
if(myEvent.Cancel){
stop();
}
P.S.
I don't encourage anyway to have events that control flows, even if this way you can have your dynamic pipeline that can manage an object and sometime can be useful
from easy.messagehub.
The performance of DynamicInvoke
can easily be addressed I leave that as an excercise for the reader! And I agree that event cancellation as you showed is a bad idea ;-)
from easy.messagehub.
Related Issues (20)
- Registering IMessageHub with MessageHub HOT 1
- Subscribing a runtime type HOT 5
- Subscriber is receiving 3 messages when publisher is only sending 1 HOT 7
- Publish & Subscribe from different Project? HOT 2
- Non-singleton instances HOT 8
- Sometimes Unsubscribe method hangs HOT 6
- How to setup one Server with Many cleints between ASP Core web and .net Console service HOT 1
- System.ObjectDisposedException: ThreadLocal object has been deleted. HOT 1
- NuGet package?
- Async handlers HOT 2
- Get MessageHub to play nicely with Autofac in a web application HOT 1
- Unflexible detection of the given type HOT 4
- Support for async actions HOT 2
- Add an async publisher HOT 4
- Available for dotnet core? HOT 2
- Add async version HOT 6
- MessageHub is holding objects in Memory HOT 18
- UnSubscribe method causes ArgumentNullException HOT 1
- Publish and subscribe from different threads HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from easy.messagehub.