Spring Boot Kafka Consumer

If you're brand new to Kafka, be sure to check out this intro to Kafka architecture first.

Spring Boot Kafka Consumer Example

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.2</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>demo</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.10.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.2</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>demo</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.10.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

The pom.xml defines the dependencies we need for our Maven build. Notice how we include the dependencies needed for Spring Boot along with those needed for running and testing Kafka.

Note that the jackson-databind dependency is included to account for bugs in older versions of the Spring Kafka library. You may not need to include this depending on which version of Kafka you're using..

DemoApplication.java

@SpringBootApplication
public class DemoApplication {
	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}
}
public class DemoApplication {
	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}
}

The DemoApplication.java class runs our Spring Boot application.

Spring Boot automatically configures Kafka via @SpringBootApplication. With this annotation, Spring provides/configures a KafkaAutoConfiguration class for you.

Consumer.java

package com.example.demo.messaging;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
    @KafkaListener(topics = "test")
    public void processMessage(String content){
        System.out.println("Message received: " + content);
    }
}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
    @KafkaListener(topics = "test")
    public void processMessage(String content){
        System.out.println("Message received: " + content);
    }
}

The Consumer.java class is what actually listens for messages. Notice how we specify the topics to listen to within the @KafkaListener annotation.

By simply using the annotations provided by the Spring Boot Kafka library, we are able to easily set up a consumer for a given topic.

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.group-id=myGroup

Thanks to auto-configuration, you only need to define a few things in application.properties to configure the consumer. Notice how our app is configured to use Kafka locally.

Running the example...

This example is configured to consume from a local Kafka topic. You'll need to have Kafka installed and running locally to see messages being consumed from the Spring Boot app.

Starting Kafka from the CLI:

To start Kafka locally, you need to start both the Zookeeper and Kafka servers locally...

./zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

This will start the zookeeper server with the default configurations. Please note the path to your local Kafka install may vary.

./kafka-server-start /usr/local/etc/kafka/server.properties 

This will start the Kafka server with the default configurations. Please note the path to your local Kafka install may vary.

Producing messages from the CLI:

To test the application consumer, you'll need to produce some messages for the test topic. To start a Kafka console producer:

./kafka-console-producer --bootstrap-server=localhost:9092 --topic test

This starts a Kafka producer for the test topic. Notice how both the bootstrap-server and topic arguments match what we've defined in our application.properties file.

Once the producer is up and running, you can send a message to the test topic like this:

> Hello world

If everything is working correctly, you should see the message being logged through the Spring application like this:

Message received: Hello world

Spring Boot Kafka Multiple Consumer Example

Spring Kafka allows multiple consumers to read from multiple topics within the same Spring Boot application. Since "multiple consumers" can mean different things, we'll look at a few different examples...

1) Multiple consumers in the same consumer group

@Component
public class Consumer {
    @KafkaListener(topics = "test", concurrency = "2" groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received: " + content);
    }
}
public class Consumer {
    @KafkaListener(topics = "test", concurrency = "2" groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received: " + content);
    }
}

This creates a SINGLE consumer with two members. Notice how we specify a concurrency = "2" on the @KafkaListener annotation. This will create 2 members within the same consumer group.

Also notice how the annotation references the topic test and groupId myGroup.

If our test topic has 3 partitions, 2 partitions will be assigned to the first member and 1 will be assigned to the second member.

If we produce messages to the test topic, we will see output like this...

Message received: abc
Message received: 123
Message received: 345
Message received: 123
Message received: 345

Notice how these two members of a SINGLE consumer group work together to consume messages from the test topic.

2) Multiple consumer groups reading from the same topic

@Component
public class Consumer {
    @KafkaListener(topics = "test", groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
    @KafkaListener(topics = "test", groupId = "anotherGroup")
    public void processMessage2(String content) {
        System.out.println("Message received by consumer 2: " + content);
    }
}
public class Consumer {
    @KafkaListener(topics = "test", groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
    @KafkaListener(topics = "test", groupId = "anotherGroup")
    public void processMessage2(String content) {
        System.out.println("Message received by consumer 2: " + content);
    }
}

This creates 2 consumers with one member each. Notice how both listener methods read from the same test topic but define a different groupId.

If our test topic has 3 partitions, 3 partitions will be assigned to the only member of myGroup and 3 partitions will be assigned to the only member of anotherGroup.

If we produce messages to the test topic, we will see output like this...

Message received by consumer 1: abc
Message received by consumer 2: abc
Message received by consumer 1: 123
Message received by consumer 2: 123
Message received by consumer 2: abc
Message received by consumer 1: 123
Message received by consumer 2: 123

Notice how the same message is being consumed TWICE. This is because different groupId means different consumer group. This is a very important distinction from the first example where multiple members are working under a single consumer group.

3) Multiple consumer groups reading from different topics

@Component
public class Consumer {
    @KafkaListener(topics = "test", groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
    @KafkaListener(topics = "test2", groupId = "anotherGroup")
    public void processMessage2(String content) {
        System.out.println("Message received by consumer 2: " + content);
    }
}
public class Consumer {
    @KafkaListener(topics = "test", groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
    @KafkaListener(topics = "test2", groupId = "anotherGroup")
    public void processMessage2(String content) {
        System.out.println("Message received by consumer 2: " + content);
    }
}

This creates 2 separate consumer groups with one member each. The first group has a single member reading from the test topic. The second group has a single member reading from the test2 topic.

4) Multiple consumers in the same consumer group reading from different topics

@Component
public class Consumer {
    @KafkaListener(topics = {"test","test2"}, groupId = "myGroup", concurrency = "2")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
}
public class Consumer {
    @KafkaListener(topics = {"test","test2"}, groupId = "myGroup", concurrency = "2")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
}

This creates a SINGLE consumer group with 2 members. Each member shares the responsibilities of reading from both topic.

If both our test and test2 topics have 3 partitions, each member WILL NOT receive an equal number of partitions by default. One may receive 2 and another may receive 4. This is because the default PartitionAssignor for Kafka is the RangeAssignor. You can optionally change partition assignment to a round robin approach by specifying the RoundRobinAssignor in your configuration:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor
org.apache.kafka.clients.consumer.RoundRobinAssignor

This will result in each member of the consumer group receiving an equal number of partitions...

An important note...

While these examples demonstrate how the @KafkaListener annotation can be used with multiple consumer groups, best practice is to separate your consumers into separate classes. In other words, a given consumer class should never have more than one @KafkaListener.

Your thoughts?

|

The @KafkaListener is an alternative to configuring a MessageListenerContainer

There are two types of MessageListenerContainers...

  1. KafkaMessageListenerContainer receives all messages from all topics on a single thread
  2. ConcurrentMessageListenerContainer allows for multithreaded consumption of (1)

The only difference between these two containers is that ConcurrentMessageListenerContainer allows you to define concurrency or number of threads / listener containers to use.

It's important to note that, by default, the @KafkaListener will acknowledge offsets in batch mode. This is important to consider when you do a lot of processing for each record you consume. If processing per record takes 5+ minutes there is a chance your consumer will time out and try to reprocess the messages again!

|

This article does a really good job explaining how Spring Boot Kafka consumer implementation works. Some things I would like to add...

While this article shows you what is possible with the @KafkaListener annotation, it's also good to emphasize what is considered best practice when implementing a consumer in Spring Boot..

For example, most @KafkaListener implementations tend to listen to a single topic. This is from my own experience but seems to be a bi-product of obsessing over "decoupled" / "microservice" based architecture. Never the less, applications typically read from a single topic and perform a very specific task with that data.

Not to say this is the best way of doing things..but just know what's out there in production environments.

|

I think understanding Kafka consumer group IDs is key to understanding how you can use the @KafkaListener annotation.

It's important to remember that @KafkaListener just sets up a consumer group. This group needs to know what topics to listen to and how many members to include. The concurrency argument tells your application how many individual consumer threads will listen to the partitions for the provided topics.

|

ChatGPT Response:

To create a Spring Boot Kafka consumer, you will need to add the following dependencies to your pom.xml file:

xml

Copy code
<dependencies> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> 

Next, you will need to create a configuration class for the Kafka consumer. Here's an example:

java

Copy code
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.bootstrap-servers}") private String bootstrapServers; @Value("${kafka.consumer.group-id}") private String groupId; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(1); return factory; } @Bean public KafkaConsumer<String, String> kafkaConsumer() { return new KafkaConsumer<>(consumerFactory().getConfigurationProperties()); } } 

In this example, we are using Spring's @EnableKafka annotation to enable the Kafka listener container. We are also injecting two properties using the @Value annotation: kafka.bootstrap-servers and kafka.consumer.group-id.

The consumerFactory() method creates a ConsumerFactory instance, which is used to create a KafkaConsumer instance later on. In this example, we are setting the bootstrap servers, group ID, and key/value deserializers for the ConsumerFactory.

The kafkaListenerContainerFactory() method creates a KafkaListenerContainerFactory instance, which is used to create a Kafka listener container later on. In this example, we are setting the ConsumerFactory instance and concurrency level for the ConcurrentKafkaListenerContainerFactory.

Finally, the kafkaConsumer() method creates a KafkaConsumer instance using the ConsumerFactory instance's configuration properties.

Now that you have created the configuration class, you can create a Kafka listener method in a separate class. Here's an example:

java

Copy code
@Component public class KafkaConsumer { @KafkaListener(topics = "${kafka.consumer.topic}") public void receive(String message) { System.out.println("Received message: " + message); } } 

In this example, we are using Spring's @KafkaListener annotation to create a Kafka listener method. The topics property is set to the value of the kafka.consumer.topic property, which we injected using the @Value annotation in the configuration class. The receive() method is called whenever a message is received on the specified topic.

That's it! You now have a working Spring Boot Kafka consumer.

|

@KafkaListener marks a class/method for creation of MessageListenerContainers

The ConcurrentKafkaListenerContainerFactory is used to create such containers so that Spring Boot applications can consume from Kafka in thread safe way.

|

Checking the most recent version of Spring Kafka, I noticed their example implementation for a listener is as follows:

public class Listener {
    @KafkaListener(id = "listen1", topics = "topic1")
    public void listen1(String in) {
        System.out.println(in);
    }
}

So it looks like this remains a relevant example even in 2023!

I like the discussion of threads etc.

|

Wow this helps.

|

Can someone show me NodeJs implementation for this?

|

Amazing breakdown of the implications of using @KafkaListener annotation in Spring Kafka.

I agree that it's best to separate listeners out into individual classes.

While you've clearly demonstrated that it's possible to do this, I think many would experience unintended consequences of messing around with multiple listeners in the same class etc.

|

amazing documentation thanks for this!

|

annotations save the day yet again...