Implement Custom Value Serializer for Kafka – Example With Source Code

In our last article on implementation of Apache Kafka, we have seen the basic Java client to produce and consume messages. To continue our learning lets see how we can send custom objects to Kafka topic. For that, we need to implement Custom Value Serializer for Kafka.

Implement Custom Value Serializer for Kafka:

You can send messages with different data types to Kafka topics. The Kafka deals with messages or records in the form of a byte array. If you have observed, both KafkaProducer and KafkaConsumer need a key and value serializer.  The default configuration for Producer

Similarlly default for Consumer is

So one thing is clear that Kafka depends on Serializer and Deserializer so that Producer and Consumer both know how to communicate and understand the messages.

Now we will see how we can implement our own custom value serializer and deserializer to send and receive custom java objects from Kafka topics.

The easiest way to serialize your custom object is to convert to JSON format. To achieve this we will add the Jackson dependency to our Kafka project.

POM

The Domain Objects

We have defined simple domain objects that we are going to send over kafka topic.

Developer.class

Address.class

Every custom serializer must implement Kafka Serializer interface. So below is ours.

In above we have implemented the serialize method which is converting our custom object to json string and returning bytes to be sent to the topic.

In a similar fashion, all custom deserializers need to implement Kafka Deserializer interface. Our Jackson ObjectMapper provides us convenient method to convert byte[] to our custom object. Below is our deserializer.

So far we have defined all the required components for our custom value serializer for Apache Kafka. Lets put it to use. As we are using the same project from our previous article I have added some more command line parameters to consider.

Start the custom consumer

In our consumer code, we have to specify the custom deserializer. That we will do in the property map we set for KafkaConsumer

The command to start our custom consumer thread is

This will keep our consumer polling to the topic test and as soon as some message is posted, it will be printed in our log file.

Start the Custom Producer

In our producer code, we have to specify the custom serializer. That we will do in the property map we set for KafkaProducer

Command to start producer

Above command will create a dummy developer object and post it to test topic. 

The Output

In the above log file snippet, you can see that our consumer was able to read the custom object successfully.

You can download the latest code from our git repository

Download from Git

 

Add a Comment

Your email address will not be published. Required fields are marked *