Coder Social home page Coder Social logo

durabletask's Introduction

Durable Task Framework

The Durable Task Framework (DTFx) is a library that allows users to write long running persistent workflows (referred to as orchestrations) in C# using simple async/await coding constructs. It is used heavily within various teams at Microsoft to reliably orchestrate long running provisioning, monitoring, and management operations. The orchestrations scale out linearly by simply adding more worker machines. This framework is also used to power the serverless Durable Functions extension of Azure Functions.

By open sourcing this project we hope to give the community a very cost-effective alternative to heavy duty workflow systems. We also hope to build an ecosystem of providers and activities around this simple yet incredibly powerful framework.

This project has adopted the Microsoft Open Source Code of Conduct.

For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

Supported persistance stores

Starting in v2.x, the Durable Task Framework supports an extensible set of backend persistence stores. Each store can be enabled using a different NuGet package. The latest version of all packages are signed and available for download at nuget.org.

Package Latest Version Details Development Status
DurableTask.ServiceBus NuGet Orchestration message and runtime state is stored in Service Bus queues while tracking state is stored in Azure Storage. The strength of this provider is its maturity and transactional consistency. However, it is no longer in active development at Microsoft. Production ready but not actively maintained
DurableTask.AzureStorage NuGet All orchestration state is stored in Azure Storage queues, tables, and blobs. The strength of this provider is the minimal service dependencies, high efficiency, and rich feature-set. This is the only backend available for Durable Functions. Production ready and actively maintained
DurableTask.AzureServiceFabric NuGet All orchestration state is stored in Azure Service Fabric Reliable Collections. This is an ideal choice if you are hosting your application in Azure Service Fabric and don't want to take on external dependencies for storing state. Production ready and actively maintained
DurableTask.Netherite NuGet An ultra-high performance backend developed by Microsoft Research where state is stored in Azure Event Hubs and Azure Page Blobs using FASTER database technology from Microsoft Research. 👉 GitHub Repo Production ready and actively maintained
DurableTask.SqlServer NuGet All orchestration state is stored in a Microsoft SQL Server or Azure SQL database with indexed tables and stored procedures for direct interaction. 👉 GitHub Repo Production ready and actively maintained
DurableTask.Emulator NuGet This is an in-memory store intended for testing purposes only. It is not designed or recommended for any production workloads. Not actively maintained

The core programming model for the Durable Task Framework is contained in the DurableTask.Core package, which is also under active development.

Learning more

There are several places where you can learn more about this framework. Note that some are external and not owned by Microsoft:

Development Notes

To run unit tests, you must specify your Service Bus connection string for the tests to use. You can do this via the ServiceBusConnectionString app.config value in the test project, or by defining a DurableTaskTestServiceBusConnectionString environment variable. The benefit of the environment variable is that no temporary source changes are required.

Unit tests also require Azure Storage Emulator, so make sure it's installed and running.

Note: While it's possible to use in tests a real Azure Storage account it is not recommended to do so because many tests will fail with a 409 Conflict error. This is because tests delete and quickly recreate the same storage tables, and Azure Storage doesn't do well in these conditions. If you really want to change Azure Storage connection string you can do so via the StorageConnectionString app.config value in the test project, or by defining a DurableTaskTestStorageConnectionString environment variable.

durabletask's People

Contributors

adarsh1 avatar affandar avatar alextolp avatar amdeel avatar bachuv avatar cgillum avatar chenglongliu avatar connormcmahon avatar davidmrdavid avatar elokaac avatar gfrancomsft avatar gled4er avatar glennamanns avatar jviau avatar kashimiz avatar neetart avatar nytian avatar pradeepkadubandi avatar ramjotsingh avatar saguiitay avatar samarabbas avatar scooletz avatar sebastianburckhardt avatar shankarsama avatar shwetabhartimsft avatar simonporter avatar tkamesh avatar tsuyoshiushio avatar wsugarman avatar zheg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

durabletask's Issues

Orchestration execution goes into infinite loop when decisions.Count == 0

(on vnext branch)

For whatever reason, in TaskOrchestrationDispatcher::OnProcessWorkItemAsync, if the runtime state indicates that it's still processing, but executes no events (aka, decisions.Count == 0), the orchestration never completes (successfully or not). Any new events that come in for that orchestration instance will get lost and never executed as isCompleted is never set to true.

Summary of what may have caused this:

  • We have a long-running orchestration that does ContinueAsNew after a single event is processed
  • An event was being processed by the orchestration, and a new event may have come in just as the orchestration was executing ContinueAsNew command
  • Logging indicates that we did receive the ContinueAsNew with the new state being passed, however the orchestration execution did not complete and clear the runtime state.

To prevent this issue going forward, if decisions.Count == 0, a completed state needs to be determined, whether it's terminal or not is up to the team implementing.

Doesn't work with Service Bus Premium Tier

We recently migrated to Service Bus Premium on Azure and for some reason Durable Task doesn't work with it. It works fine if I switch it back to basic subscription though.
I've checked out the source code and debugged into the framework itself.
Looks like we're getting TimeoutException while fetching work item. Exception details:

System.TimeoutException occurred
  HResult=0x80131505
  Message=A timeout has occurred during the operation. 3ce0641d-9948-41a5-9d33-462840602d83_G2
  Source=Microsoft.ServiceBus
  StackTrace:
   at Microsoft.ServiceBus.Messaging.Sbmp.SbmpQueueClient.OnEndAcceptMessageSession(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.QueueClient.RetryAcceptMessageSessionAsyncResult.<>c.<GetAsyncSteps>b__12_1(RetryAcceptMessageSessionAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.QueueClient.RetryAcceptMessageSessionAsyncResult.End(IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.QueueClient.EndAcceptMessageSession(IAsyncResult result)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   at DurableTask.TrackingDispatcher.<OnFetchWorkItem>d__15.MoveNext() in C:\src\durabletask\Framework\TrackingDispatcher.cs:line 79

Inner Exception 1:
FaultException`1: A timeout has occurred during the operation. 3ce0641d-9948-41a5-9d33-462840602d83_G2

Also we're getting two other types of exceptions:

System.TimeoutException occurred
  HResult=0x80131505
  Message=The request operation did not complete within the allotted timeout of 00:01:09.9994990. The time allotted to this operation may have been a portion of a longer timeout. For more information on exception types and proper exception handling, please refer to http://go.microsoft.com/fwlink/?LinkId=761101 TrackingId:c8bf9166-a324-455e-9b71-b9bfbeaa3d14, Timestamp:3/22/2017 1:16:30 PM
  Source=mscorlib
  StackTrace:
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.DuplexRequestBindingElement.DuplexRequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.RequestAsyncResult.<>c.<GetAsyncSteps>b__9_3(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.RedirectBindingElement.RedirectContainerChannelFactory`1.RedirectContainerSessionChannel.RequestAsyncResult.<>c__DisplayClass8_1.<GetAsyncSteps>b__4(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.RedirectBindingElement.RedirectContainerChannelFactory`1.RedirectContainerSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.RequestAsyncResult.<>c.<GetAsyncSteps>b__9_3(RequestAsyncResult thisPtr, IAsyncResult r)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Channels.ReconnectBindingElement.ReconnectChannelFactory`1.RequestSessionChannel.EndRequest(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.SbmpTransactionalAsyncResult`1.<>c.<GetAsyncSteps>b__18_3(TIteratorAsyncResult thisPtr, IAsyncResult a)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.SbmpMessageSender.EndSendCommand(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.Sbmp.SbmpMessageSender.OnEndSend(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.IteratorAsyncResult`1.StepCallback(IAsyncResult result)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.MessageSender.OnSend(TrackingContext trackingContext, IEnumerable`1 messages, TimeSpan timeout)
   at Microsoft.ServiceBus.Messaging.MessageSender.Send(TrackingContext trackingContext, IEnumerable`1 messages, TimeSpan timeout)
   at DurableTask.TaskOrchestrationDispatcher.<OnProcessWorkItem>b__31_4(MessageContainer m) in C:\src\durabletask\Framework\TaskOrchestrationDispatcher.cs:line 428

and

Microsoft.ServiceBus.Messaging.SessionLockLostException occurred
  HResult=0x80131500
  Message=Channel:uuid:f0f18c6e-bee1-43bf-878f-349069c0a19d;id=6;Link:pe_bee0af0ebdf6486592fdaf2ac53d7e75_87912 TrackingId:6ad89372-54b7-4d8e-b939-ddf405683531_G3_B8, SystemTracker:net.tcp://10.0.0.36:20008/220/Runtime, Timestamp:3/22/2017 1:16:46 PM
  Source=Microsoft.ServiceBus
  StackTrace:
   at Microsoft.ServiceBus.Common.ExceptionExtensions.ThrowException(Exception exception)
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndAbandon(IAsyncResult result)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at DurableTask.TaskOrchestrationDispatcher.<AbortWorkItem>d__44.MoveNext() in C:\src\durabletask\Framework\TaskOrchestrationDispatcher.cs:line 863

Inner Exception 1:
FaultException`1: Channel:uuid:f0f18c6e-bee1-43bf-878f-349069c0a19d;id=6;Link:pe_bee0af0ebdf6486592fdaf2ac53d7e75_87912 TrackingId:6ad89372-54b7-4d8e-b939-ddf405683531_G3_B8, SystemTracker:net.tcp://10.0.0.36:20008/220/Runtime, Timestamp:3/22/2017 1:16:46 PM

Can I provide you with some other details which may help to resolve the problem?

UnobservedTaskException when using task.whenall(durabletask1, durabletask2)

The following usage (and other similar usages):

UnobservedTaskException when using task.whenall(durabletask1, durabletask2)

..can result in an unobservedtaskexception when durabletask1 is completed but durabletask2 is not.

Reason is that since durabletask2 is not completed, the exception property on durabletask1 is not actually looked at. Now the way the dtfx taskorchestrationdispatcher works is by dropping all the outstanding tasks without disposing them. This will cause the durabletask1's finalizer to run which will throw an unobservedtaskexception.

The fix is to observe the exception property for all tasks before they are pushed out of scope.

Parallel initialization of two instances may fail.

The remote server returned an error: (409) Conflict. SubCode=40901. Another conflicting operation is in progress

The issue happens when more than two instances are initializing simultaneously. Both run CreateOrUpdateQueue and then one of them is getting 409.

"Duplicate start event.." gets logged erroneously every time ContinueAsNew() is called

For ContinueAsNew() we emit an ExecutionStartedEvent but also create the new state (with this event already populated in the execution history).

When this event is received, the taskorchestrationdispatcher looks at the current state, finds the execution started event already there and then finds the actual message which also contains an execution started event and ignores the message after tracing the duplicate started event message.

To fix this we can either add a switch on the state itself to indicate whether it was a genuine dupe or whether it was a dupe because of the way we are doing continueasnew. We can also fix continueasnew to not create the state, however that would be a much trickier change at this point.

TaskHubClient.CreateOrchestrationInstanceAsync() called within an orchestration seems to hang

Hi,

So, I'll explain my scenario. I have an orchestration (let's call it ValidationOrchestration) that does some validations based on an event received on the service bus (my application sends a request event on the service bus for another application and is expecting a response at some point in time). If the validations pass, I create another type of orchestration (let's call it ProcessingOrchestration) using TaskClient.CreateOrchestrationInstanceAsync(). I am doing this because I want to seperate the validation code from the processing code. I thought it would work but it seems that the ValidationOrchestration instance is waiting on the ProcessingOrchestration it started to complete. I would be expecting this behaviour with CreateSubOrchestrationInstance. I want the ValidationOrchestration to start the ProcessingOrchestration and then complete. Did I do it right?

Thank you

Add support in RetryOptions to Abort Retries

The RetryOptions class should have a Delegate Property allowing us to abort any Retries, remaining, based on Exception received.

The Exception received could be a Non-Retryable exception and we would not want to Schedule the SubOrchestration again.

Scenario:

  1. Start Orchestration A, which starts SubOrchestration B.
  2. SubOrchestration B, failed due to Non-RetryAble Exception, but their are retries remaining.

NullReferenceException in WorkItemDispatcher

I'm seeing the following exception stack being logged in vnext:

Exception: System.NullReferenceException : Object reference not set to an instance of an object.
at DurableTask.WorkItemDispatcher`1.d__37.MoveNext()

Looking at the tracing around it, it seems to be following this trace pattern:

  1. TaskActivityDispatcher-997e1811751b4af7afa9705f6fa20c5b-0: Exception while processing workItem fc68b8e9ad0844e18f59bc71c817c2b9
    Exception: Microsoft.ServiceBus.Messaging.MessagingException : The service was unable to process the request
  2. Backing off after exception by at least 10 until 5 successful operations
  3. Abandoning message fc68b8e9ad0844e18f59bc71c817c2b9

I believe the issue lies within the ServiceBusOrchestrationService.cs -> AbandonTaskActivityWorkItemAsync method.

return message?.AbandonAsync();

If the message is null, it'd return null as a Task, which cannot be awaited on.

Serialization of object containing Task<object> hangs task scheduling

(On the vnext branch)

When we use the following code, and forget to "await" on the "ExecuteIt" method in AsyncErrorTask, scheduling the completed task hangs in serialization of the event data in JsonDataConverter, line 69.

public class EventData
{
    public object Result { get; set; }
    public string Exception { get; set; }
}

public class AsyncErrorTask : AsyncTaskActivity<String, object>
{
    protected async override Task<object> ExecuteAsync(TaskContext context, string input)
    {
        await Task.Delay(10);

        // FAILING: 
        return ExecuteIt();

        // SUCCEEDING: 
        // return await ExecuteIt();
    }

    private Task<object> ExecuteIt()
    {
        return Task.FromResult<object>("Done");
    }
}

public class AsyncWriteSuccessTask : AsyncTaskActivity<EventData, object>
{
    protected override Task<object> ExecuteAsync(TaskContext context, EventData input)
    {
        Console.WriteLine($"[TASK]\r\n{input.Result}\r\n");
        return Task.FromResult<object>(string.Empty);
    }
}

public class AsyncWriteErrorTask : AsyncTaskActivity<EventData, object>
{
    protected override Task<object> ExecuteAsync(TaskContext context, EventData input)
    {
        Console.WriteLine($"[TASK]\r\n{input.Exception}\r\n");
        return Task.FromResult<object>(string.Empty);
    }
}

public class AsyncErrorOrch : TaskOrchestration<string, object>
{
    public override async Task<string> RunTask(OrchestrationContext context, object input)
    {
        var data = new EventData();

        try
        {
            data.Result = await context.ScheduleTask<object>(typeof(AsyncErrorTask), string.Empty);
            await context.ScheduleTask<object>(typeof (AsyncWriteSuccessTask), data);
            return null;
        }
        catch (Exception ex)
        {
            data.Exception = ex.ToString();
            await context.ScheduleTask<object>(typeof (AsyncWriteErrorTask), data);
            return null;
        }
    }
}

Unit tests fail

Trying to run unit tests and getting a lot of errors:

image

Most of them are 409 Conflict errors from Azure storage. Tried with several different storage accounts, same results.

Looks like in unit tests we don't wait for table deletion operation to be finished:

image

Tracking message for duplicate execution started message

We fixed the suborchestration reporting of duplicated execution started to report failure to parent orchestration. But we need to do the same for Top level orchestrations.
Currently we silently eat duplicate execution started events. Instead we also need to put tracking message for such skipped execution ids and so they can later be queried though instance store.
Also current handling of duplicate orchestration handling needs to be updated to also create tracking messages for this failure along with notifying parent orchestration of execution started failures.

The limit on a durable task activity message return is 192k

I am getting a list of size more than ~200 items in the response from my micro service , and my activity doesn't return the data to orchestration.

If the list size is less, then I was able to process the activity.

Any help on this is appreciated.

Check for null or empty message list from session.ReceiveBatchAsync() before calling session.CompleteBatchAsync()

            …
        newMessages = await Utils.ExecuteWithRetries(() => session.ReceiveBatchAsync(PrefetchCount),
            session.SessionId, "Receive Tracking Message Batch",
            MaxRetriesServiceBus,
            IntervalBetweenRetriesSecs);

            ….
            …

        IEnumerable<Guid> lockTokens = newMessages.Select(m => m.LockToken);
        Utils.SyncExecuteWithRetries<object>(() =>
        {
            session.CompleteBatch(lockTokens);
            return null;
        },

The complete batch throws an error if lockTokens is an empty list.

Unforced TaskHubWorker.StopAsync() is extremely slow (not working correctly)

The implementation of TaskHubWorker.StopAsync() does not appear to be behaving correctly, resulting in extremely slow shutdown sequences.

Here is a minimal repro app:

public static void Run()
{
    const string HubName = "DurableTaskTestHub";
    string servicebusConnectionString = ConfigurationManager.ConnectionStrings["DurableFunctions.ServiceBus"].ConnectionString;
    string storageConnectionString = ConfigurationManager.ConnectionStrings["DurableFunctions.Storage"].ConnectionString;

    var serviceBusOrchestrationService = new ServiceBusOrchestrationService(
        servicebusConnectionString,
        HubName,
        null,
        null,
        new ServiceBusOrchestrationServiceSettings());
    serviceBusOrchestrationService.CreateIfNotExistsAsync().Await();

    Console.WriteLine($"Starting the host.");

    var worker = new TaskHubWorker(serviceBusOrchestrationService);
    worker.AddTaskOrchestrations(typeof(SimpleOrchestration));
    worker.StartAsync().Await();

    Console.WriteLine("Stopping host");

    Stopwatch sw = Stopwatch.StartNew();
    worker.StopAsync().Await();
    Console.WriteLine($"Stopped after {sw}.");

    Console.WriteLine("Press [ENTER] to exit.");
    Console.ReadLine();
}

It seems like a bug in the shutdown logic in WorkItemDispatcher.StopAsync(forced: false). What I observed while debugging is that activeFetchers is always set to 1 even though the system is idle, so we loop and sleep forever until the number of retries expire. The same problem exists when trying to shut down the activity dispatcher.

If I specify TaskHubWorker.StopAsync(true), then the shutdown proceeds quickly.

TaskHubWorker.StopAsync() fails with MessagingException

The TaskHubWorker.StopAsync() code path always fails with a MessagingException when using the Service Bus provider.

Here is my minimal repro code:

public static void Run()
{
    const string HubName = "DurableTaskTestHub";
    string servicebusConnectionString = ConfigurationManager.ConnectionStrings["DurableFunctions.ServiceBus"].ConnectionString;
    string storageConnectionString = ConfigurationManager.ConnectionStrings["DurableFunctions.Storage"].ConnectionString;

    var serviceBusOrchestrationService = new ServiceBusOrchestrationService(
        servicebusConnectionString,
        HubName,
        null,
        null,
        new ServiceBusOrchestrationServiceSettings());
    serviceBusOrchestrationService.CreateIfNotExistsAsync().Await();

    Console.WriteLine($"Starting the host.");

    var worker = new TaskHubWorker(serviceBusOrchestrationService);
    worker.AddTaskOrchestrations(typeof(SimpleOrchestration));
    worker.StartAsync().Await();

    Console.WriteLine("Stopping host");

    Stopwatch sw = Stopwatch.StartNew();
    worker.StopAsync(true).Await();
    Console.WriteLine($"Stopped after {sw}.");

    Console.WriteLine("Press [ENTER] to exit.");
    Console.ReadLine();
}

Here is the call stack:

Microsoft.ServiceBus.Messaging.MessagingException occurred
  HResult=0x80131500
  Message=50002: Provider Internal Error, Resource:sb://cgillum-sbns.servicebus.windows.net/durabletasktesthub/orchestrator. TrackingId:d89f9ec2-0c45-40e5-8127-26db4b917583_G9, SystemTracker:cgillum-sbns.servicebus.windows.net:DurableTaskTestHub/orchestrator, Timestamp:7/25/2017 6:17:14 PM
  Source=Microsoft.ServiceBus
  StackTrace:
   at Microsoft.ServiceBus.Messaging.Sbmp.SbmpMessageSender.OnEndClose(IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.ClientEntity.EndClose(IAsyncResult result)
   at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at DurableTask.ServiceBus.ServiceBusOrchestrationService.<StopAsync>d__29.MoveNext() in C:\Code\GitHub\durabletask\src\DurableTask.ServiceBus\ServiceBusOrchestrationService.cs:line 189
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at DurableTask.Core.TaskHubWorker.<StopAsync>d__19.MoveNext() in C:\Code\GitHub\durabletask\src\DurableTask.Core\TaskHubWorker.cs:line 160
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at Testing2.ExtensionMethods.Await(Task task) in C:\Users\cgillum\Documents\Visual Studio 2015\Projects\Testing2\ExtensionMethods.cs:line 13
   at Testing2.Categories.DurableTasks.Run() in C:\Users\cgillum\Documents\Visual Studio 2015\Projects\Testing2\Categories\DurableTasks.cs:line 48
   at Testing2.Program.Main(String[] args) in C:\Users\cgillum\Documents\Visual Studio 2015\Projects\Testing2\Program.cs:line 41

Inner Exception 1:
FaultException: 50002: Provider Internal Error, Resource:sb://cgillum-sbns.servicebus.windows.net/durabletasktesthub/orchestrator. TrackingId:d89f9ec2-0c45-40e5-8127-26db4b917583_G9, SystemTracker:cgillum-sbns.servicebus.windows.net:DurableTaskTestHub/orchestrator, Timestamp:7/25/2017 6:17:14 PM

I'm able to run orchestrations successfully on this Service Bus namespace. Only the shutdown sequence seems to be problematic. Note that it does not matter whether I use StopAsync(true) or StopAsync(false).

Missing 'await' in Feature Durable Timers sample

In the wiki page for "Feature Durable Timers", the GetQuoteOrchestration example is missing an 'await':

Task winner = Task.WhenAny(timer, getQuote);
should be
Task winner = await Task.WhenAny(timer, getQuote);

Query methods on instance store should be defined by interface IOrchestrationServiceInstanceStore

Right now it is not possible to use Query methods (like QueryOrchestrationStatesAsync) on instance store service.

Right now we are required in many solutions to enlist states of running instances. Because TaskHubClient and TaskHubWorker do not provide such methods and there is not access to instance store service inside of named classes, we cannot query for instances.

This method is implemented in AzureTableInstanceStore, but it cannot be accessed from projects, which do have reference ServiceBus assembly directly.

Thanks

Unit Tests return 409 conflict intermittently

After a successful build, I tried to run all unit tests (non-parallel). After 21 successful tests, the majority of tests fail on TestInitialize() with a 409 conflict or TestCleanup() with a null reference error.

A failed test can be made to complete successfully if run in Debug mode stepping through code (allowing for a second or so pause between steps) but Run mode may or may not complete with a 409 conflict on either TestInitialize() or TestCleanup().

Is anyone else experiences this behavior? I am using storage in Azure (re., not using local storage).

Durable Tasks without Azure?

Can this library be used without subscription-based Azure dependencies? If not, what would have to be done to make it work in a local data center?

Read orchestration from JSON

Hi,

Is it possible to read the orchestration from XML or JSON file. Since .NET core does not have WF support, We need some orchestrator for Micro Services development like Netflix conductor. If you are not planning the support of it in this library, Is their open source .NET core orchestrator that can do this for me with Azure dependency. I would need to run the same in Pivotal cloud foundry

Unable to find project information for '...\DurableTask.Test.Orchestrations.csproj'

After a new clone, the solution would not build with the following errors:

Severity Code Description Project File Line Suppression State
Error NU1105 Unable to find project information for 'J:\temp\durabletask\test\DurableTask.Test.Orchestrations\DurableTask.Test.Orchestrations.csproj'. The project file may be invalid or missing targets required for restore. DurableTask.Stress.Tests J:\temp\durabletask\test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj 1
Error NU1105 Unable to find project information for 'J:\temp\durabletask\test\DurableTask.Test.Orchestrations\DurableTask.Test.Orchestrations.csproj'. The project file may be invalid or missing targets required for restore. DurableTask.Core.Tests J:\temp\durabletask\test\DurableTask.Core.Tests\DurableTask.Core.Tests.csproj 1
Error NU1105 Unable to find project information for 'J:\temp\durabletask\test\DurableTask.Test.Orchestrations\DurableTask.Test.Orchestrations.csproj'. The project file may be invalid or missing targets required for restore. DurableTask.ServiceBus.Tests J:\temp\durabletask\test\DurableTask.ServiceBus.Tests\DurableTask.ServiceBus.Tests.csproj 1
Error NU1105 Unable to find project information for 'J:\temp\durabletask\test\DurableTask.Test.Orchestrations\DurableTask.Test.Orchestrations.csproj'. The project file may be invalid or missing targets required for restore. DurableTask.Emulator.Tests J:\temp\durabletask\test\DurableTask.Emulator.Tests\DurableTask.Emulator.Tests.csproj 1

Reference this stream: NuGet/Home#5350

I found that the case in the solution file for the DurableTask.Test.Orchestrations.csproj had an uppercase Test in the path causing the issue.

Pull request created #132

Write error if dupe task scheduled results are received

This shouldn't be required if transactions and q2q transfer work properly, but recently we have seen a couple of instances where the responses have been duped. Need to add tracing if this arises:

    public void HandleTaskCompletedEvent(TaskCompletedEvent completedEvent)
    {
        int taskId = completedEvent.TaskScheduledId;
        if (openTasks.ContainsKey(taskId))
        {
            OpenTaskInfo info = openTasks[taskId];
            info.Result.SetResult(completedEvent.Result);

            openTasks.Remove(taskId);
        }

// write event and in the else here. also do the same in other similar places in TaskOrchestrationContext
}

Correct use of external APIs - wrapping to get async behaviour

From reading the docs I've got the impression that individual activities are recoverable and this is achieved through async/await. So if an activity makes a call to some external API whose behaviour is unpredictable (it may fail or work, it may produce a different return value), then the activity is non-deterministic.

e.g. var contents = File.ReadAllText("todo.txt") - the filename parameter is constant, but the file's contents are not, and they may control what happens next.

In cases where no async flavour of the API is available, is the correct approach just to wrap the call to the external API in a Task.Run and then await it?

var contents = await Task.Run(() => File.ReadAllText("todo.txt"));

So that on recovery when that line is replayed, it will return the contents of todo as they were originally? And given that this means serialization, the return value of the awaited task must be (JSON) serializable. Is this correct?

Improve tracing

Including more error codes for storage, sb issues etc so if can generate different event types and filter Taskhub events

TaskHubClient and TaskHubWorker need public access to instance of InstanceStore service

We need in many solutions access to instance store IOrchestrationServiceInstanceStore within TaskHubClient and TaskHubWorker.

For example :
taskHubClient.InstanceStore.GetJumpStartEntitiesAsync();
taskHubWorker.instanceStore.GetJumpStartEntitiesAsync();

Alternatively, you could provide same methods on hub APIs:

taskHubClient..GetJumpStartEntitiesAsync();
taskHubWorker.GetJumpStartEntitiesAsync();

Thanks

Needed error message from storage when running a sample

Ran a sample (Greetings) twice in a row. ON the second run, the storage table is deleted and then re-created but the creation throws an error because deletion can take up to 40 seconds to complete. But I didn't get the error message on command line just 409 conflict

ExecutionId vs InstanceId: what is the difference?

Hi,

In my implementation using DurableTasks, I am sometimes waiting on an external service to do something and I am sending the InstanceId to that service so when it is done it can tell me which orchestration instance to "wake up". I just wanted to know the difference between the InstanceId and ExecutionId to make sure I understood orchestrations correctly.

Thank you

Log exceptions in tracking dispatcher

TrackingDispatcher:

           {
               // TODO : send batch to instance store, it can write it as individual if it chooses
               foreach (OrchestrationStateInstanceEntity stateEntity in stateEntities)
               {
                   await InstanceStore.WriteEntitesAsync(new List { stateEntity });
               }
           }
           catch (Exception e) when (!Utils.IsFatal(e))
           {
               TraceEntities(TraceEventType.Critical, "Failed to write state entity", stateEntities, GetNormalizedStateEvent);
               throw;
           }

tracing the exception would have been very useful here.

Multiple ExecutionCompletedEvent corruption error

Users can get "Multiple ExecutionCompletedEvent found, potential corruption in state storage" error if in a single iteration of the taskorchestrationdispatcher, it receives more than one termination messages.

We should ignore termination messages if the orch was already terminated by a previous message.

Critical Exception due to incorrectly formatted String

TaskHub throws critical Exception on incorrectly formatted String

Failed to log actual trace because one or more trace listeners threw an exception: System.FormatException: Input string was not in a correct format. at System.Text.StringBuilder.AppendFormat(IFormatProvider provider, String format, Object[] args) at System.String.Format(IFormatProvider provider, String format, Object[] args) at DurableTask.Tracing.TraceHelper.<>c__DisplayClass16_1.<TraceExceptionCore>b__0() in C:\workshop\durabletask2\Framework\Tracing\TraceHelper.cs:line 157 at DurableTask.Tracing.TraceHelper.ExceptionHandlingWrapper(Action innerFunc) in C:\workshop\durabletask2\Framework\Tracing\TraceHelper.cs:line 200

Refactoring for integration with IoC frameworks like AutoFac, Unity etc

The idea is to be able to register an autofac container which can be used to resolve task orchestrations and activity instance objects.

In terms of the user model I was thinking that it should be similar to how Autofac integrates with Web API e.g:
http://blogs.msdn.com/b/roncain/archive/2012/07/16/dependency-injection-with-asp-net-web-api-and-autofac.aspx

Basically the dtfx exposes something like IDependencyResolver and then a separate package implements this for autofac and unit etc etc.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.