I have an array of tasks running identical jobs, but using different parameters on different servers. It could occur that one of the servers is unresponsive/slow resulting in a situation where all tasks have completed but one. At this moment I'm awaiting them using the Task.WhenAll(), so there is no option but to wait until my timeout expires.
In the ideal case all tasks complete within the timeout and I can gather all the results, but in the other case, basically I want to wait:
- until n Tasks have completed
- for another x minutes if n tasks have completed
If by the time that n tasks have been completed and we have waited for another x minutes, not all tasks have completed, I want to retrieve the result of the finished tasks.
Is there any way I can achieve the above?
CodePudding user response:
Im sure this will get jumped on as inefficient like the other solution (if not moreso, as im storing things in lists), but it's a bit more encapsulated.
public class TaskMonitor<T>
{
private int taskCountThreshold; //n
private TimeSpan waitTime; //x
private List<T> completed;
private List<Task<T>> inProgress;
public TaskMonitor(int taskCountThreshold, TimeSpan waitTime)
{
this.taskCountThreshold = taskCountThreshold;
this.waitTime = waitTime;
}
public async Task<T[]> Start(params Task<T>[] tasks)
{
this.completed = new List<T>();
this.inProgress = new List<Task<T>>(tasks);
bool isComplete = false;
while(!isComplete)
{
var t = await Task.WhenAny(this.inProgress);
isComplete = await TaskCompleted(t);
}
if(this.inProgress.Count>0)
{
await Task.WhenAny(Task.Delay(this.waitTime), Task.WhenAny(this.inProgress).ContinueWith(async t => TaskCompleted(await t)));
}
return this.completed.ToArray();
}
private async Task<bool> TaskCompleted(Task<T> task)
{
if(this.inProgress.Contains(task))
{
this.completed.Add(await task);
this.inProgress.Remove(task);
}
return this.completed.Count >= this.taskCountThreshold;
}
}
usage would be a long the lines of:
var tm = new TaskMonitor<string>(3,TimeSpan.FromMinutes(5));
var results = tm.Start(task1,task2,taks3,task4);
Live example is here: https://dotnetfiddle.net/5LxUfK as its currently set (at a 500ms threshold) task 4 will not finish so the results of 1,2,3 are shown. If you raise that threshold above 1000ms task 4 will complete as expected.
CodePudding user response:
Use Task.WhenAny to know if any tasks completes, then remove that completed task from your array.
stopWatch.Start();
while (arrayoftasks.Any())
{
Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
arrayOfTasks.Remove(finishedTask);
await finishedTask;
finishedCount ;
if (finishedCount == 4) //check you stopwatch elapsed here.
{
Console.WriteLine("4 tasks have finished");
}
}
Working example:
using System.Diagnostics;
using System.Security.Cryptography;
await Test.Go();
Console.ReadLine();
public static class Test
{
public static async Task Go()
{
List<Task<string>> arrayOfTasks = GetArrayOfTasks();
int finishedCount = 0;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
while (arrayOfTasks.Any())
{
Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
arrayOfTasks.Remove(finishedTask);
Console.WriteLine(await finishedTask);
finishedCount ;
if (finishedCount == 4) //check you stopwatch elapsed here too
{
Console.WriteLine($":::{finishedCount} tasks have finished, {arrayOfTasks.Count} to go");
}
}
}
private static List<Task<string>> GetArrayOfTasks()
{
List<Task<string>> taskList = new();
for (int i = 0; i < 10; i )
{
var t = GetString(i);
taskList.Add(t);
}
return taskList;
}
private static async Task<string> GetString(int i)
{
await Task.Delay(RandomNumberGenerator.GetInt32(1, 5000));
return i.ToString();
}
}
CodePudding user response:
Rx.Net is the most elegant way to achieve this.
public IAsyncEnumerable<TResult> DoStuff<TResult>(IEnumerable<Func<CancellationToken, Task<TResult>>> tasks)
{
var inputs = tasks
// convert this into IObservable<TResult>
// this type, like IAsyncEnumerable, contains
// async logic, and cancellation...
.ToObservable()
.Select(task => Observable.FromAsync(task))
.Merge()
// publish/refcount is needed to ensure
// we only run the tasks once, and share
// the "result/event".
.Publish()
.RefCount();
// On the 100th Item
var timeoutSignal = inputs.Skip(100 - 1)
.Take(1)
// Generate a signal 10 minutes after the 100th
// item arrives
.Delay(TimeSpan.FromMinutes(10));
return inputs
// Take items until the timeout signal
.TakeUntil(timeoutSignal)
.ToAsyncEnumerable();
}
var items = await DoStuff(tasks).ToListAsync()
