How to set up RecordInterceptor to ConcurrentKafkaListenerContainerFactory
I’m using Spring Kafka 2.2.7, I’ve configured my @EnableKafka
with kafkaListenerContainerFactory
and consume messages using @KafkaListener
, and everything is working as expected.
I want to add a RecordInterceptor
to log all consumed messages, but find it difficult to configure it. documentation indicates that it can be set on the container RecordInterceptor, but I’m not sure how to get an instance of the container.
Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
factory.setConcurrency(consumerCount);
return factory;
}
I
looked at the Spring docs and didn’t find a solution, which seems like a simple thing, but maybe I’m missing something.
Any help would be appreciated.
Thanks in advance.
Solution
There is a method setRecordInterceptor since 2.2.7
factory.setRecordInterceptor(new RecordInterceptor);
There is another piece of information< a href="https://docs.spring.io/spring-kafka/docs/2.3.0.RELEASE/reference/html/#receiving-messages" rel="noreferrer noopener nofollow">RecordInterceptor Not available for batch listeners
Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record. If the interceptor returns null, the listener is not called. The interceptor is not invoked when the listener is a batch listener.