Home > Net >  How set timer for multiple parallel running Azure durable functions in python?
How set timer for multiple parallel running Azure durable functions in python?

Time:01-31

I try to start multiple parallel activity functions in a durable function, and wait for a predetermined time. After that time I don't care about the long running functions, I want to use only the return value of the finished ones. How can I achieve this?

def orchestrator_function(context: df.DurableOrchestrationContext):
    # set post-processors
    post_deadline = context.current_utc_datetime   timedelta(seconds=10)
    post_timeout_task = context.create_timer(post_deadline)
    
    post_processors = []
    post_processors.append(context.call_activity("StartStoredProcedure", (1, 'post-processor_1')))
    post_processors.append(context.call_activity("StartStoredProcedure", (30, 'post-processor_2')))
    post_processors.append(post_timeout_task)
    
    # run data-collectors parallel, and wait for completion
    postproc_results = yield context.task_all(post_processors)

The task_all will wait for the long running functions, and the task_any will stop at the first. I would like to wait 10 secs, and get the info from the finished functions, and release the others. Any pattern idea?

CodePudding user response:

After that time I don't care about the long running functions, I want to use only the return value of the finished ones. How can I achieve this?

I believe if your ask is need of using the return value of the finished ones, then Function Chaining application pattern in Azure Durable Functions is the good option.

In this pattern, the output of one function is applied to the input of another function.

Here is the Microsoft Documentation about the above-suggested pattern information.

I would like to wait 10 secs, and get the info from the finished functions, and release the others. Any pattern idea?

There is a concept called Durable timers used in durable functions for implementing delays or setting up timeouts on async actions. In Python, time.sleep() used as a durable timer in the durable functions.

Please check Timers in Azure Durable Functions for more information.

CodePudding user response:

Im not very familiar with python for durable functions but with C# it could be achieved with a Task.WhenAll inside Task.WhenAny

    TimeSpan timeout = TimeSpan.FromSeconds(10);
    DateTime deadline = context.CurrentUtcDateTime.Add(timeout);

    using (var cts = new CancellationTokenSource())
    {
        List<Task> tasks = new List<Task>();
        for (int i = 0; i < n; i  )
            tasks.Add(context.CallActivityAsync("SomeFunc", someData));
        Task timeoutTask = context.CreateTimer(deadline, cts.Token);

        Task winner = await Task.WhenAny(activityTask, Task.WhenAll(tasks));
        List<ResultType> aggResult = new List<ResultType>;
        for (int i = 0; i < n; i  )
            if (tasks[i].Status == TaskStatus.RanToCompletion)
               aggResult.Add(tasks[i].Result)
    }

You might be able to do something similar with python

def orchestrator_function(context: df.DurableOrchestrationContext):
    # set post-processors
    post_deadline = context.current_utc_datetime   timedelta(seconds=10)
    post_timeout_task = context.create_timer(post_deadline)
    
    post_processors = []
    post_processors.append(context.call_activity("StartStoredProcedure", (1, 'post-processor_1')))
    post_processors.append(context.call_activity("StartStoredProcedure", (30, 'post-processor_2')))
    
    # run data-collectors parallel, and wait for completion
    finished_task = yield context.task_any([post_timeout_task, context.task_all(post_processors)])
    if finished_task != post_timeout_task:
       post_timeout_task.cancel()
#Loop over each task in post_processors to aggregate functions where Result != None

If this is not allowed in python i suppose a solution that is not very elegant but would work is

def orchestrator_function(context: df.DurableOrchestrationContext):
    # set post-processors
    post_deadline = context.current_utc_datetime   timedelta(seconds=10)
    post_timeout_task = context.create_timer(post_deadline)
    
    post_processors = []
    post_processors.append(context.call_activity("StartStoredProcedure", (1, 'post-processor_1')))
    post_processors.append(context.call_activity("StartStoredProcedure", (30, 'post-processor_2')))
    post_processors.append(post_timeout_task)
    results = []
    while len(post_processors) != 1: # Only timeout task exists
        finished_task = yield context.task_any(post_processors)
        post_processors = [x for x in post_processors if x != finished_task]
        if finnished_task == post_timeout_task:
           break
        results.append(finished_task.result)
     if post_processors[0] == post_timeout_task:
        post_timeout_task.cancel()
  •  Tags:  
  • Related