I have a spring boot application where we are using spring Kafka in consumer section i have made enable.auto.commit to false and set my listener ack-mode to manual_immediate
I have concurrent consumers so after consuming the record I call acknowledgment.acknowledge() but here i still face the duplicate issue problem whenever rebalance happens other consumer start consuming the same message which is already consumed by one consumer. Any idea what magic is happening behind the scene.
Anyone know when using manual_immediate does it commit message by commitSync or commitAsync ? is there way we can change the behaviour to avoid duplicates record message reading. Is there a way we can use hybrid model in Spring Kafka
In spring boot Kafka is there a way we can see whenever a rebalance happen i can log it.
How to create rebalance if we want to do it for some testing purpose?
CodePudding user response:
As long as you call acknowledge on the listener thread, it will use commitSync() by default; use the syncCommits container property to use async commits.
If you call it on a different thread, the commit is queued to be processed by the consumer thread as soon as possible.
Duplicates cannot be avoided if a forced rebalance takes place because your listener took too long to process the records received by the poll().
You can increase max.poll.interval.ms and/or reduce max.poll.records to ensure you can process the records in time.
You can add a ConsumerRebalanceListener to the container properties to log rebalances.
Reduce max.poll.interval.ms to a small value to reproduce in a test.
CodePudding user response:
Firstly, irrespective of ack-mode it is never guaranteed that a message is consumed just once. For instance, A rebalance can happen between the time a message is consumed and the time offset is committed, resulting in Kafka delivering the message again to the newly assigned consumer. It is the applications responsibility to be idempotent to duplicated messages.
In order to listen for rebalance events an implementation of ConsumerRebalanceListener is needed. You can plug this implementation to Spring's auto configured ConcurrentKafkaListenerContainerFactory instance. A more detailed description of how this can be done has already been answered here.
If you wish to create a forced rebalance for testing you can do so by killing one of hopefully more than 1 existing consumers. If using spring-kafka you can do this by using an @Autowired instance of KafkaListenerEndpointRegistry and kill/rest/(re)start any consumer. Something on this should do:
@Autowired
KafkaListenerEndpointRegistry registry;
public void myTest() {
Collection<MessageListenerContainer> containers = registry.getAllListenerContainers()
containers.get(0).stop()
}
