KafkaConsumerConfig.java
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, pool);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
return props;
}
public ConsumerFactory<String, MetadataFileIntegrationDTO> consumerFactoryMetadataFileIntegration() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer<>(MetadataFileIntegrationDTO.class, false));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> kafkaListenerContainerFactoryMetadataFileIntegration() {
ConcurrentKafkaListenerContainerFactory<String, MetadataFileIntegrationDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setErrorHandler(new KafkaErrorHandler());
factory.setMessageConverter(new StringJsonMessageConverter());
factory.setConsumerFactory(consumerFactoryMetadataFileIntegration());
return factory;
}
MetadataFileCustom.Java
@KafkaListener(topics = TOPIC,
groupId = GROUP,
containerFactory = "kafkaListenerContainerFactoryMetadataFileIntegration")
public void streamListener(MetadataFileIntegrationDTO metadataFileIntegrationDTO) {
log.info(TOPIC "===> RECEIVED MESSAGE:" metadataFileIntegrationDTO);
metadataFileService.save(metadataFileIntegrationDTO);
}
if I change my consumerFactoryMetadataFileIntegration to
public ConsumerFactory consumerFactoryMetadataFileIntegration() {
return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),
new StringDeserializer());
}
works, but the sonar complains..

Error: Listener failed; nested exception is java.lang.IllegalStateException: Only String, Bytes, or byte[] supported
CodePudding user response:
The error is saying that since you've used a StringDeserializer class for the value, then your KafkaListener method of that factory needs to be Only String, Bytes, or byte[], and not your DTO object
CodePudding user response:
You can imagine the consumer flow with MessageConverter like:
Native deserializer (
StringDeserializerin your case) deserializesbyte[]messages toStringmessages.Consumer.poll() returns these
Stringmessages.Your MessageConverter (
StringJsonMessageConverter) converts theseStringmessages to your typeMetadataFileIntegrationDTO(determined by params in @KafkaListener)
So when you defined your native deserializer as JsonDeserializer (corresponding to ConsumerFactory<String, MetadataFileIntegrationDTO>), the consumer.poll() returned MetadataFileIntegrationDTO messages, and that wasn't the type the StringJsonMessageConverter can process (you could see Only String, Bytes, or byte[] supported)
And when you changed JsonDeserializer to StringDeserializer, the corresponding ConsumerFactory was ConsumerFactory<String, String>. This means that when you create a new Consumer from this ConsumerFactory, the consumer.poll() returns String.
