Run Spark SQL ETL Jobs with Azkaban in Chango

Kidong Lee
5 min readAug 8, 2024

--

Trino ETL queries can be run through REST simply with Chango Query Exec in Chango as I mentioned in the previous article. In addition, Spark SQL ETL queries also can be run with Chango Query Exec as the following picture shows.

This article will show how to integrate Spark SQL ETL jobs through Chango Query Exec with workflow engine such as Azkaban .

Create Flow of Chango Query Exec

The following is an example flow file called load-parquet-to-iceberg.yaml which will be sent to Chango Query Exec to run Saprk SQL queries through Chango Spark Thrift Server.

uri: <sts-uri>
user: <sts-token>
queries:
- id: query-0
description: |-
Create iceberg table if not exists.
depends: NONE
query: |-
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.test_proc_import_parquet_mergeinto (
orderkey BIGINT,
partkey BIGINT,
suppkey BIGINT,
linenumber INT,
quantity DOUBLE,
extendedprice DOUBLE,
discount DOUBLE,
tax DOUBLE,
returnflag STRING,
linestatus STRING,
shipdate DATE,
commitdate DATE,
receiptdate DATE,
shipinstruct STRING,
shipmode STRING,
comment STRING)
USING iceberg
TBLPROPERTIES (
'format' = 'iceberg/PARQUET',
'format-version' = '2',
'write.format.default' = 'PARQUET',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '100',
'write.parquet.compression-codec' = 'zstd')
- id: query-1
description: |-
Load parquet files in external s3 into iceberg table.
depends: query-0
query: |-
-- import parquet file in external s3 with the mode of merge.
PROC iceberg.system.import (
source => 's3a://mykidong/temp-external-mergeinto-parquet-path/#{ nowMinusFormatted(0, 0, 1, 0, 0, 0, "YYYY-MM-dd") }',
s3_access_key => 'any-access-key',
s3_secret_key => 'any-secret-key',
s3_endpoint => 'any-endpoint',
s3_region => 'any-region',
file_format => 'parquet',
action => 'MERGE',
id_columns => 'orderkey,partkey,suppkey',
target_table => 'iceberg.iceberg_db.test_proc_import_parquet_mergeinto'
)
  • <sts-uri>: URI of chango spark thrift server.
  • <sts-token>: Chango credential to connect Chango Spark Thrift Server.

Table iceberg.iceberg_db.test_proc_import_parquet_mergeinto will be created if not exists like this.

      CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.test_proc_import_parquet_mergeinto (
orderkey BIGINT,
partkey BIGINT,
suppkey BIGINT,
linenumber INT,
quantity DOUBLE,
extendedprice DOUBLE,
discount DOUBLE,
tax DOUBLE,
returnflag STRING,
linestatus STRING,
shipdate DATE,
commitdate DATE,
receiptdate DATE,
shipinstruct STRING,
shipmode STRING,
comment STRING)
USING iceberg
TBLPROPERTIES (
'format' = 'iceberg/PARQUET',
'format-version' = '2',
'write.format.default' = 'PARQUET',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '100',
'write.parquet.compression-codec' = 'zstd')

After that, parquet files in external s3 will be loaded to iceberg table in Chango incrementally using Chango SQL Procedure.

      PROC iceberg.system.import (
source => 's3a://mykidong/temp-external-mergeinto-parquet-path/#{ nowMinusFormatted(0, 0, 1, 0, 0, 0, "YYYY-MM-dd") }',
s3_access_key => 'any-access-key',
s3_secret_key => 'any-secret-key',
s3_endpoint => 'any-endpoint',
s3_region => 'any-region',
file_format => 'parquet',
action => 'MERGE',
id_columns => 'orderkey,partkey,suppkey',
target_table => 'iceberg.iceberg_db.test_proc_import_parquet_mergeinto'
)

Take a look at the function of nowMinusFormatted(0, 0, 1, 0, 0, 0, "YYYY-MM-dd") which will be interpreted as yesterday, for example, 2024-08-06 in Chango Query Exec. If parquet files are saved in the path of s3a://mykidong/temp-external-mergeinto-parquet-path/<yyyy-MM-dd> everyday, the above import procedure will load parquet files from the appropriate date path.

Create Shell File to Send Flow Queries to Chango Query Exec

You can send flow queries to Chango Query Exec simply using curl. The following is an example shell file called hive-query-exec.sh to send flow queries to Chango Query Exec.

#!/bin/sh

set -e

export CONFIG_PATH=NA
export FLOW_PATH=NA

for i in "$@"
do
case $i in
--config-path=*)
CONFIG_PATH="${i#*=}"
shift
;;
--flow-path=*)
FLOW_PATH="${i#*=}"
shift
;;
*)
# unknown option
;;
esac
done

echo "CONFIG_PATH: $CONFIG_PATH"
echo "FLOW_PATH: $FLOW_PATH"

export ACCESS_TOKEN=NA
export CHANGO_QUERY_EXEC_URL=NA

# get properties file.
while IFS='=' read -r key value
do
key=$(echo $key | tr '.' '_')
eval ${key}=\${value}
done < "$CONFIG_PATH"

ACCESS_TOKEN=${accessToken}
CHANGO_QUERY_EXEC_URL=${changoQueryExecUrl}

#echo "ACCESS_TOKEN: $ACCESS_TOKEN"
echo "CHANGO_QUERY_EXEC_URL: $CHANGO_QUERY_EXEC_URL"

# request.
http_response=$(curl -sS -o response.txt -w "%{response_code}" -XPOST -H "Authorization: Bearer $ACCESS_TOKEN" \
$CHANGO_QUERY_EXEC_URL/v1/hive/exec-query-flow \
--data-urlencode "flow=$(cat $FLOW_PATH)" \
)
if [ $http_response != "200" ]; then
echo "Failed!"
cat response.txt
exit 1;
else
echo "Succeeded."
cat response.txt
exit 0;
fi

In addition to shell file, you can create a configuration file called config.properties like this.

changoQueryExecUrl=http://cp-3.chango.private:8009
accessToken=<chango-credential>

Then, you can run query exec runner shell file as follows, for example.

/home/spark/chango-etl-example/runner/hive-query-exec.sh \
--config-path=/home/spark/chango-etl-example/configure/config.properties \
--flow-path=/home/spark/chango-etl-example/etl/import/load-parquet-to-iceberg.yaml \
;

Create Azkaban Project

You need to create flow file called load.flow of Azkaban like this.

---
config:
failure.emails: admin@your-domain.com

nodes:
- name: start
type: noop

- name: "load-parquet-to-iceberg"
type: command
config:
command: |-
ssh spark@cp-1.chango.private " \
/home/spark/chango-etl-example/runner/hive-query-exec.sh \
--config-path=/home/spark/chango-etl-example/configure/config.properties \
--flow-path=/home/spark/chango-etl-example/etl/import/load-parquet-to-iceberg.yaml \
;
"
dependsOn:
- start

- name: end
type: noop
dependsOn:
- "load-parquet-to-iceberg"

This azkaban flow file shows DAG to run remote shell files which send query flow to Chango Query Exec.

Create a metadata file called flow20.project for azkaban project.

azkaban-flow-version: 2.0

All the files here created for now should be managed by source control system like git.

# package azkaban project file.
zip load.zip load.flow flow20.project

# move to /tmp.
mv load.zip /tmp

Upload azkaban project file on Flow Query Execution Node where Azkaban CLI needs to be installed.

sudo su - azkabancli;
source venv/bin/activate;
azkaban upload \
-c \
-p load \
-u azkaban:<azkban-password>@http://cp-1.chango.private:28081 \
/tmp/load.zip \
;

After executing azkaban flow of this project load, it looks like this.

You can also see the job log.

SQL is essential in modern data lakehouses, even for ETL jobs. Chango provides Chango Query Exec to run such as SQL ETL queries through REST simply. As seen for now, Spark SQL ETL query jobs and Trino ETL query jobs are easily integrated with workflow engine like Azkaban through Chango Query Exec .

--

--

Kidong Lee
Kidong Lee

Written by Kidong Lee

Founder of Cloud Chef Labs | Chango | Unified Data Lakehouse Platform | Iceberg centric Data Lakehouses https://www.cloudchef-labs.com/

No responses yet