Home > Net >  How to handle java.util.NoSuchElementException: reduce over empty stream akka stream?
How to handle java.util.NoSuchElementException: reduce over empty stream akka stream?

Time:02-01

I have a filter to filter out incorrect items in a stream. In some edge cases this could result in all items being filtered out. When this happens the stream fails with error - java.util.NoSuchElementException: reduce over empty stream on running reduce.

How to handle this case to return a meaningful response?

I have tried supervision like -

val decider: Supervision.Decider = {
    case _                      => Supervision.Stop
    case e : NoSuchElementException => Supervision.stop
  }

RunnableGraph.
toMat(Sink.reduce[Int](_   _)
.withAttributes(ActorAttributes.supervisionStrategy(decider)))(Keep.both)
.run()

I also tried recover, but nothing seems to work.

I need to handle this case to return a meaningful response.

Any help will be appreciated.

CodePudding user response:

Just because you're using Sink.reduce[Int], you could add a Source that guarantees to have a single element 0 and thus Sink.reduce[Int] will work and produce 0 as a result.

Here's an example

val zero          = Source.single(0)
val possiblyEmpty = Source(List[Int](1, 3, 5)).filter(_ % 2 == 0)
val eventualInt   = zero.merge(possiblyEmpty).toMat(Sink.reduce[Int](_   _))(Keep.right).run()

CodePudding user response:

It may be worth considering using Sink.fold instead of Sink.reduce:

val possiblyEmpty = Source(Seq(1, 3, 5)).filter(_ % 2 == 0)
val eventualInt = possiblyEmpty.toMat(Sink.fold[Int](0)(_   _))(Keep.right).run()

If there's no reasonable zero/identity element, you can have something a little more generalized along these lines:

def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
  val lifted = { (x: Option[T], y: Option[T]) =>
    x.flatMap(a => y.map(f))
  }
  source.map(x => Some(x))
    .concat(Source.single(None))
    .statefulMapConcat[Option[T]] { () =>
      var emptyStream = true
      { x =>
        x match {
          case Some(x) =>
            // element from the given stream
            emptyStream = false
            List(x)

          case None =>
            // given stream completed
            if (emptyStream) {
              List(x)
            } else {
              Nil  // don't emit anything
            }
        }
      }
    }
    .toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}

The returned graph will complete with None if there were no elements, or a Some of the result of the reduction.

EDIT: you can also just use orElse on the Source/Flow, in lieu of .concat.statefulMapConcat in the above:

def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
  val lifted = { (x: Option[T], y: Option[T]) =>
    x.flatMap(a => y.map(f))
  }

  source.map(x => Some(x))
    .orElse(Source.single(None))
    .toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}
  •  Tags:  
  • Related