Given the following (simplified) F# asynchronous code, to partition items coming from an asyncSeq
open FSharp.Control
let s : AsyncSeq<'t> = ...
let a =
s
|> AsyncSeq.mapi (fun i a -> (i, a))
|> AsyncSeq.groupBy (fun (i, _) -> int(i) % 3)
|> AsyncSeq.map (fun (index, tWithIndex) -> (index 1, tWithIndex |> AsyncSeq.map (fun (_, t) -> t)))
|> AsyncSeq.toArraySynchronously
|> Map.ofArray
|> Map.map (fun i a -> a |> AsyncSeq.toArraySynchronously)
let b =
s
|> AsyncSeq.mapi (fun i a -> (i, a))
|> AsyncSeq.groupBy (fun (i, _) -> int(i) % 3)
|> AsyncSeq.map (fun (index, tWithIndex) -> index 1, AsyncSeq.toArraySynchronously tWithIndex |> Array.map (fun (_, t) -> t))
|> AsyncSeq.toArraySynchronously
|> Map.ofArray
I was expecting a and b to contain the same Map<int, 't[]> value, but b calculation never completes, it gets stuck. What's wrong with b's expression?
CodePudding user response:
This is a somewhat unexpected behaviour because of how AsyncSeq.groupBy works. It creates an asynchronous sequence (of groups), each of which contains an evaluated key and another asynchronous sequence of values in the group:
AsyncSeq<'TKey * AsyncSeq<'TValue>>
The issue is that the nested AsyncSeq<'TValue> sequences only get all their values when the outer asynchronous sequence of groups is evalauted to the end. If you demand a value from the nested async sequence, it blocks until the outer one is evaluated.
I can imagine an implementation of groupBy where asking for a value in the nested asyn sequence would actually resume the evaluation, but that is clearly not how it is implemented currently.
Aside from doing what you do in the a version, you could also just use the lazy keyword to delay the evaluation in the AsyncSeq.map:
let b =
s
|> AsyncSeq.mapi (fun i a -> (i, a))
|> AsyncSeq.groupBy (fun (i, _) -> int(i) % 3)
|> AsyncSeq.map (fun (index, tWithIndex) ->
index 1,
lazy ( AsyncSeq.toArraySynchronously tWithIndex
|> Array.map (fun (_, t) -> t) ))
|> AsyncSeq.toArraySynchronously
|> Map.ofArray
