I have a filter to filter out incorrect items in a stream. In some edge cases this could result in all items being filtered out. When this happens the stream fails with error -
java.util.NoSuchElementException: reduce over empty stream on running reduce.
How to handle this case to return a meaningful response?
I have tried supervision like -
val decider: Supervision.Decider = {
case _ => Supervision.Stop
case e : NoSuchElementException => Supervision.stop
}
RunnableGraph.
toMat(Sink.reduce[Int](_ _)
.withAttributes(ActorAttributes.supervisionStrategy(decider)))(Keep.both)
.run()
I also tried recover, but nothing seems to work.
I need to handle this case to return a meaningful response.
Any help will be appreciated.
CodePudding user response:
Just because you're using Sink.reduce[Int], you could add a Source that guarantees to have a single element 0 and thus Sink.reduce[Int] will work and produce 0 as a result.
Here's an example
val zero = Source.single(0)
val possiblyEmpty = Source(List[Int](1, 3, 5)).filter(_ % 2 == 0)
val eventualInt = zero.merge(possiblyEmpty).toMat(Sink.reduce[Int](_ _))(Keep.right).run()
CodePudding user response:
It may be worth considering using Sink.fold instead of Sink.reduce:
val possiblyEmpty = Source(Seq(1, 3, 5)).filter(_ % 2 == 0)
val eventualInt = possiblyEmpty.toMat(Sink.fold[Int](0)(_ _))(Keep.right).run()
If there's no reasonable zero/identity element, you can have something a little more generalized along these lines:
def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
val lifted = { (x: Option[T], y: Option[T]) =>
x.flatMap(a => y.map(f))
}
source.map(x => Some(x))
.concat(Source.single(None))
.statefulMapConcat[Option[T]] { () =>
var emptyStream = true
{ x =>
x match {
case Some(x) =>
// element from the given stream
emptyStream = false
List(x)
case None =>
// given stream completed
if (emptyStream) {
List(x)
} else {
Nil // don't emit anything
}
}
}
}
.toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}
The returned graph will complete with None if there were no elements, or a Some of the result of the reduction.
EDIT: you can also just use orElse on the Source/Flow, in lieu of .concat.statefulMapConcat in the above:
def reducePossiblyEmpty[T](source: Source[T])(f: (T, T) => T): RunnableGraph[Future[Option[T]]] = {
val lifted = { (x: Option[T], y: Option[T]) =>
x.flatMap(a => y.map(f))
}
source.map(x => Some(x))
.orElse(Source.single(None))
.toMat(Sink.reduce[Option[T]](lifted))(Keep.right)
}
