@KafkaListener | Spring Boot Example

@KafkaListener | Interview Questions

@KafkaListener | How does Kafka consumer work?

@KafkaListener | KafkaMessageListenerContainer vs ConcurrentMessageListenerContainer?

KafkaConsumer.java
@Component
public class KafkaConsumer {
    @KafkaListener(id = "foo", topics = "myTopic")
    public void listen(String data) {
        System.out.println(data);
    }
}

@KafkaListener allows a method to consume messages from Kafka topic(s).

@KafkaListener designates a method as a listener in a KafkaMessageListenerContainer.

A KafkaMessageListenerContainer is how Spring Boot connects and polls records from Kafka under the hood.

Remember that the @Component annotation tells Spring Boot to register our KafkaConsumer class as a managed Spring Bean.

KafkaConsumerConfig.java
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "http://localhost:9092");
        props.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

This class configures the Kafka consumer.

The @Configuration annotation tells Spring Boot to generate bean definitions via @Bean annotation.

Any class annotated with @Bean will be registered by the Spring container and used at runtime.

The ConsumerFactory class is a Spring Kafka class for configuring the underlying ConcurrentMessageListenerContainer.

The ConcurrentMessageListenerContainer is a collection of KafkaMessageListenerContainers.

Remember that KafkaMessageListenerContainer implements the MessageListener.

Remember that a MessageListener is created by @KafkaListener annotation.

KafkaDemoApplication.java
@SpringBootApplication
public class KafkaDemoApplication {
   public static void main(String[] args) {
      SpringApplication.run(KafkaDemoApplication.class, args);
   }
}

@SpringBootApplication performs component scans and registers all of the configuration beans defined above.

This is how you run the Spring Boot application.

This is how you start the Kafka listener.

build.gradle
dependencies {
   implementation 'org.springframework.boot:spring-boot-starter'
   implementation 'org.springframework.kafka:spring-kafka'
}

These are the two dependencies required to run the application.

How does @KafkaListener work?

The @KafkaListener is an annotation that marks a method or class as the target of incoming messages.

By using @KafkaListener, you abstract away the need to configure the underlying KafkaMessageListenerContainer.

The KafkaListenerEndpointRegistry

The Spring Kafka library uses a BeanPostProcessor to register the target methods as KafkaEndpoints in a KafkaListenerEndpointRegistry.

The KafkaListenerEndpointRegistry class creates the underlying MessageListenerContainer instances.

While the KafkaListenerEndpointRegistry is registered in the Spring container, its listener containers are not. Instead, these listeners are registered with the KafkaListenerEndpointRegistry itself.

The KafkaListenerEndpointRegistry manages the lifecycle of the containers.

What are the @KafkaListener Annotation Properties?

  • id
  • containerFactory
  • topics
  • topicPattern
  • topicPartitions
  • containerGroup
  • errorHandler
  • groupId
  • idIsGroup
  • clientIdPrefix
  • beanRef
  • concurrency
  • autoStartup
  • properties
  • splitIterables
  • contentTypeConverter
  • batch

You can set these properties like this...

@Component
public class Listener {
    @KafkaListener(
            id = "foo", 
            topics = "myTopic",
            properties = {"bootstrap.server=localhost:9092"}
    )
    public void listen(String data) {
        System.out.println(data);
    }
}

What is DefaultKafkaConsumerFactory?

The DefaultKafkaConsumerFactory provides the default configuration for creating consumer instances.

The DefaultKafkaConsumerFactory class implements the ConsumerFactory interface.

Remember that the @KafkaListener annotation ultimately generates listener containers. These listener containers ultimately create Kafka consumer instances.

These consumers need to be configured with appropriate topics, consumer groups, bootstrap servers, etc. The DefaultKafkaConsumerFactory provides such details when creating these consumers via Spring Boot.

Still confused? Check out What is DefaultKafkaConsumerFactory?

What is KafkaListenerContainerFactory?

KafkaListenerContainerFactory is an interface.

The ConcurrentKafkaListenerContainerFactory class is an implementation of this interface. This is the default container factory used with Spring Kafka.

The ConcurrentKafkaListenerContainerFactory class creates the ConcurrentMessageListenerContainer(s) which creates a number of KafkaMessageListenerContainers (based on it's concurrency property).

These KafkaMessageListenerContainer instances are the same instances generated by @KafkaListener.

Still confused? Check out What is KafkaListenerContainerFactory?

Your thoughts?

Brenda Becker |

It's rather powerful what the @KafkaListener annotation does for you when you think about it. Sometimes I wonder if underneath the hood too much is going on with the configuration nuances that is Spring Kafka.

To make sure I understand at high level a @KafkaListener basically creates these listener containers via Spring Boot config?

Also concurrency is something I struggle with. So each ConcurrentMessageContainer starts a bunch of KafkaMessageListenerContainers based on the concurrency passed in. So it will start a separate consumer on separate threads then.

Amanda Waters |

thanks this really helped!

SherryAtkins |

it's rather confusing to understand how @KafkaListener works under the hood. I think this did a pretty good job of explaining it..especially for beginners. Not easy stuff to understand.

Robert Ramirez |

great read, thx!

AMZ EMPLOYEE |

amazing read. very helpful on @KafkaListener. we sure do take these annotations for granted! :)