HowTo: Kafka Connect

Kidong Lee
5 min readMay 13, 2018
Photo by Ian Schneider on Unsplash

Recently, I have used Kafka Connect for a project for the first time. In this article I want to show you how I have made use of Kafka Connect.

Install Kafka Connect Cluster

I have used Confluent Platform 3.3.1 to install Kafka Cluster and Kafka Connect Cluster.

Assumed that Kafka Cluster and Kafka Schema Registry are installed, let’s install Kafka Connect Cluster.

We are going to install Kafka Connect Cluster on multiple worker nodes. Distributed Connect Worker configuration worker.properties looks this:

bootstrap.servers=broker1:9092,broker2:9092# unique group id.
group.id=my-connect-cluster
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://broker1:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# unique offset topic.
offset.storage.topic=my-connect-offsets
offset.storage.replication.factor=2
offset.storage.partitions=25
# unique config topic.
config.storage.topic=my-connect-configs
config.storage.replication.factor=2
# unique status topic.
status.storage.topic=my-connect-status
status.storage.replication.factor=2
status.storage.partitions=5
offset.flush.interval.ms=10000
rest.port=8083
# consumer config.
consumer.auto.offset.reset=latest

This worker.properties should be saved in connect conf directory, for instance $KAFKA_HOME/connect-conf.

cd $KAFKA_HOME;
mkdir -p connect-conf;
# add worker.properties to connect-conf directory.

Before starting connect worker, kafka topics for offset, config and status must be created:

cd $KAFKA_HOME;# config.storage.topic=my-connect-configs
bin/kafka-topics --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic my-connect-configs --replication-factor 2 --partitions 1 --config cleanup.policy=compact
# offset.storage.topic=my-connect-offsets
bin/kafka-topics --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic my-connect-offsets --replication-factor 2 --partitions 25 --config cleanup.policy=compact
# status.storage.topic=my-connect-status
bin/kafka-topics --create --zookeeper zk1:2181,zk2:2181,zk3:2181 --topic my-connect-status --replication-factor 2 --partitions 5 --config cleanup.policy=compact

Run kafka connect worker on individual worker node:

cd $KAFKA_HOME;# start connect in distributed mode.
bin/connect-distributed connect-conf/worker.properties;

Now, Kafka Connect Cluster is installed and is runninng in distributed mode.

You can also see the installation of Kafka and Kafka Connect with Ansible: https://medium.com/@mykidong/howto-organize-ansible-playbook-to-install-uninstall-start-and-stop-kafka-and-kafka-connect-e7250c5def9d

Create and register Avro Schema to Schema Registry

Kafka Connect installed above handles Avro Messages from the topics. Before running HDFS Connector Sink and Elasticsearch Connector Sink, avro schemas for the topics must be registered onto Schema Registry.

First, create avro schema file for the topics, for instance, page-view-event.avsc looks like this:

{
"type":"record",
"name":"PageViewEvent",
"namespace":"com.mykidong.domain.event.schema",
"fields":[
{
"name":"baseProperties",
"type":[
"null",
{
"type":"record",
"name":"BaseProperties",
"namespace":"com.mykidong.domain.event.schema",
"fields":[
{
"name":"pcid",
"type":[
"null",
"string"
]
},
{
"name":"sessionId",
"type":[
"null",
"string"
]
},
{
"name":"referer",
"type":[
"null",
"string"
]
},
{
"name":"browser",
"type":[
"null",
"string"
]
},
{
"name":"eventType",
"type":[
"null",
"string"
]
},
{
"name":"version",
"type":[
"null",
"string"
]
},
{
"name":"timestamp",
"type":[
"null",
"long"
]
}
]
}
]
},
{
"name":"itemId",
"type":[
"null",
"string"
]
},
{
"name":"itemTitle",
"type":[
"null",
"string"
]
},
{
"name":"scrollRange",
"type":[
"null",
"int"
]
},
{
"name":"stayTerm",
"type":[
"null",
"long"
]
},
{
"name":"scrollUpDownCount",
"type":[
"null",
"int"
]
}
]
}

To register avro schemas, use the following Java codes instead of REST API.

Confluent Maven Dependencies should be added:

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
</dependency>

Confluent Maven Repo also added:

<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>

Let’s see the codes to register avro schema to Schema Registry.

// schema registry url.
String url = "http://broker1:8081";
// associated topic name.
String topic = "page-view-event";
// avro schema avsc file path.
String schemaPath = "/avro-schemas/page-view-event.avsc";
// subject convention is "<topic-name>-value"
String subject = topic + "-value";
// avsc json string.
String schema = null;

FileInputStream inputStream = new FileInputStream(schemaPath);
try {
schema = IOUtils.toString(inputStream);
} finally {
inputStream.close();
}

Schema avroSchema = new Schema.Parser().parse(schema);

CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(url, 20);

client.register(subject, avroSchema);

Now, avro schema for the topic page-view-event has been registered onto Schema Registry.

You can also register more avro schemas for the topics, for instance, cart-event and order-event.

HDFS Connector Sink

The next HDFS Connector Sink will save the avro messages onto HDFS as parquet.

Let’s create the HDFS Connector Sink Configuration:

cd $KAFKA_HOME;
mkdir -p connect-conf;
# event hdfs sink.
cat <<EOF > connect-conf/hdfs-parquet-sink.json
{
"name":"hdfs-parquet-sink",
"config":{
"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":"3",
"topics":"page-view-event,cart-event,order-event",
"hadoop.conf.dir":"/etc/hadoop/conf",
"hadoop.home":"/usr/hdp/current/hadoop-client",
"hdfs.url":"hdfs://hdp-cluster",
"topics.dir":"/topics",
"logs.dir":"/logs",
"flush.size":"100",
"rotate.interval.ms":"60000", "format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
"partition.duration.ms":"1800000",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
"locale":"kor",
"timezone":"Asia/Seoul"
}
}
EOF

The messages from the topics, page-view-event, cart-event, and order-event whose avro schemas have already been registered before will be read and saved to hdfs as parquet files.

Run HDFS Connector Sink.

cd $KAFKA_HOME;# load sink connector.
bin/confluent load hdfs-parquet-sink -d connect-conf/hdfs-parquet-sink.json;

Note, after running HDFS Connector tasks, there are a lot of part files created on HDFS. In our case, almost every minute a part file will be created. You should write a batch job, for instance, spark job to consolidate many part files to just 2 or 3 part files. Because the part files are being created at the current time, this batch Job should be run hourly to consolidate the part files created NOT during the last one hour from the current time, BUT during the one hour from one hour before from the current time.

Elasticsearch Connector Sink

ES Connector configuration looks similar to HDFS Connector configuration.

Create Elasticsearch Connector Configuation:

cd $KAFKA_HOME;
mkdir -p connect-conf;
# page view event.
cat <<EOF > connect-conf/es-sink-page-view-event.json
{
"name": "es-sink-page-view-event",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "2",
"topics": "page-view-event",
"type.name": "page-view-event",
"key.ignore": "true",
"connection.url": "http://es1:9200,http://es2:9200"
}
}
EOF

Before running this ES Connector, you should create Elasticsearch Index Template for the index page-view-event.

In this case, Elasticsearch index type name must be page-view-event.

Elasticsearch page-view-event template could be created like this:

curl -XPUT -H "Content-Type: application/json" 'es1:9200/_template/page-view-event-template' -d '
{
"template":"page-view-event*",
"order":0,
"settings":{
"number_of_shards":5,
"number_of_replicas":2,
"index.refresh_interval":"5s"
},
"mappings":{
"page-view-event":{
"properties":{
...
}
}
},
"aliases":{
"page-view-event-alias":{
}
}
}
';

Because ES Connector Sink cannot handle multiple topics like HDFS Connector Sink, you should create individual ES connector sink configurations for another topics.

Run the Elasticsearch Connector Sink:

cd $KAFKA_HOME;
mkdir -p connect-conf;
# load elasticsearch sink connector.
bin/confluent load es-sink-page-view-event -d connect-conf/es-sink-page-view-event.json;

Conclusion

To process messages from kafka topics, Kafka Streams has been used. With Kafka Streams, I can process the messages from topics and send the processed avro messages to the topics. With the help of Kafka Connect, avro messages from the topics will be saved to HDFS and Elasticsearch. No Java codes for hdfs and elasticsearch sinks are necessary any more!

I have thrown away Camus which I had used for ETL job from Kafka to HDFS, instead, Kafka HDFS Connector Sink does this job with more capabilities.

--

--