How to load S3 files to HDFS using dynamic Hadoop configuration in the same Spark Context?

// s3 fs configuration.
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(new ClassPathResource("s3-fs-conf.properties"));
// spark session.
SparkSession spark = SparkSessionLoader.getSession(S3toHDFS.class.getName(), SparkSessionLoader.getDefaultConf(S3toHDFS.class.getName()), hadoopProps);
// S3 input path.
String input = ...;
// read s3 files and load them as RDD.
JavaRDD<Tuple2<String, PortableDataStream>> rdd = spark.sparkContext().binaryFiles(input, 2).toJavaRDD();

// convert PortableDataStream to user event rdd.
JavaRDD<UserEvents> userEventsRdd = rdd.mapPartitions(...).persist(StorageLevel.DISK_ONLY());
// HERE IS THE KEY: change defaultFS value s3 to hdfs.// hadoop configuration with the value of hdfs for defaultFS.
Resource resource = new ClassPathResource("hadoop-conf.properties");
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(resource);

Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();

for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}

// HDFS output path.
String finalOutput = ...;// first, delete the output path in hdfs.
fs = FileSystem.get(hadoopConfiguration);
fs.delete(new Path(finalOutput), true);

// convert user events to row.
JavaRDD<Row> row = userEventsRdd.mapPartitions(...);

// save as parquet on hdfs.
spark.createDataFrame(row, ...) .write().parquet(finalOutput);

spark.sparkContext().stop();
// s3 fs configuration.
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(new ClassPathResource("s3-fs-conf.properties"));
// spark session.
SparkSession spark = SparkSessionLoader.getSession(S3toHDFS.class.getName(), SparkSessionLoader.getDefaultConf(S3toHDFS.class.getName()), hadoopProps);
fs.defaultFS=s3a://mybucket
fs.s3a.access.key=any-access-key
fs.s3a.secret.key=any-secret-key
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;

import java.net.InetAddress;
import java.util.Properties;

public class SparkSessionLoader {

/**
* get spark session.
*
*
@param appName app name.
*
@return
*/
public static SparkSession getSession(String appName)
{
SparkConf sparkConf = getDefaultConf(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

return sparkSession;
}


/**
* get spark session.
*
*
@param appName app name
*
@param s3AccessKey s3 access key.
*
@param s3SecretKey s3 secret key.
*
@return
*/
public static SparkSession getSession(String appName, String s3AccessKey, String s3SecretKey)
{
SparkConf sparkConf = getDefaultConf(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

Configuration hadoopConfiguration = sparkSession.sparkContext().hadoopConfiguration();

// set s3 configuration.
setS3Configuration
(hadoopConfiguration, s3AccessKey, s3SecretKey);

return sparkSession;
}


/**
* get spark session.
*
*
@param appName app name.
*
@param sparkConf spark configuration.
*
@return
*/
public static SparkSession getSession(String appName, SparkConf sparkConf)
{
sparkConf.setAppName(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

return sparkSession;
}

/**
* get spark session.
*
*
@param appName app name.
*
@param sparkConf spark configuration.
*
@param hadoopProps hadoop configuration properties.
*
@return
*/
public static SparkSession getSession(String appName, SparkConf sparkConf, Properties hadoopProps)
{
sparkConf.setAppName(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();


Configuration hadoopConfiguration = sparkSession.sparkContext().hadoopConfiguration();

for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}

return sparkSession;
}


/**
* get spark session.
*
*
@param appName app name.
*
@param sparkConf spark conf.
*
@param hadoopProps hadoop configuration.
*
@param s3AccessKey s3 access key.
*
@param s3SecretKey s3 secret key.
*
@return
*/
public static SparkSession getSession(String appName, SparkConf sparkConf, Properties hadoopProps, String s3AccessKey, String s3SecretKey)
{
sparkConf.setAppName(appName);

SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();


Configuration hadoopConfiguration = sparkSession.sparkContext().hadoopConfiguration();

for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}

// set s3 configuration.
setS3Configuration
(hadoopConfiguration, s3AccessKey, s3SecretKey);

return sparkSession;
}


/**
* set s3 configuration to hadoop configuration.
*
*
@param hadoopConfiguration hadoop configuration.
*
@param s3AccessKey s3 access key.
*
@param s3SecretKey s3 secret key.
*/
public static void setS3Configuration(Configuration hadoopConfiguration, String s3AccessKey, String s3SecretKey)
{
hadoopConfiguration.set("fs.s3a.access.key", s3AccessKey);
hadoopConfiguration.set("fs.s3a.secret.key", s3SecretKey);
}


/**
* get default spark configuration.
*
*
@param appName app name.
*
@return
*/
public static SparkConf getDefaultConf(String appName)
{
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.sql.parquet.compression.codec", "snappy");
sparkConf.setAppName(appName);

return sparkConf;
}


public static SparkConf getDefaultLocalConf(String appName, int threadCount)
{
// spark configuration for local mode.
SparkConf sparkConf = new SparkConf().setAppName(appName);
sparkConf.setMaster("local[" + threadCount + "]");
sparkConf.set("spark.sql.parquet.compression.codec", "snappy");


return sparkConf;
}
}
// hadoop configuration with the value of hdfs for defaultFS.
Resource resource = new ClassPathResource("hadoop-conf.properties");
Properties hadoopProps = PropertiesLoaderUtils.loadProperties(resource);

Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();

for (String key : hadoopProps.stringPropertyNames()) {
String value = hadoopProps.getProperty(key);
hadoopConfiguration.set(key, value);
}
fs.defaultFS=hdfs://mycluster
dfs.nameservices
=mycluster
dfs.ha.namenodes.mycluster
=nn1,nn2
dfs.namenode.rpc-address.mycluster.nn1
=hadoop-name1:8020
dfs.namenode.rpc-address.mycluster.nn2
=hadoop-name2:8020
dfs.client.failover.proxy.provider.mycluster
=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
// HDFS output path.
String finalOutput = ...;// first, delete the output path in hdfs.
fs = FileSystem.get(hadoopConfiguration);
fs.delete(new Path(finalOutput), true);

// convert user events to row.
JavaRDD<Row> row = userEventsRdd.mapPartitions(...);

// save as parquet on hdfs.
spark.createDataFrame(row, ...) .write().parquet(finalOutput);

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store