Home > Net >  Akka streams RestartSink doesn't seem to be restarting during failures
Akka streams RestartSink doesn't seem to be restarting during failures

Time:01-12

I am playing around with handling errors in akka streams with restartable sources & sinks.

object Main extends App {
  implicit val system: ActorSystem = ActorSystem("akka-streams-system")

  val restartSettings =
    RestartSettings(1.seconds, 10.seconds, 0.2d)

  val restartableSource = RestartSource.onFailuresWithBackoff(restartSettings) {() => {
    Source(0 to 10)
      .map(n =>
        if (n < 5) n.toString
        else throw new RuntimeException("Boom!"))
  }}

  val restartableSink: Sink[String, NotUsed] = RestartSink.withBackoff(restartSettings){
    () => Sink.fold("")((_, newVal) => {
      if(newVal == "3") {
        println(newVal   " Exception")
        throw new RuntimeException("Kabooom!!!") // TRIGGERRING A FAILURE expecting the steam to restart just the sink.
      } else {
        println(newVal   " sink")
      }
      newVal
    })
  }
  restartableSource.runWith(restartableSink)
}

I am breaking source and sink separately with different scenarios. I am breaking sink first expecting the sink to be restarting and reprocessing the newVal == 3 message over and over again. But it seems like the error in the sink is just thrown away and only the source failure is retried so the source ends up being restarted and reprocess the events starting from 0.

I am mimicking a scenario where I want to read from a source(let's say from a file) and have an HTTP sink that retries failed HTTP requests independently without restarting the whole stream's pipeline.

The output I get with the above-shared code is as follows.

0 sink
1 sink
2 sink
3 Exception
4 sink
[WARN] [01/10/2022 09:13:14.647] [akka-streams-system-akka.actor.default-dispatcher-6] [RestartWithBackoffSource(akka://akka-streams-system)] Restarting stream due to failure [1]: java.lang.RuntimeException: Boom!
java.lang.RuntimeException: Boom!
    at Main$.$anonfun$restartableSource$2(Main.scala:18)
    at Main$.$anonfun$restartableSource$2$adapted(Main.scala:16)
    at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:52)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
    at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:496)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
    at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
    at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
    at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:787)
    at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:819)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

I would appreciate any help on reasoning about why this is happening and how to restart the sink independently of the source.

CodePudding user response:

Your RestartSink is restarting (and not in the process restarting anything else): if it wasn't, you would never have gotten 4 sink as output right after 3 Exception. For some reason it's not logging, but that might be due to stream attributes (there's also been some behavioral changes around logging in the stream restarts in recent months, so logging may differ depending on what version you're running).

From the docs for RestartSink:

The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. When the wrapped Sink does cancel, this Sink will backpressure, however any elements already sent may have been lost.

This is fundamentally because in the general case stream stages are memoryless. In your Sink.fold example, it will restart with clean state (viz. ""). This does, in my experience, make the RestartSink and RestartFlow somewhat less useful than the RestartSource.

For the use-case you describe, I would tend to use a mapAsync stage with akka.pattern.RetrySupport to send HTTP requests via a Future-based API and retry requests on failures:

val restartingSource: Source[Element, _] = ???

restartingSource.mapAsync(1) { elem =>
  import akka.pattern.RetrySupport._
  // will need an implicit ExecutionContext and an implicit Scheduler (both are probably best obtained from the ActorSystem)

  val sendRequest = () => {
    // Future-based HTTP call
    ???
  }

  retry(
    attempt = sendRequest,
    attempts = Int.MaxValue,
    minBackoff = 1.seconds,
    maxBackoff = 10.seconds,
    randomFactor = 0.2
  )
}.runWith(Sink.ignore)
  •  Tags:  
  • Related