Java – How to set up RecordInterceptor to ConcurrentKafkaListenerContainerFactory

How to set up RecordInterceptor to ConcurrentKafkaListenerContainerFactory… here is a solution to the problem.

How to set up RecordInterceptor to ConcurrentKafkaListenerContainerFactory

I’m using Spring Kafka 2.2.7, I’ve configured my @EnableKafka with kafkaListenerContainerFactory and consume messages using @KafkaListener, and everything is working as expected.

I want to add a RecordInterceptor to log all consumed messages, but find it difficult to configure it. documentation indicates that it can be set on the container RecordInterceptor, but I’m not sure how to get an instance of the container.

Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(createConsumerFactory());
        factory.setConcurrency(consumerCount);
        return factory;
    }

I

looked at the Spring docs and didn’t find a solution, which seems like a simple thing, but maybe I’m missing something.

Any help would be appreciated.

Thanks in advance.

Solution

There is a method setRecordInterceptor since 2.2.7

factory.setRecordInterceptor(new RecordInterceptor);

There is another piece of information< a href="https://docs.spring.io/spring-kafka/docs/2.3.0.RELEASE/reference/html/#receiving-messages" rel="noreferrer noopener nofollow">RecordInterceptor Not available for batch listeners

Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record. If the interceptor returns null, the listener is not called. The interceptor is not invoked when the listener is a batch listener.

Related Problems and Solutions