Home > database >  How to merge multiple asynchronous sequences without left-side bias?
How to merge multiple asynchronous sequences without left-side bias?

Time:01-15

I have a few AsyncEnumerable<string>s that I would like to merge in a single AsyncEnumerable<string>, which should contain all the elements from all those sequences. So I used the Merge operator from the System.Interactive.Async package. The problem is that this operator does not treat all sequences as equal. It prefers emitting elements from the sequences that are on the left side of the arguments list, and neglects the sequences that are on the right side in the arguments list. Here is a minimal example that reproduces this undesirable behavior:

var sequence_A = Enumerable.Range(1, 5).Select(i => $"A{i}").ToAsyncEnumerable();
var sequence_B = Enumerable.Range(1, 5).Select(i => $"B{i}").ToAsyncEnumerable();
var sequence_C = Enumerable.Range(1, 5).Select(i => $"C{i}").ToAsyncEnumerable();
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged) Console.WriteLine(item);

This code snippet has also a dependency on the System.Linq.Async package. The sequence_A emits 5 elements starting from "A", the sequence_B emits 5 elements starting from "B", and the sequence_C emits 5 elements starting from "C".

Output (undesirable):

A1
A2
A3
A4
A5
B1
B2
B3
B4
B5
C1
C2
C3
C4
C5

Try it on Fiddle.

The desirable output should look like this:

A1
B1
C1
A2
B2
C2
A3
B3
C3
A4
B4
C4
A5
B5
C5

In case all sequences have their next element available, the merged sequence should pull one element from each sequence, instead of pulling elements repeatedly from the left-most sequence.

How can I ensure that my sequences are merged with fairness? I am looking for a combination of operators from the official packages that has the desirable behavior, or for a custom Merge operator that does what I want.


Update: Here is a more realistic demo, that includes latency in the producer sequences, and in the consuming enumeration loop. It simulates a situation where consuming the values produced by the left-most sequence takes longer than the time required for producing those values.

var sequence_A = Produce("A", 200, 1, 2, 3, 4, 5);
var sequence_B = Produce("B", 150, 1, 2, 3, 4, 5);
var sequence_C = Produce("C", 100, 1, 2, 3, 4, 5);
var merged = AsyncEnumerableEx.Merge(sequence_A, sequence_B, sequence_C);
await foreach (var item in merged)
{
    Console.WriteLine(item);
    await Task.Delay(item.StartsWith("A") ? 300 : 50); // Latency
}

async IAsyncEnumerable<string> Produce(string prefix, int delay, params int[] values)
{
    foreach (var value in values)
    {
        var delayTask = Task.Delay(delay);
        yield return $"{prefix}{value}";
        await delayTask; // Latency
    }
}

The result is an undesirable bias for the values produced by the sequence_A:

A1
A2
A3
A4
A5
B1
B2
C1
B3
C2
B4
C3
C4
B5
C5

Try it on Fiddle.

CodePudding user response:

The example is a bit contrived as all results are available immediately. If even a small delay is added, the results are mixed:

var sequence_A = AsyncEnumerable.Range(1, 5)
    .SelectAwait(async i =>{ await Task.Delay(i); return $"A{i}";});
var sequence_B = AsyncEnumerable.Range(1, 5)
    .SelectAwait(async i =>{ await Task.Delay(i); return $"B{i}";});
var sequence_C = AsyncEnumerable.Range(1, 5)
    .SelectAwait(async i =>{ await Task.Delay(i); return $"C{i}";});
var sequence_D = AsyncEnumerable.Range(1, 5)
    .SelectAwait(async i =>{ await Task.Delay(i); return $"D{i}";});

await foreach (var item in seq) Console.WriteLine(item);

This produces different, mixed results each time :

B1
A1
C1
D1
D2
A2
B2
C2
D3
A3
B3
C3
C4
A4
B4
D4
D5
A5
B5
C5

The method's comments explain it was reimplemented to be cheaper and fairer:

//
// This new implementation of Merge differs from the original one in a few ways:
//
// - It's cheaper because:
//   - no conversion from ValueTask<bool> to Task<bool> takes place using AsTask,
//   - we don't instantiate Task.WhenAny tasks for each iteration.
// - It's fairer because:
//   - the MoveNextAsync tasks are awaited concurently, but completions are queued,
//     instead of awaiting a new WhenAny task where "left" sources have preferential
//     treatment over "right" sources.
//

CodePudding user response:

EDIT This isn't quite what OP wanted, as OP wants any result to be returned, whichever first. I'll leave this here because it's a good demonstration of this algorithm.

Here is a full implementation of the async Interleave or Merge algorithm, known more commonly in SQL terms as a Merge-Concatenation.

The algorithm is as follows:

  • The function accepts a params array of sources.

  • Early bail-out if no source enumerables are provided.

  • Create a list to hold the enumerators.

  • Get each enumerator and store it in the list.

  • In a loop, take each enumerator and MoveNextAsync.

  • If it returns true, then yield the value and increment the loop counter. If it rolls over, go back to the beginning.

  • If it returns false, then Dispose it and remove from the list. Do not increment counter.

  • Continue looping until there are no more enumerators.

  • finally block disposes any remaining enumerators.

  • There is also an overload to provide a cancellation token

      public static IAsyncEnumerable<T> Interleave<T>(params IAsyncEnumerable<T>[] sources) =>
          Interleave(default, sources);
    
      public static async IAsyncEnumerable<T> Interleave<T>([EnumeratorCancellation] CancellationToken token, IAsyncEnumerable<T>[] sources)
      {
          if(sources.Length == 0)
              yield break;
          var enumerators = new List<IAsyncEnumerator<T>>(sources.Length);
          try
          {
              for(var i = 0; i < sources.Length; i  )
                  enumerators.Add(sources[i].GetAsyncEnumerator(token));
    
              var j = 0;
              do
              {
                  if(await enumerators[j].MoveNextAsync())
                  {
                      yield return enumerators[j].Current;
                      j  ;
                      if(j >= enumerators.Count)
                          j = 0;
                  }
                  else
                  {
                      try
                      {
                          await enumerators[j].DisposeAsync();
                      }
                      catch
                      { //
                      }
                      enumerators.RemoveAt(j);
                  }
              } while (enumerators.Count > 0);
          }
          finally
          {
              for(var i = 0; i < enumerators.Count; i  )
              {
                  try
                  {
                      await enumerators[i].DisposeAsync();
                  }
                  catch
                  { //
                  }
              }
          }
      }
    

dotnetfiddle

This can obviously be significantly simplified if you only have a fixed number of source enumerables.

  •  Tags:  
  • Related