I'm using spring-kafka 2.8.0 and I'm trying to implement non-blocking retries for batch kafka consumer. Here are my config and consumer:
@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>>
batchListenerFactory(ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
}
@Component
public class MyConsumer {
@KafkaListener(
topics = "my-topic",
containerFactory = "batchListenerFactory"
)
@RetryableTopic(
backoff = @Backoff(delay = 1000, multiplier = 2.0),
attempts = "4",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
autoCreateTopics = "false"
)
public void consume(List<ConsumerRecord<String, GenericRecord>> messages) {
// do some stuff
}
}
But on sturtup I'm getting the following exception:
java.lang.IllegalArgumentException: The provided class BatchMessagingMessageListenerAdapter is not assignable from AcknowledgingConsumerAwareMessageListener
My questions are:
Is there any way to combine batch consumer with
@RetryableTopic?Is there any another way to implement non-blocking retries for batch consumer? Is it possible to use
RetryTemplatefor this purpose?
CodePudding user response:
@RetryableTopic is not supported with batch listeners.
The RecoveringBatchErrorHandler (DefaultErrorHandler for 2.8 and later) supports sending a failed record within a batch to a dead letter topic, with the help of the listener throwing a BatchListenerFailedException indicating which record failed.
You would then have to implement your own listener on that topic.
