Does anyone have issues with JSON Serializer in Kafka?

I am trying to use the JSON deserializer for Kafka (with Spring) and keep having issues. Specifically it seems to throw the service into an infinite loop resulting in out of memory error...

Any help greatly appreciated.

Your thoughts?

theCodingSavage |

The problem is this:

The JsonDeserializer class is based on the Jackson ObjectMapper. When Kafka tries to deserialize an incoming message to a topic, if there is an ObjectMapper related error then it will be thrown before the poll() method has a chance to return...

The solution is rather simple. You just configure your Spring Kafka to use the ErrorHandlingDeserializer. Here is an example of working configuration...

public ConsumerFactory<String, User> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class);
    return new DefaultKafkaConsumerFactory<>(props);

You have to define a class for the ErrorHandlingDeserializer to delegate to. If something goes wrong, an exception will be handled (avoiding the infinite loop) and the message won't be sent to the listener.

EatFreshRupesh | 0

Yes blazing..this is a common problem with using the JsonDeserializer class provided by Kafka.

EatFreshRupesh | 0

Yes super annoying. I have to continuously manually reset offset to latest to get it to skip past the bad record and keep consuming. Not sustainable for sure. Any help is greatly appreciated on this.