Spring Boot Kafka Consumer


If you're brand new to Kafka, be sure to check out this intro to Kafka architecture first.

Spring Boot Kafka Consumer Example

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.2</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>demo</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>11</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>com.fasterxml.jackson.core</groupId>
			<artifactId>jackson-databind</artifactId>
			<version>2.10.0</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

The pom.xml defines the dependencies we need for our Maven build. Notice how we include the dependencies needed for Spring Boot along with those needed for running and testing Kafka.

Note that the jackson-databind dependency is included to account for bugs in older versions of the Spring Kafka library. You may not need to include this depending on which version of Kafka you're using..

DemoApplication.java

@SpringBootApplication
public class DemoApplication {
	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}
}

The DemoApplication.java class runs our Spring Boot application.

Spring Boot automatically configures Kafka via @SpringBootApplication. With this annotation, Spring provides/configures a KafkaAutoConfiguration class for you.

Consumer.java

package com.example.demo.messaging;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
    @KafkaListener(topics = "test")
    public void processMessage(String content){
        System.out.println("Message received: " + content);
    }
}

The Consumer.java class is what actually listens for messages. Notice how we specify the topics to listen to within the @KafkaListener annotation.

By simply using the annotations provided by the Spring Boot Kafka library, we are able to easily set up a consumer for a given topic.

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

Thanks to auto-configuration, you only need to define a few things in application.properties to configure the consumer. Notice how our app is configured to use Kafka locally.

Running the example...

This example is configured to consume from a local Kafka topic. You'll need to have Kafka installed and running locally to see messages being consumed from the Spring Boot app.

Starting Kafka from the CLI:

To start Kafka locally, you need to start both the Zookeeper and Kafka servers locally...

./zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties

This will start the zookeeper server with the default configurations. Please note the path to your local Kafka install may vary.

./kafka-server-start /usr/local/etc/kafka/server.properties 

This will start the Kafka server with the default configurations. Please note the path to your local Kafka install may vary.

Producing messages from the CLI:

To test the application consumer, you'll need to produce some messages for the test topic. To start a Kafka console producer:

./kafka-console-producer --bootstrap-server=localhost:9092 --topic test

This starts a Kafka producer for the test topic. Notice how both the bootstrap-server and topic arguments match what we've defined in our application.properties file.

Once the producer is up and running, you can send a message to the test topic like this:

> Hello world

If everything is working correctly, you should see the message being logged through the Spring application like this:

Message received: Hello world

Spring Boot Kafka Multiple Consumer Example

Spring Kafka allows multiple consumers to read from multiple topics within the same Spring Boot application. Since "multiple consumers" can mean different things, we'll look at a few different examples...

1) Multiple consumers in the same consumer group

@Component
public class Consumer {
    @KafkaListener(topics = "test", concurrency = "2" groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received: " + content);
    }
}

This creates a SINGLE consumer with two members. Notice how we specify a concurrency = "2" on the @KafkaListener annotation. This will create 2 members within the same consumer group.

Also notice how the annotation references the topic test and groupId myGroup.

If our test topic has 3 partitions, 2 partitions will be assigned to the first member and 1 will be assigned to the second member.

If we produce messages to the test topic, we will see output like this...

Message received: abc
Message received: 123
Message received: 345

Notice how these two members of a SINGLE consumer group work together to consume messages from the test topic.

2) Multiple consumer groups reading from the same topic

@Component
public class Consumer {
    @KafkaListener(topics = "test", groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
    @KafkaListener(topics = "test", groupId = "anotherGroup")
    public void processMessage2(String content) {
        System.out.println("Message received by consumer 2: " + content);
    }
}

This creates 2 consumers with one member each. Notice how both listener methods read from the same test topic but define a different groupId.

If our test topic has 3 partitions, 3 partitions will be assigned to the only member of myGroup and 3 partitions will be assigned to the only member of anotherGroup.

If we produce messages to the test topic, we will see output like this...

Message received by consumer 1: abc
Message received by consumer 2: abc
Message received by consumer 1: 123
Message received by consumer 2: 123

Notice how the same message is being consumed TWICE. This is because different groupId means different consumer group. This is a very important distinction from the first example where multiple members are working under a single consumer group.

3) Multiple consumer groups reading from different topics

@Component
public class Consumer {
    @KafkaListener(topics = "test", groupId = "myGroup")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
    @KafkaListener(topics = "test2", groupId = "anotherGroup")
    public void processMessage2(String content) {
        System.out.println("Message received by consumer 2: " + content);
    }
}

This creates 2 separate consumer groups with one member each. The first group has a single member reading from the test topic. The second group has a single member reading from the test2 topic.

4) Multiple consumers in the same consumer group reading from different topics

@Component
public class Consumer {
    @KafkaListener(topics = {"test,"test2"}, groupId = "myGroup", concurrency = "2")
    public void processMessage(String content) {
        System.out.println("Message received by consumer 1: " + content);
    }
}

This creates a SINGLE consumer group with 2 members. Each member shares the responsibilities of reading from both topic.

If both our test and test2 topics have 3 partitions, each member WILL NOT receive an equal number of partitions by default. One may receive 2 and another may receive 4. This is because the default PartitionAssignor for Kafka is the RangeAssignor. You can optionally change partition assignment to a round robin approach by specifying the RoundRobinAssignor in your configuration:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

This will result in each member of the consumer group receiving an equal number of partitions...

An important note...

While these examples demonstrate how the @KafkaListener annotation can be used with multiple consumer groups, best practice is to separate your consumers into separate classes. In other words, a given consumer class should never have more than one @KafkaListener.

Your thoughts?