Home > Software engineering >  Dynamically processing a concurrent collection in parallel by group but serially within each group
Dynamically processing a concurrent collection in parallel by group but serially within each group

Time:02-08

I've come across a problem that I can readily define but for the life of me can't seem to digest the MSDN for the best possible solution. It's been a while since I had to actually think about parallel processing outside of UI responsiveness.

So say, I have a Concurrent collection of Tasks that need to be processed. For example maybe it is loading data to various consumers by type (Consumer1, Consumer2, Consumer3...Consumer[N]) the underlying Task of sending the data is the same for each task but each consumer can only accept one source at a time

Basically, I want to process as much in parallel as possible with the caveat that I can only send 1 task to each consumer at a time. So if a current Job for a Consumer is already in progress then I should move to the next item in the collection and leave it for when the Job in progress for that consumer has completed. The Concurrent collection could also be added to at any time externally and if we had new types we'd need additional threads.

I guess what my question boils down to is how do I customize the "Take" from the collection so that I only grab the next Task with a property that designates it has a Consumer that doesn't already have a Job in progress.

Any ideas on what I'm missing here or if I'm even on the right path?

Example we have a Mediator Queue with Tasks associated with Banking transactions.

So we might add to our mediator queue (Let's say send SummaryData and Send TransactionData are using the same interface contract to send data)

  1. SendTransactionData -> Bank1
  2. SendTransactionData -> Bank2
  3. SendSummaryData -> Arbiter
  4. SendTransactionData -> Bank1
  5. SendTransactionData -> Bank3
  6. SendTransactionData -> Bank1
  7. SendTransactionData -> Bank2

1,2,3,5 can be processed in parallel but due to their own system(s) each consumer can only accept one input at a time transaction 4 must wait for transaction 1 to be completed and Transaction 6 must wait for transaction 4 to process. Similarly Transaction 7 must wait for transaction 2.

Before any of the initial processes have completed someone may add another grouping.

  1. SendSummaryData -> Arbiter

  2. SendTransactionData -> Bank1

  3. SendTransactionData -> Bank4

10 can be picked up immediately if a thread is available, but 8 and 9 must be queued behind their other related tasks.

Obviously there would be better ways to design a system to accomplish this but these are essentially the specs I'm looking to satisfy.

CodePudding user response:

If your problem is designating whether or not a task has already been picked up by another consumer, you need to perform this check on task retrieval.

That is, you probably have some code somewhere that looks something like:

Task next = queue.GetNextTask();

You'll need to update your queue to know 2 things:

  1. what tasks are currently in progress, and,
  2. whether or not there are any not-in-progress tasks.

If the queue has (or can have) access to the pool of threads/processes, you can inspect the pool for #1. This is the best option, if available. #2 becomes more complicated than a typical FIFO queue, starting at the head: inspect that element, return it if it's available or move on to the next element if not.

If not, you'll need to decorate your Task objects with a flag indicating if that task is in progress or not. Flags like this (semaphores, really) are difficult and irritating to manage, but are sometimes the only option. You have to make sure you're locking appropriately and that anyone who attempts to update the flag respects the lock. You also have to deal with casualty scenarios, like what happens if a thread or process dies while processing a task.

Either way, you can then update your code to:

if (queue.HasAvailableTask()) {
  Task next = queue.GetNextAvailableTask();
  // Process task.
} else {
  // No task to process, this thread should sleep or die.
}

Similarly, if one task is dependent upon the completion of another, the queue's inspector (like HasAvailableTask()) must be able to determine somehow if a task may be processed immediately, or must wait until the completion of another task. If it must wait, skip it and move on to check the next task in the queue.

Edit: checking for other tasks of the same type

Essentially, you have to encapsulate some code (in the right place) by which the task runner can decide if the queue has any items in it that are available to run right now.

One good way to do this (there are others), is to decorate the process/thread pool with some inspectors. That is, look among the running tasks to see if there are any (or how many, if necessary) tasks that match your exclusion criteria. What those criteria are, don't matter much to this description, they're just your business logic. Add as many or as few as you need.

The queue, similarly, should have an inspector to expose if there are any (or how many, if necessary) waiting tasks.

When you check to see if there is a task to run, match up all the criteria. If there are any tasks remaining in the queue that haven't been excluded, there is work to do and another task can be scheduled.

If your criteria are complicated, you may consider returning, from the pool and from the queue separately, some sort of lightweight task descriptor that contains nothing but a key (to find the task later) and enough information to make all the decisions you need to. Consider:

public final class TaskDescriptor {
  public string Id; // Change the data type to match your task ID
  public string Type; // What type of task this is
  public string Source; // Either "runner" or "queue"
}

Fill a list from the runner and the queue, then filter. For example, you can look for Source = "runner" where Type matches a particular type, to determine whether another task of that type is currently running.

CodePudding user response:

Here is an approach that is not based on the TPL Dataflow library. This is a variant of the Parallel.ForEachAsync method, that prevents concurrent operations for elements with the same key. The key of each element is obtained via a keySelector function. The maximum concurrency is configurable (it must be set to a reasonable positive number). There is also cancellation support via the optional cancellationToken parameter. The source of this method is an IAsyncEnumerable<T> sequence:

/// <summary>
/// Executes a for-each operation on an async-enumerable sequence in which
/// iterations may run concurrently, enforcing a non-concurrent execution policy
/// for elements having the same key.
/// </summary>
public static async Task ForEachExclusivePerKeyAsync<TSource, TKey>(
    this IAsyncEnumerable<TSource> source,
    Func<TSource, Task> action,
    int maximumConcurrency,
    Func<TSource, TKey> keySelector,
    IEqualityComparer<TKey> keyComparer = default,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var semaphore = new SemaphoreSlim(maximumConcurrency, maximumConcurrency);
    var errors = new ConcurrentQueue<Exception>();
    var perKey = new Dictionary<TKey, Queue<TSource>>(keyComparer);
    try
    {
        // Continue on captured context
        await foreach (var item in source.WithCancellation(cancellationToken))
        {
            if (!errors.IsEmpty) break;
            var key = keySelector(item);
            lock (perKey)
            {
                if (!perKey.TryGetValue(key, out var queue))
                {
                    // There is no other task in-flight with the same key.
                    // Insert a null value as an indicator of activity.
                    perKey.Add(key, null);
                }
                else
                {
                    // A task with the same key is currently in-flight.
                    // Enqueue this item and continue with the next item.
                    if (queue == null) perKey[key] = queue = new Queue<TSource>();
                    queue.Enqueue(item); continue;
                }
            }
            await semaphore.WaitAsync(cancellationToken);
            if (!errors.IsEmpty) { semaphore.Release(); break; }
            FireAndAwaitTask(item, key);

            async void FireAndAwaitTask(TSource item, TKey key)
            {
                // Fires the task for this item, and for all other items with the
                // same key that might be queued while this task is in-flight.
                try
                {
                    while (true)
                    {
                        await action(item);
                        if (!errors.IsEmpty) break;
                        lock (perKey)
                        {
                            var queue = perKey[key];
                            if (queue == null || queue.Count == 0)
                            {
                                perKey.Remove(key); break;
                            }
                            item = queue.Dequeue();
                        }
                        cancellationToken.ThrowIfCancellationRequested();
                    }
                }
                catch (Exception ex) { errors.Enqueue(ex); }
                finally { semaphore.Release(); }
            }
        }
    }
    catch (Exception ex) { errors.Enqueue(ex); }

    // Wait for all pending operations to complete.
    for (int i = 0; i < maximumConcurrency; i  )
        await semaphore.WaitAsync().ConfigureAwait(false);

    if (!errors.IsEmpty)
    {
        if (cancellationToken.IsCancellationRequested
            && errors.All(ex => ex is OperationCanceledException)
            && errors.TryPeek(out var firstError))
            throw firstError;
        throw new AggregateException(errors);
    }
}

Contrary to how the Parallel.ForEachAsync works, the asynchronous action is invoked sequentially on the context of the caller. In case this is not desirable, the caller can wrap the action in a Task.Run, so that the action is invoked in parallel on the ThreadPool.

Usage example. A Channel<T> is used as the source/controller of the IAsyncEnumerable<T> sequence:

var channel = Channel.CreateUnbounded<Transaction>();
//...
await ForEachExclusivePerKeyAsync(channel.Reader.ReadAllAsync(), async x =>
{
    await ProcessTransactionAsync(x);
}, maximumConcurrency: 20, keySelector: x => x.Bank);
//...
channel.Writer.TryWrite(new Transaction() { Bank = "Bank1" });
channel.Writer.TryWrite(new Transaction() { Bank = "Bank2" });
  •  Tags:  
  • Related