why it changes whether use ShareIn func or not
val sharedf = MutableSharedFlow<Int>()
fun Flow<Int>.print() : Flow<Int> {
return map {
println("in print : $it")
it
}
}
fun Flow<String>.printS() : Flow<Int> {
return map {
println("in print : $it")
it.toInt()
}
}
fun Flow<Int>.toNext() : Flow<Int> {
return merge(
filterIsInstance<Int>().print(),
filterIsInstance<String>().printS(),
)
}
when I use main func like this
fun main() = runBlocking<Unit> {
launch {
sharedf.onSubscription{
println("subsCount : ${sharedf.subscriptionCount.value}")
}
.shareIn(GlobalScope, SharingStarted.Lazily)
.toNext()
.collect()
}
launch {
for (i in 0..3){
delay(500)
sharedf.emit(i)
}
}
}
subsCount : 1
but if I remove ShareIn
subsCount : 2
why I have to Use ShareIn even sharedf is already MutableSharedFlow
CodePudding user response:
Looking at toNext, you can see that it builds 2 flows from the receiver (this) flow, by using twice filterIsInstance, and merging those 2 flows. Therefore, when you collect someFlow.toNext(), it will collect twice the original source flow someFlow.
This is why, if you don't put shareIn in your main method, you see 2 subscribers on sharedf. The toNext().collect() subscribes twice to the shared flow due to the implementation of toNext() as we've just seen.
Now, if you add shareIn() in the middle, you create another, independent shared flow. Let's call it shared2, and extract it to a variable to make it clearer:
val shared2 = sharedf.onSubscription { ... }.shareIn(GlobalScope, SharingStarted.Lazily)
shared2.toNext().collect()
In a way, that new shared flow shared2 "protects" the source flow sharedf from collectors. When many collectors collect shared2, shared2 only collects once the source flow sharedf.
So, even if toNext() collects shared2 twice, shared2 only collects sharedf once, which is why you only see 1 subscription on sharedf in your log.
why I have to Use ShareIn even sharedf is already MutableSharedFlow
What makes you think you have to do that? Creating a shared flow via shareIn out of a flow that's already a shared flow is indeed unexpected.
Side note: you can use onEach instead of map in your print function
CodePudding user response:
The way shareIn works in very simple terms is that it launches a coroutine that runs indefinitely and collects the source flow to re-emit those values. So unlike the map and merge operators that create cold Flows that only subscribe to the source when they're collected, shareIn creates a Flow that immediately starts collecting from the source Flow, so it is immediately counted as a subscriber of the source flow.
The downstream toNext() wraps the second SharedFlow, not the first. The first one doesn't know about downstream subscribers. It is only emitting to the single SharedFlow from the shareIn call, so it has only one subscriber.
When you remove shareIn, then your top level SharedFlow is emitting to both of the subscribers that are created in your toNext()'s merge() call, so it sees two subscribers. Note that it will not see any subscribers until collect is called, since those are cold wrappers.
