HowTo: Pool Kryo Instances to serialize Java Object in multithreaded Environment

Kryo is a Java Object Serialization Framework. Because it is faster and more efficient than Java’s native serialization, it is widely used, for instance, in Spark and Flink which process the data in distributed manner. There are a lot of serialization framework to choose, but if you want to use a serializer just in your Java applications, Kryo is a good choice to use for Java Object serializer.

When I used Redis or Memcached as a cache, I have used Kryo to serialize Java Object. This serialized byte array has been added to Redis or Memcached.

For multithreaded and concurrent environment, Kryo instances should be pooled and be reusable. Let’s see how to do so.

Kryo Pooling

In multithreaded env, the creation of Kryo instance is too expensive in every thread.

I will show you how to pool Kryo instances and how to serialize Java Object and deserialize with Kryo Pool.

Let’s define interface:

public interface KryoContext {

byte[] serialze(Object obj);

byte[] serialze(Object obj, int bufferSize);

Object deserialze(Class clazz, byte[] serialized);
}

This interface serializes Java Object to byte array and deserializes byte array to Java Object.

Let’s add the following default implementation of :

public class DefaultKryoContext implements KryoContext{

private static final int DEFAULT_BUFFER = 1024 * 100;

private KryoPool pool;

public static KryoContext newKryoContextFactory(KryoClassRegistrator registrator)
{
return new DefaultKryoContext(registrator);
}

private DefaultKryoContext(KryoClassRegistrator registrator)
{
KryoFactory factory = new KryoFactoryImpl(registrator);

pool = new KryoPool.Builder(factory).softReferences().build();
}

private static class KryoFactoryImpl implements KryoFactory
{
private KryoClassRegistrator registrator;

public KryoFactoryImpl(KryoClassRegistrator registrator)
{
this.registrator = registrator;
}

@Override
public Kryo create() {
Kryo kryo = new Kryo();
registrator.register(kryo);

return kryo;
}
}


@Override
public byte[] serialze(Object obj)
{
return serialze(obj, DEFAULT_BUFFER);
}

@Override
public byte[] serialze(Object obj, int bufferSize)
{
Output output =
new Output(new ByteArrayOutputStream(), bufferSize);

Kryo kryo = pool.borrow();

kryo.writeObject(output, obj);
byte[] serialized = output.toBytes();

pool.release(kryo);

return serialized;
}

@Override
public Object deserialze(Class clazz, byte[] serialized)
{
Object obj;

Kryo kryo = pool.borrow();

Input input = new Input(serialized);
obj = kryo.readObject(input, clazz);

pool.release(kryo);

return obj;
}

}

With this , Kryo instances will be pooled, and Java Object can be serialized and deserialized using this Kryo Pool.

As seen in method , Kryo instance will be borrowed from the pool to serialize Java object and released to the pool after finishing serialization.

Before stepping to using Java Object Serialization with this Kryo Pool, let’s define a Kryo Registrator of Java Classes which should be registered to Kryo:

public interface KryoClassRegistrator {

public void register(Kryo kryo);

}

Now, you can create instance to do Java Serialization and Deserialization like this:

KryoContext kryoContext =   DefaultKryoContext.newKryoContextFactory(kryo -> {
kryo.register(ArrayList.class);
kryo.register(HashMap.class);
});

With registering Java Classes which will be serialized in this context to Kryo, Kryo Pool in is ready to be used.

Serialize Java Object

Let’s see some examples how to serialize Java Object with instance created above.

Serialze List of Map:

List<Map<String, Object>> list = new ArrayList<>();
for(int k = 0; k < 3; k++)
{
Map<String, Object> map =
new HashMap<>();
map.put("any-prop1", "any-value1-" + k);
map.put("any-prop2", "any-value2-" + k);
map.put("any-prop3", "any-value3-" + k);

list.add(map);
}
// serialize list.
byte[] listBytes = kryoContext.serialze(list);

Serialize List of User:

private static class User {
private String name;

private int age;

private String address;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}
}
...List<User> userList = new ArrayList<>();
for(int k = 0; k < 3; k++)
{
User user = new User();
user.setName("any-name" + k);
user.setAge(50 + k);
user.setAddress("any-address..." + k);

userList.add(user);
}

// serialize user list.
byte[] userListBytes = kryoContext.serialze(userList);

Serialize Map:

Map<String, Object> map = new HashMap<>();				
map.put("any-prop1", "any-value1");
map.put("any-prop2", "any-value2");
map.put("any-prop3", "any-value3");

// serialize map.
byte[] mapBytes = kryoContext.serialze(map);

Serialize User Object:

User user = new User();
user.setName("any-name");
user.setAge(50);
user.setAddress("any-address...");

// serialize user object.
byte[] userBytes = kryoContext.serialze(user);

Default Buffer size is 100KB, buffer size can be changed while serializing:

int moreBuffer = 200 * 1024;
byte[] listBytes = kryoContext.serialze(list, moreBuffer);

To deserialize the serialized byte array above:

// deserialize list.
List<Map<String, Object>> retList =
(List<Map<String, Object>>)kryoContext.deserialze(ArrayList.class, listBytes);

// deserialize user list.
List<User> retUserList = (List<User>)kryoContext.deserialze(ArrayList.class, userListBytes);

// deserialize map.
Map<String, Object> retMap =
(Map<String, Object>)kryoContext.deserialze(HashMap.class, mapBytes);

// deserialize user object.
User retUser = (User)kryoContext.deserialze(User.class, userBytes);

Using Kryo Pool in multithread environment

Here, I will show an example to use Kryo Pool in concurrent environment. Let’s create an executor to use Kryo Pool in multithreaded env.

private void runExecutor(Runnable r) {
ExecutorService executor = Executors.newFixedThreadPool(20);

for (int i = 0; i < 40; i++) {
executor.execute(r);
}

executor.shutdown();

while (!executor.isTerminated()) {
}
System.out.println("all threads finished...");
}

A Runnable Implementation in which Kryo Pool will be used to serialize/deserialize the Java Object looks like this:

private static class KryoWorkerThread implements Runnable {
private int MAX = 1000;

private ObjectMapper mapper = new ObjectMapper();

private KryoContext kryoContext;

public KryoWorkerThread(KryoContext kryoContext) {
this.kryoContext = kryoContext;
}

@Override
public void run() {

for (int i = 0; i < MAX; i++) {
// ================ serialization ===================
List<Map<String, Object>> list = new ArrayList<>();
for (int k = 0; k < 3; k++) {
Map<String, Object> map = new HashMap<>();
map.put("any-prop1", "any-value1-" + k);
map.put("any-prop2", "any-value2-" + k);
map.put("any-prop3", "any-value3-" + k);

list.add(map);
}

// serialize list.
byte[] listBytes = kryoContext.serialze(list);

List<User> userList = new ArrayList<>();
for (int k = 0; k < 3; k++) {
User user = new User();
user.setName("any-name" + k);
user.setAge(50 + k);
user.setAddress("any-address..." + k);

userList.add(user);
}

// serialize user list.
byte[] userListBytes = kryoContext.serialze(userList);


Map<String, Object> map = new HashMap<>();
map.put("any-prop1", "any-value1");
map.put("any-prop2", "any-value2");
map.put("any-prop3", "any-value3");

// serialize map.
byte[] mapBytes = kryoContext.serialze(map);

User user = new User();
user.setName("any-name");
user.setAge(50);
user.setAddress("any-address...");

// serialize user object.
byte[] userBytes = kryoContext.serialze(user);


// ================ de-serialization =================
// deserialize list.
List<Map<String, Object>> retList = (List<Map<String, Object>>) kryoContext.deserialze(ArrayList.class, listBytes);

// deserialize user list.
List<User> retUserList = (List<User>) kryoContext.deserialze(ArrayList.class, userListBytes);

// deserialize map.
Map<String, Object> retMap = (Map<String, Object>) kryoContext.deserialze(HashMap.class, mapBytes);

// deserialize user object.
User retUser = (User) kryoContext.deserialze(User.class, userBytes);


try {
System.out.println("retList: [" + mapper.writeValueAsString(retList) + "]");

System.out.println("retUserList: [" + mapper.writeValueAsString(retUserList) + "]");

System.out.println("retMap: [" + mapper.writeValueAsString(retMap) + "]");

System.out.println("retUser: [" + mapper.writeValueAsString(retUser) + "]");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

Run executor to see whether kryo works fine in multithreaded env or not.

// run multiple threads.
runExecutor(new KryoWorkerThread(kryoContext));

Conclusion

If you want to use a serialization framework just for your Java Applications, Kryo is good choice to serialize Java Object rather than native java serialization. So long as you meet such concurrent situations, Kryo Pooling has to be used to serialize Java Object.

References

Founder of Cloud Chef Labs Inc.(http://www.cloudchef-labs.com) | Creator of DataRoaster(https://bit.ly/3BM0ccA)