Home > Software design >  Kotlin: Is there a tool that allows me to control parallelism when executing suspend functions?
Kotlin: Is there a tool that allows me to control parallelism when executing suspend functions?

Time:02-08

I'm trying to execute certain suspend function multiple times, in such a way that never more than N of these are being executed at the same time.

For those acquainted with Akka and Scala Streaming libraries, something like mapAsync.

I did my own implementation using one input channel (as in kotlin channels) and N output channels. But it seems cumbersome and not very efficient.

The code I'm currently using is somewhat like this:

val inChannel = Channel<T>()
val outChannels = (0..n).map{
  Channel<T>()
}
launch{
   var i = 0
   for(t in inChannel){
     
     outChannels[i].offer(t)
     i = ((i 1)%n)
   }
}
outChannels.forEach{outChannel ->
  launch{
     for(t in outChannel){
        fn(t)
     }
  }
}

Of course it has error management and everything, but still...

CodePudding user response:

You can use the limitedParallelism-function on a Dispatcher (experimental in v1.6.0), and use the returned dispatcher to call your asynchronous functions. The function returns a view over the original dispatcher which limits the parallelism to a limit you provide. You can use it like this:

val limit = 2 // Or some other number

val dispatcher = Dispatchers.Default
val limitedDispatcher = dispatcher.limitedParallelism(limit)

for (n in 0..100) {
    scope.launch(limitedDispatcher) {
        executeTask(n)
    }
}

CodePudding user response:

Your question, as asked, calls for @marstran's answer. If what you want is that no more than N coroutines are being actively executed at any given time (in parallel), then limitedParallelism is the way to go:

val maxThreads: Int = TODO("some max number of threads")
val limitedDispatcher = Dispatchers.Default.limitedParallelism(maxThreads)

elements.forEach { elt ->
    scope.launch(limitedDispatcher) {
        doSomething(elt)
    }
}

Now, if what you want is to even limit concurrency, so that at most N coroutines are run concurrently (potentially interlacing), regardless of threads, you could use a Semaphore instead:

val maxConcurrency: Int = TODO("some max number of concurrency coroutines")
val semaphore = Semaphore(maxConcurrency)

elements.forEach { elt ->
    scope.async {
        semaphore.withPermit {
            doSomething(elt)
        }
    }
}

You can also combine both approaches.

  •  Tags:  
  • Related