HowTo: Load S3 files to HDFS using dynamic hadoop configuration in the same Spark Context

Kidong Lee
4 min readMar 23, 2019

--

It is a little bit hard to load S3 files to HDFS with Spark. Some scenario to do that is, first read files from S3 using S3 API, and parallelize them as RDD which will be saved to parquet files on HDFS. But it is not efficient way to load a lot of big size S3 files.

I wanted to load S3 files to HDFS in the same Spark Context without using such S3 API.

Here, I will show you how to do that.

First, you can read files from S3 using hadoop configuration of fs.defaultFS with the value of s3a://mybucket .

After loading S3 files to RDD, you can change the hadoop configuration of fs.defaultFS with the value of hdfs://mycluster from the same spark context. Let’s say, it is called as dynamic hadoop configuration in the same spark context.

Finally, you can save RDD to for instance, parquet files on HDFS with the same spark context.

Let’s see some spark codes below:

// s3 fs configuration.
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(new ClassPathResource("s3-fs-conf.properties"));
// spark session.
SparkSession spark = SparkSessionLoader.getSession(S3toHDFS.class.getName(), SparkSessionLoader.getDefaultConf(S3toHDFS.class.getName()), hadoopProps);
// S3 input path.
String input = ...;
// read s3 files and load them as RDD.
JavaRDD<Tuple2<String, PortableDataStream>> rdd = spark.sparkContext().binaryFiles(input, 2).toJavaRDD();

// convert PortableDataStream to user event rdd.
JavaRDD<UserEvents> userEventsRdd = rdd.mapPartitions(...).persist(StorageLevel.DISK_ONLY());

// HERE IS THE KEY: change defaultFS value s3 to hdfs.
// hadoop configuration with the value of hdfs for defaultFS.
Resource resource = new ClassPathResource("hadoop-conf.properties");
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(resource);

Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();

for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}

// HDFS output path.
String finalOutput = ...;
// first, delete the output path in hdfs.
fs = FileSystem.get(hadoopConfiguration);
fs.delete(new Path(finalOutput), true);

// convert user events to row.
JavaRDD<Row> row = userEventsRdd.mapPartitions(...);

// save as parquet on hdfs.
spark.createDataFrame(row, ...) .write().parquet(finalOutput);

spark.sparkContext().stop();

Let’s see the part of above codes to create spark session to access S3:

// s3 fs configuration.
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(new ClassPathResource("s3-fs-conf.properties"));
// spark session.
SparkSession spark = SparkSessionLoader.getSession(S3toHDFS.class.getName(), SparkSessionLoader.getDefaultConf(S3toHDFS.class.getName()), hadoopProps);

You can set hadoop configuration to spark session from s3-fs-conf.properties which looks like this:

fs.defaultFS=s3a://mybucket
fs.s3a.access.key=any-access-key
fs.s3a.secret.key=any-secret-key

Now SparkSession instance is created using SparkSessionLoader class which looks like this:


import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

import java.net.InetAddress;
import java.util.Properties;

public class SparkSessionLoader {

/**
* get spark session.
*
*
@param appName app name.
*
@return
*/
public static SparkSession getSession(String appName)
{
SparkConf sparkConf = getDefaultConf(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

return sparkSession;
}


/**
* get spark session.
*
*
@param appName app name
*
@param s3AccessKey s3 access key.
*
@param s3SecretKey s3 secret key.
*
@return
*/
public static SparkSession getSession(String appName, String s3AccessKey, String s3SecretKey)
{
SparkConf sparkConf = getDefaultConf(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Configuration hadoopConfiguration = sparkSession.sparkContext().hadoopConfiguration();

// set s3 configuration.
setS3Configuration
(hadoopConfiguration, s3AccessKey, s3SecretKey);

return sparkSession;
}


/**
* get spark session.
*
*
@param appName app name.
*
@param sparkConf spark configuration.
*
@return
*/
public static SparkSession getSession(String appName, SparkConf sparkConf)
{
sparkConf.setAppName(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

return sparkSession;
}

/**
* get spark session.
*
*
@param appName app name.
*
@param sparkConf spark configuration.
*
@param hadoopProps hadoop configuration properties.
*
@return
*/
public static SparkSession getSession(String appName, SparkConf sparkConf, Properties hadoopProps)
{
sparkConf.setAppName(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();


Configuration hadoopConfiguration = sparkSession.sparkContext().hadoopConfiguration();

for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}

return sparkSession;
}


/**
* get spark session.
*
*
@param appName app name.
*
@param sparkConf spark conf.
*
@param hadoopProps hadoop configuration.
*
@param s3AccessKey s3 access key.
*
@param s3SecretKey s3 secret key.
*
@return
*/
public static SparkSession getSession(String appName, SparkConf sparkConf, Properties hadoopProps, String s3AccessKey, String s3SecretKey)
{
sparkConf.setAppName(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();


Configuration hadoopConfiguration = sparkSession.sparkContext().hadoopConfiguration();

for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}

// set s3 configuration.
setS3Configuration
(hadoopConfiguration, s3AccessKey, s3SecretKey);

return sparkSession;
}


/**
* set s3 configuration to hadoop configuration.
*
*
@param hadoopConfiguration hadoop configuration.
*
@param s3AccessKey s3 access key.
*
@param s3SecretKey s3 secret key.
*/
public static void setS3Configuration(Configuration hadoopConfiguration, String s3AccessKey, String s3SecretKey)
{
hadoopConfiguration.set("fs.s3a.access.key", s3AccessKey);
hadoopConfiguration.set("fs.s3a.secret.key", s3SecretKey);
}


/**
* get default spark configuration.
*
*
@param appName app name.
*
@return
*/
public static SparkConf getDefaultConf(String appName)
{
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.sql.parquet.compression.codec", "snappy");
sparkConf.setAppName(appName);

return sparkConf;
}


public static SparkConf getDefaultLocalConf(String appName, int threadCount)
{
// spark configuration for local mode.
SparkConf sparkConf = new SparkConf().setAppName(appName);
sparkConf.setMaster("local[" + threadCount + "]");
sparkConf.set("spark.sql.parquet.compression.codec", "snappy");


return sparkConf;
}
}

After reading S3 files which are loaded as RDD, Hadoop Configuration of defaultFS in the same spark context has to be changed like this:

// hadoop configuration with the value of hdfs for defaultFS.
Resource resource = new ClassPathResource("hadoop-conf.properties");
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(resource);

Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();

for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}

Hadoop configuration with the value of hdfs for defaultFS, hadoop-conf.properties looks like this:

fs.defaultFS=hdfs://mycluster
dfs.nameservices
=mycluster
dfs.ha.namenodes.mycluster
=nn1,nn2
dfs.namenode.rpc-address.mycluster.nn1
=hadoop-name1:8020
dfs.namenode.rpc-address.mycluster.nn2
=hadoop-name2:8020
dfs.client.failover.proxy.provider.mycluster
=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

This is hadoop namenode HA configuration to access HDFS.

Finally, save RDD loaded from S3 to parquet file on HDFS:

// HDFS output path.
String finalOutput = ...;
// first, delete the output path in hdfs.
fs = FileSystem.get(hadoopConfiguration);
fs.delete(new Path(finalOutput), true);

// convert user events to row.
JavaRDD<Row> row = userEventsRdd.mapPartitions(...);

// save as parquet on hdfs.
spark.createDataFrame(row, ...) .write().parquet(finalOutput);

Take a look at RDD userEventsRdd from S3 data. After changing hadoop configuration of defaultFS to hdfs, this userEventsRdd will be saved to parquet file on HDFS.

With this simple dynamic change of defaultFS hadoop configuration in spark context, you can load S3 data and save them to HDFS in the same spark context.

If you have files, for instance, parquet files on HDFS and want to backup them to S3, you can use the same way of the dynamic hadoop configuration like above.

--

--

Kidong Lee
Kidong Lee

Written by Kidong Lee

Founder of Cloud Chef Labs | Chango | Unified Data Lakehouse Platform | Iceberg centric Data Lakehouses https://www.cloudchef-labs.com/

Responses (1)