Home > Mobile >  Why doesn't this code execute concurrently and delay() doesn't suspend the coroutine as ex
Why doesn't this code execute concurrently and delay() doesn't suspend the coroutine as ex

Time:01-25

The following code generates a Flow with a range [1,10]. Then it invokes a suspend function that starts an async coroutine per number and triplicates its value.

    @OptIn(FlowPreview::class)
    @Test
    fun someTest() {
        val t1 = System.currentTimeMillis()
        runTest {
            val range = (1..10).asFlow()
            val concat = range.flatMapMerge { triplicate(it) }.toList(mutableListOf())
            println(concat)
        }
        val t2 = System.currentTimeMillis()
        println("Elapsed time ${t2 -t1}")
    }

    private suspend fun triplicate(i: Int): Flow<Int> {
        return coroutineScope {
            val x = async {
                val t1 = System.currentTimeMillis()
                println("pre sleep ${Thread.currentThread().name}")
                delay(10000L)
                val t2 = System.currentTimeMillis()
                println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
                listOf(i, i, i).asFlow()
            }
            x.await()
        }
    }

1) delay() is not working:

The execution shows this log:

pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 4ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 1ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 1ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 1ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 0ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 1ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 1ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 1ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 1ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 1ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 179

Every coroutine sleeps for just a few ms instead of the 10000ms as expected (delay(10000L)) . Why?

  1. Changing delay() for Thread.sleep I can interrup the coroutine but there's no way to make the async coroutines execute concurrently. It takes 100 seconds (10 elements * 10 seconds sleeping) to finish instead of only 10 seconds (10 elements in parallel sleeping 10 seconds). Why?
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
pre sleep Test worker @coroutine#13
post sleep Test worker @coroutine#13 10002ms
pre sleep Test worker @coroutine#15
post sleep Test worker @coroutine#15 10002ms
pre sleep Test worker @coroutine#17
post sleep Test worker @coroutine#17 10004ms
pre sleep Test worker @coroutine#19
post sleep Test worker @coroutine#19 10002ms
pre sleep Test worker @coroutine#21
post sleep Test worker @coroutine#21 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 100179

UPDATE:

    @OptIn(FlowPreview::class)
    @Test
    fun someTest() {
        runBlocking {
            val t1 = System.currentTimeMillis()
            val range = (1..10).asFlow()
            val concat = range.flatMapMerge{ triplicate(it) }.toList(mutableListOf())
            println(concat)
            val t2 = System.currentTimeMillis()
            println("Elapsed time ${t2 - t1}")
            println(t2 - t1)
        }
    }

    private suspend fun triplicate(i: Int): Flow<Int> {
        return coroutineScope {
            val x = async {
                val t1 = System.currentTimeMillis()
                println("pre sleep ${Thread.currentThread().name}")
                delay(10000L)
                val t2 = System.currentTimeMillis()
                println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
                listOf(i, i, i).asFlow()
            }
            x.await()
        }
    }

I've changed runTest to runBlocking, and now delay makes the thread sleep. However I can't manage to paralelize the triplicate calls. What's wrong?

pre sleep Test worker @coroutine#2
post sleep Test worker @coroutine#2 10005ms
pre sleep Test worker @coroutine#3
post sleep Test worker @coroutine#3 10005ms
pre sleep Test worker @coroutine#4
post sleep Test worker @coroutine#4 10005ms
pre sleep Test worker @coroutine#5
post sleep Test worker @coroutine#5 10004ms
pre sleep Test worker @coroutine#6
post sleep Test worker @coroutine#6 10006ms
pre sleep Test worker @coroutine#7
post sleep Test worker @coroutine#7 10002ms
pre sleep Test worker @coroutine#8
post sleep Test worker @coroutine#8 10003ms
pre sleep Test worker @coroutine#9
post sleep Test worker @coroutine#9 10005ms
pre sleep Test worker @coroutine#10
post sleep Test worker @coroutine#10 10004ms
pre sleep Test worker @coroutine#11
post sleep Test worker @coroutine#11 10003ms
[1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9, 10, 10, 10]
Elapsed time 10084
100084

CodePudding user response:

  1. Because this is exactly what runTest() does: it creates a very special coroutine environment where the time flow is simulated. It keeps proper operation ordering, but everything happens immediately. We don't want unit test to run for seconds or even hours just waiting.

    If you prefer to wait as normal, use runBlocking() instead of runTest().

  2. There are three reasons for not running concurrently.

    • runTest() by default uses a single thread. If you need to use more threads then provide a coroutine dispatcher, for example: runTest(Dispatchers.Default) { ... }.

    • You should almost never block the thread when running inside coroutines. It makes coroutines unresponsive, as you can see in your example. Even if you use Dispatchers.Default then assuming you have 4 CPU cores, it will still take 30s to finish, because you can only execute 4 sleeps at a time.

    • triplicate() function is really messed up. By using async() and then await() immediately you still execute the code pretty much synchronously. triplicate() returns after waiting for 10s and because flatMapMerge() "calls transform sequentially", you really execute one triplicate() at a time.

      I guess what you intended (?) is to return a flow immediately and emit items after 10s. Then flatMapMerge() can acquire multiple such flows and collect them concurrently:

private fun triplicate(i: Int): Flow<Int> = flow {
    val t1 = System.currentTimeMillis()
    println("pre sleep ${Thread.currentThread().name}")
    delay(10000L)
    val t2 = System.currentTimeMillis()
    println("post sleep ${Thread.currentThread().name} ${t2 - t1}ms")
    emit(i)
    emit(i)
    emit(i)
}
  •  Tags:  
  • Related