Home > Software engineering >  Spring Kafka KafkaListener picks up message when there's a breakpoint, doesn't without one
Spring Kafka KafkaListener picks up message when there's a breakpoint, doesn't without one

Time:01-27

I have a simple integration test that spins up embedded kafka, sends a message to the topic, assigns the payload to a variable and compares the value.

Listener:

@KafkaListener( topics = "topic.name", groupId = "group-id")
public void receive(@Payload ConsumerRecord<?, ?> consumerRecord) {
    setPayload(consumerRecord.toString());
}

Test (using awaitly for delay)

@Test
public void canReceiveAMessage() {
    producer.send(topic, TEST_MESSAGE);
    await().pollDelay(10, TimeUnit.SECONDS).atMost(30, TimeUnit.SECONDS).untilAsserted(() 
        -> assertThat(consumer.getPayload()).contains(TEST_MESSAGE);
}

I assume the breakpoint resolves some kind of race condition - however, with the breakpoint, it resolves faster than 30 seconds, so i'm unsure what it seems to trigger that makes it work. I can verify that the rest of it is fine, given it works with the breakpoint.

Edit: It reliably passes if I sent the Consumer config to AUTO_OFFSET_RESET_CONFIG="earliest". Now, I don't want this long term, it wants to be "latest" - is there a reliable solution for this?

CodePudding user response:

The breakpoint is causing the producer thread to flush its batch.

Call producer.send(topic, TEST_MESSAGE).get(Duration.ofMillis(100); in order to properly await

CodePudding user response:

You can add a ConsumerRebalanceListener to the container configuration and wait for the partition(s) to be assigned before sending the test message.

  •  Tags:  
  • Related