opencodez

Implement Custom Value Serializer for Kafka – Example With Source Code

In our last article on implementation of Apache Kafka, we have seen the basic Java client to produce and consume messages. To continue our learning lets see how we can send custom objects to Kafka topic. For that, we need to implement Custom Value Serializer for Kafka.

Implement Custom Value Serializer for Kafka:

You can send messages with different data types to Kafka topics. The Kafka deals with messages or records in the form of a byte array. If you have observed, both KafkaProducer and KafkaConsumer need a key and value serializer.  The default configuration for Producer

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Similarlly default for Consumer is

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

So one thing is clear that Kafka depends on Serializer and Deserializer so that Producer and Consumer both know how to communicate and understand the messages.

Now we will see how we can implement our own custom value serializer and deserializer to send and receive custom java objects from Kafka topics.

The easiest way to serialize your custom object is to convert to JSON format. To achieve this we will add the Jackson dependency to our Kafka project.

POM

xdependencyx
	xgroupIdxcom.fasterxml.jackson.corex/groupIdx
	xartifactIdxjackson-databindx/artifactIdx
x/dependencyx

The Domain Objects

We have defined simple domain objects that we are going to send over kafka topic.

Developer.class

public class Developer {

	private Long id;
	private String name;
	private BigDecimal salary;
	private Address address;
        
     //Getters and Setters beyond this point

}

Address.class

public class Address {

	private String state;
	private String country;
	private String zipcode;
   
    //Getters and Setters beyond this
}

Every custom serializer must implement Kafka Serializer interface. So below is ours.

public class DeveloperSerializer implements SerializerxDeveloperx {

	@Override
	public byte[] serialize(String arg0, Developer developer) {
		byte[] serializedBytes = null;
		ObjectMapper objectMapper = new ObjectMapper();
		try {
			serializedBytes = objectMapper.writeValueAsString(developer).getBytes();
		} catch (Exception e) {
			e.printStackTrace();
		}
		return serializedBytes;
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub
	}

	@Override
	public void configure(MapxString, ?x arg0, boolean arg1) {
		// TODO Auto-generated method stub
	}
}

In above we have implemented the serialize method which is converting our custom object to json string and returning bytes to be sent to the topic.

In a similar fashion, all custom deserializers need to implement Kafka Deserializer interface. Our Jackson ObjectMapper provides us convenient method to convert byte[] to our custom object. Below is our deserializer.

public class DeveloperDeserializer implements DeserializerxDeveloperx {

	@Override
	public Developer deserialize(String arg0, byte[] devBytes) {
		ObjectMapper mapper = new ObjectMapper();
		Developer developer = null;
		try {
			developer = mapper.readValue(devBytes, Developer.class);
		} catch (Exception e) {

			e.printStackTrace();
		}
		return developer;
	}

	@Override
	public void close() {
		// TODO Auto-generated method stub

	}

	@Override
	public void configure(MapxString, ?x arg0, boolean arg1) {
		// TODO Auto-generated method stub

	}

}

So far we have defined all the required components for our custom value serializer for Apache Kafka. Lets put it to use. As we are using the same project from our previous article I have added some more command line parameters to consider.

Start the custom consumer

In our consumer code, we have to specify the custom deserializer. That we will do in the property map we set for KafkaConsumer

props.put("value.deserializer", "com.opencodez.serializer.DeveloperDeserializer");

The command to start our custom consumer thread is

kafka-demo-0.0.1-SNAPSHOT.jar --start.as=custom-consumer --topic=test

This will keep our consumer polling to the topic test and as soon as some message is posted, it will be printed in our log file.

Start the Custom Producer

In our producer code, we have to specify the custom serializer. That we will do in the property map we set for KafkaProducer

props.put("value.serializer", "com.opencodez.serializer.DeveloperSerializer");

Command to start producer

kafka-demo-0.0.1-SNAPSHOT.jar --start.as=custom-producer --topic=test --message="Pavan Solapure"

Above command will create a dummy developer object and post it to test topic. 

The Output

In the above log file snippet, you can see that our consumer was able to read the custom object successfully.

You can download the latest code from our git repository

Download from Git

x