Spring Boot Kafka Consumer JSON Example

If you're new to Kafka, be sure to check out Kafka architecture explained first. If your'e new to Spring Boot Kafka consumers, be sure to check out this Spring Boot Kafka consumer example first...

Spring Boot Kafka Consumer JSON Example

pom.xml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <scope>test</scope>
</dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <scope>test</scope>
</dependency>

The above dependencies include everything you need to run Spring Kafka and test the example.

application.properties

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.group-id=myGroup

These are the only Kafka configuration properties we define in application.properties. While we've only specified the consumer group and broker list, you can optionally configure more properties here if you so choose...

User.java

package model;
public class User
{
    private String firstName;
    private String lastName;
    public String getFirstName() {
        return firstName;
    }
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }
    public String getLastName() {
        return lastName;
    }
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}
public class User
{
    private String firstName;
    private String lastName;
    public String getFirstName() {
        return firstName;
    }
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }
    public String getLastName() {
        return lastName;
    }
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

This sets up our basic User model. We need this to tell our JsonDeserializer how to deserialize the input.

KafkaConfig.java

package com.example.demo.config;
import model.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.bootstrap-servers}")
    private String brokers;
    @Bean
    public ConsumerFactory<String, User> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, User> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
import model.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.bootstrap-servers}")
    private String brokers;
    @Bean
    public ConsumerFactory<String, User> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class));
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, User>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, User> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

This is our main configuration for the Kafka client. Notice the use of the @EnableKafka to register configuration of our ConsumerFactory and ConcurrentKafkaListenerContainerFactory.

If you followed the previous example, you'll notice we didn't include any of this configuration. We include these configurations in this example because we have to configure our JsonDeserializer.

By default, Kafka uses the StringDeserializer when consuming messages. Since we are consuming JSON, we must explicitly specify the Deserializer class to use for both the key and value via return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(User.class)).

You'll also notice a reference to User. This is our model representing the object we want to read in from Kafka. Notice how we specify the User type both in defining the ConsumerFactory and the ConcurrentMessageListenerContainer.

Consumer.java

package com.example.demo.messaging;
import model.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
    @KafkaListener(topics = "test")
    public void processMessage(User user) {
        System.out.println("Message received by consumer 1: " + user.toString());
    }
}
import model.User;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
    @KafkaListener(topics = "test")
    public void processMessage(User user) {
        System.out.println("Message received by consumer 1: " + user.toString());
    }
}

This sets up a @KafkaListener similar to the previous example. You'll notice a key difference that processMessage() specifies the User type for incoming messages. This is a key difference.

Publishing JSON messages to the topic

kafka-console-producer json from file

If your running Kafka locally, you can use a kafka-console-producer to read in a JSON file as a message. You do this by creating a sample JSON file with User data and running the kafka-console-producer like this:

./kafka-console-producer --bootstrap-server localhost:9092 --topic test < ~/path/to/test/file/testUser.json

The "Infinite Loop" problem with invalid messages

The JsonDeserializer works with the Jackson ObjectMapper class. Remember that Kafka uses the ObjectMapper class to deserialize incoming JSON messages. If there is an issue then it will throw the application into an infinite loop throwing something like this:

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test-0 at offset 87. If needed, please seek past the record to continue consumption.

The issue is that Spring has no way of resolving the deserialization issue as it happens before the consumer's poll() method resolves.

How to fix the infinite loop problem with JSON and Kafka consumers

To fix this "infinite loop" problem, you can make use of the ErrorHandlingDeserializer...

@Bean
public ConsumerFactory<String, User> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class);
    return new DefaultKafkaConsumerFactory<>(props);
}
public ConsumerFactory<String, User> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

By simply updating our consumerFactory() with these properties, we are able to catch any deserialization related errors and continue consuming messages.

Notice how we configure the value deserializer with ErrorHandlingDeserializer.class. We also provide a delegate deserializer for the ErrorHandlingDeserializer. This configuration results in the error handler delegating to the JsonDeserializer class. If something goes wrong, the exception is handled and the payload isn't sent to the listener.

Your thoughts?