HowTo: Migrate data from an Elasticsearch Cluster to another Elasticsearch Cluster with Spark

Kidong Lee
2 min readMay 6, 2019

--

Sometimes, you have to migrate data from the current elasticsearch cluster to the new cluster with the reasons like, for instance, too many shards, etc. which you have designed in the wrong way before production.

Here, I am going to show you how to migrate data from an elasticsearch cluster to another elasticsearch cluster with Spark.

First, read data as RDD from a elasticsearch cluster with spark,

and then, write RDD loaded from the source elasticsearch to the target elasticsearch cluster with the change of elasticsearch configurations in Spark Context.

It is simple and similar way like loading data from HDFS/S3 to S3/HDFS with Spark mentioned in my last blog: https://medium.com/@mykidong/howto-load-s3-files-to-hdfs-using-dynamic-hadoop-configuration-in-the-same-spark-context-dece48a31a2e

Let’s see some codes.

Create SparkSession with the source Elasticsearch configurations like this:

// spark conf.
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("MyAPP");
...
// elasticsearch specific configuration.
sparkConf.set("es.nodes", "es-source-host")
.set("es.port", "9200")
.set("es.index.read.missing.as.empty", "true")
.set("es.modes.wan.only", "true");
// spark session.
SparkSession spark = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();

And, read rdd from the source elasticsearch with this spark session:

// source index.
String fromIndex = "source-index";
// source index type.
String type = "source-index-type";
// rdd read from the source elasticsearch cluster.
JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(new JavaSparkContext(spark.sparkContext()), fromIndex + "/" + type).persist(StorageLevel.DISK_ONLY());

Now, it is time to write the loaded RDD from the source elasticsearch to another cluster.

Let’s change elasticsearch configurations to the target elasticsearch cluster in spark context like this:

// set the target elasticsearch configurations.
Map<String, String> esSparkConf = new HashMap<>();
esSparkConf.put("es.nodes", "target-elasticsarch-host");
esSparkConf.put("es.port", "9200");

Before you write RDD to the target elasticsearch cluster, you should create an elasticsearch template to manage indices, aliases, etc in the target elasticsearch cluster. If you are not familiar with Elasticsearch Template, see this link: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-templates.html

Now, write RDD loaded from source elasticsearch to the target elasticsearch cluster:

// target index.
String toIndex = "target-index-name";
// write rdd loaded from the source elasticsearch to the target elasticsearch with the changed elasticsearch configuration.
JavaEsSpark.saveToEsWithMeta(rdd, toIndex + "/" + type, esSparkConf);

Take a look at the esSparkConf as a last parameter in the above. With this configuration, you can change the target elasticsearch which the loaded RDD from the source elasticsearch will be saved to.

Before writing to the target elasticsearch, you can also manipulate the loaded RDD from the source elasticsearch to write the specific portions of data.

There can be another way to migrate data in elasticsearch, but with spark in the above mentioned way, you can have more options to handle the loaded RDD from the source elasticsearch and to write manipulated RDD to another elasticsearch cluster in distributed and efficient way.

--

--

Kidong Lee
Kidong Lee

Written by Kidong Lee

Founder of Cloud Chef Labs | Chango | Unified Data Lakehouse Platform | Iceberg centric Data Lakehouses https://www.cloudchef-labs.com/

No responses yet