Spring Boot Kafka Consumer
If you're brand new to Kafka, be sure to check out this intro to Kafka architecture first.
Spring Boot Kafka Consumer Example
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The pom.xml defines the dependencies we need for our Maven build. Notice how we include the dependencies needed for Spring Boot along with those needed for running and testing Kafka.
Note that the jackson-databind dependency is included to account for bugs in older versions of the Spring Kafka library. You may not need to include this depending on which version of Kafka you're using..
DemoApplication.java
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
The DemoApplication.java class runs our Spring Boot application.
Spring Boot automatically configures Kafka via @SpringBootApplication. With this annotation, Spring provides/configures a KafkaAutoConfiguration class for you.
Consumer.java
package com.example.demo.messaging;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@KafkaListener(topics = "test")
public void processMessage(String content){
System.out.println("Message received: " + content);
}
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@KafkaListener(topics = "test")
public void processMessage(String content){
System.out.println("Message received: " + content);
}
}
The Consumer.java class is what actually listens for messages. Notice how we specify the topics to listen to within the @KafkaListener annotation.
By simply using the annotations provided by the Spring Boot Kafka library, we are able to easily set up a consumer for a given topic.
application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.group-id=myGroup
Thanks to auto-configuration, you only need to define a few things in application.properties to configure the consumer. Notice how our app is configured to use Kafka locally.
Running the example...
This example is configured to consume from a local Kafka topic. You'll need to have Kafka installed and running locally to see messages being consumed from the Spring Boot app.
Starting Kafka from the CLI:
To start Kafka locally, you need to start both the Zookeeper and Kafka servers locally...
./zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
This will start the zookeeper server with the default configurations. Please note the path to your local Kafka install may vary.
./kafka-server-start /usr/local/etc/kafka/server.properties
This will start the Kafka server with the default configurations. Please note the path to your local Kafka install may vary.
Producing messages from the CLI:
To test the application consumer, you'll need to produce some messages for the test topic. To start a Kafka console producer:
./kafka-console-producer --bootstrap-server=localhost:9092 --topic test
This starts a Kafka producer for the test topic. Notice how both the bootstrap-server and topic arguments match what we've defined in our application.properties file.
Once the producer is up and running, you can send a message to the test topic like this:
> Hello world
If everything is working correctly, you should see the message being logged through the Spring application like this:
Message received: Hello world
Spring Boot Kafka Multiple Consumer Example
Spring Kafka allows multiple consumers to read from multiple topics within the same Spring Boot application. Since "multiple consumers" can mean different things, we'll look at a few different examples...
1) Multiple consumers in the same consumer group
@Component
public class Consumer {
@KafkaListener(topics = "test", concurrency = "2" groupId = "myGroup")
public void processMessage(String content) {
System.out.println("Message received: " + content);
}
}
public class Consumer {
@KafkaListener(topics = "test", concurrency = "2" groupId = "myGroup")
public void processMessage(String content) {
System.out.println("Message received: " + content);
}
}
This creates a SINGLE consumer with two members. Notice how we specify a concurrency = "2" on the @KafkaListener annotation. This will create 2 members within the same consumer group.
Also notice how the annotation references the topic test and groupId myGroup.
If our test topic has 3 partitions, 2 partitions will be assigned to the first member and 1 will be assigned to the second member.
If we produce messages to the test topic, we will see output like this...
Message received: abc
Message received: 123
Message received: 345
Message received: 123
Message received: 345
Notice how these two members of a SINGLE consumer group work together to consume messages from the test topic.
2) Multiple consumer groups reading from the same topic
@Component
public class Consumer {
@KafkaListener(topics = "test", groupId = "myGroup")
public void processMessage(String content) {
System.out.println("Message received by consumer 1: " + content);
}
@KafkaListener(topics = "test", groupId = "anotherGroup")
public void processMessage2(String content) {
System.out.println("Message received by consumer 2: " + content);
}
}
public class Consumer {
@KafkaListener(topics = "test", groupId = "myGroup")
public void processMessage(String content) {
System.out.println("Message received by consumer 1: " + content);
}
@KafkaListener(topics = "test", groupId = "anotherGroup")
public void processMessage2(String content) {
System.out.println("Message received by consumer 2: " + content);
}
}
This creates 2 consumers with one member each. Notice how both listener methods read from the same test topic but define a different groupId.
If our test topic has 3 partitions, 3 partitions will be assigned to the only member of myGroup and 3 partitions will be assigned to the only member of anotherGroup.
If we produce messages to the test topic, we will see output like this...
Message received by consumer 1: abc
Message received by consumer 2: abc
Message received by consumer 1: 123
Message received by consumer 2: 123
Message received by consumer 2: abc
Message received by consumer 1: 123
Message received by consumer 2: 123
Notice how the same message is being consumed TWICE. This is because different groupId means different consumer group. This is a very important distinction from the first example where multiple members are working under a single consumer group.
3) Multiple consumer groups reading from different topics
@Component
public class Consumer {
@KafkaListener(topics = "test", groupId = "myGroup")
public void processMessage(String content) {
System.out.println("Message received by consumer 1: " + content);
}
@KafkaListener(topics = "test2", groupId = "anotherGroup")
public void processMessage2(String content) {
System.out.println("Message received by consumer 2: " + content);
}
}
public class Consumer {
@KafkaListener(topics = "test", groupId = "myGroup")
public void processMessage(String content) {
System.out.println("Message received by consumer 1: " + content);
}
@KafkaListener(topics = "test2", groupId = "anotherGroup")
public void processMessage2(String content) {
System.out.println("Message received by consumer 2: " + content);
}
}
This creates 2 separate consumer groups with one member each. The first group has a single member reading from the test topic. The second group has a single member reading from the test2 topic.
4) Multiple consumers in the same consumer group reading from different topics
@Component
public class Consumer {
@KafkaListener(topics = {"test","test2"}, groupId = "myGroup", concurrency = "2")
public void processMessage(String content) {
System.out.println("Message received by consumer 1: " + content);
}
}
public class Consumer {
@KafkaListener(topics = {"test","test2"}, groupId = "myGroup", concurrency = "2")
public void processMessage(String content) {
System.out.println("Message received by consumer 1: " + content);
}
}
This creates a SINGLE consumer group with 2 members. Each member shares the responsibilities of reading from both topic.
If both our test and test2 topics have 3 partitions, each member WILL NOT receive an equal number of partitions by default. One may receive 2 and another may receive 4. This is because the default PartitionAssignor for Kafka is the RangeAssignor. You can optionally change partition assignment to a round robin approach by specifying the RoundRobinAssignor in your configuration:
spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
org.apache.kafka.clients.consumer.RoundRobinAssignor
This will result in each member of the consumer group receiving an equal number of partitions...
An important note...
While these examples demonstrate how the @KafkaListener annotation can be used with multiple consumer groups, best practice is to separate your consumers into separate classes. In other words, a given consumer class should never have more than one @KafkaListener.
Your thoughts?
This article does a really good job explaining how Spring Boot Kafka consumer implementation works. Some things I would like to add...
While this article shows you what is possible with the @KafkaListener annotation, it's also good to emphasize what is considered best practice when implementing a consumer in Spring Boot..
For example, most @KafkaListener implementations tend to listen to a single topic. This is from my own experience but seems to be a bi-product of obsessing over "decoupled" / "microservice" based architecture. Never the less, applications typically read from a single topic and perform a very specific task with that data.
Not to say this is the best way of doing things..but just know what's out there in production environments.
I think understanding Kafka consumer group IDs is key to understanding how you can use the @KafkaListener annotation.
It's important to remember that @KafkaListener just sets up a consumer group. This group needs to know what topics to listen to and how many members to include. The concurrency argument tells your application how many individual consumer threads will listen to the partitions for the provided topics.
ChatGPT Response:
To create a Spring Boot Kafka consumer, you will need to add the following dependencies to your pom.xml
file:
xml Copy code <dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
Next, you will need to create a configuration class for the Kafka consumer. Here's an example:
java Copy code @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.bootstrap-servers}") private String bootstrapServers; @Value("${kafka.consumer.group-id}") private String groupId; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(1); return factory; } @Bean public KafkaConsumer<String, String> kafkaConsumer() { return new KafkaConsumer<>(consumerFactory().getConfigurationProperties()); } }
In this example, we are using Spring's @EnableKafka
annotation to enable the Kafka listener container. We are also injecting two properties using the @Value
annotation: kafka.bootstrap-servers
and kafka.consumer.group-id
.
The consumerFactory()
method creates a ConsumerFactory
instance, which is used to create a KafkaConsumer
instance later on. In this example, we are setting the bootstrap servers, group ID, and key/value deserializers for the ConsumerFactory
.
The kafkaListenerContainerFactory()
method creates a KafkaListenerContainerFactory
instance, which is used to create a Kafka listener container later on. In this example, we are setting the ConsumerFactory
instance and concurrency level for the ConcurrentKafkaListenerContainerFactory
.
Finally, the kafkaConsumer()
method creates a KafkaConsumer
instance using the ConsumerFactory
instance's configuration properties.
Now that you have created the configuration class, you can create a Kafka listener method in a separate class. Here's an example:
java Copy code @Component public class KafkaConsumer { @KafkaListener(topics = "${kafka.consumer.topic}") public void receive(String message) { System.out.println("Received message: " + message); } }
In this example, we are using Spring's @KafkaListener
annotation to create a Kafka listener method. The topics
property is set to the value of the kafka.consumer.topic
property, which we injected using the @Value
annotation in the configuration class. The receive()
method is called whenever a message is received on the specified topic.
That's it! You now have a working Spring Boot Kafka consumer.
@KafkaListener marks a class/method for creation of MessageListenerContainers
The ConcurrentKafkaListenerContainerFactory is used to create such containers so that Spring Boot applications can consume from Kafka in thread safe way.
Checking the most recent version of Spring Kafka, I noticed their example implementation for a listener is as follows:
public class Listener { @KafkaListener(id = "listen1", topics = "topic1") public void listen1(String in) { System.out.println(in); } }
So it looks like this remains a relevant example even in 2023!
I like the discussion of threads etc.
Wow this helps.
Can someone show me NodeJs implementation for this?
Amazing breakdown of the implications of using @KafkaListener annotation in Spring Kafka.
I agree that it's best to separate listeners out into individual classes.
While you've clearly demonstrated that it's possible to do this, I think many would experience unintended consequences of messing around with multiple listeners in the same class etc.
amazing documentation thanks for this!
annotations save the day yet again...
AMZ EMPLOYEE |
The @KafkaListener is an alternative to configuring a MessageListenerContainer
There are two types of MessageListenerContainers...
The only difference between these two containers is that ConcurrentMessageListenerContainer allows you to define concurrency or number of threads / listener containers to use.
It's important to note that, by default, the @KafkaListener will acknowledge offsets in batch mode. This is important to consider when you do a lot of processing for each record you consume. If processing per record takes 5+ minutes there is a chance your consumer will time out and try to reprocess the messages again!