Coder Social home page Coder Social logo

emr-dynamodb-connector's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

emr-dynamodb-connector's Issues

DynamoDB Auto Scaling

Now that DynamoDB supports Auto Scaling is there any chance this can be updated to periodically go back and check the read/write capacity units and utilize any additional capacity added?

Exporting DynamoDB table with compression?

I'd like to be able to specify a compression format when exporting a DynamoDB table using the export script. I tried forcing a codec using:
hadoop jar emr-ddb.jar org.apache.hadoop.dynamodb.tools.DynamoDbExport -D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec s3://this-is-my/bucket/ table_name

But the output was still uncompressed. As a workaround, I can export the table to HDFS and then use S3DistCp to transfer the data to S3 compressed, but ideally I'd like to directly write the compressed data to S3. Is this possible?

List coming as NULL even when used with DynamoDBListStorageHandler

As per issue 74 the issue of having NULL list data when mapped to an array is fixed, but it is still coming in as NULL

Steps Followed

dynamoDB table name testReco1
Items :
2 items ( attributes id ( primary) and test)
(a) id :abcd, test :cncn
(b) id:cncn , test: [ { "S" : "h" }, { "S" : "e" } ]
As can be seen second item has test as as List

Copied 2 of the jars to s3

copied emr-dynamodb-hive-4.7.0-SNAPSHOT-jar-with-dependencies.jar to s3:///emr-dynamodb-hive-4.7.0-SNAPSHOT-jar-with-dependencies.jar
copied emr-dynamodb-hadoop-4.7.0-SNAPSHOT.jar to s3:///emr-dynamodb-hadoop-4.7.0-SNAPSHOT.jar

Steps in Hive

*hive> add jar s3:///emr-dynamodb-hive-4.7.0-SNAPSHOT-jar-with-dependencies.jar;
Added [/mnt/tmp/b8892051-3dea-4fe9-88e0-536ddd2742af_resources/emr-dynamodb-hive-4.7.0-SNAPSHOT-jar-with-dependencies.jar] to class path
Added resources: [s3:///emr-dynamodb-hive-4.7.0-SNAPSHOT-jar-with-dependencies.jar]
hive> add jar s3:///emr-dynamodb-hadoop-4.7.0-SNAPSHOT.jar
> ;
Added [/mnt/tmp/b8892051-3dea-4fe9-88e0-536ddd2742af_resources/emr-dynamodb-hadoop-4.7.0-SNAPSHOT.jar] to class path
Added resources: [s3:///emr-dynamodb-hadoop-4.7.0-SNAPSHOT.jar]
hive> drop table test;
OK
Time taken: 0.57 seconds
hive>
> CREATE EXTERNAL TABLE test (id string, test array) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBListStorageHandler'
> TBLPROPERTIES ("dynamodb.table.name" = "testReco1",
> "dynamodb.column.mapping" = "id:id,test:test","dynamodb.region"="us-east-1");
WARNING: Configured write throughput of the dynamodb table testReco1 is less than the cluster map capacity. ClusterMapCapacity: 90 WriteThroughput: 5
WARNING: Writes to this table might result in a write outage on the table.
OK
Time taken: 1.19 seconds
hive> select * from test;
OK
abcd NULL
cncn NULL
Time taken: 1.546 seconds, Fetched: 2 row(s)
**-- Now create with test as just string **
hive> drop table test;
OK
Time taken: 0.044 seconds

hive> CREATE EXTERNAL TABLE test (id string, test string) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBListStorageHandler'
> TBLPROPERTIES ("dynamodb.table.name" = "testReco1",
> "dynamodb.column.mapping" = "id:id,test:test","dynamodb.region"="us-east-1");
OK
Time taken: 0.118 seconds
hive> select * from test;
OK
abcd hh
cncn NULL
Time taken: 0.184 seconds, Fetched: 2 row(s)

DynamoDBClient MAX values.

Hi all,
I've read from the latest DynamoDB API references that the BatchWriteItem operation will be rejected if one of the following conditions is violated:

  • One or more tables specified in the BatchWriteItem request does not exist.
  • Primary key attributes specified on an item in the request do not match those in the corresponding table's primary key schema.
  • You try to perform multiple operations on the same item in the same BatchWriteItem request. For example, you cannot put and delete the same item in the same BatchWriteItem request.
  • There are more than 25 requests in the batch.
  • Any individual item in a batch exceeds 400 KB.
  • The total request size exceeds 16 MB.

Focusing on the following thresholds:

  • Any individual item in a batch exceeds 400 KB.
  • The total request size exceeds 16 MB.

And giving an eye to org.apache.hadoop.dynamodb.DynamoDBClient class I can't figure out about which is the aim of the following (and in case is the individual item max size why is 512 instead of 400):

private static final int MAX_ALLOWABLE_BYTE_SIZE = 512 * 1024;

Also in putBatch method (lines 182-188) the following check (third clause) on the write requests for a given table

BatchWriteItemResult result = null; if (writeBatchMap.containsKey(tableName)) { if (writeBatchMap.get(tableName).size() >= batchLimit || (writeBatchMapSizeBytes + itemSizeBytes) > MAX_ALLOWABLE_BYTE_SIZE) { result = writeBatch(reporter, itemSizeBytes); } }

uses the MAX_ALLOWABLE_BYTE_SIZE. Is that correct or should it be a different threshold (i.e. 16MB)?

If there's any documentation I can read where all details are defined and can be shared, it would be very appreciated.

Thanks for the support,
S.

Runtime error NoSuchMethodError: org.apache.http.params.HttpConnectionParams

Probably missing some dependency in the pom, or I am missing something in my class path during execution.
I see the above error (in the subject), when I execute the dynamodbimport class.
I tried passing httpcomponents-core(deprecated per doc) in the classpath, but that did not help.
I am going to debug this further, but if there is something obvious that I am missing, would like to be pointed out.

Large items in table break serialization

If I run the below:

pyspark --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar

conf = {
    "dynamodb.servicename": "dynamodb",
    "dynamodb.input.tableName": "table_with_big_items_in_it",
    "dynamodb.endpoint": "https://dynamodb.ap-northeast-1.amazonaws.com",
    "dynamodb.regionid": "ap-northeast-1",    "mapred.output.format.class": "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat",
    "mapred.input.format.class": "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat"
}

rdd = sc.hadoopRDD(
    inputFormatClass='org.apache.hadoop.dynamodb.read.DynamoDBInputFormat',
    keyClass='org.apache.hadoop.io.Text',
    valueClass='org.apache.hadoop.dynamodb.DynamoDBItemWritable',
    conf=conf
)
rdd.take(1)

I get the following error:

8/03/15 09:04:36 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 26, ip-172-31-141-34.ap-northeast-1.compute.internal, executor 9): java.lang.RuntimeException: Error writing/reading clone buffer
	at org.apache.hadoop.io.WritableUtils.clone(WritableUtils.java:221)
	at org.apache.spark.api.python.WritableToJavaConverter.org$apache$spark$api$python$WritableToJavaConverter$$convertWritable(PythonHadoopUtil.scala:93)
	at org.apache.spark.api.python.WritableToJavaConverter.convert(PythonHadoopUtil.scala:101)
	at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$convertRDD$1.apply(PythonHadoopUtil.scala:181)
	at org.apache.spark.api.python.PythonHadoopUtil$$anonfun$convertRDD$1.apply(PythonHadoopUtil.scala:181)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:121)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:112)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:518)
	at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:333)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1948)
	at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.io.UTFDataFormatException: encoded string too long: 84878 bytes
	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:364)
	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
	at org.apache.hadoop.dynamodb.DynamoDBItemWritable.write(DynamoDBItemWritable.java:53)
	at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)
	at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)
	at org.apache.hadoop.util.ReflectionUtils.copy(ReflectionUtils.java:298)
	at org.apache.hadoop.io.WritableUtils.clone(WritableUtils.java:218)
	... 14 more

As far as I can tell the problem is that the method used here is limited to strings of 65535 bytes or less. DynamoDB items can be 400KB though - a fact that we take advantage of to store documents that can be a few hundred kilobytes in size.

Interestingly, if I run the following using the Scala shell:
spark-shell --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar

import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf

var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "table_with_big_items_in_it")
jobConf.set("dynamodb.endpoint", "dynamodb.ap-northeast-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "ap-northeast-1")
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")


var elements = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

elements.values.take(1)

It seems to work fine. Looking at the first stack trace, it does seem like a proximate cause of the error is code in org.apache.spark.api.python. Why the issue doesn't come up in Scala I don't know. Either way, I probably shouldn't introduce Scala into our stack and so if anyone has an idea about getting the DynamoDB connector to work for large items in Python I'm all ears.

New version Hadoop API

It is should have a version based on New Hadoop API (MRv2) since that custom may develop own MR
job using DynamoDB(Input/Output)Format.

Support for custom dynamo db regions

Currently emr-dynamodb-connector does not support setting a different region for dynamo DB tables. I have a scenario where tables are located in one region and because that region does not support EMR clusters, EMR is located in another regions.

It would really help if I can define dynamo DB region as custom argument for dynamodb export script and is there any specific reason why this is not implemented that I am missing?

Release new version with aws-java-sdk 1.11.*

It looks like you have versions higher than 4.2.0 that have the updated aws-java-sdk version. When is the next release planned? I'd rather use an updated release than revert other code that uses the newer SDK.

Could not find or load main class org.apache.hadoop.dynamodb.tools.DynamoDBExport

When I use export usage:

java -cp target/emr-dynamodb-tools-4.2.0-SNAPSHOT.jar org.apache.hadoop.dynamodb.tools.DynamoDBExport /where/output/should/go my-dynamo-table-name

there is an error:

Error: Could not find or load main class org.apache.hadoop.dynamodb.tools.DynamoDBExport

I can find this class in target/classes/org/apache/hadoop/dynamodb/tools/DynamoDBExport.class.
Can someone tell me how to fix this error? Thanks.

next release date ?

We are looking to use the on demand dynamo changes that have been recently merged.
Can I know the next release date since we would like to avoid a forked version if the release is sooner.

dynamodb list showing as null and error when using DynamoDBListStorageHandler

While trying to create and external table using "org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler" received nulls for list , therefore created an external table using "org.apache.hadoop.hive.dynamodb.DynamoDBListStorageHandler", but the DDL fails with error FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org/apache/hadoop/dynamodb/type/DynamoDBListType
Steps followed
-- add jar file
add jar s3://<ourbucker/folder>/emr-dynamodb-hive-4.7.0-SNAPSHOT-jar-with-dependencies.jar
-- create external table
CREATE EXTERNAL TABLE test (id string, series array,lastModified string) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBListStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "test",
"dynamodb.column.mapping" = "id:id,series:seriesNames,lastModified:lastModified","dynamodb.region"="us-east-1")
-- response
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org/apache/hadoop/dynamodb/type/DynamoDBListType

what is wrong here?

"incorrect" exception is thrown when bad mapping is used

For example, if in the following code you will remove cpc from the CREATE EXTERNAL TABLE mapping, the following exception will br thrown

CREATE EXTERNAL TABLE ddb_cross_keywords_volumes
( keyword_key string,
  cpc bigint,
  volumes string)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES(
 "dynamodb.table.name" = "cross_keywords_volumes_${hiveconf:partition_date}",
 "dynamodb.column.mapping"="keyword_key:keyword_key,volumes:volumes,cpc:cpc"
);

INSERT OVERWRITE TABLE ddb_cross_keywords_volumes
SELECT    concat(v.keyword, '_', CAST(v.country AS VARCHAR(5))) AS keyword_key,
          MAX(c.cpc) AS cpc,
          to_json(str_to_map(concat_ws(',', collect_set(v.month ||':'|| v.volume)), ',', ':')) AS volumes
FROM      sw_published_adwords.keywords_volumes__${hiveconf:pdl_branch} AS v
LEFT JOIN sw_published_adwords.keywords_cpcs__${hiveconf:pdl_branch} AS c
    ON  c.keyword = v.keyword
    AND c.country = v.country
    AND c.`date` >= '${hiveconf:from_date}'
    AND c.`date` <= '${hiveconf:to_date}'
WHERE     v.`date` >= '${hiveconf:from_date}'
AND       v.`date` <= '${hiveconf:to_date}'
GROUP BY  v.keyword, v.country;

TaskAttempt 2 failed, info=[Error: Error while running task ( failure ) : attempt_1530441053089_0001_2_03_000072_2:java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"_col0":"!4rc4n01d! 4: kohbeep edition astats","_col1":999},"value":{"_col0":null,"_col1":["2018-04:-1","2017-08:-1","2017-07:-1","2018-03:-1","2018-02:-1","2017-12:-1","2018-01:-1","2017-11:-1","2017-10:-1","2017-05:-1","2017-06:-1","2017-09:-1"]}}
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:211)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"_col0":"!4rc4n01d! 4: kohbeep edition astats","_col1":999},"value":{"_col0":null,"_col1":["2018-04:-1","2017-08:-1","2017-07:-1","2018-03:-1","2018-02:-1","2017-12:-1","2018-01:-1","2017-11:-1","2017-10:-1","2017-05:-1","2017-06:-1","2017-09:-1"]}}
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:297)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.run(ReduceRecordProcessor.java:317)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:185)
... 14 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row (tag=0) {"key":{"_col0":"!4rc4n01d! 4: kohbeep edition astats","_col1":999},"value":{"_col0":null,"_col1":["2018-04:-1","2017-08:-1","2017-07:-1","2018-03:-1","2018-02:-1","2017-12:-1","2018-01:-1","2017-11:-1","2017-10:-1","2017-05:-1","2017-06:-1","2017-09:-1"]}}
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:365)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:287)
... 16 more
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.GroupByOperator.process(GroupByOperator.java:792)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:356)
... 17 more
Caused by: java.lang.NullPointerException
at org.apache.hadoop.dynamodb.DynamoDBUtil.getItemSizeBytes(DynamoDBUtil.java:148)
at org.apache.hadoop.dynamodb.DynamoDBClient.putBatch(DynamoDBClient.java:186)
at org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.write(AbstractDynamoDBRecordWriter.java:108)
at org.apache.hadoop.hive.dynamodb.write.HiveDynamoDBRecordWriter.write(HiveDynamoDBRecordWriter.java:42)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:762)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:1047)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processAggr(GroupByOperator.java:847)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.processKey(GroupByOperator.java:721)
at org.apache.hadoop.hive.ql.exec.GroupByOperator.process(GroupByOperator.java:787)
... 18 more
]

A. I would appreciate if the exception will be thrown immediately and not after an hour or more of expensive calculations.
B. The exception should be more readable, not NullPointerException from org.apache.hadoop.dynamodb.DynamoDBUtil.getItemSizeBytes.

Thanks,
Doron Grinzaig

Null Pointer exception at at org.apache.hadoop.dynamodb.preader.ScanRecordReadRequest.fetchPage

I get a Java Null pointer exception when I try to query a local dynamodb table with the following configuration:

    var jobConf = new JobConf(sparkContext.hadoopConfiguration);
    jobConf.set("dynamodb.servicename", "dynamodb");
    jobConf.set("dynamodb.endpoint", "http://localhost:8000");
    jobConf.set("dynamodb.regionid", "us-east-1");
    
    jobConf.set("dynamodb.throughput.read", "1");
    jobConf.set("dynamodb.throughput.read.percent", "1");
   
    jobConf.set("dynamodb.input.tableName, "test");
    jobConf.set("dynamodb.awsAccessKeyId", "key");
    jobConf.set("dynamodb.awsSecretAccessKey", "secret");

    jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
    jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");

    var res = sparkContext.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]);

    res.collect().foreach(s => println(s._1));

I get an error when it collects and executes the job. This is the full log:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: java.lang.NullPointerException
	at org.apache.hadoop.dynamodb.preader.PageResultMultiplexer.next(PageResultMultiplexer.java:97)
	at org.apache.hadoop.dynamodb.read.AbstractDynamoDBRecordReader.next(AbstractDynamoDBRecordReader.java:108)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:255)
	at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:209)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$class.foreach(Iterator.scala:742)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:300)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:287)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:85)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at org.apache.hadoop.dynamodb.preader.ScanRecordReadRequest.fetchPage(ScanRecordReadRequest.java:47)
	at org.apache.hadoop.dynamodb.preader.AbstractRecordReadRequest.readNextPage(AbstractRecordReadRequest.java:68)
	at org.apache.hadoop.dynamodb.preader.AbstractRecordReadRequest.read(AbstractRecordReadRequest.java:43)
	at org.apache.hadoop.dynamodb.preader.ReadWorker.runInternal(ReadWorker.java:84)
	at org.apache.hadoop.dynamodb.preader.ReadWorker.run(ReadWorker.java:46)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:893)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:892)
	at com.trust.CorrelationWithEmr.getDataExample(CorrelationWithEmr.scala:99)
	at com.trust.CorrelationWithEmr.run(CorrelationWithEmr.scala:52)
	at com.trust.App$.main(App.scala:31)
	at com.trust.App.main(App.scala)

Currently on EMR-dynamodb-connector 4.5, installed by maven into my project. The logs show that it can describe the tables, so it's not an access issue, and the error shows up with a locally installed dynamodb instance.

Cannot load dynamo table in Pyspark

I am getting this error after passing the conf to pyspark, my code errors out before I even get to the count.

Code

pyspark --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar
conf = {
"dynamodb.servicename": "dynamodb",
"dynamodb.input.tableName": "test-table",
"dynamodb.output.tableName": "test-table",
"dynamodb.endpoint": "https://dynamodb.us-west-2.amazonaws.com",
"dynamodb.regionid": "us-west-2",
"mapred.output.format.class": "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat",
"mapred.input.format.class": "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat"
}

dynamoRDD = sc.hadoopRDD(
inputFormatClass='org.apache.hadoop.dynamodb.read.DynamoDBInputFormat',
keyClass='org.apache.hadoop.io.Text',
valueClass='org.apache.hadoop.dynamodb.DynamoDBItemWritable',
conf=conf
)

Error

Traceback (most recent call last):
  File "<stdin>", line 5, in <module>
  File "/usr/lib/spark/python/pyspark/context.py", line 759, in hadoopRDD
    jconf, batchSize)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.hadoopRDD.
: java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
	at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
	at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
	at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
	at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
	at org.apache.hadoop.mapred.ResourceMgrDelegate.serviceInit(ResourceMgrDelegate.java:102)
	at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
	at org.apache.hadoop.mapred.ResourceMgrDelegate.<init>(ResourceMgrDelegate.java:96)
	at org.apache.hadoop.mapred.YARNRunner.<init>(YARNRunner.java:112)
	at org.apache.hadoop.mapred.YarnClientProtocolProvider.create(YarnClientProtocolProvider.java:34)
	at org.apache.hadoop.mapreduce.Cluster.initialize(Cluster.java:95)
	at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:82)
	at org.apache.hadoop.mapreduce.Cluster.<init>(Cluster.java:75)
	at org.apache.hadoop.mapred.JobClient.init(JobClient.java:475)
	at org.apache.hadoop.mapred.JobClient.<init>(JobClient.java:454)
	at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:46)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1333)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
	at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:203)
	at org.apache.spark.api.python.PythonRDD$.hadoopRDD(PythonRDD.scala:656)
	at org.apache.spark.api.python.PythonRDD.hadoopRDD(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 44 more


Support for Scan FIlters for spark dynamodb connector

I have a use-case where I am trying to update all records on my dynamodb table with a new attribute. My dynamodb table has billions of records and I would like to split updating of the records by filtering on dates. My question was is there any support to selectively load data only for specific filter criteria into spark instead of always reading the entire table and then filtering. Reading the whole table would consume lot of memory. Passing filter criteria on the jobconfiguration seems to be the way to go, but I have not found any documentation that says so.

Fix slow tests

3 different unit tests take > 10 seconds, and contribute to > 50% of required build time. Refactoring these tests to make them more of unit tests with proper mocking as opposed to functional tests would greatly increase build time and lower developer burden.

3 test classes that take > 10 seconds:

Running org.apache.hadoop.dynamodb.DynamoDBFibonacciRetryerTest
Tests run: 7, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 40.57 sec
...
Running org.apache.hadoop.dynamodb.preader.RateControllerTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 11.112 sec
Running org.apache.hadoop.dynamodb.preader.ReadManagerTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 15.202 sec

DynamoDb Spark example - emr-ddb-hadoop.jar does not exist

Description

With the latest version of Amazon EMR (5.4) the jar file (/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar) referenced in the Spark section of README file does not exist.

It is already mentioned in the following Big data blog post

You have to manually build and copy the jar file to the master node, which is OK, but the documentation should be up to date.

pyspark examples ?

Can we get some examples added to read and write from dynamodb using pyspark ?

Here is what I have tried so far on a standalone spark cluster ( not EMR )

conf = {
    "dynamodb.servicename": "dynamodb",
    "dynamodb.input.tableName": "test",
    "dynamodb.output.tableName": "test",
    "dynamodb.endpoint": "http://localhost:4569",
    "dynamodb.regionid": "us-east-1",
    "mapred.output.format.class": "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat",
    "mapred.input.format.class": "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat"
}

sc = spark.sparkContext

sc.hadoopRDD(
	inputFormatClass='org.apache.hadoop.dynamodb.read.DynamoDBInputFormat',
	keyClass='org.apache.hadoop.io.Text',
	valueClass='org.apache.hadoop.dynamodb.DynamoDBItemWritable',
	conf=conf
)

2019-02-14 14:33:50 WARN  ClusterTopologyNodeCapacityProvider:51 - Exception when trying to determine instance types
java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
	at java.nio.file.Files.newByteChannel(Files.java:361)
	at java.nio.file.Files.newByteChannel(Files.java:407)
	at java.nio.file.Files.readAllBytes(Files.java:3152)
	at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:103)
	at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:42)


I tried with v4.6.0 of the connector library.

dynamodb.throughput.write.percent not being respected

Hello I am using the connector package with the below configuration but the write capacity and percent is not being respected for some reason. https://imgur.com/a/Ht2yw (attachments aren't working for some reason on github today). For 5 minutes right when the job starts the consumed capacity is always 2x the write capacity, then it levels out to the provisioned write capacity.

I am running this job in us-west-2 but writing to us-east-1

Random specs:
Release label:emr-5.9.0
Hadoop distribution:Amazon 2.7.3
Applications:Spark 2.2.0, Hive 2.3.0, Hue 4.0.1, Zeppelin 0.7.2, Ganglia 3.7.2

      private JobConf setDynamoWriteJobConf(JavaSparkContext sc) {
          JobConf jobConf = new JobConf(sc.hadoopConfiguration());
  
          jobConf.set("dynamodb.servicename", "dynamodb");
          jobConf.set("dynamodb.output.tableName", "test");
          jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com");
          jobConf.set("dynamodb.regionid", "us-east-1");
          jobConf.set("dynamodb.throughput.write", "500");
          jobConf.set("dynamodb.throughput.write.percent", ".5");
          jobConf.set("dynamodb.version", "2012-08-10");
  
          jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
          jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
  
          return jobConf;
      }

write command:

          JavaPairRDD<Text, DynamoDBItemWritable> writeToDynamo = dataset
                  .javaRDD()
                  .map(createDdbWritable())
                  .mapToPair(item -> new Tuple2<>(new Text(""), item));
  
          writeToDynamo.saveAsHadoopDataset(setDynamoWriteJobConf(sc));

Any explanation on why this could be occurring?

Cannot save RDD with pyspark

I'm not able to save the RDD to DynamoDB using Pyspark.

EMR 5.10

pyspark --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar

For me, I'm able to read the Dynamodb data using the below code:

conf = {
"dynamodb.servicename": "dynamodb",
"dynamodb.input.tableName": "Test",
"dynamodb.output.tableName": "Test2",
"dynamodb.endpoint": "https://dynamodb.us-east-1.amazonaws.com",
"dynamodb.regionid": "us-east-1",
"mapred.output.format.class": "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat",
"mapred.input.format.class": "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat"
}

dynamoRDD = sc.hadoopRDD(
inputFormatClass='org.apache.hadoop.dynamodb.read.DynamoDBInputFormat',
keyClass='org.apache.hadoop.io.Text',
valueClass='org.apache.hadoop.dynamodb.DynamoDBItemWritable',
conf=conf
)
//Now the count or first method works well on the above RDD- dynamoRDD. So it is able to connect and read.
dynamoRDD.count()

//But if I'm trying to save the RDD it is failing:

dynamoRDD.saveAsHadoopDataset(conf)

Error:

18/01/15 13:11:25 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
Traceback (most recent call last):
File "", line 1, in
File "/usr/lib/spark/python/pyspark/rdd.py", line 1437, in saveAsHadoopDataset
keyConverter, valueConverter, False)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsHadoopDataset.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, ip-172-31-2-14.ec2.internal, executor 1): java.lang.ClassCastException: org.apache.hadoop.io.MapWritable cannot be cast to org.apache.hadoop.dynamodb.DynamoDBItemWritable
at org.apache.hadoop.dynamodb.write.DefaultDynamoDBRecordWriter.convertValueToDynamoDBItem(DefaultDynamoDBRecordWriter.java:23)
at org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.write(AbstractDynamoDBRecordWriter.java:107)
at org.apache.spark.internal.io.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply$mcV$sp(PairRDDFunctions.scala:1139)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1137)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12$$anonfun$apply$4.apply(PairRDDFunctions.scala:1137)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1375)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1145)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1690)

===============================

//Error is self-explanatory.. I'm basically trying to read from one table and copy to another table as defined in the conf dictionary. I have even tried with separate conf (like inputConf, outputConf). But still getting the same error.

Whereas exactly same code is working fine with Scala.

**** So it seems to be an issue with the emr connector with Pyspark. Could you please look into the matter.

Why the input format type is changing to MapWritable while loading the data, whereas it is staying same as ynamoDBItemWritable in case of scala (spark-shell) ??

What happens in case of an Exception

Hi team, i am using Spark on EMR to write an RDD to dynamodb using the following syntax:
myRDD.saveAsHadoopDataset(jobConf)
I am curious to know if this writes the data in batches or not. Also what happens in case of an exception(ProvisionedThroughput Exception especially), does it retry? If yes does it just retry the failed components or the whole batch?

Thanks

NullPointerException at AbstractDynamoDBRecordWriter (115)

As far as I know, this only applies to the downloadable, local version of DynamoDB, which apparently doesn't return "consumed capacity" even if it's requested. Looping over it without null checking causes a NullPointerException.

Here's the line:

for (ConsumedCapacity consumedCapacity : result.getConsumedCapacity()) {

Problem about credential of DynamoDB for cross region operation.

Hi guys, I am facing an issue about the credential for cross region operation.

I want to move data from S3 (us-west-2) to DynamoDB (cn-north-1). It works if I set the region in TBLPROPERTIES and AWS credential in /etc/hadoop/conf/core-site.xml as follow:

CREATE EXTERNAL TABLE dynamodb_recommendations (col1 string, col2 string, col3 bigint, col4 double)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.region"="cn-north-1","dynamodb.table.name" = "video_recommendations_20170927_1","dynamodb.throughput.write.percent" = "1.0","dynamodb.column.mapping" = "col1:registration_token,col2:sk_rating,col3:video_id,col4:rating");
<property>
    <name>dynamodb.awsAccessKeyId</name>
    <value>xxxx</value>
</property>
<property>
    <name>dynamodb.awsSecretAccessKey</name>
    <value>xxxx</value>
</property>

However, when I set the credential in TBLPROPERTIES (instead of core-site.xml), I get an error of "FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: Could not lookup table dynamodb_recommendations in DynamoDB". So I think the connector doesn't read credential from TBLPROPERTIES.

My hive script is:

CREATE EXTERNAL TABLE dynamodb_recommendations (col1 string, col2 string, col3 bigint, col4 double)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.region"="cn-north-1","dynamodb.awsAccessKeyId"="xxxx","dynamodb.awsSecretAccessKey"="xxxx","dynamodb.table.name" = "video_recommendations_20170927_1","dynamodb.throughput.write.percent" = "1.0","dynamodb.column.mapping" = "col1:registration_token,col2:sk_rating,col3:video_id,col4:rating");

Is there a way that I can set credential in Hive script?

'List' type converted to NULL

Hi,

I have a table that contains items with a List property. Something that would export in this format :

{"id":{"s":"1234"}","values":{"l":[{"s":"one"},{"s":"two"}]}}

I'm trying to load the List as an array using hive. Whatever I try, "values" property always get converted to NULL in the result.

CREATE EXTERNAL TABLE items (id string, values array<string>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "myTable",
"dynamodb.column.mapping" = "id:id,values:values");

Is it something that is just not supported by the dynamodb-connector or am I doing something wrong?

Regards,

Does this support server-side filtering on dynamoDB?

I currently have something that looks like this in python:

conf = { "dynamodb.servicename": "dynamodb", 
         "dynamodb.input.tableName": "my-table", 
         "dynamodb.endpoint": "https://dynamodb.us-west-2.amazonaws.com", 
         "dynamodb.regionid": "us-west-2", 
         "mapred.output.format.class": "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat", 
         "mapred.input.format.class": "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat",
         "dynamodb.throughput.read.percent": "0.95"} 
         
products = sc.hadoopRDD( 
    inputFormatClass='org.apache.hadoop.dynamodb.read.DynamoDBInputFormat', 
    keyClass='org.apache.hadoop.io.Text', 
    valueClass='org.apache.hadoop.dynamodb.DynamoDBItemWritable', 
    conf=conf ) 
    
products.take(1)
    
products.count() 

Is there a way to give it a list of primary keys so that ddb only returns that smaller dataset to be loaded as a df/rdd?

Connector doesn't use DDB primary key, always performs full table scan

I've tried using this connector to write a Hive query against a DynamoDB table that's defined with a partition key and a sort key. Even when my Hive query includes conditions on both the partition key and the sort key, the connector performs a full table scan to DynamoDB, rather than a query based on its primary key! My table has billions of items so this makes using the connector infeasible.

This posting sure seems to indicate that the connector used to be smart enough to use DDB indexes -- could this have gotten broken somewhere along the line?
http://hipsterdevblog.com/blog/2015/05/30/analysing-dynamodb-index-usage-in-hive-queries/

Related posts on AWS Forums:
https://forums.aws.amazon.com/thread.jspa?threadID=260026
https://forums.aws.amazon.com/thread.jspa?messageID=795525

Doesn't support map data type

Any timeline for support of map data types? This was asked previously but closed as it had queried support of boolean as well, which is now supported. However, the question remains for map support.

how to create emr-ddb.jar ??

seems the packaging type mentioned in parent pom.xml is "pom"
how do we create the jar file which will contains the modules ex. emr-ddb.jar which will contain the hadoop, hive and tools modules

Spark filter pushdown support

Hello,

I have been looking for a way to use the predicate filter pushdown for Spark to load data "filtered" by two timestamps. This will avoid to load the entire table into Spark. A typical use case of this feature is a batch analytics workload where data is processed from the day before in order to get aggregates.

I have been searching and I have been to look the method createDynamoDBSplit in the DynamoDBSplitGenerator class where the predicate pushdown is passed to each of the inputsplits. In this case the method just pass null to the filter.

One option would be to use the JobConf to store the predicate and build the filter in the generateSplits method of DynamoDBSplitGenerator. However I dot know if that would be best approach.

Any suggestions?

Thanks!

java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json exception from emr-dynamodb-hadoop

Dear team,

Thanks for making such awesome library. I was able to setup to connect my spark process at local. While running unit testing again local dynamodb, I am always getting following exception from library. It didn't interrupt my test nor my code though. I am wondering what caused this exception. Do I have to add additional config file ?

[Stage 11:=====> (9 + 4) / 100]2017-10-18 23:27:19,791 WARN : org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider - Exception when trying to determine instance types
java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
at java.nio.file.Files.newByteChannel(Files.java:361)
at java.nio.file.Files.newByteChannel(Files.java:407)
at java.nio.file.Files.readAllBytes(Files.java:3152)
at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:105)
at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:44)
at org.apache.hadoop.dynamodb.util.TaskCalculator.getMaxMapTasks(TaskCalculator.java:54)
at org.apache.hadoop.dynamodb.DynamoDBUtil.calcMaxMapTasks(DynamoDBUtil.java:257)
at org.apache.hadoop.dynamodb.write.WriteIopsCalculator.calculateMaxMapTasks(WriteIopsCalculator.java:79)
at org.apache.hadoop.dynamodb.write.WriteIopsCalculator.(WriteIopsCalculator.java:64)
at org.apache.hadoop.dynamodb.write.AbstractDynamoDBRecordWriter.(AbstractDynamoDBRecordWriter.java:81)
at org.apache.hadoop.dynamodb.write.DefaultDynamoDBRecordWriter.(DefaultDynamoDBRecordWriter.java:27)
at org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat.getRecordWriter(DynamoDBOutputFormat.java:30)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1206)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Support for on-demand capacity

I've been trying to export DynamoDB from HiveSQL.

Using a simple Hive Script select * from tableName I encounter a read capacity error. The capacity calculation of the connector is made by using a percentage.

This script works great when the DynamoDB table is set on managed capacity mode. But when switched to on-demand error I encounter the Capacity error.

Could you enable a configuration setting that allows to use a table con on-demand capacity and that uses the maximum throughput the EMR Cluster will use?

Thanks you

DynamoDBStorageHandler should allow empty attributes in destination dynamoDB table.

Problem:
When there are empty columns in hive table, it will lead to exception when loading data to dynamoDB: "One or more parameter values were invalid: An AttributeValue may not contain an empty string".
I think DynamoDBStorageHandler should allow empty columns in hive table. I mean some non-key attributes can be ignored in dynamoDB table.

Suggest:
Ignore the attribute if dynamoDBAttributeValue is empty.

String dynamoDBAttributeValue = mapValueObjectInspector.getPrimitiveJavaObject(entry

Rate limiter returns batchsize of 1 after a segment completes

When scanning a table with the Spark connector, the rate limiter breaks after it finishes its first segment and only returns RequestLimits with a batch size of 1 even though there are plenty of tokens left in the rate limiter. I believe this happens because the estimated item size goes to infinity when it receives an empty batch (which happens when the segment completes). This means that when it tries to estimate the number of items to fetch that always come out as 0 (which is rounded up to 1).

The logs:
report: permitted=25.0, consumed=128.5, items=3326, avg from= 0.0 to 94.94936861094409
report: permitted=25.0, consumed=0.5, items=0, avg from= 94.94936861094409 to Infinity
report: permitted=25.0, consumed=0.5, items=1, avg from= Infinity to Infinity

Cannot get work with pyspark

Currently I have no luck how to get things work. My following code fails:

pyspark --jars /usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar
conf = {
    "dynamodb.servicename": "dynamodb",
    "dynamodb.input.tableName": "users",
    "dynamodb.endpoint": "https://dynamodb.eu-central-1.amazonaws.com",
    "dynamodb.regionid": "eu-central-1",
    "mapred.output.format.class": "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat",
    "mapred.input.format.class": "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat"
}
sc.hadoopRDD(
    inputFormatClass='org.apache.hadoop.dynamodb.read.DynamoDBInputFormat',
    keyClass='org.apache.hadoop.io.Text',
    valueClass='org.apache.hadoop.dynamodb.DynamoDBItemWritable',
    conf=conf
)

Shortly, I get following error:

Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.dynamodb.DynamoDBItemWritable

Full traceback you may see here.

I try do this in EMR, not locally.

Backup Verification Question

What's the recommended method for ensuring that DynamoDB backups to S3 produced by the Import/Export Tool are complete and accurate?

Should the EMR logs be sufficient to know with 100% metaphysical certitude that the DynamoDB backups are complete and accurate? Or is independent data verification necessary? If so, what level of verification/validation? Row level? Field level?

Structure streaming

Please if possible add support for structure streaming.
So that can be used with spark session and not only spark context.

Thank you

UPDATE/MERGE not supported?

When I try running a MERGE on external DynamoDB tables from within EMR, I get:

FAILED: SemanticException [Error 10294]: Attempt to do update or delete using transaction manager that does not support these operations.

This would be really neat to have working!

Support GSI as query condition in Hive

When we perform a query in Hive, like
Hive > SELECT * FROM ddb_table WHERE secondary_index="something";
It performs a scan() operation on the whole table, which lower the speed and make extra cost. My expected behavior is to perform query() operation on the table for GSI.

In getQueryFilter(), the only thing to determine whether it should be a query solely depend on the result from client.describeTable(conf.get(DynamoDBConstants.TABLE_NAME)).getKeySchema(). The getKeySchema() call only return the primary key and sort key, without the knowledge of GSI.

I think we take advantage of the GSI information, before decide whether a scan or query should be applied.

One blocker I can think about is the attribute projection. If it is an "all projection", there should be no problem.

Any suggestion?

Add List support

Requesting DynamoDB List support to be converted to a hive array (ARRAY<data_type>).

Shutting down record reader, no segments remaining when used with pyspark dataframe

Hi,

I am using emr-dynamodb-connector in my pyspark application and running into some weird behavior (happens consistently). Here are my steps,

rdd = session.sparkContext.hadoopRDD(inputFormatClass='org.apache.hadoop.dynamodb.read.DynamoDBInputFormat',
	     keyClass='org.apache.hadoop.io.Text',
	     valueClass='org.apache.hadoop.dynamodb.DynamoDBItemWritable',
	     conf=conf)

//conf is standard with dynamodb related conf
//when I do below, I get exception
df= rdd.map(lambda x: Row(**x)).toDF()

Exception:

2019-05-02 00:25:29 INFO  AbstractReadManager:173 - Shutting down record reader, no segments remaining.
2019-05-02 00:25:29 ERROR Executor:91 - Exception in task 0.0 in stage 2.0 (TID 2)
java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.DataInputStream.readInt(DataInputStream.java:387)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:460)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:453)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:284)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
        at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:152)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Reason why I think it might be an issue with connector and not pyspark:

It is happening right after AbstractReadManager:173 - Shutting down record reader, no segments remaining.
and this is happening consistently

I have separately tested for pyspark where list of map is converted to DF and I don't see any issue there

Problem with connection to DynamoDB

I have a problem while trying to use DynamoDBStorageHandler to collect partial data from DynamoDB.

I am using the newest EMR version with hive, EMR roles have full access to DynamoDb in IAM.
EMR is in the same region as DynamoDb table. NetworkACL allows all the necessary traffic. Security groups are set to public. I also pass dynamodb.endpoint, dynamodb.region, fs.s3a.access.key, fs.s3a.secret.key, dynamodb.regionid to hive-site.xml and core-site.xml.

I checked documentation many times, tried numerous configurations and despite all of my effort I still have the same result every time:

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: Could not lookup table test_input in DynamoDB.

But this table exists. When I run aws(cli) configure on EMR master and pass credentials I can list it.
Also when I run Wireshark and use Hive with DynamoDBStorageHandler there is no connection to any DynamoDB endpoints.

Do you have any idea why it is not working? Some bug? Lack of documentaiton?
Bellow is the code that I'm running:

CREATE EXTERNAL TABLE raw_data (id bigint)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES (
"dynamodb.table.name" = "test_input",
"dynamodb.region"="eu-central-1",
"dynamodb.column.mapping" = "id:id"
);

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.