Hidden Partitioned Iceberg Table in Chango

Kidong Lee
4 min readMay 31, 2024

--

There are some ways to create hidden partitioned iceberg tables in Chango. You can create them with trino, but trino does not support all the features of iceberg at the moment. Chango provides Chango Spark SQL Runner and Chango Spark Thrift Server to execute spark sql which can be iceberg specific queries. Chango Spark SQL Runner and Chango Spark Thrift Server provide full features of iceberg, so they are suitable to run iceberg specific queries like DDL and iceberg maintenance queries.

This article shows how to create hidden partitioned iceberg table using Chango Spark Thrift Server.

Connect Chango Spark Thrift Server using Superset

After installing Chango Spark Thrift Server in Chango, you can connect Chango Spark Thrift Server with Superset. See Connect Chango Spark Thrift Server using Superset.

Hidden Partitioned Iceberg Table

You can create hidden partitioned iceberg table like this.

--hidden partitioned table.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.hidden_partitioning (
id long,
log string,
ts TIMESTAMP_LTZ
)
USING iceberg
;
-- add hidden partitions.
ALTER TABLE iceberg.iceberg_db.hidden_partitioning ADD PARTITION FIELD day(ts);

Column ts must be timestamp type. After creating iceberg table, you need to add partitions using iceberg functions like one of year(), month() , day() or hour().

Create Hidden Partitioned Iceberg Table using Superset

Run the iceberg DDL queries using Superset like this.

Then, you will get table description with the following query.

-- describe table.
describe iceberg.iceberg_db.hidden_partitioning;

Partitioning fields are shown as in the picture.

Insert Data to Hidden Partitioned Iceberg Table

Let’s insert json data as below to the hidden partitioned iceberg table with spark.

{"id": 1717045290030, "log":  "any log message 1", "ts":  "2024-05-30T14:01:30.030+09:00"}
{"id": 1717045290031, "log": "any log message 2", "ts": "2024-05-30T14:01:31.030+09:00"}
{"id": 1717045290032, "log": "any log message 3", "ts": "2024-05-30T14:01:32.030+09:00"}
{"id": 1717045290033, "log": "any log message 4", "ts": "2024-05-30T14:01:33.030+09:00"}
{"id": 1717045290034, "log": "any log message 5", "ts": "2024-05-30T14:01:34.030+09:00"}
{"id": 1717045290035, "log": "any log message 6", "ts": "2024-05-30T14:01:35.030+09:00"}
{"id": 1717045290036, "log": "any log message 7", "ts": "2024-05-30T14:01:36.030+09:00"}
{"id": 1717045290037, "log": "any log message 8", "ts": "2024-05-30T14:01:37.030+09:00"}
{"id": 1717045290038, "log": "any log message 9", "ts": "2024-05-30T14:01:38.030+09:00"}
{"id": 1717045290039, "log": "any log message 10", "ts": "2024-05-30T14:01:39.030+09:00"}

You will get spark session which connects to Chango REST Catalog to createiceberg catalog like this.

    public static SparkSession createSparkSessionForIcebergRESTCatalog(
String s3AccessKey,
String s3SecretKey,
String s3Endpoint,
String s3Region,
String bucket,
String restUrl,
String restWarehouse,
String restToken,
boolean isLocal
) {
SparkConf sparkConf = new SparkConf().setAppName("Run Spark using Iceberg REST Catalog");
if(isLocal) {
sparkConf.setMaster("local[2]");
}


// set aws system properties.
System.setProperty("aws.region", (s3Region != null) ? s3Region : "us-east-1");
System.setProperty("aws.accessKeyId", s3AccessKey);
System.setProperty("aws.secretAccessKey", s3SecretKey);

// iceberg rest catalog.
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog");
sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.rest.RESTCatalog");
sparkConf.set("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
sparkConf.set("spark.sql.catalog.iceberg.uri", restUrl);
sparkConf.set("spark.sql.catalog.iceberg.warehouse", restWarehouse);
sparkConf.set("spark.sql.catalog.iceberg.token", restToken);
sparkConf.set("spark.sql.catalog.iceberg.s3.endpoint", s3Endpoint);
sparkConf.set("spark.sql.catalog.iceberg.s3.path-style-access", "true");
sparkConf.set("spark.sql.defaultCatalog", "iceberg");

SparkSession spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();
hadoopConfiguration.set("fs.s3a.bucket." + bucket + ".endpoint", s3Endpoint);
if(s3Region != null) {
hadoopConfiguration.set("fs.s3a.bucket." + bucket + ".endpoint.region", s3Region);
}
hadoopConfiguration.set("fs.s3a.bucket." + bucket + ".access.key", s3AccessKey);
hadoopConfiguration.set("fs.s3a.bucket." + bucket + ".secret.key", s3SecretKey);
hadoopConfiguration.set("fs.s3a.path.style.access", "true");
hadoopConfiguration.set("fs.s3a.change.detection.mode", "warn");
hadoopConfiguration.set("fs.s3a.change.detection.version.required", "false");
hadoopConfiguration.set("fs.s3a.multiobjectdelete.enable", "true");
hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");

return spark;
}

Run the following spark job to insert local json data to iceberg table which is hidden partitioned in Chango.

        // chango s3 properties.
String s3AccessKey = System.getProperty("s3AccessKey");
String s3SecretKey = System.getProperty("s3SecretKey");
String s3Endpoint = System.getProperty("s3Endpoint");
String s3Region = System.getProperty("s3Region");
String s3Bucket = System.getProperty("s3Bucket");

// iceberg rest catalog.
String restUrl = System.getProperty("restUrl");
String restWarehouse = System.getProperty("restWarehouse");
String restToken = System.getProperty("restToken");

String icebergSchema = System.getProperty("schema");
String icebergTable = System.getProperty("table");

boolean isLocal = true;

// spark session for chango iceberg rest catalog.
SparkSession sparkForIceberg = ImportExportHelper.createSparkSessionForIcebergRESTCatalog(
s3AccessKey,
s3SecretKey,
s3Endpoint,
s3Region,
s3Bucket,
restUrl,
restWarehouse,
restToken,
isLocal
);

// make json list.
String json = StringUtils.fileToString("data/hidden-partition.json", true);
String lines[] = json.split("\\r?\\n");
List<String> jsonList = new ArrayList<>();
for(String line : lines) {
jsonList.add(line);
}

String tableName = "iceberg" + "." + icebergSchema + "." + icebergTable;

// make dataframe from json list with schema.
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkForIceberg.sparkContext());
StructType schema = sparkForIceberg.table(tableName).schema();
JavaRDD<String> javaRDD = jsc.parallelize(jsonList);
Dataset<String> jsonDs = sparkForIceberg.createDataset(javaRDD.rdd(), Encoders.STRING());
Dataset<Row> df = sparkForIceberg.read().json(jsonDs);

// convert data type according to schema.
for(StructField structField : schema.fields()) {
String field = structField.name();
DataType dataType = structField.dataType();
df = df.withColumn(field, df.col(field).cast(dataType));
}

df.printSchema();

Dataset<Row> newDf = sparkForIceberg.createDataFrame(df.javaRDD(), schema);
newDf.writeTo(tableName).append();

All the source codes can be found here: https://github.com/cloudcheflabs/spark-iceberg-example/blob/master/src/test/java/co/cloudcheflabs/example/spark/component/HiddenPartitionTestRunner.java

Explore iceberg table in Superset with the following query.

-- select.
select * from iceberg.iceberg_db.hidden_partitioning;

You will see path of the created data file in the hidden partitioned iceberg table with the following query.

-- table files.
select * from iceberg.iceberg_db.hidden_partitioning.files;

Take a look at the value of column file_path.

s3a://<chango-bucket>/warehouse-rest/iceberg_db/hidden_partitioning/data/ts_year=2024/ts_month=2024-05/ts_day=2024-05-30/00000-4-1019cfd4-6c3b-418f-88b5-e40b5c314557-00001.parquet

The data files have been created in the hidden partitioned paths in iceberg table automatically.

Even if you have not defined partition fields explicitly in iceberg table, iceberg supported engines like spark including Chango Spark Thrift Server and trino will scan only the data files in the hidden partitioned path when such as query like select * from iceberg.iceberg_db.hidden_partitioning where ts xxx is executed.

That’s all.

Visit Chango.

--

--

Kidong Lee
Kidong Lee

Written by Kidong Lee

Founder of Cloud Chef Labs | Chango | Unified Data Lakehouse Platform | Iceberg centric Data Lakehouses https://www.cloudchef-labs.com/

No responses yet