I try to start multiple parallel activity functions in a durable function, and wait for a predetermined time. After that time I don't care about the long running functions, I want to use only the return value of the finished ones. How can I achieve this?
def orchestrator_function(context: df.DurableOrchestrationContext):
# set post-processors
post_deadline = context.current_utc_datetime timedelta(seconds=10)
post_timeout_task = context.create_timer(post_deadline)
post_processors = []
post_processors.append(context.call_activity("StartStoredProcedure", (1, 'post-processor_1')))
post_processors.append(context.call_activity("StartStoredProcedure", (30, 'post-processor_2')))
post_processors.append(post_timeout_task)
# run data-collectors parallel, and wait for completion
postproc_results = yield context.task_all(post_processors)
The task_all will wait for the long running functions, and the task_any will stop at the first. I would like to wait 10 secs, and get the info from the finished functions, and release the others.
Any pattern idea?
CodePudding user response:
After that time I don't care about the long running functions, I want to use only the return value of the finished ones. How can I achieve this?
I believe if your ask is need of using the return value of the finished ones, then Function Chaining application pattern in Azure Durable Functions is the good option.
In this pattern, the output of one function is applied to the input of another function.
Here is the Microsoft Documentation about the above-suggested pattern information.
I would like to wait 10 secs, and get the info from the finished functions, and release the others. Any pattern idea?
There is a concept called Durable timers used in durable functions for implementing delays or setting up timeouts on async actions.
In Python, time.sleep() used as a durable timer in the durable functions.
Please check Timers in Azure Durable Functions for more information.
CodePudding user response:
Im not very familiar with python for durable functions but with C# it could be achieved with a Task.WhenAll inside Task.WhenAny
TimeSpan timeout = TimeSpan.FromSeconds(10);
DateTime deadline = context.CurrentUtcDateTime.Add(timeout);
using (var cts = new CancellationTokenSource())
{
List<Task> tasks = new List<Task>();
for (int i = 0; i < n; i )
tasks.Add(context.CallActivityAsync("SomeFunc", someData));
Task timeoutTask = context.CreateTimer(deadline, cts.Token);
Task winner = await Task.WhenAny(activityTask, Task.WhenAll(tasks));
List<ResultType> aggResult = new List<ResultType>;
for (int i = 0; i < n; i )
if (tasks[i].Status == TaskStatus.RanToCompletion)
aggResult.Add(tasks[i].Result)
}
You might be able to do something similar with python
def orchestrator_function(context: df.DurableOrchestrationContext):
# set post-processors
post_deadline = context.current_utc_datetime timedelta(seconds=10)
post_timeout_task = context.create_timer(post_deadline)
post_processors = []
post_processors.append(context.call_activity("StartStoredProcedure", (1, 'post-processor_1')))
post_processors.append(context.call_activity("StartStoredProcedure", (30, 'post-processor_2')))
# run data-collectors parallel, and wait for completion
finished_task = yield context.task_any([post_timeout_task, context.task_all(post_processors)])
if finished_task != post_timeout_task:
post_timeout_task.cancel()
#Loop over each task in post_processors to aggregate functions where Result != None
If this is not allowed in python i suppose a solution that is not very elegant but would work is
def orchestrator_function(context: df.DurableOrchestrationContext):
# set post-processors
post_deadline = context.current_utc_datetime timedelta(seconds=10)
post_timeout_task = context.create_timer(post_deadline)
post_processors = []
post_processors.append(context.call_activity("StartStoredProcedure", (1, 'post-processor_1')))
post_processors.append(context.call_activity("StartStoredProcedure", (30, 'post-processor_2')))
post_processors.append(post_timeout_task)
results = []
while len(post_processors) != 1: # Only timeout task exists
finished_task = yield context.task_any(post_processors)
post_processors = [x for x in post_processors if x != finished_task]
if finnished_task == post_timeout_task:
break
results.append(finished_task.result)
if post_processors[0] == post_timeout_task:
post_timeout_task.cancel()
