Coder Social home page Coder Social logo

Comments (6)

NimaAra avatar NimaAra commented on July 23, 2024 1

Hey Joe,

Thank you for your support of the library! :-)

Easy.MessageHub is an implementation of the Event Aggregator pattern and I have intentionally tried to avoid adding features that would deviate from the pattern. IMO any additional logic that would determine who receives the message would turn this into a Mediator which is not the problem the library is concerned with. In fact, I think the library is feature complete unless of course bugs are found which I will fix.

For your particular use-case, I would suggest housing this logic within your subscriber; Given you know the type of the message e.g. Person, you can combine Easy.MessageHub with System.Threading.Channel whereby any incoming invocation through the hub is funnelled into the Channel<Person> within which you will have a bunch of Func<Person, bool> deciding how you should handle the message.

I highly recommend using this library in conjunction with System.Threading.Channel as it unlocks many potentials which allows you to build higher-level features on top the library.

HTH.

from easy.messagehub.

MithrilMan avatar MithrilMan commented on July 23, 2024

for the record do you have an example of what are suggesting?
You mean to have a "singleton" channel for that message type and then within the channel consumer redirect the event to who has to receive it based on the list of Fun<person, bool> ?

I have an event bus similar to your, where I built a SubscriptionManager class to handle better the registration/unregistration process and in it I've a clause (something that I think could be useful to you too) and I've added a simple method like this to handle the "clause"

/// <summary>
/// Registers a <see cref="IEventBus"/> message <paramref name="handler"/> that will be automatically
/// unregistered once the component gets disposed.
/// </summary>
/// <param name="subscription">The subscription.</param>
protected void RegisterLifeTimeEventHandler<TEventBase>(Func<TEventBase, ValueTask> handler, Func<TEventBase, bool>? clause = null) where TEventBase : EventBase
{
   _eventSubscriptionManager.RegisterSubscriptions(eventBus.Subscribe<TEventBase>(async (message) =>
   {
      // ensure we listen only to events we are interested into
      if (clause != null && !clause(message)) return;

      await handler(message).ConfigureAwait(false);
   }));
}

it's not a perfect solution (performance wise) because this causes to have an handler wrapped within another handler but it does it's job

from easy.messagehub.

MithrilMan avatar MithrilMan commented on July 23, 2024

P.S.
my implementation is opensourced here if you want to adopt the SubscriptionManager in your library
https://github.com/MithrilMan/MithrilShards/tree/master/src/MithrilShards.Core/EventBus

from easy.messagehub.

NimaAra avatar NimaAra commented on July 23, 2024

Here's a quick example which should give you a good perf (so long as your not adding or removing registrations too many times) and more ideas on how to expand:

void Main()
{
    IMessageHub hub = new MessageHub();
    using var fancyHub = new MyFancyHub<MyBaseEvent>(hub);
    
    Action<MyBaseEvent> handlesAllEvents = x => Console.WriteLine("[A] " + x.ToString());
    Action<MyBaseEvent> handlesPositiveEvents = x => Console.WriteLine("[+] " + x.ToString());
    Action<MyOtherEvent> handlesOtherEvents = x => Console.WriteLine("[O] " + x.ToString());
    
    fancyHub.Register(handlesAllEvents);
    fancyHub.Register(handlesPositiveEvents, x => x.Number > 0);
    fancyHub.Register(handlesOtherEvents, x => x.Name == "Foo");
    
    fancyHub.Publish(new MyBaseEvent { Number = 42 });
    fancyHub.Publish(new MyOtherEvent { Number = -42, Name = "Foo" });
    
    Console.ReadLine();
}

public sealed class MyFancyHub<T> : IDisposable
{
    private readonly IMessageHub _hub;
    private readonly Channel<T> _channel;
    private readonly CancellationTokenSource _cts;
    private readonly Guid _token;
    private readonly List<FancyRegistration> _registrations;

    public MyFancyHub(IMessageHub hub)
    {
        _hub = hub;

        UnboundedChannelOptions options = new UnboundedChannelOptions
        {
            AllowSynchronousContinuations = false,
            SingleWriter = false,
            SingleReader = true
        };
        _channel = Channel.CreateUnbounded<T>(options);
        _cts = new CancellationTokenSource();
        _registrations = new List<FancyRegistration>();

        _token = _hub.Subscribe<T>(x => _channel.Writer.TryWrite(x));
        _ = StartReadingFromChannel(_channel.Reader, _cts.Token);
    }

    public void Publish<TPayload>(TPayload payload) where TPayload : T => _hub.Publish(payload);

    public Guid Register<TPayload>(Action<TPayload> handler) where TPayload : T => Register(handler, x => true);

    public Guid Register<TPayload>(Action<TPayload> handler, Predicate<TPayload> predicate) where TPayload : T
    {
        var registration = new FancyRegistration(handler, predicate, typeof(TPayload));

        lock (_registrations)
        {
            _registrations.Add(registration);
        }

        return registration.Token;
    }

    public bool Remove(Guid token)
    {
        lock (_registrations)
        {
            int possibleIdx = _registrations.FindIndex(x => x.Token == token);
            if (possibleIdx < 0)
            {
                return false;
            }
            _registrations.RemoveAt(possibleIdx);
            return true;
        }
    }

    public void Dispose()
    {
        _hub.Unsubscribe(_token);
        _channel.Writer.Complete();
        _cts.Cancel();
    }

    private Task StartReadingFromChannel(ChannelReader<T> reader, CancellationToken cToken) =>
        Task.Factory.StartNew(async () =>
        {
            while (await reader.WaitToReadAsync(cToken).ConfigureAwait(false))
            {
                while (reader.TryRead(out T item))
                {
                    if (cToken.IsCancellationRequested) { break; }

                    lock (_registrations)
                    {
                        foreach (var registration in _registrations)
                        {
                            bool handled = registration.TryHandle(item);
                        }
                    }
                }
            }
        },
        TaskCreationOptions.LongRunning);

    private sealed class FancyRegistration
    {
        private readonly Delegate _handler;
        private readonly Delegate _predicate;
        private readonly Type _payloadType;

        public FancyRegistration(Delegate handler, Delegate predicate, Type payloadType)
        {
            _handler = handler;
            _predicate = predicate;
            _payloadType = payloadType;
            Token = Guid.NewGuid();
        }

        public Guid Token { get; }

        public bool TryHandle<TIn>(TIn input) where TIn : T
        {
            Type inputType = input.GetType();
            bool shouldHandle = inputType.IsAssignableTo(_payloadType);

            if (!shouldHandle) { return false; }

            if (_predicate is Predicate<TIn> pred && pred(input))
            {
                var handler = (Action<TIn>)_handler;
                handler(input);
                return true;
            }

            if ((bool)_predicate.DynamicInvoke(input))
            {
                _handler.DynamicInvoke(input);
                return true;
            }

            return false;
        }
    }
}

public class MyBaseEvent
{
    public int Number { get; set; }

    public override string ToString() => Number.ToString();
}

public sealed class MyOtherEvent : MyBaseEvent
{
    public string Name { get; set; }

    public override string ToString() => $"{Number.ToString()} | {Name.ToString()}";
}

from easy.messagehub.

MithrilMan avatar MithrilMan commented on July 23, 2024

ok I see thanks, as I thought, even if I'd avoid DynamicInvoke that's very slow, you'd want to use a generic predicate only

Another thing to point out is that this way the consumer is on its own thread: events are executed sequentially but it's no more blocking and so you can't use events to drive the workflow if you want, it may or may not be problem depending your needs, it's fine as the user knows the limitations

what I mean is that you can't do like

class MyEvent{
    public bool Cancel
}

var myEvent = new MyEvent();
eventbus.Publish(myEvent);
if(myEvent.Cancel){
   stop();
}

P.S.
I don't encourage anyway to have events that control flows, even if this way you can have your dynamic pipeline that can manage an object and sometime can be useful

from easy.messagehub.

NimaAra avatar NimaAra commented on July 23, 2024

The performance of DynamicInvoke can easily be addressed I leave that as an excercise for the reader! And I agree that event cancellation as you showed is a bad idea ;-)

from easy.messagehub.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.