Home > Net >  Merge multiple IAsyncEnumerable streams
Merge multiple IAsyncEnumerable streams

Time:01-11

With the release of Mediatr 10, there's now a paradigm that allows developers to create streams powered by IAsyncEnumerable. I'm leveraging this paradigm to create multiple different file system watchers to monitor multiple folders. To monitor the folders, I'm leveraging two different approaches: Polling and FileSystemWatcher. As part of my pipeline, all of the different folder monitors are aggregated into a single IEnumerable<IAsyncEnumerable<FileRecord>. In each type of watcher, there's an internal loop that runs until cancellation is requested via a CancellationToken.

Here's the polling watcher:

public class PolledFileStreamHandler : 
    IStreamRequestHandler<PolledFileStream, FileRecord>
{
    private readonly ISeenFileStore _seenFileStore;
    private readonly IPublisher _publisher;
    private readonly ILogger<PolledFileStreamHandler> _logger;

    public PolledFileStreamHandler(
        ISeenFileStore seenFileStore, 
        IPublisher publisher, 
        ILogger<PolledFileStreamHandler> logger)
    {
        _seenFileStore = seenFileStore;
        _publisher = publisher;
        _logger = logger;
    }

    public async IAsyncEnumerable<FileRecord> Handle(
        PolledFileStream request, 
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var queue = new ConcurrentQueue<FileRecord>();
        while (!cancellationToken.IsCancellationRequested)
        {
            var files = Directory.EnumerateFiles(request.Folder)
                .Where(f => !_seenFileStore.Contains(f));

            await Parallel.ForEachAsync(files, CancellationToken.None, async (f,t) =>
            {
                var info = new FileRecord(f);
                
                _seenFileStore.Add(f);
                await _publisher.Publish(new FileSeenNotification { FileInfo = info }, t);
                queue.Enqueue(info);
            });
            
            // TODO: Try mixing the above parallel task with the serving task... Might be chaos...

            while (!queue.IsEmpty)
            {
                if (queue.TryDequeue(out var result))
                    yield return result;
            }

            _logger.LogInformation("PolledFileStreamHandler watching {Directory} at: {Time}", request.Folder, DateTimeOffset.Now);
            
            await Task.Delay(request.Interval, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
    }
}

And the FileSystemWatcher

public class FileSystemStreamHandler : 
    IStreamRequestHandler<FileSystemStream, FileRecord>
{
    private readonly ISeenFileStore _seenFileStore;
    private readonly ILogger<FileSystemStreamHandler> _logger;
    private readonly IPublisher _publisher;
    private readonly ConcurrentQueue<FileRecord> _queue;

    private Action<object, FileSystemEventArgs>? _tearDown;

    public FileSystemStreamHandler(
        ISeenFileStore seenFileStore, 
        ILogger<FileSystemStreamHandler> logger, 
        IPublisher publisher)
    {
        _seenFileStore = seenFileStore;
        _logger = logger;
        _publisher = publisher;
        _queue = new ConcurrentQueue<FileRecord>();
    }

    public async IAsyncEnumerable<FileRecord> Handle(
        FileSystemStream request, 
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        var watcher = SetupWatcher(request.Folder, cancellationToken);
        
        while (!cancellationToken.IsCancellationRequested)
        {
            if (_queue.TryDequeue(out var record))
                yield return record;

            await Task.Delay(100, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
        
        TearDownWatcher(watcher);
    }
    
    private FileSystemWatcher SetupWatcher(string folder, CancellationToken cancellation)
    {
        var watcher = new FileSystemWatcher(folder);
        watcher.NotifyFilter = NotifyFilters.Attributes
                               | NotifyFilters.CreationTime
                               | NotifyFilters.DirectoryName
                               | NotifyFilters.FileName
                               | NotifyFilters.LastAccess
                               | NotifyFilters.LastWrite
                               | NotifyFilters.Security
                               | NotifyFilters.Size;
        watcher.EnableRaisingEvents = true;
        _tearDown = (_, args) => OnWatcherOnChanged(args, cancellation);
        watcher.Created  = _tearDown.Invoke;

        return watcher;
    }
    
    private async void OnWatcherOnChanged(FileSystemEventArgs args, CancellationToken cancellationToken)
    {
        var path = args.FullPath;

        if (_seenFileStore.Contains(path)) return;
            
        _seenFileStore.Add(path);

        try
        {
            if ((File.GetAttributes(path) & FileAttributes.Directory) != 0) return;
        }
        catch (FileNotFoundException)
        {
            _logger.LogWarning("File {File} was not found. During a routine check. Will not be broadcast", path);
            return;
        }
            
        var record = new FileRecord(path);
        _queue.Enqueue(record);
        await _publisher.Publish(new FileSeenNotification { FileInfo = record }, cancellationToken);
    }

    private void TearDownWatcher(FileSystemWatcher watcher)
    {
        if (_tearDown != null)
            watcher.Created -= _tearDown.Invoke;
    }
}

Finally, here's the class that ties everything together and attempts to monitor the streams (in the StartAsync method). You'll notice the presence of a Merge operator coming from System.Interactive.Async, this does not currently operate as desired.

public class StreamedFolderWatcher : IDisposable
{
    private readonly ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>> _streams;
    private CancellationTokenSource? _cancellationTokenSource;
    private readonly IMediator _mediator;
    private readonly ILogger<StreamedFolderWatcher> _logger;

    public StreamedFolderWatcher(
        IMediator mediator,
        IEnumerable<IFileStream> fileStreams, 
        ILogger<StreamedFolderWatcher> logger)
    {
        _mediator = mediator;
        _logger = logger;
        _streams = new ConcurrentBag<Func<IAsyncEnumerable<FileRecord>>>();
        _cancellationTokenSource = new CancellationTokenSource();

        fileStreams.ToList()
            .ForEach(f => AddStream(f, _cancellationTokenSource.Token));
    }

    private void AddStream<T>(
        T request, 
        CancellationToken cancellationToken) 
        where T : IStreamRequest<FileRecord>
    {
        _streams.Add(() => _mediator.CreateStream(request, cancellationToken));
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        _cancellationTokenSource = CancellationTokenSource
            .CreateLinkedTokenSource(cancellationToken);

        var streams = _streams.Select(s => s()).ToList();
        while (!cancellationToken.IsCancellationRequested)
        {
            await foreach (var file in streams.Merge().WithCancellation(cancellationToken))
            {
                _logger.LogInformation("Incoming file {File}", file);
            }
            
            await Task.Delay(1000, cancellationToken)
                .ContinueWith(_ => {}, CancellationToken.None);
        }
    }

    public Task StopAsync()
    {
        _cancellationTokenSource?.Cancel();

        return Task.CompletedTask;
    }

    public void Dispose()
    {
        _cancellationTokenSource?.Dispose();
        GC.SuppressFinalize(this);
    }
}

My expectation for the Merge behavior is that if I have 3 IAsyncEnumerables, each item should be emitted as soon as it's yielded. Instead, unless I place yield break somewhere within the loops, the first IStreamRequestHandler fetched will simply execute ad infinitum until the cancellation token forces a stop.

How can I merge multiple input IAsyncEnumerables into a single long-lived output stream, that emits each time a result is yielded?

Minimum Reproducible Sample

static async IAsyncEnumerable<(Guid Id, int Value)> CreateSequence(
    [EnumeratorCancellation] CancellationToken cancellationToken)
{
    var random = new Random();
    var id = Guid.NewGuid();
    while (!cancellationToken.IsCancellationRequested)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(random.Next(100, 1000)));
        yield return (id, random.Next(0, 10));
    }
}

var token = new CancellationTokenSource();
var sequences = Enumerable.Range(0, 10)
    .Select(_ => CreateSequence(token.Token));
var merged = sequences.Merge();

await foreach (var (id, value) in merged)
{
    Console.WriteLine($"[{DateTime.Now.ToShortTimeString()}] Value {value} Emitted from {id}");
}

CodePudding user response:

It seems that the Rx team messed up with the Merge operator, and have created overloads with divergent behavior. This overload supports concurrency:

public static IAsyncEnumerable<TSource> Merge<TSource>(
    params IAsyncEnumerable<TSource>[] sources);

This overload does not support concurrency:

public static IAsyncEnumerable<TSource> Merge<TSource>(
    this IEnumerable<IAsyncEnumerable<TSource>> sources);

From the comments inside the source code:

// REVIEW:
// This implementation does not exploit concurrency. We should not introduce such
// behavior in order to avoid breaking changes, but we could introduce a parallel
// ConcurrentMerge implementation. It is unfortunate though that the Merge
// overload accepting an array has always been concurrent, so we can't change that
// either (in order to have consistency where Merge is non-concurrent, and
// ConcurrentMerge is).

So what you have to do is to convert your enumerable .ToArray() before the Merge().

CodePudding user response:

I managed to come up with a working, but likely inefficient and potentially buggy solution. By putting each IAsyncEnumerable into its own background task, I'm able to emit each into a thread-safe queue, where they're served up as each one comes available.

public static async IAsyncEnumerable<TSource> MergeAsyncEnumerable<TSource>(
    this IEnumerable<IAsyncEnumerable<TSource>> sources,
    TimeSpan? debounceTime = default,
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var queue = new ConcurrentQueue<TSource>();
    var tasks = SetupCollections(sources, queue, cancellationToken);
    
    while (!Task.WhenAll(tasks).IsCompleted)
    {
        while (!queue.IsEmpty)
            if (queue.TryDequeue(out var record))
                yield return record;
            
        // Small debounce to prevent an infinite loop from just spinning. 
        await WaitIfDebounce(debounceTime, cancellationToken);
    }

    await Task.CompletedTask;
}

private static Task WaitIfDebounce(
    TimeSpan? debounceTime,
    CancellationToken cancellationToken)
{
    return debounceTime.HasValue
        ? Task.Delay(debounceTime.Value, cancellationToken)
            .ContinueWith(_ => { }, CancellationToken.None)
        : Task.CompletedTask;
}

private static IList<Task> SetupCollections<TSource>(
    IEnumerable<IAsyncEnumerable<TSource>> sources,
    ConcurrentQueue<TSource> queue,
    CancellationToken cancellationToken)
{
    return sources
        .Select(s => Task.Run(async () =>
        {
            await foreach (var file in s.WithCancellation(cancellationToken)) 
                queue.Enqueue(file);
        }, cancellationToken))
        .ToList();
}
  •  Tags:  
  • Related