HowTo: High Performant Event Collector with Disruptor and Vert.x

Kidong Lee
4 min readMay 21, 2018

High performant event collector is the fundamental component in the area of big data collection and IoT data collection.

But it is not easy to accomplish 100,000 TPS for about 1KB size of user behavior event data and 300,000 TPS for about 60B size of IoT event data with an event collector.

Here, I will show how to implement such high performant event collector using Disruptor and Vert.x.

  • Disruptor: High Performant Thread Queue.
  • Vert.x: High Performant Transport(vert.x java transport is based on Netty) Handler for HTTP, TCP, etc.

The complete codes which will be shown here can be found in this repo: https://github.com/mykidong/high-performant-event-collector

Overview of Event Collector

Let’s see the high performant event collector in picture.

  • In RequestHandler, Vert.x handles HTTP or TCP to get the body of the event message and sends the event messages to ValidationDisruptor Queue.
  • In ValidationDisruptor Queue, the event messages are filtered by validation check. All the valid event messages are forwarded to PutDisruptor Queue.
  • In PutDisruptor Queue, the valid event messages fork to Logger and ProduceToKafka.
  • In Logger, the valid event messages are logged onto the local files.
  • In ProduceToKafka, the valid event messages are produced to Kafka.

RequestHandler in Event Collector

In RequestHandler, let’s create validation disruptor instance queue:

EventHandler<EventLog> validationHandler = ...Disruptor<EventLog> validationDisruptor = DisruptorCreator.singleton(DisruptorCreator.DISRUPTOR_NAME_VALIDATION, EventLog.FACTORY, 1024, validationHandler);

With DisruptorCreator and validationHandler, a disruptor instance for validation is created. validationHandler will handle the events coming from this created validationDisruptor Queue.

DisruptorCreator class looks like this:

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;

/**
* Created by mykidong on 2016-09-01.
*/
public class DisruptorCreator {

private static Logger log = LoggerFactory.getLogger(DisruptorCreator.class);

public static final String DISRUPTOR_NAME_VALIDATION = "Validation";
public static final String DISRUPTOR_NAME_PUT = "Put";

private static ConcurrentMap<String, Disruptor> disruptorMap;
private static final Object lock = new Object();

public static <T> Disruptor singleton(String disruptorName, EventFactory<T> factory, int bufferSize, EventHandler<T>... handlers)
{
if(disruptorMap == null) {
synchronized(lock) {
if(disruptorMap == null) {
disruptorMap = new ConcurrentHashMap<>();
Disruptor disruptor = newInstance(disruptorName, factory, bufferSize, handlers);
disruptorMap.put(disruptorName, disruptor);
}
}
}
else
{
synchronized(lock) {
if (!disruptorMap.containsKey(disruptorName)) {
Disruptor disruptor = newInstance(disruptorName, factory, bufferSize, handlers);
disruptorMap.put(disruptorName, disruptor);
}
}
}

return disruptorMap.get(disruptorName);
}

public static <T> Disruptor newInstance(String disruptorName, EventFactory<T> factory, int bufferSize, EventHandler<T>... handlers) {
Disruptor disruptor = new Disruptor(factory,
bufferSize,
Executors.newCachedThreadPool(),
ProducerType.SINGLE, // Single producer
new BlockingWaitStrategy());

disruptor.handleEventsWith(handlers);
disruptor.start();

return disruptor;
}
}

After getting body of event from HTTP or TCP in RequestHandler, the messages from the event body with additional meta data can be sent to the ValidationDisruptor like this:

// init. disruptor event log translator.
BaseTranslator.EventLogTranslator eventLogValidationTranslator = new BaseTranslator.EventLogTranslator();
// handle http or tcp request.
Buffer body = ...
// get body from the request.
String json = new String(body.getBytes());
// parse json and retrieve version and event type.
String version = ...
String eventType = ...
// set the values to event translator.
eventLogValidationTranslator
.setVersion(version);
eventLogValidationTranslator.setEventType(eventType);
eventLogValidationTranslator.setValue(json);
// publish the event to validation disruptor.
validationDisruptor.publishEvent(this.eventLogValidationTranslator);

EventLogTranslator class looks like this:

public class BaseTranslator {

public static class EventLogTranslator extends EventLog implements EventTranslator<EventLog>
{
@Override
public void translateTo(EventLog eventLog, long sequence) {
eventLog.setVersion(this.getVersion());
eventLog.setEventType(this.getEventType());
eventLog.setValue(this.getValue());
}
}
}
public class BaseEvent {

private String version;

private String eventType;

/**
* event log json string.
*/
private String value;

public String getVersion() {
return version;
}

public void setVersion(String version) {
this.version = version;
}

public String getEventType() {
return eventType;
}

public void setEventType(String eventType) {
this.eventType = eventType;
}

public String getValue() {
return value;
}

public void setValue(String value) {
this.value = value;
}
}
public class EventLog extends BaseEvent {

public final static EventFactory<EventLog> FACTORY = EventLog::new;
}

ValidationHandler in Event Collector

In ValidationHandler, the published events from RequestHandler will be consumed:

this.putDisruptor = DisruptorCreator.singleton(DisruptorCreator.DISRUPTOR_NAME_PUT, EventLog.FACTORY, 1024, produceToKafkaHandler, loggerEventHandler);

this.putEventTranslator = new BaseTranslator.EventLogTranslator();
...@Override
public void onEvent(final EventLog eventLog, final long sequence, final boolean endOfBatch) throws Exception {
String version = eventLog.getVersion();
String json = eventLog.getValue();

// Validation
if (eventLogValidationEnabled) {
try {
Map<String, Object> map = JsonUtils.toMap(mapper, json);

// version 1.0.0 validation.
if (version.equals("1.0.0")) {
if (!evenValidator.isValid(map)) {

return;
}
}
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());

return;
}
}

this.putEventTranslator.setVersion(version);
this.putEventTranslator.setEventType(eventLog.getEventType());
this.putEventTranslator.setValue(json);

this.putDisruptor.publishEvent(this.putEventTranslator);
}

putDisruptor instance is created with two event handlers, namely, produceToKafkaHandler and loggerEventHandler. All the valid messages will be forwarded into these two handlers.

In method onEvent(), the validation check is done, if invalid, the messages may not be published to the next PutDisruptor queue.

ProduceToKafka in Event Collector

In ProduceToKafka, all the valid messages will be produced to Kafka:

@Override
public void onEvent(EventLog eventLog, long l, boolean b) throws Exception {
String topic = eventLog.getEventType();
String json = eventLog.getValue();

// send event log to kafka.
if (this.sendMessageEnabled) {
producer.send(new ProducerRecord<Integer, String>(topic, json));
}
}

Logger in Event Collector

In Logger, the valid messages will be logged onto local log files:

private static Logger eventLogger = LoggerFactory.getLogger("event-logger");...@Override
public void onEvent(EventLog eventLog, long sequence, boolean endOfBatch)
throws Exception {

String eventType = eventLog.getEventType();
String json = eventLog.getValue();

if (this.eventLoggingEnabled) {
eventLogger.info(json);
}
}

eventLogger will log the valid json. This eventLogger is defined in the log4j.xml configuration:

<appender name="eventLoggerDailyRollingFileAppender" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="/any-temp/collection-logger.log" />
<param name="DatePattern" value="'.'yyyy-MM-dd" />
<param name="Encoding" value="UTF-8" />
<param name="MaxBackupIndex" value="3"/>
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%m%n" />
</layout>
</appender>

<logger name="event-logger" additivity="false">
<level value="info" />
<appender-ref ref="eventLoggerDailyRollingFileAppender" />
</logger>

Conclusion

With the inter-thread messaging queue Disruptor and transport Vert.x, you can have an event collector with the impressed performance.

References

--

--