Kidong Lee
3 min readJun 10, 2019

--

It is assumed that avro messages sent onto the topic will be saved onto hdfs as parquet file via kafka hdfs connector.

That is, if your messages sent to the topic are not in avro format, you don’t need to use avro schema registry.

But if you send avro messages to your topic, then you need avro schema registry where you have to registry avro schema for your topic. I have already mentioned how to do it in my blog.

Kafka Connect cluster should be installed in distributed mode on multiple nodes, for instance, your 3 nodes.

I have recently used confluent 5.0.1 of which configuration is a little bit different(but almost same) from that of confluent 3.x used in my blog.

Let’s look at the connect worker configuration:

bootstrap.servers=your-broker1:9092,your-broker2:9092,your-broker3:9092# unique roup id.
group.id=your-connect-cluster
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://your-broker1:8081
# unique offset topic.
offset.storage.topic=your-connect-offsets
offset.storage.replication.factor=2
offset.storage.partitions=25
# unique config topic.
config.storage.topic=your-connect-configs
config.storage.replication.factor=2
# unique status topic.
status.storage.topic=your-connect-status
status.storage.replication.factor=2
status.storage.partitions=5
offset.flush.interval.ms=10000

Now, you have to save this properties, for instance onto <confluent-home>/connect-conf/worker.properties on each kafka node, and then run kafka connect workers on each nodes like this:

# start connect in distributed mode.
bin/connect-distributed connect-conf/worker.properties;

If you have kafka connect cluster like above, you need to have hdfs connector which will be used to save the topic messages to hdfs.

For confluent 5.0.1, you should use confluent-hub client to install hdfs connector on each node like this:

cd <confluent-home>;
## create confluent hub directory.
mkdir -p confluent-hub;
cd confluent-hub;

## download conflent hub client.
wget http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz;
tar zxvf *.tar.gz;
rm -rf *.gz;

## create hdfs plugin directory.
mkdir -p <confluent-home>/plugins/hdfs

## install hdfs connector.
bin/confluent-hub install confluentinc/kafka-connect-hdfs:5.0.1 --component-dir <confluent-home>/plugins/hdfs/ --worker-configs <confluent-home>/connect-conf/worker.properties;

After installing hdfs connector plugin on each nodes, you should restart kafka connect cluster.

And now hdfs connector should be loaded onto kafka connect cluster.

You should configure hdfs connector sink like this:

cd <confluent-home>;
mkdir -p connect-conf;

## some hdfs sink.
cat <<EOF > connect-conf/hdfs-some.json
{
"name":"hdfs-some",
"config":{
"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":"3",
"topics":"<your-topics>",
"hadoop.conf.dir":"/etc/hadoop/conf",
"hadoop.home":"/usr/hdp/current/hadoop-client",
"hdfs.url":"hdfs://<hadoop-name-service>",
"topics.dir":"/topics",
"logs.dir":"/logs",
"flush.size":"18000",
"rotate.interval.ms":"60000",
"format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
"partition.duration.ms":"1800000",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
"locale":"kor",
"timezone":"UTC"
}
}
EOF

You have to save this configuration onto one of your nodes, then run the following to load hdfs connector tasks onto kafka connect cluster.

bin/confluent load hdfs-some -d connect-conf/hdfs-some.json;

After you have installed kafka connect and hdfs connector successfully, a lot of part files as parquet onto hdfs will be created for instance in every minute.

If hdfs sink connector tasks running are 3, then 3 x 60 = 180 part files will be created for an hour, that could cause Name Node overhead.

You have to consolidate small part files to for instance 3 part files in every hour with batch job of a spark, or some hdfs small file compact tool.

I have no experience to integrate kafka connect with Hive directly, but I have created just hive external table considering partitions related to “path.format” for the above topic hdfs path.

- Kidong.

--

--

Kidong Lee
Kidong Lee

Written by Kidong Lee

Founder of Cloud Chef Labs | Chango | Unified Data Lakehouse Platform | Iceberg centric Data Lakehouses https://www.cloudchef-labs.com/

No responses yet