Easy way to move Iceberg Data using Iceberg REST Catalog

Kidong Lee
ITNEXT
Published in
7 min readApr 1, 2023

--

Photo by Aaron Burden on Unsplash

We have used hive metastore as a standard data catalog in data lakehouses for now. As open table formats like iceberg become popular, we need to consider an alternative to hive metastore for our data catalog. Especially, if you want to save all the data as iceberg table, then you don’t have to use hive metastore. There are several catalogs like JDBC and REST catalogs supported by iceberg.

Because Iceberg REST catalog is simple and flexible to manage meta data of data catalog , I am going to talk about data movement using Iceberg REST catalog in this article.

All the codes mentioned here are located in the following git repo.

, which is originated from https://github.com/tabular-io/iceberg-rest-image which I have modified with adding some functionalities.

Data Movement using Iceberg REST Catalog

Let’s say, there are two data lakehouses. All data are saved as iceberg table in S3 / S3 compatible object storage. One is based on trino, and the other is based on spark.

  • On spark based data lake, spark etl jobs will be run using Iceberg REST Catalog as data catalog.
  • On trino based data lakehouse, trino queries like etl and interactive queries will be run using Hive Metastore as data catalog.

I want to move data from spark based data lake to trino based one, or vice versa. The simplest way is:

  • Add iceberg rest catalog to trino on trino based data lakehouse.

Using the queries like CTAS of trino, you can move data from spark based data lake to trino based one, and vice versa.

As seen in spark based data lake, Iceberg REST catalog server is running. Of course, Hive metastore could be running there, but it is not easy to add authentication to hive metastore to be publicly accessed in more secure way. But Iceberg REST catalog server can be run with support of OAuth2 authentication for public access.

Install Iceberg REST Catalog Server

Install MySQL Server

Iceberg REST Catalog server needs backend jdbc catalog for which mysql needs to be installed.

There is an easy way to install mysql for instance on kubernetes. First install helm operator.

helm repo add dataroaster-helm-operator https://cloudcheflabs.github.io/helm-operator-helm-repo/
helm repo update

helm upgrade \
helm-operator \
--install \
--create-namespace \
--namespace helm-operator \
--version v1.1.1 \
dataroaster-helm-operator/dataroasterhelmoperator;

And then install mysql.

cat <<EOF > rest-catalog-mysql.yaml
apiVersion: "helm-operator.cloudchef-labs.com/v1beta1"
kind: HelmChart
metadata:
name: rest-catalog-mysql
namespace: helm-operator
spec:
repo: https://cloudcheflabs.github.io/mysql-helm-repo/
chartName: dataroaster-mysql
name: mysql
version: v1.0.1
namespace: rest-catalog
values: |
storage:
storageClass: <storageClass>
size: 2Gi
EOF

kubectl apply -f rest-catalog-mysql.yaml;

<storageClass> needs to be changed to suit to your environment.

Portforward mysql service to be accessed from localhost.

kubectl port-forward svc/mysql-service 3306 -n rest-catalog;

Run Iceberg REST Catalog Server with Maven

With maven, you can run iceberg rest catalog server running on local machine to test it.

cd rest-catalog;

export REST_CATALOG_ACCESS_TOKEN=<token>;
export CATALOG_WAREHOUSE=s3a://<your-bucket>/<warehouse-path>;
export S3_ACCESS_KEY=<s3-access-key>;
export S3_SECRET_KEY=<s3-secret-key>;
export S3_ENDPOINT=https://<s3-endpoint>;
export S3_REGION=<s3-region>;
export JDBC_URL=jdbc:mysql://localhost:3306/rest_catalog?useSSL=false\&createDatabaseIfNotExist=true;
export JDBC_USER=root;
export JDBC_PASSWORD=mysqlpass123;

mvn -e spring-boot:run \
-Dspring.profiles.active=dev \
;
  • <token> is token which will be used to access iceberg rest catalog by spark and trino.
  • <your-bucket> and <warehouse-path> are s3 bucket and warehouse path for catalog warehouse location.
  • <s3-access-key> is s3 access key.
  • <s3-secret-key> is s3 secret key.
  • <s3-endpoint> is s3 endpoint.
  • <s3-region> is s3 region. If your s3 region is not default region, set the proper region.
  • All the environment variables with the prefix of JDBC are related to the connection to mysql which is backend database for JDBC Catalog behind Iceberg REST Catalog.

After running the script, iceberg rest catalog server will be run with the listening port of 8181 .

Run Spark Job

First step to run spark job using rest catalog, you need to define rest catalog in spark configuration.

        // iceberg rest catalog.
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
sparkConf.set("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog");
sparkConf.set("spark.sql.catalog.rest.catalog-impl", "org.apache.iceberg.rest.RESTCatalog");
sparkConf.set("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
sparkConf.set("spark.sql.catalog.rest.uri", restUrl);
sparkConf.set("spark.sql.catalog.rest.warehouse", warehouse);
sparkConf.set("spark.sql.catalog.rest.token", token);
sparkConf.set("spark.sql.catalog.rest.s3.endpoint", s3Endpoint);
sparkConf.set("spark.sql.catalog.rest.s3.path-style-access", "true");
sparkConf.set("spark.sql.defaultCatalog", "rest");
  • token is a token which was set when iceberg rest catalog server is run above.
  • s3.endpoint needs to be set when you use s3 compatible object storage like MinIO, Oracle Object Storage, etc.

Because the warehouse location is in s3 bucket, we need to add s3 credentials to spark hadoop configuration.

        // set aws system properties.
System.setProperty("aws.region", (s3Region != null) ? s3Region : "us-east-1");
System.setProperty("aws.accessKeyId", s3AccessKey);
System.setProperty("aws.secretAccessKey", s3SecretKey);

Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();
hadoopConfiguration.set("fs.s3a.endpoint", s3Endpoint);
if(s3Region != null) {
hadoopConfiguration.set("fs.s3a.endpoint.region", s3Region);
}
hadoopConfiguration.set("fs.s3a.access.key", s3AccessKey);
hadoopConfiguration.set("fs.s3a.secret.key", s3SecretKey);
hadoopConfiguration.set("fs.s3a.path.style.access", "true");
hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");

You can use rest catalog name to query in spark sql, because rest catalog name is defined in spark configuration previously.

        // get table schema created.
StructType schema = spark.table("rest.iceberg_db.test_iceberg").schema();

// write to iceberg table.
Dataset<Row> newDf = spark.createDataFrame(df.javaRDD(), schema);
newDf.writeTo("rest.iceberg_db.test_iceberg").append();

// show data in table.
spark.table("rest.iceberg_db.test_iceberg").show(30);

Ok, let’s run the spark example job on local to save data as iceberg table to the warehouse using iceberg rest catalog.

cd spark;

mvn -e -Dtest=RunSparkWithIcebergRestCatalog \
-Ds3AccessKey=<s3-access-key> \
-Ds3SecretKey=<s3-secret-key> \
-Ds3Endpoint=https://<s3-endpoint> \
-Ds3Region=<s3-region> \
-DrestUrl=http://<rest-catalog-server-ip>:8181 \
-Dwarehouse=s3a://<your-bucket>/<warehouse-path> \
-Dtoken=<token> \
test;
  • <rest-catalog-server-ip> is the ip address of iceberg rest catalog server.

Take note that <token> will be used to access the iceberg rest catalog server.

After running this spark job, it will create iceberg_db schema and test_iceberg table if not exists and save data as iceberg table test_iceberg in iceberg rest catalog warehouse selected above.

The output of the job looks like this.

2023-04-01 17:52:08,448 INFO [task-result-getter-2] org.apache.spark.internal.Logging: Finished task 0.0 in stage 6.0 (TID 206) in 569 ms on 192.168.0.20 (executor driver) (1/1)
2023-04-01 17:52:08,448 INFO [task-result-getter-2] org.apache.spark.internal.Logging: Removed TaskSet 6.0, whose tasks have all completed, from pool
2023-04-01 17:52:08,448 INFO [dag-scheduler-event-loop] org.apache.spark.internal.Logging: ResultStage 6 (show at RunSparkWithIcebergRestCatalog.java:84) finished in 0.572 s
2023-04-01 17:52:08,449 INFO [dag-scheduler-event-loop] org.apache.spark.internal.Logging: Job 5 is finished. Cancelling potential speculative or zombie tasks for this job
2023-04-01 17:52:08,449 INFO [dag-scheduler-event-loop] org.apache.spark.internal.Logging: Killing all running tasks in stage 6: Stage finished
2023-04-01 17:52:08,449 INFO [main] org.apache.spark.internal.Logging: Job 5 finished: show at RunSparkWithIcebergRestCatalog.java:84, took 0.573713 s
+--------------------+------------+-----+--------+
| baseproperties| itemid|price|quantity|
+--------------------+------------+-----+--------+
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
|{cart-event, 1527...|any-item-id0| 1000| 2|
+--------------------+------------+-----+--------+

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 9.242 s - in com.cloudcheflabs.iceberg.catalog.rest.RunSparkWithIcebergRestCatalog
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10.796 s
[INFO] Finished at: 2023-04-01T17:52:08+09:00
[INFO] ------------------------------------------------------------------------

Add Iceberg REST Catalog to Trino

Create an iceberg rest catalog as iceberg_rest.properties to trino like this.

connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://<rest-catalog-server-ip>:8181
iceberg.rest-catalog.security=OAUTH2
iceberg.rest-catalog.oauth2.token=<token>
iceberg.rest-catalog.warehouse=s3://<your-bucket>/<warehouse-path>
hive.s3.aws-access-key=<s3-access-key>
hive.s3.aws-secret-key=<s3-secret-key>
hive.s3.endpoint=https://<s3-endpoint>
hive.s3.region=<s3-region>
hive.s3.path-style-access=true
hive.s3.ssl.enabled=true

Run the following CTAS trino query to move data from spark based data lake to trino based one.

CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.imported_from_rest
AS
SELECT
*
FROM iceberg_rest.iceberg_db.test_iceberg

It means all the iceberg data of the table iceberg_db.test_iceberg created by spark job previously from spark based data lake will be moved to the table iceberg_db.imported_from_rest in trino based data lakehouse. You can also run trino query to move iceberg data from trino based data lakehouse to spark based one in similar way.

Install Iceberg REST Catalog Server with Helm

To install the iceberg rest catalog server on kubernetes, use the following helm chart.

helm repo add iceberg-rest-catalog https://cloudcheflabs.github.io/iceberg-rest-catalog-helm-repo/
helm repo update

helm upgrade \
rest-catalog \
--install \
--create-namespace \
--namespace rest-catalog \
--version v1.1.0 \
--set jettyIngress.enabled=true \
--set jettyIngress.hostName=<ingress-hostname> \
--set restCatalog.token=<token> \
--set restCatalog.warehouse=s3a://<your-bucket>/<warehouse-path> \
--set restCatalog.s3.accessKey=<s3-access-key> \
--set restCatalog.s3.secretKey=<s3-secret-key> \
--set restCatalog.s3.endpoint=https://<s3-endpoint> \
--set restCatalog.s3.region=<s3-region> \
--set restCatalog.jdbc.url=jdbc:mysql://mysql-service.rest-catalog.svc:3306/rest_catalog?useSSL=false\&createDatabaseIfNotExist=true \
--set restCatalog.jdbc.user=root \
--set restCatalog.jdbc.password=mysqlpass123 \
iceberg-rest-catalog/iceberg-rest-catalog;
  • <ingress-hostname> is ingress hostname, for instance, your-rest-catalog.yourdomain.com which needs to be added as dns entry with the ip address of ingress controller like nginx to your DNS server beforehand.

Now, iceberg rest catalog server can be accessed publicly using the hostname of https://your-rest-catalog.yourdomain.com .

Data Movement in Chango using Iceberg REST Catalog

Chango is sql data lakehouse cloud service which is based on trino as query engine, iceberg as storage table format and other components for data ingestion and streaming functionalities. As seen in the previous picture above, trino based data lakehouse can be replaced with chango. That is, with adding iceberg rest catalog to chango, you can move iceberg data between chango and other data lakes like spark based one using iceberg rest catalog server with ease.

That’s it.

--

--