source1 emits A,B,C,D etc and never completes
source2 emit 1,2 and completes
I want to merge to A1, B2, C1, D2 etc
update
my initial attemp was to Zip and Repeat as suggested from Theodor however this creates a lock cause source2 generation is expensive.
Last comment from Enigmativity addresses that problem
source1.Zip(source2.ToEnumerable().ToArray().Repeat())
CodePudding user response:
Since you want to repeat source2 indefinitely and you say that it is cold (in the sense that it produces the same set of values each time, and generally in the same sort of cadence) and it is expensive, we want to turn the IObservable<T> into a T[] to ensure it's computed once and only once.
var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));
CodePudding user response:
Assuming that the desirable marble diagram is like this:
Source1: --------A-------B-------C--------D-------|
Source2: ----1--------------2--------|
Merged: --------A1---------B2-------C1---D2------|
Here is a ZipWithRepeated operator having this behavior:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return second.Replay(replayed => first.ToAsyncEnumerable()
.Zip(replayed.ToAsyncEnumerable().Repeat())
.ToObservable());
}
Usage example:
var merged = source1.ZipWithRepeated(source2);
This solution requires a dependency to the System.Linq.Async and System.Interactive.Async packages, because both sequences are converted to IAsyncEnumerable<T>s before the zipping.
