Home > Net >  Cannot pass callback lambdas into .toReactivePublisher().subscribe() in Spring Integration with Proj
Cannot pass callback lambdas into .toReactivePublisher().subscribe() in Spring Integration with Proj

Time:01-25

I'm getting an error of Caused by: java.lang.IllegalStateException: No subscriptions have been created when I use Spring Integration with Project Reactor and I try to figure out how can I subscribe. My original code was:

    @Bean
    public IntegrationFlow writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .get();
    }

After it threw the error I've tried to subscribe but I've couldn't understand what should I pass to the subscribe method, that seems to act differently than the regular reactive .subscribe().

    @Bean
    public void writeToKafka() {
        return IntegrationFlows.from(reactiveKafkaConsumerTemplate.receiveAutoAck()
                .map(payload -> {
                    return new GenericMessage<ConsumerRecord<String, String>>(payload);
                }))
            .<ConsumerRecord<String, String>, String>transform(ConsumerRecord::value)
            .channel(c -> c.queue("resultChannel"))
            .toReactivePublisher().subscribe(value -> {
                log.info("Wrote: "   value);
            });
    }

CodePudding user response:

Do that .toReactivePublisher().subscribe() combination is not correct. The IntegrationFlow must be first exposed and configured as a bean. And only then, after injecting this bean somewhere in your service you can subscribe() to that Publisher bean.

You are missing the fact that inversion of control has to be initialized first in its dependency injection container and only after that we can do some real work (subscribe) with those beans.

EDIT

For example my test-case:

@SpringJUnitConfig
@DirtiesContext
public class ReactiveStreamsTests {

    @Autowired
    @Qualifier("pollableReactiveFlow")
    private Publisher<Message<Integer>> pollablePublisher;

    @Autowired
    private AbstractEndpoint reactiveTransformer;

    @Autowired
    @Qualifier("inputChannel")
    private MessageChannel inputChannel;

    @Test
    void testPollableReactiveFlow() throws Exception {
        assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
        this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));

        CountDownLatch latch = new CountDownLatch(6);

        Flux.from(this.pollablePublisher)
                .take(6)
                .filter(m -> m.getHeaders().containsKey(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .doOnNext(p -> latch.countDown())
                .subscribe();

        ExecutorService exec = Executors.newSingleThreadExecutor();
        Future<List<Integer>> future =
                exec.submit(() ->
                        Flux.just("11,12,13")
                                .map(v -> v.split(","))
                                .flatMapIterable(Arrays::asList)
                                .map(Integer::parseInt)
                                .<Message<Integer>>map(GenericMessage::new)
                                .concatWith(this.pollablePublisher)
                                .take(7)
                                .map(Message::getPayload)
                                .collectList()
                                .block(Duration.ofSeconds(10))
                );

        this.inputChannel.send(new GenericMessage<>("6,7,8,9,10"));

        assertThat(latch.await(20, TimeUnit.SECONDS)).isTrue();
        List<Integer> integers = future.get(20, TimeUnit.SECONDS);

        assertThat(integers).isNotNull();
        assertThat(integers.size()).isEqualTo(7);
        exec.shutdownNow();
    }

    @Configuration
    @EnableIntegration
    public static class ContextConfiguration {

        @Bean
        public Publisher<Message<Integer>> pollableReactiveFlow() {
            return IntegrationFlows
                    .from("inputChannel")
                    .split(s -> s.delimiters(","))
                    .<String, Integer>transform(Integer::parseInt,
                            e -> e.reactive(flux -> flux.publishOn(Schedulers.parallel())).id("reactiveTransformer"))
                    .channel(MessageChannels.queue())
                    .log()
                    .toReactivePublisher();
        }

    }

}
  •  Tags:  
  • Related