apache / hudi Goto Github PK
View Code? Open in Web Editor NEWUpserts, Deletes And Incremental Processing on Big Data.
Home Page: https://hudi.apache.org/
License: Apache License 2.0
Upserts, Deletes And Incremental Processing on Big Data.
Home Page: https://hudi.apache.org/
License: Apache License 2.0
Probably HoodieStorage is a better name.. and as detailed on intro, we will have two storages
Only support RO table
Supports both RO & Realtime tables
This is tricky as log files will have multiple commits. This requires rewriting avro log file.
HoodieInputFormat should pick up the start_ts from the config and merge the _hoodie_commit_time predicate with the predicates set already. This allows the processing SQL to be exactly the same for backfill and incremental processing.
SET hoodie.<table_name>.consume.mode=LATEST
Select * from table1; // this would run against all the data in table1
SET hoodie.<table_name>.consume.mode=INCREMENTAL
SET hoodie.<table_name>.start.timestamp=20170101132319
Select * from table1; // this would run against only data inserted after 2017/01/01 12:23:19
Today we have to do
SET hoodie.<table_name>.consume.mode=INCREMENTAL
Select * from table1 where _hoodie_commit_ts > '20170101132319';
Hoodie tables should support snapshotting for backup and recovery
Right now, we dont have benefits like small insert handling due to this
Commits are going to be more frequent - this should be appended to Recoverable avro file reader/writer
Compaction commit should be a special commit with file details containing details about the commit
HoodieTableMetadata to support API which includes compaction commits
WriteStatus and HoodieCommitMetadata needs a cleanup.
With implementation of merge-on-read underway
As a pre-requisite, This will be good time to overhaul hoodie-commons and work on right abstractions.
Also introduce java 8 features into code to make it more succinct
Approach used in prestodb/presto#7002
Actual changes will be on Apache Spark, this is for tracking
Specifically following tests need to be executed across Hive/Presto/Spark2.x
cc @prazanna , will tackle the tests here as a separate PR .
This would be super useful in troubleshooting data issues
This approach is cleaner , to integrate with presto and we can follow similar approach to Spark as well
Commit needs to be done in HoodieClient and in HoodieCompacter. This needs to be refactored into a common place.
Following tools to be moved over
Add hoodie-hive module which can register hoodie datasets to hive metastore
I believe we have figured out the root cause
Step 1:
Executor starts writing 11337c2a-9acd-4c07-aa0e-de5e78e9f951_393_20170104223220.parquet, but failed (reason unknown)
17/01/04 22:51:39 INFO executor.Executor: Running task 393.0 in stage 34.0 (TID 23337)
17/01/04 22:51:39 INFO io.HoodieUpdateHandle: Merging new data into oldPath /app/hoodie/schemaless/trifle-client_bills-tcb005/2016/11/25/11337c2a-9acd-4c07-aa0e-de5e78e9f951_1232_20170104210747.parquet, as newPath /app/hoodie/schemaless/trifle-client_bills-tcb005/2016/11/25/11337c2a-9acd-4c07-aa0e-de5e78e9f951_393_20170104223220.parquet
17/01/04 22:52:16 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
17/01/04 22:52:16 INFO storage.DiskBlockManager: Shutdown hook called
Step 2:
Driver realizes this
1253563 [sparkDriver-akka.actor.default-dispatcher-37] ERROR org.apache.spark.scheduler.cluster.YarnClusterScheduler - Lost executor 466 on hadoopworker674-sjc1.prod.uber.internal: remote Rpc client disassociated
1253564 [sparkDriver-akka.actor.default-dispatcher-37] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 393.0 in stage 34.0 (TID 23337, hadoopworker674-sjc1.prod.uber.internal): ExecutorLostFailure (executor 466 lost)
and Schedules a retry of the task
1253582 [sparkDriver-akka.actor.default-dispatcher-37] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 393.1 in stage 34.0 (TID 23543, hadoopworker523-sjc1.prod.uber.internal, PROCESS_LOCAL, 1901 bytes)
Step 3:
Retry failed because of - maybe datanode is down / restarted
17/01/04 22:56:48 ERROR io.HoodieUpdateHandle: Error in update task at commit 20170104223220
org.apache.spark.shuffle.FetchFailedException: Failed to connect to hadoopworker674-sjc1.prod.uber.internal/10.11.37.11:7337
...
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
at com.uber.hoodie.io.HoodieUpdateHandle.init(HoodieUpdateHandle.java:57)
at com.uber.hoodie.io.HoodieUpdateHandle.<init>(HoodieUpdateHandle.java:48)
at com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:375)
at com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:425)
at com.uber.hoodie.HoodieWriteClient$5.call(HoodieWriteClient.java:181)
at com.uber.hoodie.HoodieWriteClient$5.call(HoodieWriteClient.java:177)
Because of this error HoodieUpdateHandle is not initialized properly
https://code.uberinternal.com/diffusion/DAHOOD/browse/master/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java;1006e4b0b115ddfe246fe3f43024a9213a003637$93
This error is kind of gobbled in and the task succeeds
17/01/04 22:56:48 ERROR table.HoodieCopyOnWriteTable: Error in finding the old file path at commit 20170104223220
17/01/04 22:56:48 INFO table.HoodieCopyOnWriteTable: Upsert Handle has partition path as null null, WriteStatus {fileId=null, globalError='org.apache.spark.shuffle.FetchFailedException: Failed to connect to hadoopworker674-sjc1.prod.uber.internal/10.11.37.11:7337', hasErrors='false', errorCount='0', errorPct='NaN'}
17/01/04 22:56:48 INFO storage.MemoryStore: ensureFreeSpace(3142) called with curMem=2185001, maxMem=1111369973
17/01/04 22:56:48 INFO storage.MemoryStore: Block rdd_53_393 stored as bytes in memory (estimated size 3.1 KB, free 1057.8 MB)
17/01/04 22:56:48 INFO executor.Executor: Finished task 393.1 in stage 34.0 (TID 23543). 1807 bytes result sent to driver
Step 4:
When Partition path is null on the write stat, it is ommitted from the commit json
https://code.uberinternal.com/diffusion/DAHOOD/browse/master/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java;1006e4b0b115ddfe246fe3f43024a9213a003637$55
Hence we do not see that fileID in the commit metadata and also HoodieUpdateHandle does not cleanup the partial written file in Step 1 and susequent bloom filter index checks fails because of this file.
Actual problem:
Task succeeds when HoodieUpdateHandle fails on init(), if it fails on init() it should show a UpsertException.
Some of this discussion happened already in #64
High level, we want to decouple hardcoded assumptions on Parquet, and sequcne file etc and introduce a HoodieFile interface
then there could be subclasses for
HoodieDataFile (for Parquet/ORC storing columnar data)
HoodieLogFile (for sequence/avro files storing commits/delta records)
and so forth
This is to ensure that Driver does not do anything that breaks the idempotency of the upsert or insert DAG. Re-compute can always happen if the persisted RDD[WriteStatus] is lost (or some partitions of the RDD are lost).
This is useful in repairs of a hoodie dataset, where the row_key and partition_field mapping has changed because of an error.
FYI - @vinothchandar
Writers to log can fail in the middle. RollingLogWriter should be able to
detect this
Roll over to a new log so that new records are not appended to corrupted log files
Would be nice to have something native that understand queries on Hoodie tables
Testing a new client for task tracking
This will help with triaging and identifying issues with a hoodie dataset. This becomes particularly useful with merge-on-read implementations.
This would be helpful to deal with issues where the query fails from FileNotFoundException..
This should and is a rarity. but good to have this tool in the bag
Getting the following, when I try to follow the README for testing..
Caused by: java.lang.ClassNotFoundException: com.uber.hoodie.exception.InvalidDatasetException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 27 more
FYI @prazanna
This houses the merge-on-read record reader
We have some naive/first take code sitting around done for the POC on Merge on Read. Port the code over the model refactor branch and merge them all into master and continue refining on master.
As we spoke a few times, it would be nice to have the rollup-merge functionality in Hoodie, i.e. Hoodie should be able to apply following functions on individual columns in the 2 or more rows to be merged:
Max
Min
Avg
Sum
Var
When Joins need not be spilled on to disk - we can incrementally pull, transform and upsert in one DAG.
This utility will show case that.
Two things can be cached technically
Caching the mapping of uuid <=> fileId (higher cost & longer recovery time & high performance)
Caching the mapping of fileId <=> bloom Filters (lower cost & low recovery time & better perf than HoodieBloomIndex)
What we need is something similar to Spark Streaming's built in mapWithState HashMap .
Now that we have the log format v2 implemented and stable, good time to tap into it to implement incremental pull on MOR datasets.
Algorithm :
Simplified problem is : Given a begin_instant_time
and end_instant_time
(part of active timeline) pull data that was written between them.
// instants from earliest to latest
instants_to_pull = commitsTimeline.findInstantsInRange(begin, end)
fileIDToSliceMap = {}
for (instant : instants_to_pull) {
for (fileId : instant.affectedFileIds()){
fileIDToSliceMap.put(fileId, fileSlice)
}
}
// build compacted scan for set of fileIDs, filtering for instant time.
// Optimization : if instant time is only in the log, then seek ahead efficiently.
Considerations :
This involves
Use averageBytesPerRecord() logic to configure the number of entries in BloomFilter.
If the number of entries in bloom filters / number of row keys is too small, then bloom filter becomes inefficient with a lot of false positives.
Basic idea here is to use the commit metadata to be self contained about managing the storage
https://travis-ci.org/uber/hoodie/builds/187340755
Its pulling in 0.2.8-SNAPSHOT since we have 0.2.7 in maven I suppose.. Need to debug more.
Add a module with the necessary inputformat to be able read datasets ingested using Hoodie library
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
in
org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) com.uber.hoodie.HoodieWriteClient.commit(HoodieWriteClient.java:313) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob$.writeRDDtoFile(HoodieSchemalessBackfillJob.scala:102) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob$.writeRDDtoFile(HoodieSchemalessBackfillJob.scala:57) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob$.backfill(HoodieSchemalessBackfillJob.scala:133) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob$.main(HoodieSchemalessBackfillJob.scala:241) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob.main(HoodieSchemalessBackfillJob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)
We need metadata stored at the HDFS partitions to link back to the base path that has the hoodie metadata.
We cannot use the table location as it would not be applicable for all the query engines and there is no enforcement on what value the table location should be if partition paths are pointing to the right paths.
Failure and retry - Hdfs Throws AlreadyBeingCreatedException - when fs.append, because lease is not expired or close is not called. Roll over the log if this happened?
Need to think about this.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.