Coder Social home page Coder Social logo

elastic / elasticsearch-hadoop Goto Github PK

View Code? Open in Web Editor NEW
1.9K 1.9K 981.0 10.18 MB

:elephant: Elasticsearch real-time search and analytics natively integrated with Hadoop

Home Page: https://www.elastic.co/products/hadoop

License: Apache License 2.0

Java 72.30% Scala 20.81% Groovy 6.56% Shell 0.25% ASL 0.01% Python 0.07%

elasticsearch-hadoop's Introduction

Elasticsearch Hadoop Build Status

Elasticsearch real-time search and analytics natively integrated with Hadoop. Supports Map/Reduce, Apache Hive, and Apache Spark.

See project page and documentation for detailed information.

Requirements

Elasticsearch (1.x or higher (2.x highly recommended)) cluster accessible through REST. That's it! Significant effort has been invested to create a small, dependency-free, self-contained jar that can be downloaded and put to use without any dependencies. Simply make it available to your job classpath and you're set. For a certain library, see the dedicated chapter.

ES-Hadoop 6.x and higher are compatible with Elasticsearch 1.X, 2.X, 5.X, and 6.X

ES-Hadoop 5.x and higher are compatible with Elasticsearch 1.X, 2.X and 5.X

ES-Hadoop 2.2.x and higher are compatible with Elasticsearch 1.X and 2.X

ES-Hadoop 2.0.x and 2.1.x are compatible with Elasticsearch 1.X only

Installation

Stable Release (currently 8.4.0)

Available through any Maven-compatible tool:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>8.4.0</version>
</dependency>

or as a stand-alone ZIP.

Development Snapshot

Grab the latest nightly build from the repository again through Maven:

<dependency>
  <groupId>org.elasticsearch</groupId>
  <artifactId>elasticsearch-hadoop</artifactId>
  <version>8.15.0-SNAPSHOT</version>
</dependency>
<repositories>
  <repository>
    <id>sonatype-oss</id>
    <url>http://oss.sonatype.org/content/repositories/snapshots</url>
    <snapshots><enabled>true</enabled></snapshots>
  </repository>
</repositories>

or build the project yourself.

We do build and test the code on each commit.

Supported Hadoop Versions

Running against Hadoop 1.x is deprecated in 5.5 and will no longer be tested against in 6.0. ES-Hadoop is developed for and tested against Hadoop 2.x and YARN. More information in this section.

Feedback / Q&A

We're interested in your feedback! You can find us on the User mailing list - please append [Hadoop] to the post subject to filter it out. For more details, see the community page.

Online Documentation

The latest reference documentation is available online on the project home page. Below the README contains basic usage instructions at a glance.

Usage

Configuration Properties

All configuration properties start with es prefix. Note that the es.internal namespace is reserved for the library internal use and should not be used by the user at any point. The properties are read mainly from the Hadoop configuration but the user can specify (some of) them directly depending on the library used.

Required

es.resource=<ES resource location, relative to the host/port specified above>

Essential

es.query=<uri or query dsl query>              # defaults to {"query":{"match_all":{}}}
es.nodes=<ES host address>                     # defaults to localhost
es.port=<ES REST port>                         # defaults to 9200

The full list is available here

For basic, low-level or performance-sensitive environments, ES-Hadoop provides dedicated InputFormat and OutputFormat that read and write data to Elasticsearch. To use them, add the es-hadoop jar to your job classpath (either by bundling the library along - it's ~300kB and there are no-dependencies), using the DistributedCache or by provisioning the cluster manually. See the documentation for more information.

Note that es-hadoop supports both the so-called 'old' and the 'new' API through its EsInputFormat and EsOutputFormat classes.

'Old' (org.apache.hadoop.mapred) API

Reading

To read data from ES, configure the EsInputFormat on your job configuration along with the relevant properties:

JobConf conf = new JobConf();
conf.setInputFormat(EsInputFormat.class);
conf.set("es.resource", "radio/artists");
conf.set("es.query", "?q=me*");             // replace this with the relevant query
...
JobClient.runJob(conf);

Writing

Same configuration template can be used for writing but using EsOuputFormat:

JobConf conf = new JobConf();
conf.setOutputFormat(EsOutputFormat.class);
conf.set("es.resource", "radio/artists"); // index or indices used for storing data
...
JobClient.runJob(conf);

'New' (org.apache.hadoop.mapreduce) API

Reading

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists");
conf.set("es.query", "?q=me*");             // replace this with the relevant query
Job job = new Job(conf)
job.setInputFormatClass(EsInputFormat.class);
...
job.waitForCompletion(true);

Writing

Configuration conf = new Configuration();
conf.set("es.resource", "radio/artists"); // index or indices used for storing data
Job job = new Job(conf)
job.setOutputFormatClass(EsOutputFormat.class);
...
job.waitForCompletion(true);

ES-Hadoop provides a Hive storage handler for Elasticsearch, meaning one can define an external table on top of ES.

Add es-hadoop-.jar to hive.aux.jars.path or register it manually in your Hive script (recommended):

ADD JAR /path_to_jar/es-hadoop-<version>.jar;

Reading

To read data from ES, define a table backed by the desired index:

CREATE EXTERNAL TABLE artists (
    id      BIGINT,
    name    STRING,
    links   STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists', 'es.query' = '?q=me*');

The fields defined in the table are mapped to the JSON when communicating with Elasticsearch. Notice the use of TBLPROPERTIES to define the location, that is the query used for reading from this table.

Once defined, the table can be used just like any other:

SELECT * FROM artists;

Writing

To write data, a similar definition is used but with a different es.resource:

CREATE EXTERNAL TABLE artists (
    id      BIGINT,
    name    STRING,
    links   STRUCT<url:STRING, picture:STRING>)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'radio/artists');

Any data passed to the table is then passed down to Elasticsearch; for example considering a table s, mapped to a TSV/CSV file, one can index it to Elasticsearch like this:

INSERT OVERWRITE TABLE artists
    SELECT NULL, s.name, named_struct('url', s.url, 'picture', s.picture) FROM source s;

As one can note, currently the reading and writing are treated separately but we're working on unifying the two and automatically translating HiveQL to Elasticsearch queries.

ES-Hadoop provides native (Java and Scala) integration with Spark: for reading a dedicated RDD and for writing, methods that work on any RDD. Spark SQL is also supported

Scala

Reading

To read data from ES, create a dedicated RDD and specify the query as an argument:

import org.elasticsearch.spark._

..
val conf = ...
val sc = new SparkContext(conf)
sc.esRDD("radio/artists", "?q=me*")

Spark SQL

import org.elasticsearch.spark.sql._

// DataFrame schema automatically inferred
val df = sqlContext.read.format("es").load("buckethead/albums")

// operations get pushed down and translated at runtime to Elasticsearch QueryDSL
val playlist = df.filter(df("category").equalTo("pikes").and(df("year").geq(2016)))

Writing

Import the org.elasticsearch.spark._ package to gain savetoEs methods on your RDDs:

import org.elasticsearch.spark._

val conf = ...
val sc = new SparkContext(conf)

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("OTP" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

Spark SQL

import org.elasticsearch.spark.sql._

val df = sqlContext.read.json("examples/people.json")
df.saveToEs("spark/people")

Java

In a Java environment, use the org.elasticsearch.spark.rdd.java.api package, in particular the JavaEsSpark class.

Reading

To read data from ES, create a dedicated RDD and specify the query as an argument.

import org.apache.spark.api.java.JavaSparkContext;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);

JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");

Spark SQL

SQLContext sql = new SQLContext(sc);
DataFrame df = sql.read().format("es").load("buckethead/albums");
DataFrame playlist = df.filter(df.col("category").equalTo("pikes").and(df.col("year").geq(2016)))

Writing

Use JavaEsSpark to index any RDD to Elasticsearch:

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

SparkConf conf = ...
JavaSparkContext jsc = new JavaSparkContext(conf);

Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran");

JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");

Spark SQL

import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;

DataFrame df = sqlContext.read.json("examples/people.json")
JavaEsSparkSQL.saveToEs(df, "spark/docs")

Building the source

Elasticsearch Hadoop uses Gradle for its build system and it is not required to have it installed on your machine. By default (gradlew), it automatically builds the package and runs the unit tests. For integration testing, use the integrationTests task. See gradlew tasks for more information.

To create a distributable zip, run gradlew distZip from the command line; once completed you will find the jar in build/libs.

To build the project, JVM 8 (Oracle one is recommended) or higher is required.

License

This project is released under version 2.0 of the Apache License

Licensed to Elasticsearch under one or more contributor
license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright
ownership. Elasticsearch licenses this file to you under
the Apache License, Version 2.0 (the "License"); you may
not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.

elasticsearch-hadoop's People

Contributors

acchen97 avatar andreidan avatar breskeby avatar brianseeders avatar charsyam avatar clintongormley avatar conky5 avatar costin avatar debadair avatar ebuildy avatar elasticsearchmachine avatar fs111 avatar jakelandis avatar jasontedor avatar jbaiera avatar jimczi avatar jrodewig avatar jtibshirani avatar lcawl avatar lucabelluccini avatar lucebert avatar mark-vieira avatar martijnvg avatar masseyke avatar nerdynick avatar nik9000 avatar pfcoperez avatar pugnascotia avatar rjernst avatar takezoe avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

elasticsearch-hadoop's Issues

Update HTTPClient to HTTPComponents

The 3.1 interface has long since been abandoned to the new http components interface. The gradle script already downloads the appropriate jar files, it's simply a matter of using the newer API.

add jackson serialization for writable objects

To speed up things, a jackson serializer/deserializer could be added to avoid the intermediary step of creating generic objects that are then transformed into Writable.
It remains to be seen though whether there is enough information to do this translation directly into the serializer.

Broken tests

To reproduce:
clone the es-hadoop/master repository and run:

curl -XDELETE 'http://localhost:9200/radio'
./gradlew -x clean test build

Both the CascadingHadoopTest and the CascadingLocalTest faile.

Am I doing something wrong or missing some configuration?

Mountable Index support in ES

Lets say we have a billion-line log file on Hadoop which we want to index. We could eliminate the need to make lots of requests to a centralized ES server and greatly improve the overall indexing performance if we create the indexes on Map/Reduce jobs locally and then mount them to a running ES cluster after the map reduce job. Maybe Elasticsearch-Hadoop project could provide an ES compatible index/shard creation library which could be ready to mount to ES. What do you think?

Duplicate+ Results

The data appears to returned duplicated when an index has a replication factor of 1. I'm assuming the search request is being sent to both the primary shard and replica shard and the results combined.

For example, if I run the following command from the console:

curl localhost:9200/twitter/profile/_search?q=screen_name:twitter

I get one result returned

However if I start hive with the elasticsearch-hadoop jar added and create a table like the following

create external table profile ( id string, screen_name string ) stored by 'org.elasticsearch.hadoop.hive.ESStorageHandler' TBLPROPERTIES('es.resource'='twitter/profile/_search?q=screen_name:twitter');

and then do a

select * from profile;

two identical rows are returned.

783214 twitter
783214 twitter

Hive writing Error

Hi everyone

thanks for a great piece of software.
I am running cloudera CDH4.3 with hive .10 on a three node cluster.

I have created Hive "reading" and Hive "writing" tables pointing to my elasticsearch server.
I can read fine however when I try to push data as per your example:
INSERT OVERWRITE TABLE es_tasks_write select "false","titi","tata" from sample_table limit 10;

I get an error message:

2013-07-12 16:31:33,709 FATAL ExecReducer: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{},"value":{"_col0":"false","_col1":"titi","_col2":"tata"},"alias":0}
at org.apache.hadoop.hive.ql.exec.ExecReducer.reduce(ExecReducer.java:258)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:506)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:447)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
Caused by: java.lang.ClassCastException: org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryStruct cannot be cast to [Ljava.lang.Object;
at org.elasticsearch.hadoop.hive.HiveValueWriter.write(HiveValueWriter.java:155)
at org.elasticsearch.hadoop.hive.HiveValueWriter.write(HiveValueWriter.java:56)
at org.elasticsearch.hadoop.hive.HiveValueWriter.write(HiveValueWriter.java:40)
at org.elasticsearch.hadoop.serialization.ContentBuilder.value(ContentBuilder.java:242)

Elasticsearch Cluster

The properties available in configuration file mention a server of Elasticsearch. Why set a specific server (host+port) instead the name of cluster? I mean, is not better use infrastructure of cluster to prevent fails and overhead?

Cascading example has an exception at the end, and some incompatibily with Cascalog

Thanks for this repo. I've tried the Cascading code and it seems to work. The input example gives an error at the end though, see https://github.com/jeroenvandijk/elasticsearch-hadoop-trial/blob/master/src/trial/input.clj

I also tried to get the ES Cascading code to work with Cascalog. The input code worked perfectly, the output code failed at the Cascading level though, see https://github.com/jeroenvandijk/elasticsearch-hadoop-trial/blob/master/src/trial/cascalog/output.clj. I don't why this would happens. Cascalog does depend on an older version of Cascading (2.0.8), but I'm not sure if that's the issue.

The complete trial repo can be found here: https://github.com/jeroenvandijk/elasticsearch-hadoop-trial

I hope you don't mind the Clojure code. I hope it translates easily to the original Java code.

Map input is the double of elasticsearch results

When Hadoop executes a Job using plugin with one query like the follow:
samples/test/_search?q=*

The Map input 136188 data to map. But in ElasticSearch execute the follow query:
samples/test/_count?q=*

The result is:
{"count": 68094, "_shards":{"total":5, "successful":5, "failed":0}}

My environment is:
Linux, Hadoop 1.2.0 (3 nodes), ES 0.90.1 (3 nodes).

allow raw json to be used without parsing

There are cases where the raw json might be used as input or sent as output (without transforming it into an object).
This should be supported either by trying to automatically identify the time or by looking at a flag.

Case sensitive mapping Hive

I using Hive with ES and when a field named with sensitive chars in hive these fields doesn't work.

ES:
{"user": {"name": "Joseph", "countryId":"US"}}

Hive:
select * from user;
Returns:
Joseph NULL

Elasticsearch+Hadoop read and write

Hi, I want to use Elasticsearch+Hadoop MapReduce and I want to read from elasticsearch and write to elasticsearch in the same MapReduce task. In the sample only one way is showed (or read or write because exists only one es.resource). What I want is something like follows:

Read from:
/radio/artists/_search?q=me*

Write to :
/radio/statistics

Best regards.

support for push down predicates

Ideally, where possible, we should try to convert the hive operations to ES queries - to minimize IO but also the work done on the Hive side.

CascadingHadoopTest fails with JsonMappingException: No serializer found for class org.apache.hadoop.io.NullWritable

Current ObjectMapper configuration doesn't serialise null fields. The test data (artists.dat) contains empty fields that cause CascadingHadoopTest.testWriteToES() to fail with:

org.codehaus.jackson.map.JsonMappingException: No serializer found for class org.apache.hadoop.io.NullWritable

To make it work with the existing OM configuration the test needs to filter out all null-field entries before writing the index: fa62b72

pipe = new Each(pipe, 
   new Fields("id", "name", "url", "picture"), new FilterNull());

Related issues: #27, #30

Serialization bug in ESSerDe.hiveToWritable

The issue is explained here:

Caused by: java.lang.ArrayStoreException
at java.lang.System.arraycopy(Native Method)
at java.util.ArrayList.toArray(ArrayList.java:306)
at org.elasticsearch.hadoop.hive.ESSerDe.hiveToWritable(ESSerDe.java:136)
at org.elasticsearch.hadoop.hive.ESSerDe.hiveToWritable(ESSerDe.java:197)
at org.elasticsearch.hadoop.hive.ESSerDe.serialize(ESSerDe.java:109)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.processOp(FileSinkOperator.java:586)

https://groups.google.com/forum/?fromgroups=#!topic/elasticsearch/BAaoqF6SkiY

Can not create a Path from an empty string (Hive MapRed job with ES)

When I try to run a mapred job by Hive.
Environment Mac OS or Linux, hive 0.11, hadoop 1.2 or 0.23, ES 0.90.2.

My "test" entity hava just three fields (userid, name and type), and have just one record on ES.

hive> select count(userid) from test;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=
In order to set a constant number of reducers:
set mapred.reduce.tasks=
Starting Job = job_201307081651_0001, Tracking URL = http://localhost:50030/jobdetails.jsp?jobid=job_201307081651_0001
Kill Command = /Users/developergvt/servers/hadoop-1.2.0/libexec/../bin/hadoop job -kill job_201307081651_0001
Hadoop job information for Stage-1: number of mappers: 5; number of reducers: 1
2013-07-08 16:57:39,056 Stage-1 map = 0%, reduce = 0%
2013-07-08 16:58:01,154 Stage-1 map = 100%, reduce = 100%
Ended Job = job_201307081651_0001 with errors
Error during job, obtaining debugging information...
Job Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201307081651_0001
Examining task ID: task_201307081651_0001_m_000006 (and more) from job job_201307081651_0001

Task with the most failures(4):

Task ID:
task_201307081651_0001_m_000000

URL:

http://localhost:50030/taskdetails.jsp?jobid=job_201307081651_0001&tipid=task_201307081651_0001_m_000000

Diagnostic Messages for this Task:
java.lang.IllegalArgumentException: Can not create a Path from an empty string
at org.apache.hadoop.fs.Path.checkPathArg(Path.java:82)
at org.apache.hadoop.fs.Path.(Path.java:90)
at org.apache.hadoop.hive.ql.io.HiveInputFormat$HiveInputSplit.getPath(HiveInputFormat.java:106)
at org.apache.hadoop.mapred.MapTask.updateJobWithSplit(MapTask.java:451)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:409)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190)
at org.apache.hadoop.mapred.Child.main(Child.java:249)

FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask
MapReduce Jobs Launched:
Job 0: Map: 5 Reduce: 1 HDFS Read: 0 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 0 msec

Allow external user mapping to/from Pig

Some field names are not supported by pig and cannot be translated to ES (ex: '@timestamp'). Additionally some complex types (bag and tuples) cannot be serialized safely as they both map to a JSON list.
By allowing a user mapping, such details can be tweaked to give the user better control over the data (in/out) mapping.

eliminate object saving when performing bulk/buffer updates

Currently to perform efficient writes, the data is saved before being passed to ES. As Hadoop (and various libraries) perform object pooling, each entry needs to be copied otherwise the data is lost.
This causes significant memory overhead which can be alleviated by serializing early ( #3 )

remove jersey dependency

Currently the REST interaction uses Jersey client as a temporary solution. This should be replaced with commons-http-3.0.x found in Hadoop, Hive and Pig

Support for List of Maps (Nested objects)

For example :
if I have a map type (rdata) column in hive

image

and when I try to push the data to Elastic search I only see last mapid/value pair in elastic search.

Log for the writable

INFO org.elasticsearch.hadoop.rest.BufferedRestClient: Writable{rid=1, rdata={value=5, mapid=4}, rdate=1234, mapids=[7, 8, 9]}

Any workaround for this ? Is this a bug ?

Thanks

improve search/address handling

The current parsing of es.resource needs to be improved as extra characters (like ?) can easily break it. Additionally support for json documents, for complicated queries, needs to be added in.

Invalid test method execution order

CascadingLocalTest and CascadingHadoopTest require a particular execution order for the tests to work: They run the testWriteToES() first to create the index and then run the testReadFromES() to ready that index.

The JUnit assumes that all tests can be performed in an arbitrary order and often it runs the testReadFromES() before the testWriteToES().

Note: To reproduce the issue when an external ES is used make sure to delete the old indexes:

curl -XDELETE 'http://localhost:9200/billboard'
curl -XDELETE 'http://localhost:9200/top'
./gradlew -x clean test build

Solutions:

  • Call the testWriteToES() and testReadToES() in a single test method to enforce the execution order. I've tried this in the upstreammaster branch: bd389e3
  • Upgrade to JUnit 4.11 and use the @FixMethodOrder(MethodSorters.NAME_ASCENDING) class annotation. Rename the methods so that read test comes second. Have not tested this option
  • Build a custom JUnit ordered class runner. Haven't tested this option.

Related issues: #27 and #30

some exceptions easy to fix

Hello
I'm writing an ES plugin for cascading.lingual, and had to hack in some places:

  1. RestClient.refresh --> StringIndexOutOfBound if no '/', e.g. "artists"
    Needs something like:
    int p = index.indexOf("/");
    String indx = (p < 0) ? index : index.substring(0, p);
  2. ESTap.toString --> NPE if not initialized
    Needs something like:
    return (actualTap == null) ? super.toString() : actualTap.toString();
  3. BufferedRestClient --> no way to set _index, _type for bulk load
    Had to set it to
    "{"index":{"_index":""+resource+"","_type":""+resourceType+""" + /*","_id":""+id+""" + */ "}}\n"
    via reflection between flowConnector.connect and flow.complete()

Could you please fix in SNAPSHOT.
Regards,
Oleg Sinitsin
[email protected]

Bug in WritableUtils.fromWritable() - incorrect ArrayWritable to List handling

The ArrayWritable to List transformation does not return the result list but falls back to the byte array:

       else if (writable instanceof ArrayWritable) {
            Writable[] writables = ((ArrayWritable) writable).get();
            List<Object> list = new ArrayList<Object>(writables.length);
            for (Writable wrt : writables) {
                list.add(fromWritable(wrt));
            }
+            return list;
        }
....
        // fall-back to bytearray
        return org.apache.hadoop.io.WritableUtils.toByteArray(writable);
}

Pig storage constructors only take strings

ESStorage has a constructor ESStorage(String, int) that is intended to set the host and port for the storage. Pig requires that the constructor only take strings. I changed my local code as follows:

--- a/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java
+++ b/src/main/java/org/elasticsearch/hadoop/pig/ESStorage.java
@@ -83,12 +83,12 @@ public class ESStorage extends LoadFunc implements StoreFuncInterface, StoreMeta
     private RecordWriter<Object, Object> writer;

     public ESStorage() {
-        this(null, 0);
+        this(null, "0");
     }

-    public ESStorage(String host, int port) {
+    public ESStorage(String host, String port) {
         this.host = host;
-        this.port = port;
+        this.port = Integer.parseInt(port);
     }

     @Override

Import to pig fails when JSON contains an array

When the JSON in the result set contains an array, then Pig 10.0 fails during internal serialization with an exception trace similar to the following. It appears that the array is being serialized as a Byte[] at some level, and Pig cannot handle that.

java.lang.RuntimeException: Unexpected data type [B found in stream. Note only standard Pig type is supported when you output from UDF/LoadFunc
    at org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:559)
    at org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:435)
    at org.apache.pig.data.BinInterSedes.writeMap(BinInterSedes.java:581)
    at org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:451)
    at org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:435)
    at org.apache.pig.data.BinInterSedes.writeMap(BinInterSedes.java:581)
    at org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:451)
    at org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:435)
    at org.apache.pig.data.utils.SedesHelper.writeGenericTuple(SedesHelper.java:135)
    at org.apache.pig.data.BinInterSedes.writeTuple(BinInterSedes.java:613)
    at org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:443)
    at org.apache.pig.data.BinInterSedes.writeDatum(BinInterSedes.java:435)
    at org.apache.pig.impl.io.InterRecordWriter.write(InterRecordWriter.java:73)
    at org.apache.pig.impl.io.InterStorage.putNext(InterStorage.java:87)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:139)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat$PigRecordWriter.write(PigOutputFormat.java:98)
    at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:559)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:85)
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:106)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$Map.collect(PigMapOnly.java:48)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:264)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase.map(PigGenericMapBase.java:64)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:673)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:331)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at org.apache.hadoop.mapred.Child.main(Child.java:262)

ClassCastException VIntWritable and java.lang.Integer

I have a type in ES and in Hive I added a EXTERNAL TABLE pointing to ES. Something like this:
CREATE EXTERNAL TABLE user (userId INT, userRoles ARRAY<STRUCT<roleId:INT, name:STRING>>) STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler' TBLPROPERTIES('es.resource'='samples/user/_search?q=*');

My mapping in ES sounds like this:
{
"user" : {
"properties" : {
"userId" : {
"type" : "int"
},
"userRoles" : {
"properties" : {
"roleId" : {
"type" : "int"
},
"name" : {
"type" : "string"
}
}
}
}
}
}

So when a execute a SELECT * FROM user I caught:
2013-07-08 09:09:17,648 ERROR CliDriver (SessionState.java:printError(386)) - Failed with exception java.io.IOException:java.lang.ClassCastException: org.apache.hadoop.io.VIntWritable cannot be cast to java.lang.Integer
java.io.IOException: java.lang.ClassCastException: org.apache.hadoop.io.VIntWritable cannot be cast to java.lang.Integer
at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:150)
at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:1412)
at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:271)
at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:216)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:413)
at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:756)
at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:614)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:160)
Caused by: java.lang.ClassCastException: org.apache.hadoop.io.VIntWritable cannot be cast to java.lang.Integer
at org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector.get(JavaIntObjectInspector.java:39)
at org.apache.hadoop.hive.serde2.lazy.LazyUtils.writePrimitiveUTF8(LazyUtils.java:201)
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:428)
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serializeField(LazySimpleSerDe.java:382)
at org.apache.hadoop.hive.serde2.DelimitedJSONSerDe.serializeField(DelimitedJSONSerDe.java:71)
at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.serialize(LazySimpleSerDe.java:366)
at org.apache.hadoop.hive.ql.exec.ListSinkOperator.processOp(ListSinkOperator.java:91)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:502)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:832)
at org.apache.hadoop.hive.ql.exec.SelectOperator.processOp(SelectOperator.java:84)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:502)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:832)
at org.apache.hadoop.hive.ql.exec.TableScanOperator.processOp(TableScanOperator.java:90)
at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:502)
at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:490)
at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:136)
... 11 more

When I only use strings, works fine. I tried the master branch and the maven repositories available releases, but none works.

incompatible with CDH4 YARN/Hadoop 2

Will you be releasing a version of elasticsearch-hadoop compilable with hadoop 2.X
OR could someone help how to make it work for the mentioned version.

Thanks,
Venkat

Pig Stack Trace

ERROR 2998: Unhandled internal error. Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected

java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected
at org.elasticsearch.hadoop.mr.ESOutputFormat.checkOutputSpecs(ESOutputFormat.java:104)
at org.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileVisitor.visit(InputOutputFileValidator.java:80)
at org.apache.pig.newplan.logical.relational.LOStore.accept(LOStore.java:77)
at org.apache.pig.newplan.DepthFirstWalker.depthFirst(DepthFirstWalker.java:64)

Cascading tap needs to be able to write to multiple indexes?

I'm trying to export some of my data to ES for Kibana. This now requires indexes per day. However AFAIK this is currently not possible in an easy way with the ES Cascading Tap since you have to choose an index upfront.

I think the ES tap needs a possibility to define a pattern for the index so it is possible to dynamically write to an index. I guess I need something similar as the GlobHfs Tap

What do you think?

Hive Elasticserach insert a date or timestamp

Hello everyone

I am right now playing around with this great piece of sotware.
I have looked at the code but not found a clear answer.
Say I want to push from hive into an elasticsearch index a date field how would I go about writing it?

CREATE EXTERNAL TABLE es_wrtie (
clientId String,
mydate timestamp||string)
STORED BY 'org.elasticsearch.hadoop.hive.ESStorageHandler'
TBLPROPERTIES('es.resource' = 'test/test/')

since hive only knows timstamps would I use these, or put a strign and let elasticesearch do the trick if I format it correctly?

thnaks for your help and software

support for push down predicates

Ideally, where possible, we should try to convert Pig predicates to ES queries - to minimize IO but also the work done on the Pig side.

Current status of this project?

Hello,

I am not sure if this is the right place to ask this question but I couldn’t find a mailing list or contact info.

I have been playing with this library and looks really good, it just works. I are planning to use elastic search to index about 45M documents. So, I will use this with a pig script to throw data from cassandra to elastic search.

So, what can we expect a stable release and what is the current status of this project for production use?

Support Hive query and push down predicates

Currently the Hive reading support requires the ES query to be specified as a table property. It would be better (and consistent with writing) to specify just the index and then create the query through hive directly.

Expose the ObjectMapper to allow custom JSON serializer/deserializers

ES-Hadoop uses Jackson's ObjectMapper for converting to/from JSON. By default OM registers a DEFAULT and ANNOTATION based introspectors.

To facilitate the integration of various Hadoop libraries one will need a greater control overt the JSON binding functionality. For example to use an Avro schema for JSON binding (like jackson-dataformat-avro) one would have to providing a custom ObjectMapper configuration.

Therefore the ObjectMapper has to be exposed at ESInputFormat and ESOutputFormat lever.

flush index after bulk insert

After writing data to ES, the data needs to be flushed otherwise subsequent reads will not see the data.
In some cases this might not be needed (hence the need to offer a configuration option) but in most it's probably what the user expects.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.