Hidden Partitioned Iceberg Table in Chango
There are some ways to create hidden partitioned iceberg tables in Chango. You can create them with trino, but trino does not support all the features of iceberg at the moment. Chango provides Chango Spark SQL Runner
and Chango Spark Thrift Server
to execute spark sql which can be iceberg specific queries. Chango Spark SQL Runner
and Chango Spark Thrift Server
provide full features of iceberg, so they are suitable to run iceberg specific queries like DDL and iceberg maintenance queries.
This article shows how to create hidden partitioned iceberg table using Chango Spark Thrift Server
.
Connect Chango Spark Thrift Server using Superset
After installing Chango Spark Thrift Server
in Chango, you can connect Chango Spark Thrift Server
with Superset. See Connect Chango Spark Thrift Server using Superset.
Hidden Partitioned Iceberg Table
You can create hidden partitioned iceberg table like this.
--hidden partitioned table.
CREATE TABLE IF NOT EXISTS iceberg.iceberg_db.hidden_partitioning (
id long,
log string,
ts TIMESTAMP_LTZ
)
USING iceberg
;
-- add hidden partitions.
ALTER TABLE iceberg.iceberg_db.hidden_partitioning ADD PARTITION FIELD day(ts);
Column ts
must be timestamp type. After creating iceberg table, you need to add partitions using iceberg functions like one of year()
, month()
, day()
or hour()
.
Create Hidden Partitioned Iceberg Table using Superset
Run the iceberg DDL queries using Superset like this.
Then, you will get table description with the following query.
-- describe table.
describe iceberg.iceberg_db.hidden_partitioning;
Partitioning fields are shown as in the picture.
Insert Data to Hidden Partitioned Iceberg Table
Let’s insert json data as below to the hidden partitioned iceberg table with spark.
{"id": 1717045290030, "log": "any log message 1", "ts": "2024-05-30T14:01:30.030+09:00"}
{"id": 1717045290031, "log": "any log message 2", "ts": "2024-05-30T14:01:31.030+09:00"}
{"id": 1717045290032, "log": "any log message 3", "ts": "2024-05-30T14:01:32.030+09:00"}
{"id": 1717045290033, "log": "any log message 4", "ts": "2024-05-30T14:01:33.030+09:00"}
{"id": 1717045290034, "log": "any log message 5", "ts": "2024-05-30T14:01:34.030+09:00"}
{"id": 1717045290035, "log": "any log message 6", "ts": "2024-05-30T14:01:35.030+09:00"}
{"id": 1717045290036, "log": "any log message 7", "ts": "2024-05-30T14:01:36.030+09:00"}
{"id": 1717045290037, "log": "any log message 8", "ts": "2024-05-30T14:01:37.030+09:00"}
{"id": 1717045290038, "log": "any log message 9", "ts": "2024-05-30T14:01:38.030+09:00"}
{"id": 1717045290039, "log": "any log message 10", "ts": "2024-05-30T14:01:39.030+09:00"}
You will get spark session which connects to Chango REST Catalog
to createiceberg
catalog like this.
public static SparkSession createSparkSessionForIcebergRESTCatalog(
String s3AccessKey,
String s3SecretKey,
String s3Endpoint,
String s3Region,
String bucket,
String restUrl,
String restWarehouse,
String restToken,
boolean isLocal
) {
SparkConf sparkConf = new SparkConf().setAppName("Run Spark using Iceberg REST Catalog");
if(isLocal) {
sparkConf.setMaster("local[2]");
}
// set aws system properties.
System.setProperty("aws.region", (s3Region != null) ? s3Region : "us-east-1");
System.setProperty("aws.accessKeyId", s3AccessKey);
System.setProperty("aws.secretAccessKey", s3SecretKey);
// iceberg rest catalog.
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog");
sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.rest.RESTCatalog");
sparkConf.set("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
sparkConf.set("spark.sql.catalog.iceberg.uri", restUrl);
sparkConf.set("spark.sql.catalog.iceberg.warehouse", restWarehouse);
sparkConf.set("spark.sql.catalog.iceberg.token", restToken);
sparkConf.set("spark.sql.catalog.iceberg.s3.endpoint", s3Endpoint);
sparkConf.set("spark.sql.catalog.iceberg.s3.path-style-access", "true");
sparkConf.set("spark.sql.defaultCatalog", "iceberg");
SparkSession spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();
hadoopConfiguration.set("fs.s3a.bucket." + bucket + ".endpoint", s3Endpoint);
if(s3Region != null) {
hadoopConfiguration.set("fs.s3a.bucket." + bucket + ".endpoint.region", s3Region);
}
hadoopConfiguration.set("fs.s3a.bucket." + bucket + ".access.key", s3AccessKey);
hadoopConfiguration.set("fs.s3a.bucket." + bucket + ".secret.key", s3SecretKey);
hadoopConfiguration.set("fs.s3a.path.style.access", "true");
hadoopConfiguration.set("fs.s3a.change.detection.mode", "warn");
hadoopConfiguration.set("fs.s3a.change.detection.version.required", "false");
hadoopConfiguration.set("fs.s3a.multiobjectdelete.enable", "true");
hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
hadoopConfiguration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
return spark;
}
Run the following spark job to insert local json data to iceberg table which is hidden partitioned in Chango.
// chango s3 properties.
String s3AccessKey = System.getProperty("s3AccessKey");
String s3SecretKey = System.getProperty("s3SecretKey");
String s3Endpoint = System.getProperty("s3Endpoint");
String s3Region = System.getProperty("s3Region");
String s3Bucket = System.getProperty("s3Bucket");
// iceberg rest catalog.
String restUrl = System.getProperty("restUrl");
String restWarehouse = System.getProperty("restWarehouse");
String restToken = System.getProperty("restToken");
String icebergSchema = System.getProperty("schema");
String icebergTable = System.getProperty("table");
boolean isLocal = true;
// spark session for chango iceberg rest catalog.
SparkSession sparkForIceberg = ImportExportHelper.createSparkSessionForIcebergRESTCatalog(
s3AccessKey,
s3SecretKey,
s3Endpoint,
s3Region,
s3Bucket,
restUrl,
restWarehouse,
restToken,
isLocal
);
// make json list.
String json = StringUtils.fileToString("data/hidden-partition.json", true);
String lines[] = json.split("\\r?\\n");
List<String> jsonList = new ArrayList<>();
for(String line : lines) {
jsonList.add(line);
}
String tableName = "iceberg" + "." + icebergSchema + "." + icebergTable;
// make dataframe from json list with schema.
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sparkForIceberg.sparkContext());
StructType schema = sparkForIceberg.table(tableName).schema();
JavaRDD<String> javaRDD = jsc.parallelize(jsonList);
Dataset<String> jsonDs = sparkForIceberg.createDataset(javaRDD.rdd(), Encoders.STRING());
Dataset<Row> df = sparkForIceberg.read().json(jsonDs);
// convert data type according to schema.
for(StructField structField : schema.fields()) {
String field = structField.name();
DataType dataType = structField.dataType();
df = df.withColumn(field, df.col(field).cast(dataType));
}
df.printSchema();
Dataset<Row> newDf = sparkForIceberg.createDataFrame(df.javaRDD(), schema);
newDf.writeTo(tableName).append();
All the source codes can be found here: https://github.com/cloudcheflabs/spark-iceberg-example/blob/master/src/test/java/co/cloudcheflabs/example/spark/component/HiddenPartitionTestRunner.java
Explore iceberg table in Superset with the following query.
-- select.
select * from iceberg.iceberg_db.hidden_partitioning;
You will see path of the created data file in the hidden partitioned iceberg table with the following query.
-- table files.
select * from iceberg.iceberg_db.hidden_partitioning.files;
Take a look at the value of column file_path
.
s3a://<chango-bucket>/warehouse-rest/iceberg_db/hidden_partitioning/data/ts_year=2024/ts_month=2024-05/ts_day=2024-05-30/00000-4-1019cfd4-6c3b-418f-88b5-e40b5c314557-00001.parquet
The data files have been created in the hidden partitioned paths in iceberg table automatically.
Even if you have not defined partition fields explicitly in iceberg table, iceberg supported engines like spark including Chango Spark Thrift Server
and trino will scan only the data files in the hidden partitioned path when such as query like select * from iceberg.iceberg_db.hidden_partitioning where ts xxx
is executed.
That’s all.
Visit Chango.