Run Spark SQL ETL Jobs with Azkaban in Chango
Trino ETL queries can be run through REST simply with Chango Query Exec
in Chango as I mentioned in the previous article. In addition, Spark SQL ETL queries also can be run with Chango Query Exec
as the following picture shows.
This article will show how to integrate Spark SQL ETL jobs through Chango Query Exec
with workflow engine such as Azkaban
.
Create Flow of Chango Query Exec
The following is an example flow file called load-parquet-to-iceberg.yaml
which will be sent to Chango Query Exec
to run Saprk SQL queries through Chango Spark Thrift Server
.
uri: <sts-uri>
user: <sts-token>
queries:
- id: query-0
description: |-
Create iceberg table if not exists.
depends: NONE
query: |-
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.test_proc_import_parquet_mergeinto (
orderkey BIGINT,
partkey BIGINT,
suppkey BIGINT,
linenumber INT,
quantity DOUBLE,
extendedprice DOUBLE,
discount DOUBLE,
tax DOUBLE,
returnflag STRING,
linestatus STRING,
shipdate DATE,
commitdate DATE,
receiptdate DATE,
shipinstruct STRING,
shipmode STRING,
comment STRING)
USING iceberg
TBLPROPERTIES (
'format' = 'iceberg/PARQUET',
'format-version' = '2',
'write.format.default' = 'PARQUET',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '100',
'write.parquet.compression-codec' = 'zstd')
- id: query-1
description: |-
Load parquet files in external s3 into iceberg table.
depends: query-0
query: |-
-- import parquet file in external s3 with the mode of merge.
PROC iceberg.system.import (
source => 's3a://mykidong/temp-external-mergeinto-parquet-path/#{ nowMinusFormatted(0, 0, 1, 0, 0, 0, "YYYY-MM-dd") }',
s3_access_key => 'any-access-key',
s3_secret_key => 'any-secret-key',
s3_endpoint => 'any-endpoint',
s3_region => 'any-region',
file_format => 'parquet',
action => 'MERGE',
id_columns => 'orderkey,partkey,suppkey',
target_table => 'iceberg.iceberg_db.test_proc_import_parquet_mergeinto'
)
<sts-uri>
: URI of chango spark thrift server.<sts-token>
: Chango credential to connect Chango Spark Thrift Server.
Table iceberg.iceberg_db.test_proc_import_parquet_mergeinto
will be created if not exists like this.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.test_proc_import_parquet_mergeinto (
orderkey BIGINT,
partkey BIGINT,
suppkey BIGINT,
linenumber INT,
quantity DOUBLE,
extendedprice DOUBLE,
discount DOUBLE,
tax DOUBLE,
returnflag STRING,
linestatus STRING,
shipdate DATE,
commitdate DATE,
receiptdate DATE,
shipinstruct STRING,
shipmode STRING,
comment STRING)
USING iceberg
TBLPROPERTIES (
'format' = 'iceberg/PARQUET',
'format-version' = '2',
'write.format.default' = 'PARQUET',
'write.metadata.delete-after-commit.enabled' = 'true',
'write.metadata.previous-versions-max' = '100',
'write.parquet.compression-codec' = 'zstd')
After that, parquet files in external s3 will be loaded to iceberg table in Chango incrementally using Chango SQL Procedure
.
PROC iceberg.system.import (
source => 's3a://mykidong/temp-external-mergeinto-parquet-path/#{ nowMinusFormatted(0, 0, 1, 0, 0, 0, "YYYY-MM-dd") }',
s3_access_key => 'any-access-key',
s3_secret_key => 'any-secret-key',
s3_endpoint => 'any-endpoint',
s3_region => 'any-region',
file_format => 'parquet',
action => 'MERGE',
id_columns => 'orderkey,partkey,suppkey',
target_table => 'iceberg.iceberg_db.test_proc_import_parquet_mergeinto'
)
Take a look at the function of nowMinusFormatted(0, 0, 1, 0, 0, 0, "YYYY-MM-dd")
which will be interpreted as yesterday, for example, 2024-08-06
in Chango Query Exec
. If parquet files are saved in the path of s3a://mykidong/temp-external-mergeinto-parquet-path/<yyyy-MM-dd>
everyday, the above import procedure will load parquet files from the appropriate date path.
Create Shell File to Send Flow Queries to Chango Query Exec
You can send flow queries to Chango Query Exec
simply using curl
. The following is an example shell file called hive-query-exec.sh
to send flow queries to Chango Query Exec
.
#!/bin/sh
set -e
export CONFIG_PATH=NA
export FLOW_PATH=NA
for i in "$@"
do
case $i in
--config-path=*)
CONFIG_PATH="${i#*=}"
shift
;;
--flow-path=*)
FLOW_PATH="${i#*=}"
shift
;;
*)
# unknown option
;;
esac
done
echo "CONFIG_PATH: $CONFIG_PATH"
echo "FLOW_PATH: $FLOW_PATH"
export ACCESS_TOKEN=NA
export CHANGO_QUERY_EXEC_URL=NA
# get properties file.
while IFS='=' read -r key value
do
key=$(echo $key | tr '.' '_')
eval ${key}=\${value}
done < "$CONFIG_PATH"
ACCESS_TOKEN=${accessToken}
CHANGO_QUERY_EXEC_URL=${changoQueryExecUrl}
#echo "ACCESS_TOKEN: $ACCESS_TOKEN"
echo "CHANGO_QUERY_EXEC_URL: $CHANGO_QUERY_EXEC_URL"
# request.
http_response=$(curl -sS -o response.txt -w "%{response_code}" -XPOST -H "Authorization: Bearer $ACCESS_TOKEN" \
$CHANGO_QUERY_EXEC_URL/v1/hive/exec-query-flow \
--data-urlencode "flow=$(cat $FLOW_PATH)" \
)
if [ $http_response != "200" ]; then
echo "Failed!"
cat response.txt
exit 1;
else
echo "Succeeded."
cat response.txt
exit 0;
fi
In addition to shell file, you can create a configuration file called config.properties
like this.
changoQueryExecUrl=http://cp-3.chango.private:8009
accessToken=<chango-credential>
Then, you can run query exec runner shell file as follows, for example.
/home/spark/chango-etl-example/runner/hive-query-exec.sh \
--config-path=/home/spark/chango-etl-example/configure/config.properties \
--flow-path=/home/spark/chango-etl-example/etl/import/load-parquet-to-iceberg.yaml \
;
Create Azkaban Project
You need to create flow file called load.flow
of Azkaban like this.
---
config:
failure.emails: admin@your-domain.com
nodes:
- name: start
type: noop
- name: "load-parquet-to-iceberg"
type: command
config:
command: |-
ssh spark@cp-1.chango.private " \
/home/spark/chango-etl-example/runner/hive-query-exec.sh \
--config-path=/home/spark/chango-etl-example/configure/config.properties \
--flow-path=/home/spark/chango-etl-example/etl/import/load-parquet-to-iceberg.yaml \
;
"
dependsOn:
- start
- name: end
type: noop
dependsOn:
- "load-parquet-to-iceberg"
This azkaban flow file shows DAG to run remote shell files which send query flow to Chango Query Exec
.
Create a metadata file called flow20.project
for azkaban project.
azkaban-flow-version: 2.0
All the files here created for now should be managed by source control system like git.
# package azkaban project file.
zip load.zip load.flow flow20.project
# move to /tmp.
mv load.zip /tmp
Upload azkaban project file on Flow Query Execution Node
where Azkaban CLI
needs to be installed.
sudo su - azkabancli;
source venv/bin/activate;
azkaban upload \
-c \
-p load \
-u azkaban:<azkban-password>@http://cp-1.chango.private:28081 \
/tmp/load.zip \
;
After executing azkaban flow of this project load
, it looks like this.
You can also see the job log.
SQL is essential in modern data lakehouses, even for ETL jobs. Chango provides Chango Query Exec
to run such as SQL ETL queries through REST simply. As seen for now, Spark SQL ETL query jobs and Trino ETL query jobs are easily integrated with workflow engine like Azkaban
through Chango Query Exec
.