HowTo: Pool Kryo Instances to serialize Java Object in multithreaded Environment

Kryo Pooling

public interface KryoContext {

byte[] serialze(Object obj);

byte[] serialze(Object obj, int bufferSize);

Object deserialze(Class clazz, byte[] serialized);
}
public class DefaultKryoContext implements KryoContext{

private static final int DEFAULT_BUFFER = 1024 * 100;

private KryoPool pool;

public static KryoContext newKryoContextFactory(KryoClassRegistrator registrator)
{
return new DefaultKryoContext(registrator);
}

private DefaultKryoContext(KryoClassRegistrator registrator)
{
KryoFactory factory = new KryoFactoryImpl(registrator);

pool = new KryoPool.Builder(factory).softReferences().build();
}

private static class KryoFactoryImpl implements KryoFactory
{
private KryoClassRegistrator registrator;

public KryoFactoryImpl(KryoClassRegistrator registrator)
{
this.registrator = registrator;
}

@Override
public Kryo create() {
Kryo kryo = new Kryo();
registrator.register(kryo);

return kryo;
}
}


@Override
public byte[] serialze(Object obj)
{
return serialze(obj, DEFAULT_BUFFER);
}

@Override
public byte[] serialze(Object obj, int bufferSize)
{
Output output =
new Output(new ByteArrayOutputStream(), bufferSize);

Kryo kryo = pool.borrow();

kryo.writeObject(output, obj);
byte[] serialized = output.toBytes();

pool.release(kryo);

return serialized;
}

@Override
public Object deserialze(Class clazz, byte[] serialized)
{
Object obj;

Kryo kryo = pool.borrow();

Input input = new Input(serialized);
obj = kryo.readObject(input, clazz);

pool.release(kryo);

return obj;
}

}
public interface KryoClassRegistrator {

public void register(Kryo kryo);

}
KryoContext kryoContext =   DefaultKryoContext.newKryoContextFactory(kryo -> {
kryo.register(ArrayList.class);
kryo.register(HashMap.class);
});

Serialize Java Object

List<Map<String, Object>> list = new ArrayList<>();
for(int k = 0; k < 3; k++)
{
Map<String, Object> map =
new HashMap<>();
map.put("any-prop1", "any-value1-" + k);
map.put("any-prop2", "any-value2-" + k);
map.put("any-prop3", "any-value3-" + k);

list.add(map);
}
// serialize list.
byte[] listBytes = kryoContext.serialze(list);
private static class User {
private String name;

private int age;

private String address;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}
}
...List<User> userList = new ArrayList<>();
for(int k = 0; k < 3; k++)
{
User user = new User();
user.setName("any-name" + k);
user.setAge(50 + k);
user.setAddress("any-address..." + k);

userList.add(user);
}

// serialize user list.
byte[] userListBytes = kryoContext.serialze(userList);
Map<String, Object> map = new HashMap<>();				
map.put("any-prop1", "any-value1");
map.put("any-prop2", "any-value2");
map.put("any-prop3", "any-value3");

// serialize map.
byte[] mapBytes = kryoContext.serialze(map);
User user = new User();
user.setName("any-name");
user.setAge(50);
user.setAddress("any-address...");

// serialize user object.
byte[] userBytes = kryoContext.serialze(user);
int moreBuffer = 200 * 1024;
byte[] listBytes = kryoContext.serialze(list, moreBuffer);
// deserialize list.
List<Map<String, Object>> retList =
(List<Map<String, Object>>)kryoContext.deserialze(ArrayList.class, listBytes);

// deserialize user list.
List<User> retUserList = (List<User>)kryoContext.deserialze(ArrayList.class, userListBytes);

// deserialize map.
Map<String, Object> retMap =
(Map<String, Object>)kryoContext.deserialze(HashMap.class, mapBytes);

// deserialize user object.
User retUser = (User)kryoContext.deserialze(User.class, userBytes);

Using Kryo Pool in multithread environment

private void runExecutor(Runnable r) {
ExecutorService executor = Executors.newFixedThreadPool(20);

for (int i = 0; i < 40; i++) {
executor.execute(r);
}

executor.shutdown();

while (!executor.isTerminated()) {
}
System.out.println("all threads finished...");
}
private static class KryoWorkerThread implements Runnable {
private int MAX = 1000;

private ObjectMapper mapper = new ObjectMapper();

private KryoContext kryoContext;

public KryoWorkerThread(KryoContext kryoContext) {
this.kryoContext = kryoContext;
}

@Override
public void run() {

for (int i = 0; i < MAX; i++) {
// ================ serialization ===================
List<Map<String, Object>> list = new ArrayList<>();
for (int k = 0; k < 3; k++) {
Map<String, Object> map = new HashMap<>();
map.put("any-prop1", "any-value1-" + k);
map.put("any-prop2", "any-value2-" + k);
map.put("any-prop3", "any-value3-" + k);

list.add(map);
}

// serialize list.
byte[] listBytes = kryoContext.serialze(list);

List<User> userList = new ArrayList<>();
for (int k = 0; k < 3; k++) {
User user = new User();
user.setName("any-name" + k);
user.setAge(50 + k);
user.setAddress("any-address..." + k);

userList.add(user);
}

// serialize user list.
byte[] userListBytes = kryoContext.serialze(userList);


Map<String, Object> map = new HashMap<>();
map.put("any-prop1", "any-value1");
map.put("any-prop2", "any-value2");
map.put("any-prop3", "any-value3");

// serialize map.
byte[] mapBytes = kryoContext.serialze(map);

User user = new User();
user.setName("any-name");
user.setAge(50);
user.setAddress("any-address...");

// serialize user object.
byte[] userBytes = kryoContext.serialze(user);


// ================ de-serialization =================
// deserialize list.
List<Map<String, Object>> retList = (List<Map<String, Object>>) kryoContext.deserialze(ArrayList.class, listBytes);

// deserialize user list.
List<User> retUserList = (List<User>) kryoContext.deserialze(ArrayList.class, userListBytes);

// deserialize map.
Map<String, Object> retMap = (Map<String, Object>) kryoContext.deserialze(HashMap.class, mapBytes);

// deserialize user object.
User retUser = (User) kryoContext.deserialze(User.class, userBytes);


try {
System.out.println("retList: [" + mapper.writeValueAsString(retList) + "]");

System.out.println("retUserList: [" + mapper.writeValueAsString(retUserList) + "]");

System.out.println("retMap: [" + mapper.writeValueAsString(retMap) + "]");

System.out.println("retUser: [" + mapper.writeValueAsString(retUser) + "]");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
// run multiple threads.
runExecutor(new KryoWorkerThread(kryoContext));

Conclusion

References

--

--

--

Founder of Cloud Chef Labs Inc.(http://www.cloudchef-labs.com) | Creator of DataRoaster(https://bit.ly/3BM0ccA)

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

✨Wondering HOW to collect your airdrop rewards?

Integration lvm with Hadoop , So that we can get dynamic storage in datanode.

5 Tips to Fix your Software Development Company Cashflow

October Sneak Peek: Automation Architecture

October Sneak Peek: Automation

MongoDB Aggregation Framework and Map-Reduce

What is a Microservice, Really

Can a Scrum team consist of only one person?

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Kidong Lee

Kidong Lee

Founder of Cloud Chef Labs Inc.(http://www.cloudchef-labs.com) | Creator of DataRoaster(https://bit.ly/3BM0ccA)

More from Medium

Authentication Failure Error in wlsdm.log File

An implementation of TLS Handshake Part 1: Overview

How to Pass Data Among Steps in a Spring Batch Application

Garlicpool