I am trying solve my problem when i need to get last element (last method) of a flux but in some cases these flux can be empty and the follow error is appear
Flux#last() didn't observe any onNext signal
and this is the chain i have
return apiService.getAll(entry)
.flatMap(response -> {
if (response.getId() != null){
//do some logic
return Mono.just("some Mono");
}
else{
return Mono.empty();
}
})
.last()
//more flatMap operators
I already use switchIfEmpty()as well but can't fix.
What is the correct implementation to verify if can call last() or skip and return a empty to terminate chain operation.
Thanks,
CodePudding user response:
According to Flux.last() api doc:
emit NoSuchElementException error if the source was empty. For a passive version use takeLast(int)
It means that, for an empty upstream Flux:
last()will emit an errortakeLast(1)will return an empty flux
Now, takeLast(1) returns a Flux, not a Mono, as last() does. Then, you can just chain it with Flux.next(), and it will return the only retained value (if any), or propagate the empty signal.
Note: another solution would be to use last().onErrorResume(NoSuchElementException.class, err -> Mono.empty()).
This would catch the error sent by last() internally, and then return an empty mono.
However, if you've got some code other than last() that can throw a NoSuchElementException, you might miss a problem. For this, my personal choice for your case would be to use takeLast(1).next().
The following code example shows behavior of last() vs takeLast(1).next():
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class FluxLast {
static void subscribe(Mono<?> publisher) {
publisher.subscribe(value -> {},
err -> System.out.println("Failed: " err.getMessage()),
() -> System.out.println("Completed empty"));
}
public static void main(String[] args) {
subscribe(Flux.empty().last());
subscribe(Flux.empty().takeLast(1).next());
// When not empty, takeLast(1).next() will return the last value
Integer v = Flux.just(1, 2, 3)
.takeLast(1)
.next()
.block();
System.out.println("Last value: " v);
}
}
Program output:
Failed: Flux#last() didn't observe any onNext signal from Callable flux
Completed empty
3
