The following code generates a Flow with a range [1,10]. Then it invokes a suspend function that starts an async coroutine per number and triplicates its value.
@OptIn(FlowPreview::class)
@Test
fun someTest() {
val t1 = System.currentTimeMillis()
runTest {
val range = (1..10).asFlow()
val concat = range.flatMapMerge { triplicate(it) }.toList(mutableListOf())
println(concat)
}
val t2 = System.currentTimeMillis()
println("Elapsed time ${t2 -t1}")
}
private suspend fun triplicate(i: Int): Flow<Int> {
return coroutineScope {
val x = async {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
listOf(i, i, i).asFlow()
}
x.await()
}
}
1) delay() is not working:
The execution shows this log:
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 4ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 1ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 1ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 1ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 0ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 1ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 1ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 1ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 1ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 1ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 179
Every coroutine sleeps for just a few ms instead of the 10000ms as expected (delay(10000L)) . Why?
- Changing delay() for Thread.sleep I can interrup the coroutine but there's no way to make the async coroutines execute concurrently. It takes 100 seconds (10 elements * 10 seconds sleeping) to finish instead of only 10 seconds (10 elements in parallel sleeping 10 seconds). Why?
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 10002ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 10002ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 10004ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 10002ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 100179
UPDATE:
@OptIn(FlowPreview::class)
@Test
fun someTest() {
runBlocking {
val t1 = System.currentTimeMillis()
val range = (1..10).asFlow()
val concat = range.flatMapMerge{ triplicate(it) }.toList(mutableListOf())
println(concat)
val t2 = System.currentTimeMillis()
println("Elapsed time ${t2 - t1}")
println(t2 - t1)
}
}
private suspend fun triplicate(i: Int): Flow<Int> {
return coroutineScope {
val x = async {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
listOf(i, i, i).asFlow()
}
x.await()
}
}
I've changed runTest to runBlocking, and now delay makes the thread sleep. However I can't manage to paralelize the triplicate calls. What's wrong?
pre sleep Test worker @coroutine#2
post sleep Test worker @coroutine#2 10005ms
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#4
post sleep Test worker @coroutine#4 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#6
post sleep Test worker @coroutine#6 10006ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10002ms
pre sleep Test worker @coroutine#8
post sleep Test worker @coroutine#8 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#10
post sleep Test worker @coroutine#10 10004ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 10084
100084
CodePudding user response:
Because this is exactly what runTest() does: it creates a very special coroutine environment where the time flow is simulated. It keeps proper operation ordering, but everything happens immediately. We don't want unit test to run for seconds or even hours just waiting.
If you prefer to wait as normal, use runBlocking() instead of
runTest().There are three reasons for not running concurrently.
runTest()by default uses a single thread. If you need to use more threads then provide a coroutine dispatcher, for example:runTest(Dispatchers.Default) { ... }.You should almost never block the thread when running inside coroutines. It makes coroutines unresponsive, as you can see in your example. Even if you use
Dispatchers.Defaultthen assuming you have 4 CPU cores, it will still take 30s to finish, because you can only execute 4 sleeps at a time.triplicate()function is really messed up. By usingasync()and thenawait()immediately you still execute the code pretty much synchronously.triplicate()returns after waiting for 10s and because flatMapMerge() "calls transform sequentially", you really execute onetriplicate()at a time.I guess what you intended (?) is to return a flow immediately and emit items after 10s. Then
flatMapMerge()can acquire multiple such flows and collect them concurrently:
private fun triplicate(i: Int): Flow<Int> = flow {
val t1 = System.currentTimeMillis()
println("pre sleep ${Thread.currentThread().name}")
delay(10000L)
val t2 = System.currentTimeMillis()
println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
emit(i)
emit(i)
emit(i)
}
