Home > Software design >  why mutableSharedFlow subscriptionCount changes when use ShareIn
why mutableSharedFlow subscriptionCount changes when use ShareIn

Time:02-03

why it changes whether use ShareIn func or not

val sharedf = MutableSharedFlow<Int>()

fun Flow<Int>.print() : Flow<Int> {
    return map {
          println("in print : $it")
          it
       }
    }

fun Flow<String>.printS() : Flow<Int> {
    return map {
        println("in print : $it")
        it.toInt()
    }
}

fun Flow<Int>.toNext() : Flow<Int> {
    return merge(
        filterIsInstance<Int>().print(),
        filterIsInstance<String>().printS(),
    )
}

when I use main func like this

fun main() = runBlocking<Unit> {
    launch {

        sharedf.onSubscription{
            println("subsCount : ${sharedf.subscriptionCount.value}")
        }
        .shareIn(GlobalScope, SharingStarted.Lazily)
        .toNext()
        .collect()

    }

    launch {
        for (i in 0..3){
            delay(500)
            sharedf.emit(i)
        }
    }
}

subsCount : 1

but if I remove ShareIn

subsCount : 2

why I have to Use ShareIn even sharedf is already MutableSharedFlow

CodePudding user response:

Looking at toNext, you can see that it builds 2 flows from the receiver (this) flow, by using twice filterIsInstance, and merging those 2 flows. Therefore, when you collect someFlow.toNext(), it will collect twice the original source flow someFlow.

This is why, if you don't put shareIn in your main method, you see 2 subscribers on sharedf. The toNext().collect() subscribes twice to the shared flow due to the implementation of toNext() as we've just seen.

Now, if you add shareIn() in the middle, you create another, independent shared flow. Let's call it shared2, and extract it to a variable to make it clearer:

val shared2 = sharedf.onSubscription { ... }.shareIn(GlobalScope, SharingStarted.Lazily)
shared2.toNext().collect()

In a way, that new shared flow shared2 "protects" the source flow sharedf from collectors. When many collectors collect shared2, shared2 only collects once the source flow sharedf.

So, even if toNext() collects shared2 twice, shared2 only collects sharedf once, which is why you only see 1 subscription on sharedf in your log.

why I have to Use ShareIn even sharedf is already MutableSharedFlow

What makes you think you have to do that? Creating a shared flow via shareIn out of a flow that's already a shared flow is indeed unexpected.


Side note: you can use onEach instead of map in your print function

CodePudding user response:

The way shareIn works in very simple terms is that it launches a coroutine that runs indefinitely and collects the source flow to re-emit those values. So unlike the map and merge operators that create cold Flows that only subscribe to the source when they're collected, shareIn creates a Flow that immediately starts collecting from the source Flow, so it is immediately counted as a subscriber of the source flow.

The downstream toNext() wraps the second SharedFlow, not the first. The first one doesn't know about downstream subscribers. It is only emitting to the single SharedFlow from the shareIn call, so it has only one subscriber.

When you remove shareIn, then your top level SharedFlow is emitting to both of the subscribers that are created in your toNext()'s merge() call, so it sees two subscribers. Note that it will not see any subscribers until collect is called, since those are cold wrappers.

  •  Tags:  
  • Related