@KafkaListener | Spring Boot Example

@KafkaListener | Interview Questions

@KafkaListener | How does Kafka consumer work?

@KafkaListener | KafkaMessageListenerContainer vs ConcurrentMessageListenerContainer?

KafkaConsumer.java
@Component
public class KafkaConsumer {
    @KafkaListener(id = "foo", topics = "myTopic")
    public void listen(String data) {
        System.out.println(data);
    }
}

@KafkaListener allows a method to consume messages from Kafka topic(s).

@KafkaListener designates a method as a listener in a KafkaMessageListenerContainer.

A KafkaMessageListenerContainer is how Spring Boot connects and polls records from Kafka under the hood.

Remember that the @Component annotation tells Spring Boot to register our KafkaConsumer class as a managed Spring Bean.

KafkaConsumerConfig.java
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "http://localhost:9092");
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

This class configures the Kafka consumer.

The @Configuration annotation tells Spring Boot to generate bean definitions via @Bean annotation.

Any class annotated with @Bean will be registered by the Spring container and used at runtime.

The ConsumerFactory class is a Spring Kafka class for configuring the underlying ConcurrentMessageListenerContainer.

The ConcurrentMessageListenerContainer is a collection of KafkaMessageListenerContainers.

Remember that KafkaMessageListenerContainer implements the MessageListener.

Remember that a MessageListener is created by @KafkaListener annotation.

KafkaDemoApplication.java
@SpringBootApplication
public class KafkaDemoApplication {
   public static void main(String[] args) {
      SpringApplication.run(KafkaDemoApplication.class, args);
   }
}

@SpringBootApplication performs component scans and registers all of the configuration beans defined above.

This is how you run the Spring Boot application.

This is how you start the Kafka listener.

build.gradle
dependencies {
   implementation 'org.springframework.boot:spring-boot-starter'
   implementation 'org.springframework.kafka:spring-kafka'
}

These are the two dependencies required to run the application.

How does @KafkaListener work?

The @KafkaListener is an annotation that marks a method or class as the target of incoming messages.

By using @KafkaListener, you abstract away the need to configure the underlying KafkaMessageListenerContainer.

The KafkaListenerEndpointRegistry

The Spring Kafka library uses a BeanPostProcessor to register the target methods as KafkaEndpoints in a KafkaListenerEndpointRegistry.

The KafkaListenerEndpointRegistry class creates the underlying MessageListenerContainer instances.

While the KafkaListenerEndpointRegistry is registered in the Spring container, its listener containers are not. Instead, these listeners are registered with the KafkaListenerEndpointRegistry itself.

The KafkaListenerEndpointRegistry manages the lifecycle of the containers.

What are the @KafkaListener Annotation Properties?

  • id
  • containerFactory
  • topics
  • topicPattern
  • topicPartitions
  • containerGroup
  • errorHandler
  • groupId
  • idIsGroup
  • clientIdPrefix
  • beanRef
  • concurrency
  • autoStartup
  • properties
  • splitIterables
  • contentTypeConverter
  • batch

You can set these properties like this...

@Component
public class Listener {
    @KafkaListener(
            id = "foo", 
            topics = "myTopic",
            properties = {"bootstrap.server=localhost:9092"}
    )
    public void listen(String data) {
        System.out.println(data);
    }
}

What is DefaultKafkaConsumerFactory?

The DefaultKafkaConsumerFactory provides the default configuration for creating consumer instances.

The DefaultKafkaConsumerFactory class implements the ConsumerFactory interface.

Remember that the @KafkaListener annotation ultimately generates listener containers. These listener containers ultimately create Kafka consumer instances.

These consumers need to be configured with appropriate topics, consumer groups, bootstrap servers, etc. The DefaultKafkaConsumerFactory provides such details when creating these consumers via Spring Boot.

Still confused? Check out What is DefaultKafkaConsumerFactory?

What is KafkaListenerContainerFactory?

KafkaListenerContainerFactory is an interface.

The ConcurrentKafkaListenerContainerFactory class is an implementation of this interface. This is the default container factory used with Spring Kafka.

The ConcurrentKafkaListenerContainerFactory class creates the ConcurrentMessageListenerContainer(s) which creates a number of KafkaMessageListenerContainers (based on it's concurrency property).

These KafkaMessageListenerContainer instances are the same instances generated by @KafkaListener.

Still confused? Check out What is KafkaListenerContainerFactory?

Your thoughts?

|

Great read on @KafkaListener annotation. It's a very easy method to use but also very misunderstood in terms of how it works with the Spring Context under the hood. This article does a great job of exploring the interfaces and classes that make it so easy to consume messages from Kafka using @KafkaListenerAnnotation...maybe even better than the official docs.

A couple of things to add to this...

RecordFilterStrategy can be used with the @KafkaListenerAnnotation to filter messages in one batch call.

Starting in version 2.8+, you can specify or override the RecordFilterStrategy via the filter attribute.

There are 2 implementations of the MessageListenerContainer interface. While the KafkaMessageListenerContainer receives all of the message for the specified topics on a single thread, the ConcurrentMessageListenerContainer uses a collection of KafkaMessageListenerContainer to achieve multi threaded consumption.

HOWEVER if you have two separate applications that are both using a single threaded KafkaMessageListenerContainer with the same group id then you can achieve multi threaded consumption in this way as well....

|

@KafkaListener makes life easy for Java developers trying to read messages from Kafka but there are an important few caveats to consider.

By default, Spring Kafka does not automatically commit offsets for you. The default AckMode is BATCH. This means that your @KafkaListener method won't commit offsets until every single record in the batch has been processed. So when the underlying Kafka consumer runs the poll() method, it's going to bring back (default 500) messages. Depending on the type of listener you implement (MessageListener vs BatchMessageListener), you will receive either individual consumer records or a List of all the records in the batch.

This is important to understand these default configurations because it can lead to unexpected results. There is a limit on how much time Kafka waits between poll() calls. If the processing for each message leads to the total execution time exceeding this interval then you run the risk of reprocessing the same messages over and over again.

You can either change config properties to read fewer messages, change time interval to wait longer to solve this issue.

|

It's rather powerful what the @KafkaListener annotation does for you when you think about it. Sometimes I wonder if underneath the hood too much is going on with the configuration nuances that is Spring Kafka.

To make sure I understand at high level a @KafkaListener basically creates these listener containers via Spring Boot config?

Also concurrency is something I struggle with. So each ConcurrentMessageContainer starts a bunch of KafkaMessageListenerContainers based on the concurrency passed in. So it will start a separate consumer on separate threads then.

|

@KafkaListener immediately turns any class or method into a Kafka consumer. It's important to remember that BY DEFAULT, the @KafkaConsumer class utilizes the ConcurrentMessageListenerContainer aka supports multiple threads (or consumers) reading from the same group.

This is why group id is also important because it defines how the listener containers are organized. Specifying group id allows you to have multiple @KafkaListener implementations in the same class as per this example...

|

thanks this really helped!

|

it's rather confusing to understand how @KafkaListener works under the hood. I think this did a pretty good job of explaining it..especially for beginners. Not easy stuff to understand.

|

great read, thx!

|

amazing read. very helpful on @KafkaListener. we sure do take these annotations for granted! :)

|

In @KafkaListener we can listen to the messages which satisfy some rules ? Such as date to be 01.02.2020 or so ?

|

great read thx! :)