Coder Social home page Coder Social logo

louthy / echo-process Goto Github PK

View Code? Open in Web Editor NEW
116.0 14.0 18.0 2.45 MB

Actor library for C# with additional modules that support persistence to Redis, as well as JS integration

License: MIT License

C# 96.50% JavaScript 3.47% Batchfile 0.03%

echo-process's Introduction

banner

echo

Actor system that works alongside the functional framework Language-Ext

An issue with working with C# is that no matter how much of Language-Ext functional framework you take on-board, you will always end up bumping into mutable state or side-effecting systems. A way around that is to package up the side-effects into atoms of functional computation that are attached to the mutable state (in whatever form it may take). The Actor model + functional message handling expressions are the perfect programming model for that.

Concurrent programming in C# isn't a huge amount of fun. Yes the TPL gets you lots of stuff for free, but it doesn't magically protect you from race conditions or accessing shared state, and definitely doesn't help with accessing shared external state like a database.

Documentation

Documention Description
Overview A quick guide to the core features of the Process system
tell Send a message to a Process - This should be your prefered mechanism for communicating with processes
ask Request/response for processes - use this sparingly.
Publish / Subscribe Mechanism for a Process to publish messages and state. Other processes can subscribe through their inbox or external systems can subscribe through Reactive streams (Observables).
Message dispatch The power of any actor system, especially when it comes to a changing network topology is in its message routing and dispatching
ProcessId Process address/location mechansim
Routers A router is a Process that manage sets of 'worker' processes by routing the received messages, following pre-defined behaviours, e.g. Round-robin, broadcast, etc.
Dispatchers Similar to routers but without the need for a router process, all routing is done by the sender
Registered processes A sort of DNS for Processes, can also register dispatchers
Roles A special type of dispatcher that's aware of the aliveness of cluster nodes and what their roles are

Getting started

Make sure you have the Echo.Process DLL included in your project. If you're using F# then you will also need to include Echo.Process.FSharp.

In C# you should be using static Echo.Process, if you're not using C# 6, just prefix all functions in the examples below with Process.

If you want to use it with Redis, include Echo.Process.Redis.dll. To connect to Redis use:

    // C#
    RedisCluster.register();
    ProcessConfig.initialise("sys", "web-front-end", "web-front-end-1", "localhost", "0");
  • "sys" is the 'system name', but easier to think of it as the name of the cluster as a whole. That means you can use a different value to point it at another Redis db (for multiple clusters). But for now it's easier to call it sys and leave it.
  • "web-front-end" is the role, you can have multiple nodes in a role and the role based dispatchers allow you to implement high-availability and load balancing strategies.
  • "web-front-end-1" is the name of this node, and should be unique in the cluster
  • "localhost" is the Redis connection (can be comma separated for multiple Redis nodes)
  • "0" is the Redis catalogue to use ("0" - "15")

Note, neither of those lines are needed if you're doing in-app messaging only.

Nuget

Nu-get package Description
LanguageExt.Core All of the core types and functional 'prelude'. This is the core framework that the Echo library is based upon.
Echo.Process 'Erlang like' actor system for in-app messaging and massive concurrency
Echo.Process.Redis Cluster support for the Echo.Process system for cluster aware processes using Redis for queue and state persistence
Echo.Process.Owin WebSockets gateway into the Echo.Process system. Uses Owin to register the WebSockets handler.
Echo.ProcessJS Javascript API to the Echo.Process system. Supports running of Processes in a client browser, with hooks for two-way UI binding

echo-process's People

Contributors

abbasfaisal avatar dependabot[bot] avatar louthy avatar meddbase-steve avatar productiverage avatar stanjav avatar stefanbertels avatar tanxeel90 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

echo-process's Issues

ProcessConfig.initialise raises TypeLoadException

Hi Paul,

I'm just trying out new echo-process. I try to build a small local actor system.

If I call ProcessConfig.initialise(); in Main (console-app), I get a runtime exception:

System.TypeLoadException: 'Could not load type '___ParserExt' from assembly 'LanguageExt.Parsec, Version=2.0.0.0, Culture=neutral, PublicKeyToken=null'.'

  <package id="Echo.Process" version="2.0.2-alpha" targetFramework="net452" />
  <package id="LanguageExt.Core" version="2.0.26-beta" targetFramework="net452" />
  <package id="LanguageExt.Parsec" version="2.0.26-beta" targetFramework="net452" />

I tried using some older LanguageExt (alpha-15) because that is used in echo-process (source code) but this results in another exception (ArgumentNullException).

error handling question

Hi,

what's the recommendation for error handling?

I have inbox actions that might detect/run into an error situation (e.g. lost connection to some ressource).
What's the best pattern to handle this?

The connection is opened in Setup function so restart would help. Can/should I restart from inside? What happens to the current message? How should I replay this message? Or should I build some separate "watchman" (maybe related to deadletter)?

I probably would use a typical strategy like logging error, perhaps send mail notification, pausing messages (including current) and after waiting some time restart Process (and unpause message queue). "Waiting some time" => increasing interval

Get some notification during inboxFn when actor gets killed (CancellationToken)

This topic was already discussed in last comments in #23. I created this new issue because the solution there does not work.

Question is: how can I get some information during inboxFn and setupFn telling me that the actor is going to be killed?

Reason: If there are long running operations in setupFn (e.g. slow loading of state) or inboxFn (e.g. slow/big downloads) I want to cancel this if the actor get's killed for whatever reason.

The idea of #23 does not work:

var ctx = new CancellationTokenSource();
using (observeUnsafe<Unit>(Self).Subscribe(_ => { }, () => ctx.Cancel())
{
    // use ctx.Token here;
}

It does not work because kill is waiting for the inboxFn to finish (implemented as solution for #42, which is a good thing).

One quick hack would be to modify Actor.cs like

        public Unit Shutdown(bool maintainState)
        {
            publishSubject.OnCompleted(); // moved above lock

            lock(sync)
            { 

but this isn't clean, either, because inboxFn might publish something and this will get lost, then.

I think the best solution would be to add some CancellationToken to ActorContext so one could get it in the same way like one has access to Self or Parent, now.

What do you think? Ready for a pull request?

Would be no breaking change because if you don't use the CancellationToken nothing would change. And of course the inboxFn would decide what to do on cancellation (throw or execute successfully in some way).

Message type safety / union type

Hi Paul,

I thought about your cache example (readme) and I try to get more type safety. I value the proxy solution because it gives type safety and looks quite clean. But it is OO-style and I like the functional style more.

I tried to build some (poor man's) discriminated union. It works but is quite ugly.
The most ugly part is that when calling tell() I have to explicit convert to Union. Is there anything I can do here? At least the target type is fixed if any actor only receives one type of message...
Or is there any chance to have more than one type of message to the same actor?

Any thoughts on this?

 class Program
    {
        static void Main(string[] args)
        {
            ProcessConfig.initialise();

            UnionTest();

            Console.WriteLine("press key to shutdown...");
            Console.ReadKey();

            shutdownAll();
            Console.WriteLine("shutdownAll finished.");

            Console.WriteLine("end, press key.");
            Console.ReadKey();

        }

        // types just for making a difference (more than pure msg handler signature)
        public class Add { }
        public class Remove { }
        public class Show { }

       private static void UnionTest()
        {
            var pid = spawn("cache", () =>
            {
                Console.WriteLine("create cache");
                return HashSet<int>();
            }
            , multiActor<HashSet<int>, Tuple<Add, int>, Tuple<Remove, int>, Show>((state, msg) =>
            {
                Console.WriteLine($"Add {msg.Item2}");
                return state.Add(msg.Item2);
            }
            , (state, msg) =>
            {
                Console.WriteLine($"Remove {msg.Item2}");
                return state.Remove(msg.Item2);
            }, (state, msg) =>
            {
                Console.WriteLine("Content: " + string.Join(",", state));
               return state;
            }));
           

            Console.WriteLine($"process {pid} spawned");

            pid.Tell((Union<Tuple<Add, int>, Tuple<Remove, int>, Show>)new Show());
            pid.Tell((Union<Tuple<Add, int>, Tuple<Remove, int>, Show>)Tuple(new Add(), 5));
            pid.Tell((Union<Tuple<Add, int>, Tuple<Remove, int>, Show>)new Show());
            pid.Tell((Union<Tuple<Add, int>, Tuple<Remove, int>, Show>)Tuple(new Add(), 3));
            pid.Tell((Union<Tuple<Add, int>, Tuple<Remove, int>, Show>)new Show());
            pid.Tell((Union<Tuple<Add, int>, Tuple<Remove, int>, Show>)Tuple(new Remove(), 5));
            pid.Tell((Union<Tuple<Add, int>, Tuple<Remove, int>, Show>)new Show());
        }

        public class Union<A, B>
        {
            private readonly int _type;
            private A _a;
            private B _b;

            public Union(A a)
            {
                _type = 1;
                _a = a;
            }

            public Union(B b)
            {
                _type = 2;
                _b = b;
            }


            public T Match<T>(Func<A, T> fa, Func<B, T> fb)
            {
                switch (_type)
                {
                    case 1:
                        return fa(_a);
                    case 2:
                        return fb(_b);
                    default:
                        return raiseapp<T>("invalid type");
                }
            }

            public static implicit operator Union<A, B>(A a)
            {
                return new Union<A, B>(a);
            }
            public static implicit operator Union<A, B>(B b)
            {
                return new Union<A, B>(b);
            }
        }

        public class Union<A, B, C>
        {
            private readonly int _type;
            private A _a;
            private B _b;
            private C _c;

            public Union(A a)
            {
                _type = 1;
                _a = a;
            }

            public Union(B b)
            {
                _type = 2;
                _b = b;
            }

            public Union(C c)
            {
                _type = 3;
                _c = c;
            }

            public T Match<T>(Func<A, T> fa, Func<B, T> fb, Func<C, T> fc)
            {
                switch (_type)
                {
                    case 1:
                        return fa(_a);
                    case 2:
                        return fb(_b);
                    case 3:
                        return fc(_c);
                    default:
                        return raiseapp<T>("invalid type");
                }
            }

            public static implicit operator Union<A, B, C>(A a)
            {
                return new Union<A, B, C>(a);
            }
            public static implicit operator Union<A, B, C>(B b)
            {
                return new Union<A, B, C>(b);
            }
            public static implicit operator Union<A, B, C>(C c)
            {
                return new Union<A, B, C>(c);
            }
        }

        public static Func<S, Union<A, B>, S> multiActor<S, A, B>(Func<S, A, S> actOnA, Func<S, B, S> actOnB)
        {
            return (state, a_or_b) => a_or_b.Match(a => actOnA(state, a), b => actOnB(state, b));
        }
        public static Func<S, Union<A, B, C>, S> multiActor<S, A, B, C>(Func<S, A, S> actOnA, Func<S, B, S> actOnB, Func<S, C, S> actOnC)
        {
            return (state, a_or_b) => a_or_b.Match(a => actOnA(state, a), b => actOnB(state, b), c => actOnC(state, c));
        }

Message dropped when actor killed in inboxFn

If actor gets killed during inbox, the message gets lost (i.e. the message will not be passed to dead-letter):

spawn("test", () =>
{
    Console.WriteLine("setup playground");
    var pid = Self;
    return Observable.Interval(15 * seconds)
                     .Subscribe(i =>
                     {
                         Console.WriteLine("timer tell...");
                         tell(pid, unit);
                     }, () => Console.WriteLine("timer completed!"));
},
(IDisposable state, Unit _) =>
{
    Console.WriteLine("inbox is running");
    kill();
    return state;
});

I expected the message to arrive at dead-letter so I can at least log the fact that some inboxFn was partly executed.

a) Feature?
b) small bug?
c) don't use kill if you want consistence / clean end of an actor

Consistence for me means: a message is only dropped when it was processed successfully or it is passed elsewhere by a MessageDirective or the system crashes.

Cluster out of synch, reads null metadata when using dispatcher

Hi Paul, I've got an exception call stack here and what I think may be a cause.

The exception is occuring during a 'tell' to a dispatcher, and its ultimately coming from Json deserialiser because it's getting a null value:

at Newtonsoft.Json.JsonConvert.DeserializeObject(String value, Type type, JsonSerializerSettings settings) at 
Newtonsoft.Json.JsonConvert.DeserializeObject[T](String value, JsonSerializerSettings settings) at 
LanguageExt.RedisClusterImpl.<>c__DisplayClass25_0`1.<GetValue>b__0() at 
LanguageExt.RedisClusterImpl.Retry[T](Func`1 f) at 
LanguageExt.ActorDispatchRemote.ValidateMessageType(Object message, ProcessId sender) at 
LanguageExt.ActorDispatchRemote.TellNoIO(Object message, ProcessId sender, String inbox, Type type, TagSpec tag) at 
LanguageExt.ActorDispatchRemote.<>c__DisplayClass13_0.<Tell>b__0() at 
LanguageExt.ProcessOp.IO(Func`1 op) at 
LanguageExt.ActorDispatchRemote.Tell(Object message, ProcessId sender, TagSpec tag) at 
LanguageExt.ActorDispatchGroup.<>c__DisplayClass18_0.<Tell>b__0(IActorDispatch d) at LanguageExt.List.iter[T](IEnumerable`1 list, Action`1 action) at 
LanguageExt.ActorDispatchGroup.Tell(Object message, ProcessId sender, TagSpec tag) at 
LanguageExt.Process.tell[T](ProcessId pid, T message, ProcessId sender) 

In ValidateMessageType, it first checks if a process exists and then calls GetValue. It seems like it must be possible for a process to be in the cluster members list, but its metadata does not exist in redis.

Ideally (I think) if this situation occurs the cluster members list would get 'repaired' by removing the dead member.

Idea: Use Source Generators to rewrite LINQ expressions

This has been going around my head for a while, I'm not sure if it's possible, but want to get it written down just in case.

Problem: LINQ expressions come with lambda allocation and contextual type allocation costs, this means their performance isn't as good as writing imperative code.

Possible solution: Use Source Generators to find LINQ expressions that evaluate to known language-ext monadic types. Replace the LINQ expression with an 'unrolled' version that runs imperatively. It won't remove all lambda usage, but would significantly reduce it.

State disposed during inboxFn when asked

Issue similar to #42 but when actor is asked.

Can be reproduced with:

using System.Reactive.Disposables;

var actor2 = spawn<BooleanDisposable, long>("actor2", () => new BooleanDisposable(), (state, msg) =>
                {
                    using (observeUnsafe<Unit>(Self).Subscribe(_ => { }, () => { Console.WriteLine("subject completed in actor2"); }))
                    {
                        Console.WriteLine($"actor2: Processing msg: {msg}, state.IsDisposed={state.IsDisposed}");
                        Task.Delay(5 * seconds).Wait();
                        reply("Answer from actor2");
                        Console.WriteLine($"actor2: Answered and finished processing msg: {msg}, state.IsDisposed={state.IsDisposed}");
                    }


                    return state;
                });

                //ASK
                Observable.Timer(1 * seconds).Subscribe(_ =>
                {
                    Console.WriteLine($"Telling actor2 msg: {_}");
                    var answer = ask<string>(actor2, _);
                    Console.WriteLine($"Answer from actor2: {answer}");
                });

                Observable.Timer(2 * seconds).Subscribe(_ =>
                {
                    Console.WriteLine("Killing actor2...");
                    kill(actor2);
                    Console.WriteLine("Killed actor2...");
                });

State diposed during inboxFn

(I added this as comment in #39, but I think it wasn't fixed (at least not by my PR) so here as separate issue.)

I think there is a problem with lifetime of state if state object is IDisposable.

If the actor gets stopped, the state will be disposed -- regardless whether inboxFn is still running.
At least this will result in an error when accessing the state after Dispose has been called on it.

To solve this issue I think about two workarounds:

a) lock some internal object in dispose and during processing of inboxFn so shutting down an actor will wait for the inboxFn to finish (force clean shutdown, even if this will block for some time).

b) if inboxFn throws errors check whether state was disposed (track this) and handle situation in a special way (e.g. always ForwardToSelf with ProcessDirective.None). Might catch errors that aren't caused by Disposing.

Both solutions have pro and cons. @louthy: I'm interested in your point of view / how the library should in general handle this. The current implementation just sees an error and will cause normal Strategy handling. This is suboptimal because if an error occurs I might restart Parent actor which will restart siblings of Self (other children of Parent) and they might run into state-disposed-issue which will itself result in error handling / Strategy processing.

I'm not sure but currently I think the library should go way a).

Test code:

 [Collection("Actor")] // https://xunit.github.io/docs/running-tests-in-parallel
    public static class SafeDisposableStateTests
    {
        [Fact]
        static void Test()
        {
            ProcessConfig.initialise();
            var pid = spawn("test", () =>
                {
                    Console.WriteLine($"{DateTime.Now} created state!");
                    return Disposable.Create(() => Console.WriteLine($"{DateTime.Now} diposed state!"));
                },
                (IDisposable _, string msg) =>
                {
                    Console.WriteLine($"{DateTime.Now} {msg} inboxFn 1");
                    Thread.Sleep(2000);
                    Console.WriteLine($"{DateTime.Now} {msg} inboxFn 2");
                    return _;
                });

            tell(pid, "bla", 1 * second);
            restart(pid);

            Thread.Sleep(2000);
            restart(pid);
            Thread.Sleep(2000);
            Process.shutdownAll();
        }
    }

How to use it within an ASP.NET app?

I'm thinking about using echo process as a replacement to Mediatr. This may not even be a good idea. However, I'm not sure how it would work within an action method. I know I could use ask but I'm not sure how to scale it. I couldn't find any samples around this.

Thanks in advance.

Still beta?

The version string for this project says that it is beta.
One issue this causes is with NuGet. To find this project, you have to check the "Include prerelease" box.
Is this project still beta?
If so, what are the outstanding questions?
If not, should I create a PR to update the version string?

How to stop an actor (restartable)

I'm looking for a way to stop an actor in a way I can start it again later using something like startup.

If I call shutdown the actor get's destroyed (e.g. actor subjects) and I would have to spawn a new one.
I can restart an actor but that would not delay.
I can pause an actor (and use restartlater) but that probably will only pause actor's inbox. But I want to kill all children and dispose actor's state.

Is there some other option? Ideally somehow resetting actor to the same state it would be in when freshly spawned with Lazy:true.

System.Exception: 'observe' should not be be used from within a process' message loop.

When creating some actor hierarchy I get this error message:

System.Exception: 'observe' should not be be used from within a process' message loop.

Reason: I call observe when initalising state (SetupFn).

Why is observe a problem there (it's not in inboxFn)?
Can we allow this (change this)?

The reason I want to do this is that I start some actors and wire them together using tell/ask/publish/observe. All these actors are childs of some "container" actor.

I just wanted to make those actors child of the container because I setup an error Strategy that should apply to all children. Spawning (and wiring) the actors during Setup does this. As an extra I can restart the whole subset by just restarting the container actor.

Wiki Error Handling Docs Issues

Code changes have been made that make Error-handling out f date.

Process.observe(Process.DeadLetters).Subscribe(Console.WriteLine);

Should be something like this:

Process.observe<object>(Process.DeadLetters()).Subscribe(Console.WriteLine);

void vs. Unit return type

This is maybe more related to LanguageExt than Echo.Process but I wonder why you have many Actions (return type void) in your echo code. I'm asking because I'm not sure whether I should replace every void by Unit in my code.

I know act/fun will always convert one to each other, so I guess this is more a style question.

So what's the reason for e.g. Process.logWarn returning void and Process.publish returning Unit? Do you have a rule of thumb here in echo-process?

Block instead of error when mailbox full?

Is there a way to block on tell when posting a message to a full mailbox instead of throwing an error?

I want to use an actor for slow background processes. TPL has an ActionBlock where you can set BoundedCapacity to reduce memory usage due to slow processing.

Feature request: Inbox pre/post hook functions

It would be very useful for profiling/diagnostics if I could set a pair of callbacks that are invoked before/after every setup and inbox call. Something like:

Echo.Process.PreInboxHook = (ProcessId pid, Option<object> msg) => Stopwatch.StartNew();

Echo.Process.PostInboxHook = (object state, ProcessId pid, Option<object> msg) => { ((StopWatch)state).Stop(); // now do some logging }

Question: LifeTime / order of spawning

Hi,

is there some documentation / information regarding lifetime / activity of actors?

Can I wire up actors in a way that makes sure I don't lose a message even if the receiving actor is spawned after producing actor?

Example:

  1. spawn some "producing" actor
  2. spawn some "trigger" actor (think of timer / FileSystemwatcher)
  3. spawn some "receiving" actor

After spawning trigger actor this trigger actor might immediately start by telling the producer which itself starts publishing valuable messages. If this happends before 3 I might lose the latter.

Flow: trigger ==(tell)==> producer ==(publish)==> receiver(s)

I don't want to spawn trigger actor after receiving actor (swap 2 and 3) because producer and trigger will be spawned by the same function. Trigger should be just some child / helper actor for producer actor.

I could spawn "receiving" actor first (3 before 1+2). But I could not manage to wire producer and receiver this way. subscribe and observe require a valid ProcessId.

I want to avoid some (quick&dirty) timing solution like starting trigger message with a delay.

Internal Exception Response on ask

Paul,

I really enjoy both Language-Ext and Echo-Process. It's changed the way I code and how I think about solving problems. I do have a possible issue with some exception handling.

When asking an inbox that doesn't exist in the process system, internally the ProcessException is thrown saying as much but that exception is never returned to the ask. The ask only fails after a timeout has occurred.

In the below case RaisesExceptionsOnAskWithInboxException, when receiving an exception from an inbox, the exception is sent back right away and is available via the inner exception on the ProcessException.

In the case DoesntRaiseExceptionOnAskWithNoInbox , the only exception returned is a system timeout exception. I would expect to fail fast in this scenario. As soon as it's know there is no registered inbox of that name, and exception should be returned.

While the end result is the same (being that an exception is raised), we do have to wait that additional time for the exception to bubble up. In our application, we are forced to set the timeout a bit higher due to legacy API calls and their high failure frequency. We would also like to treat a timeout exception differently than an internal exception when tackling our management strategies.

I am using Xunit as my test runner.

    public class AskFixture : IDisposable
    {
        public AskFixture()
        {
            initialiseFileSystem();
            var command = Router.leastBusy
                (
                    Name: "cmd",
                    Count: 5,
                    Inbox: (string msg) =>
                    {
                        throw new Exception("Hello");
                    }

                );
            register(command.Name, command);
            subscribe<Exception>(Errors(), e => raise<Unit>(e));
        }
        public void Dispose() => shutdownAll();
    }


    public class AskError : IClassFixture<AskFixture>, IDisposable
    {
        public readonly ITestOutputHelper Output;
        public readonly IReadOnlyCollection<IDisposable> Trash;

        public AskError()
        {
            Trash = List(
                subscribe<Exception>(Errors(), e => 
                  Output.WriteLine($"{e.GetType().ToString()} {e.Message}"))
            );
        }

        public void Dispose()
        {
            Trash.Iter(t => t.Dispose());
        }

        [Fact]
        public void DoesntRaiseExceptionOnAskWithNoInbox()
        {
            Assert.Throws<Echo.ProcessException>(() => ask<string>("@test", "You there?"));
        }

        [Fact]
        public void RaisesExceptionsOnAskWithInboxException()
        {
            Assert.Throws<Echo.ProcessException>(() => ask<string>("@cmd", "You there?"));
        }
    }

Proxy variant with IDisposable?

Hi Paul,

I'm just checking some options for better type safety (see next issue...) and I'm trying the proxy variant because the non-OO style is not that type-safe for multi-message actors.

The following code works without Dispose/IDisposable, but raises a runtime exception in spawnotherwise:

System.TypeLoadException: 'Method 'Dispose' in type 'ProcessTest.IMyStateProxy' from assembly 'ProcessProxies, Version=0.0.0.0, Culture=neutral, PublicKeyToken=null' does not have an implementation.'

public interface IMyState : IDisposable
    {
        void Add(int num);
        void Remove(int num);
        void Show();

    }
    class Program
    {
        static void Main(string[] args)
        {
            ProcessConfig.initialise();

            ProxyTest();

            Console.ReadKey();

            shutdownAll();

            Console.WriteLine("end, press key.");
            Console.ReadKey();

        }


        public class MyState: IMyState
        {
            HashSet<int> _state = HashSet<int>();

            public MyState()
            {
                Console.WriteLine("Setup called");
            }

            public void Add(int num)
            {
                Console.WriteLine($"add {num}...");
                _state = _state.Add(num);
            }

            public void Dispose()
            {
                Console.WriteLine($"Dispose");
            }

            public void Remove(int num)
            {
                Console.WriteLine($"Remove {num}...");
                _state = _state.Remove(num);
            }

            public void Show()
            {
                Console.WriteLine("content: " + string.Join(",", _state));
            }

         
        }

        private static void ProxyTest()
        {
            IMyState state = spawn<IMyState>("mystate", () => new MyState());

            state.Add(5);
            state.Add(3);
            state.Show();
            state.Remove(5);
            state.Show();
        }

Question: Where is FSharp wrapper?

In the docs there is a reference to an Echo.Process.FSharp but I can't find any other references or packages.

Doing something like this at the moment but not sure what types of trouble I am getting myself into.

let init = Echo.ProcessConfig.initialise()

let inline spawn<'a> (name:string) (f:'a -> unit) (terminated:(ProcessId -> unit) option) = 
    let flags = ProcessFlags.Default
    let strategy = Strategy.OneForOne(Strategy.Always(Directive.Restart))
    let mailboxSize = -1
    let t = Action<ProcessId>(defaultArg terminated (fun _ -> ()))
    let action = Action<'a> f
    Process.spawn<'a>(ProcessName(name), action, flags, strategy, mailboxSize, t)

let publish msg = Process.publish msg |> FSharp.fs

let tell<'a> (pid:ProcessId) (msg:'a) (sender: ProcessId option) =
    let s = defaultArg sender ProcessId.None
    Process.tell<'a>(pid, msg, s)

let tellAt<'a> (pid:ProcessId) (msg:'a) (dt:DateTime) (sender: ProcessId option) =
    let s = defaultArg sender ProcessId.None
    Process.tell<'a>(pid, msg, dt, s)

let tellLater<'a> (pid:ProcessId) (msg:'a) (t:TimeSpan) (sender: ProcessId option) =
    let s = defaultArg sender ProcessId.None
    Process.tell<'a>(pid, msg, t, s)

Zombie state: Race condition on shutdown, actor state will resurrect

There is probably a bug in synchronisation of system messages that dipose state (e.g. kill) and user messages.

Run this code to see the problem:

var test = spawn("test", () =>
    {
            Console.WriteLine("setup playground");
            var pid = Self;
            return Observable.Interval(4 * seconds)
                             .Subscribe(i =>
                             {
                                 Console.WriteLine("timer tell..." + i);
                                 try
                                 {
                                     tell(pid, i);
                                 }
                                 catch
                                 {

                                 }
                             }, () => Console.WriteLine("timer completed!"));
        
    },
    (IDisposable state, long nr) =>
    {
        Console.WriteLine("inbox is running for " + nr);
        Task.Delay(5 * seconds).Wait();
        Console.WriteLine("inbox is ready for " + nr);
        return state;
    });

Task.Delay(15* seconds).Wait();
Console.WriteLine("kill...");
kill(test);
Console.WriteLine("killed");
Task.Delay(60* seconds).Wait();

I think this is what happens:

When kill is called the actor processes some user inbox message m1 and another user inbox message m2 is ready for processing (maybe waiting at lock(sync) in Actor.cs ProcessMessage).

The kill message will lock(sync) and enter the locked section before m2 can. It will shutdown the actor, including diposing state.
After that m2 is still processed and in Actors.cs in function ProcessMessage GetState() gets called that will re-run setupFn because state is None.

  1. SetupFn must not run twice.
  2. If possible m2 should not be processed at all (might better stay in inbox queue).

I think point 2 is hard because ProcessMessage might run already. But maybe this isn't relevant because local inbox gets dropped anyway on kill. But if we have some stop method that diposes state while inbox stays alive this might be bad (compare #44).

Point 1 maybe could be solved by checking whether kill was called (and then somehow cancel ProcessMessage). Could use the same CancellationToken I requested in #47...

Probably buggy strategy ForwardToSelf / Restart

After doing some more tests I think there is something buggy with retry strategy:

 static void Main(string[] args)
        {
            ProcessConfig.initialise();

            spawn<Unit, Unit>("main", Setup: () =>
                {
                    spawn<string>("buggy", Inbox: _ =>
                    {
                        Console.WriteLine(DateTime.UtcNow + ": " + _);
                        throw new Exception(_);
                    });
                    return unit;
                },
                Inbox: (_, __) => _,
                
                Strategy: OneForOne(
                    Retries(Count: 5),
                    Backoff(Min: 2 * seconds, Max: 100 * seconds, Step: 0 * minute),
                    Always(Directive.Restart), // alternative: Resume
                    Redirect(MessageDirective.ForwardToSelf)) // alternative: StayInQueue
            );

            Console.WriteLine("actor runs");
            Task.Delay(2 * seconds).ConfigureAwait(false).GetAwaiter().GetResult();
            tell("/root/user/main/buggy", "hello");

            Console.WriteLine("ready for shutdown.");
            Console.ReadKey();
            Process.shutdownAll();
        }

Current guess: there might be an ordering problem in Actor.cs in RunStrategy at:

decision.Affects.Iter(p => pause(p));
                            safedelay(
                                () => RunProcessDirective(pid, sender, ex, message, decision),
                                decision.Pause
                            );
                            return InboxDirective.Pause | RunMessageDirective(pid, sender, decision, ex, message);

At least this should pass the faulty message to the inbox before restarting the actor (due to Pause > 0) -- which might not be desired: first restart, i.e. repair, actor and after_that re-process msg.

Anyway the code above does not what I expected regardless this order. I got two outputs directly one ofter the other and no more retries laster:

actor runs
ready for shutdown.
13.08.2018 09:38:15: hello
13.08.2018 09:38:15: hello

Issues from recent refactor

The last stable version is 2.1.7-beta - I am working through some bugs brought about by the recent refactor. I'll close this once I have resolved them

setup method called twice

2.0.6-alpha seems to call setup method twice:

        private static void SimpleTest()
        {
            var pid = spawn<Option<DateTime>, string>("watcher", () =>
            {
                Console.WriteLine("Setup process");
                return None;
            }
            , (state, msg) =>
            {
                Console.WriteLine("Received " + msg);
                return DateTime.UtcNow;
            });

            Console.WriteLine($"process {pid} spawned");

            pid.Tell("hello world");
        }

        static void Main(string[] args)
        {
            ProcessConfig.initialise();

            SimpleTest();

            Console.ReadKey();

            shutdownAll();

            Console.WriteLine("end, press key.");
            Console.ReadKey();

        }

Master branch renamed to Main

I have renamed the master branch to main. Whilst there may be some debate about whether master in git has any connotation or relation to master/slave, I'd prefer this project stood on the side of history that doesn't think using terms that are potentially loaded is ok.

If someone has a local clone, you can update your locals like this:

$ git checkout master
$ git branch -m master main
$ git fetch
$ git branch --unset-upstream
$ git branch -u origin/main
$ git symbolic-ref refs/remotes/origin/HEAD refs/remotes/origin/main

For open PRs I will see how easy it in to merge everything when I get to them, but I may just ask you to re-push to main.

Default value on cluster not preserved in copy

When using more than one cluster, it is noticeable that the Default value is not respected from the cluster.conf file.

This looks to be because the copy constructor doesn't copy it.

null reference error when disposing system

There is a bug in shutdown. I could not build a small example reproducing this yet, but I could find the bug:

In https://github.com/louthy/echo-process/blob/master/Echo.Process/ActorSys/ActorSystem.cs#L173 the rootItem is set to null but it is used later via https://github.com/louthy/echo-process/blob/master/Echo.Process/ActorSys/ActorSystem.cs#L196 which calls https://github.com/louthy/echo-process/blob/master/Echo.Process/ActorSys/ActorSystem.cs#L789 and there rootItem will be passed to handle children shutdown.

I had a situation where this resulted somehow in a dead lock (but could not find out why) when I called shutdownAll during actor setup.

Current performance state compared to Akka.NET and Orleans

Hi @louthy,

This project looks really great! Kudos for this great idea for actor model with an FP API! I might be faced with a development story where I could leverage the actor model to see the performance gains in my system. As of now, I see Akka.NET and Microsoft's Orleans are the two main things for concurrency on the MSFT stack but I'd like to know, what's the current state of echo-process?

I've already included LanguageExt.Core in our toolchain and this could be a smooth add-on to the system if the performance does the job from the echo side of things!

Lifetime of Inbox queue

I have a local actor with slow inbox function.
My main / outer method (which spawns the actor) tells it a larger number of message objects.

Currently when shutting down this actor or the whole process system unprocessed messages get lost / do not get processed.

Is there a way to wait for this or all actors to process any waiting messages? I mean I'm in a similar state when calling OnCompleted on some Subject (Rx) and my actor is something like the subscription functions, so there should be some option to shutdown after processing the remaining messages (shutdownAfterInboxEmpty). I guess this might be more tricky to implement for shutdownAll -- for me shutting down some specific actors would work fine.

Missing dependency on System.Reactive.Linq?

Hi, I've created a console project in VS 2017 and added the echo.process 2.0.27-beta nuget package. The code below throws:

System.IO.FileNotFoundException: 'Could not load file or assembly 'System.Reactive.Linq, Version=3.0.1000.0, Culture=neutral, PublicKeyToken=94bc3704cddfc263' 

because it cannot load the assembly System.Reactive.Linq

using Echo;
using System.Data;
using static Echo.Process;

namespace ep_datatable_bug
{
    class Program
    {
        static void Main(string[] args)
        {
            ProcessConfig.initialise();
        }
    }
}

RedisClusterImpl.QueryProcessMetaData duplicate key error

I was seeing the following exception repeatedly when attempting to query process metadata in redis (stack trace at bottom):

An element with the same key already exists in the Map

I can't immediately see how this is possible, but it looks like under some circumstances:

  • QueryKeys can return duplicates

or

  • QueryKeys can return keys that are different, but equivalent in the ProcessId comparison

Additional info:

This started happening after the server was rebooted.
A recycle of the app pool fixed it.

LanguageExt.MapModule.Add[OrdK,K,V](MapItem`2 node, K key, V value) at 
LanguageExt.MapModule.Add[OrdK,K,V](MapItem`2 node, K key, V value) at 
LanguageExt.MapModule.Add[OrdK,K,V](MapItem`2 node, K key, V value) at 
LanguageExt.MapInternal`3.AddRange(IEnumerable`1 range) at 
Echo.RedisClusterImpl.QueryProcessMetaData(String keyQuery) at 
LanguageExt.ClassInstances.FOption`2.<>c__DisplayClass2_0.<Map>b__0(A a) at 
LanguageExt.ClassInstances.MOption`1.Bind[MonadB,MB,B](Option`1 ma, Func`2 f) at 
LanguageExt.ClassInstances.FOption`2.Map(Option`1 ma, Func`2 f) at 
LanguageExt.Option`1.Map[B](Func`2 f) at
Echo.Process.queryProcessMetaData(String keyQuery, SystemName system) 

Interfaces should not implement IDisposable

On this line of code, IActor implements IDisposable. Mark Seemann, an expert in dependency injection and author of Dependency Injection in .NET, says that an interface should not implement IDisposable. Here is the relevant text.

Concrete types can implement IDisposable, but interfaces should not, since IDisposable is an implementation detail. (I do realize that some interfaces defined by the .NET Base Class Library derives from IDisposable, but I consider those Leaky Abstractions. As Nicholas Blumhardt put it: "an interface [...] generally shouldn't be disposable. There's no way for the one defining an interface to foresee all possible implementations of it - you can always come up with a disposable implementation of practically any interface.")

And indeed you have the class NullProcess that implements IActor but doesn't need to do any disposing.

Dead lock on shutdown

I found a dead lock bug but currently have no clear idea how to fix this properly.

Here is a reproducing test:

public void SystemShutdownDuringActorSetup()
            {
                Echo.ProcessConfig.initialise();
                {
                    var actor = spawn("test", () =>
                    {
                        Thread.Sleep(1 * seconds);
                        throw new Exception();
                        return Disposable.Empty;
                    });
                    Thread.Sleep(0.5 * seconds);
                    shutdownAll();
                }
                // will not arrive here
            }

The problem is this line:

https://github.com/louthy/echo-process/blob/master/Echo.Process/ActorSys/Actor.cs#L1016

Parent.Actor.UnlinkChild will call lock(sync) of the parent of the test actor. Test actor itself is already locked. This means: child is locked before parent. Situation is a result of the exception thrown in setup function, which calls RunStrategy which might call shutdown (Directive.Stop).

The parallel shutdownAll will lock actors in this order: First the parent is locked and then the children will be locked (hierarchical shutdown).

Wiki Process-System Docs Issues

There are a couple issues in the docs on this page.

Function Doesn't Exist

In the wiki Process-system page Discoverability section section:

The docs seem to refer to code that doesn't exist anymore:

register(myProcessId, "hello-world");

It looks like there are two overloads as of 2020-12-04 for register:

ProcessId Process.register(ProcessName name, SystemName system = default(SystemName))
ProcessId Process.register(ProcessName name, ProcessId process)

So I think the change would be:

register("hello-world", myProcessId);

Fix Link

In the wiki Process-system page Discoverability section section there's a broken link.

Along with routers, dispatchers and roles the ability to find, route and dispatch to other nodes in the cluster is trivial. For a full discussion on routing, roles and dispatchers see here

The "see here" link is pointing to the language-ext wiki, not the echo-process wiki

Name Change

In the wiki Process-system page Discoverability section section:

Both of these text snippets are referencing 'my-stuff', but that doesn't exist in the context.

Then instead of having root as the top level Process in your hierarchy, you have my-stuff:
Your process now has two addresses, the /my-stuff/user/hello-world address and the /disp/reg/hello-world address that anyone can find by calling find("hello-world"). 

Could spawn return a "type-safe" process item (ProcessId)?

Is there a reason for having ProcessId typeless?

I guess this might have something todo with generic handling of processes but isn't something like this more typesafe:

public static Unit tell<T>(ProcessId<T> pid, T message, ProcessId sender = default(ProcessId))

public static ProcessId<T> spawn<T>(ProcessName Name, Action<T> Inbox, ...)

Together with making ProcessId implement some (typeless) IProcessId or just allow casting ProcessId to ProcessId this might be compatible with the existing typeless ProcessId system.

(Of course ProcessId could be named ProcessIdTypesafe or anything else.)

So we could have typesafe wiring of actors:

var actorHandler = spawn<string>("my actor", Console.WriteLine);
tell(actorHandler , "hello world"); // compiles
tell(actorHandler, 123); // compile time error

ProcessLogItem (initialise/shutdownAll timing?)

I got

ProcessLogItem: 13:43.50.690 Error
You must call one of the  ProcessConfig.initialiseXXX functions

   at Echo.ActorContext.get_DefaultSystem()
   at Echo.ActorContext.System(SystemName system)
   at Echo.ActorContext.System(ProcessId pid)
   at Echo.LocalScheduler.ProcessActions()

with this code:

 static void Main(string[] args)
        {
            ProcessSystemLog.Subscribe(processLogItem => Console.WriteLine($"ProcessLogItem: {processLogItem}"));
            ProcessConfig.initialise();
            // var dummy = spawn<Unit,string>("bla", () => unit, (state, inbox) => state);
            Process.shutdownAll();
            Console.ReadKey();
        }

Probably minor issue. Found this because of a somewhat related issue (#56).

I thought subscribing to ProcessSystemLog is a good idea to catch any hidden bugs (like exceptions in actors' dispose functions). Should ProessSystemLog be a reliable source (i.e. error items are indicators of real errors)?

Latest echo + latest lang-ext: LanguageExt.Parsec.Prim.choice not found

Latest echo processes does not work with latest lang-ext 3.0.19

[MissingMethodException: Method not found: 'LanguageExt.Parsec.Parser`1<!!0> LanguageExt.Parsec.Prim.choice(System.Collections.Generic.IEnumerable`1<LanguageExt.Parsec.Parser`1<!!0>>)'.]
   Echo.Config.ProcessSystemConfigParser..ctor(String nodeName, Types typeDefs, IEnumerable`1 strategyFuncs) +0
   Echo.ProcessConfig.initialise(String configText, Option`1 nodeName, Action setup, IEnumerable`1 strategyFuncs) +332
   Echo.ProcessConfig.initialiseFileSystem(String nodeName, Action setup, IEnumerable`1 strategyFuncs, String appPath) +352
   Echo.ProcessConfig.initialiseWeb(Action setup, IEnumerable`1 strategyFuncs) +269
   MedDBase.ReferralSystemService.<>c.<.cctor>b__15_0() in D:\Git\referral-service\MedDBase.ReferralSystemService\Global.asax.cs:23
   System.Lazy`1.CreateValue() +520
   System.Lazy`1.LazyInitValue() +315
   MedDBase.ReferralSystemService.Global.Application_BeginRequest(Object sender, EventArgs e) in D:\Git\referral-service\MedDBase.ReferralSystemService\Global.asax.cs:57
   System.Web.SyncEventExecutionStep.System.Web.HttpApplication.IExecutionStep.Execute() +223
   System.Web.HttpApplication.ExecuteStepImpl(IExecutionStep step) +213
   System.Web.HttpApplication.ExecuteStep(IExecutionStep step, Boolean& completedSynchronously) +91

Do you know what is going on or do you want me to investigate?

Exceptions propogated by 'ask' include stack trace in message

If an inbox throws an exception then it seems like the ask ing process gets an exception where the original stack trace is appended to the message. In some scenarios it might be 'helpful' (relatively) to surface the message without exposing the stack trace but currently this would be hard.

Could the stack trace perhaps be included as exception data instead?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.