Home > Blockchain >  Kotlin Coroutine/Flow Timeout without cancelling the running coroutine?
Kotlin Coroutine/Flow Timeout without cancelling the running coroutine?

Time:01-13

I am trying to create a Flow that emits a value after a timeout, without cancelling the underlying coroutine. The idea is that the network call has X time to complete and emit a value and after that timeout has been reached, emit some initial value without cancelling the underlying work (eventually emitting the value from the network call, assuming it succeeds).

Something like this seems like it might work, but it would cancel the underlying coroutine when the timeout is reached. It also doesn't handle emitting some default value on timeout.

val someFlow = MutableStateFlow("someInitialValue")

val deferred = async {
    val networkCallValue = someNetworkCall()
    someFlow.emit(networkCallValue)
}

withTimeout(SOME_NUMBER_MILLIS) {
    deferred.await()
}

I'd like to be able to emit the value returned by the network call at any point, and if the timeout is reached just emit some default value. How would I accomplish this with Flow/Coroutines?

CodePudding user response:

One way to do this is with a simple select clause:

import kotlinx.coroutines.selects.*

val someFlow = MutableStateFlow("someInitialValue")

val deferred = async {
    someFlow.value = someNetworkCall()
}

// await the first of the 2 things, without cancelling anything
select<Unit> {
    deferred.onAwait {}
    onTimeout(SOME_NUMBER_MILLIS) {
        someFlow.value = someDefaultValue
    }
}

You would have to watch out for race conditions though, if this runs on a multi-threaded dispatcher. If the async finished just after the timeout, there is a chance the default value overwrites the network response.

One way to prevent that, if you know the network can't return the same value as the initial value (and if no other coroutine is changing the state) is with the atomic update method:

val deferred = async {
    val networkCallValue = someNetworkCall()
    someFlow.update { networkCallValue }
}

// await the first of the 2 things, without cancelling anything
val initialValue = someFlow.value
select<Unit> {
    deferred.onAwait {}
    onTimeout(300) {
        someFlow.update { current ->
            if (current == initialValue) {
                "someDefaultValue"
            } else {
                current // don't overwrite the network result
            }
        }
    }
}

If you can't rely on comparisons of the state, you can protect access to the flow with a Mutex and a boolean:

val someFlow = MutableStateFlow("someInitialValue")
val mutex = Mutex()
var networkCallDone = false

val deferred = async {
    val networkCallValue = someNetworkCall()
    mutex.withLock {
        someFlow.value = networkCallValue
        networkCallDone = true
    }
}

// await the first of the 2 things, without cancelling anything
select<Unit> {
    deferred.onAwait {}
    onTimeout(300) {
        mutex.withLock {
            if (!networkCallDone) {
                someFlow.value = "someDefaultValue"
            }
        }
    }
}

CodePudding user response:

You can launch two coroutines simultaneously and cancel the Job of the first one, which responsible for emitting default value, in the second one:

val someFlow = MutableStateFlow("someInitialValue")

val firstJob = launch {
    delay(SOME_NUMBER_MILLIS)
    ensureActive() // Ensures that current Job is active.
    someFlow.update {"DefaultValue"}
}
launch {
    val networkCallValue = someNetworkCall()
    firstJob.cancel()
    someFlow.update { networkCallValue }
}

CodePudding user response:

Probably the easiest way to solve the race condition is to use select() as in @Joffrey's answer. select() guarantees to execute only a single branch.

However, I believe mutating a shared flow concurrently complicates the situation and introduces another race condition that we need to solve. Instead, we can do it really very easily:

flow {
    val network = async { someNetworkCall() }
    select {
        network.onAwait{ emit(it) }
        onTimeout(1000) {
            emit("initial")
            emit(network.await())
        }
    }
}

There are no race conditions to handle. We have just two simple execution branches, depending on what happened first.

If we need a StateFlow then we can use stateIn() to convert a regular flow. Or we can use a MutableStateFlow as in the question, but mutate it only inside select(), similarly to above:

select {
    network.onAwait{ someFlow.value = it }
    onTimeout(1000) {
        someFlow.value = "initial"
        someFlow.value = network.await()
    }
}

CodePudding user response:

You can send the network request and start the timeout delay simultaneously. When the network call succeeds, update the StateFlow with the response. And, when the timeout finishes and we haven't received the response, update the StateFlow with the default value.

val someFlow = MutableStateFlow(initialValue)

suspend fun getData() {
    launch {
        someFlow.value = someNetworkCall()
    }
    delay(TIMEOUT_MILLIS)
    if(someFlow.value == initialValue)
        someFlow.value = defaultValue
}

If the response of the network call can be same as the initialValue, you can create a new Boolean to check the completion of network request. Another option can be to store a reference of the Job returned by launch and check if job.isActive after the timeout.

Edit: In case you want to cancel delay when the network request completes, you can do something like:

val someFlow = MutableStateFlow(initialValue)

suspend fun getData() {
    val job = launch {    
        delay(TIMEOUT_MILLIS)
        someFlow.value = defaultValue
    } 
    someFlow.value = someNetworkCall()
    job.cancel()
}

And to solve the possible concurrency issue, you can use MutableStateFlow.update for atomic updates.

  •  Tags:  
  • Related