Java – Using Hadoop to connect to ElasticSearch from Spark does not work

Using Hadoop to connect to ElasticSearch from Spark does not work… here is a solution to the problem.

Using Hadoop to connect to ElasticSearch from Spark does not work

I’m having trouble connecting to an ElasticSearch node running locally from my Java code, which runs as a job submitted to Spark (run locally). However, when I don’t use Spark, there is no problem with the connection. Also running Python jobs and submitting them to spark works fine.

I know that for Java I need to connect via port 9300 instead of 9200 (HTTP port). Still, I always get the same exception, there is no difference between reading or writing:

16/08/04 16:51:55 ERROR NetworkClient: Node [The server localhost failed to respond with a valid HTTP response] failed (localhost:9300); no other nodes left - aborting...
Exception in thread "main" org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[localhost:9300]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:102)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:282)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:266)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:270)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:108)
    at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:90)
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:61)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:434)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:415)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala. Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1307)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
    at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1342)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1341)
    at org.apache.spark.api.java.JavaPairRDD.first(JavaPairRDD.scala:211)
    at com.dd.mediaforce.spark.most_popular. ExecutorMostPopular.main(ExecutorMostPopular.java:564)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

We are running Spark and ElasticSearch on a number of nodes. The Python code is running fine here, but trying the Java code with this setup of ES didn’t help solving the problem either.

The code I’m using connect from Java:

    SparkConf _sparkConf = new SparkConf()
            .setMaster("local[*]")
            .setAppName("Test");
    JavaSparkContext jsc = new JavaSparkContext(_sparkConf);
    Configuration conf = new Configuration();
    conf.set("cluster.name", "our_clustername");
    conf.set("es.nodes", "localhost");
    conf.setInt("es.port", 9300);
    conf.set("es.resource", index_and_type);
    JavaPairRDD readRdd = jsc.newAPIHadoopRDD(conf, org.elasticsearch.hadoop.mr.EsInputFormat.class, org.apache.hadoop.io.NullWritable.class, org.elasticsearch.hadoop.mr.LinkedMapWritable.class);
    System.out.println(readRdd.first());
    jsc.stop();

As mentioned earlier, the following Java code using TransportClient (no Spark) connects to ES without issues, writes and reads work fine:

    Client client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300 ));

ImmutableOpenMap<String, IndexMetaData> indices = client.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
    for (ObjectCursor<IndexMetaData> value : indices.values()) {
        log.info("Index: " + value.index + " : " + value.toString());
    }

GetResponse response = client.prepareGet("index_name", "type_name", "1").get();
    log.info(response.getIndex() + " : " + response.getId() + " : " + response.isExists());

String field_id = "6";
    IndexRequest indexRequest = new IndexRequest("index_name", "type", "2")
        .source(jsonBuilder()
                .startObject()
                .prettyPrint()
                .field("field_id", field_id)
                .field("another_field", "value")
                .field("integer_field", 100)
                .endObject());

UpdateRequest updateRequest = new UpdateRequest("index_name", "type_name", article_id)
        .doc(jsonBuilder()
                .startObject()
                .prettyPrint()
                .field("field_id", field_id)
                .field("another_field", "value")
                .field("integer_field", 100)
                .endObject())
                .upsert(indexRequest);

UpdateResponse responseUpdate = client.update(updateRequest).get();
    log.info(responseUpdate.getIndex() + " : " + responseUpdate.getGetResult() + " : " + responseUpdate.getType());
    client.close();

Any suggestions are welcome as I have been stuck here for days without any further impressions. I obviously googled this question and searched on StackOverflow, but so far I haven’t found an answer to my question.

For completeness, some Python code can also read and write ES well using Spark.

conf = SparkConf()
conf = conf.setAppName('Test')
sc = SparkContext(conf=conf)

#Omitting some of the code in creating some_rdd on Spark: 

index_and_type = index_name + '/type_name'
groovy_script = "if (ctx._source.%s) { ctx._source.%s+=value } else { ctx._source.%s=value }" % (field, field, field)

es_db_connection_dictionary = {
    "es.nodes": db_hosts,
    "es.port": db_port,
    "es.resource": index_and_type,
    "es.write.operation": "upsert",
    "es.mapping.id": "field_id",
    "es.update.script": groovy_script,
    "es.update.script.params": "value:%s" % integer_field,
    "es.http.timeout": "10s"
}

es_input = views_tuple_rdd.map(lambda item: (item[0],
        {
            'field_id': item[0],
            "integer_field": item[1],
            "another_field": client_name,
        }))

es_input.saveAsNewAPIHadoopFile(
        path='-',
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable",
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
        conf=es_db_connection_dictionary)

Solution

Typically, if

you’re using the Elasticsearch-Spark connector, you don’t need port 9300 if the default port is 9200. It behaves differently than the regular elasticsearch API.

And it seems that you are also using a version of the connector that is not compatible with Elasticsearch. This is a common mistake because most of the time they are mostly in 2.x.

I believe this will not be the case with Elasticsearch 5.x, and they have aligned all other elastic product versions with it.

Related Problems and Solutions