Coder Social home page Coder Social logo

apache / hudi Goto Github PK

View Code? Open in Web Editor NEW
5.1K 1.2K 2.4K 1.13 GB

Upserts, Deletes And Incremental Processing on Big Data.

Home Page: https://hudi.apache.org/

License: Apache License 2.0

Shell 0.73% Java 78.03% Scala 19.92% Dockerfile 0.14% Python 0.15% Mustache 0.01% ANTLR 1.00% Thrift 0.01%
hudi apachehudi datalake bigdata apachespark incremental-processing stream-processing data-integration apacheflink

hudi's Issues

Rethink & Refactor HoodieTable abstraction

Probably HoodieStorage is a better name.. and as detailed on intro, we will have two storages

  • Copy_on_write

Only support RO table

  • merge_on-read

Supports both RO & Realtime tables

Incremental pull should set the _hoodie_commit_time automatically from configs in HoodieInputFormat

HoodieInputFormat should pick up the start_ts from the config and merge the _hoodie_commit_time predicate with the predicates set already. This allows the processing SQL to be exactly the same for backfill and incremental processing.

SET hoodie.<table_name>.consume.mode=LATEST
Select * from table1; // this would run against all the data in table1
SET hoodie.<table_name>.consume.mode=INCREMENTAL
SET hoodie.<table_name>.start.timestamp=20170101132319
Select * from table1; // this would run against only data inserted after 2017/01/01 12:23:19

Today we have to do

SET hoodie.<table_name>.consume.mode=INCREMENTAL
Select * from table1 where _hoodie_commit_ts > '20170101132319'; 

Implement Savepoints

Hoodie tables should support snapshotting for backup and recovery

  • Provide a way to sym link data files of a commit and Hoodie cleaner does not delete them
  • Provide API to roll back to a specific snapshotted commit
  • Think about how it affects incremental pull

Commit Storage changes for merge-on-read

Commits are going to be more frequent - this should be appended to Recoverable avro file reader/writer
Compaction commit should be a special commit with file details containing details about the commit
HoodieTableMetadata to support API which includes compaction commits

Cleanup WriteStatus

WriteStatus and HoodieCommitMetadata needs a cleanup.

  • Do we need all the records to be returned in the status?
  • Make it immutable and use builder to build the object
  • HoodieCommit / HoodieCommitMetadata - confusing as to what has what

Hoodie Upset fails with InvalidParquetMetadata because of a parquet file partially written

I believe we have figured out the root cause

Step 1:
Executor starts writing 11337c2a-9acd-4c07-aa0e-de5e78e9f951_393_20170104223220.parquet, but failed (reason unknown)

17/01/04 22:51:39 INFO executor.Executor: Running task 393.0 in stage 34.0 (TID 23337)
17/01/04 22:51:39 INFO io.HoodieUpdateHandle: Merging new data into oldPath /app/hoodie/schemaless/trifle-client_bills-tcb005/2016/11/25/11337c2a-9acd-4c07-aa0e-de5e78e9f951_1232_20170104210747.parquet, as newPath /app/hoodie/schemaless/trifle-client_bills-tcb005/2016/11/25/11337c2a-9acd-4c07-aa0e-de5e78e9f951_393_20170104223220.parquet
17/01/04 22:52:16 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
17/01/04 22:52:16 INFO storage.DiskBlockManager: Shutdown hook called

Step 2:
Driver realizes this

1253563 [sparkDriver-akka.actor.default-dispatcher-37] ERROR org.apache.spark.scheduler.cluster.YarnClusterScheduler - Lost executor 466 on hadoopworker674-sjc1.prod.uber.internal: remote Rpc client disassociated
1253564 [sparkDriver-akka.actor.default-dispatcher-37] WARN org.apache.spark.scheduler.TaskSetManager - Lost task 393.0 in stage 34.0 (TID 23337, hadoopworker674-sjc1.prod.uber.internal): ExecutorLostFailure (executor 466 lost)

and Schedules a retry of the task

1253582 [sparkDriver-akka.actor.default-dispatcher-37] INFO  org.apache.spark.scheduler.TaskSetManager  - Starting task 393.1 in stage 34.0 (TID 23543, hadoopworker523-sjc1.prod.uber.internal, PROCESS_LOCAL, 1901 bytes)

Step 3:
Retry failed because of - maybe datanode is down / restarted

17/01/04 22:56:48 ERROR io.HoodieUpdateHandle: Error in update task at commit 20170104223220
org.apache.spark.shuffle.FetchFailedException: Failed to connect to hadoopworker674-sjc1.prod.uber.internal/10.11.37.11:7337
...
	at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
	at com.uber.hoodie.io.HoodieUpdateHandle.init(HoodieUpdateHandle.java:57)
	at com.uber.hoodie.io.HoodieUpdateHandle.<init>(HoodieUpdateHandle.java:48)
	at com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:375)
	at com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:425)
	at com.uber.hoodie.HoodieWriteClient$5.call(HoodieWriteClient.java:181)
	at com.uber.hoodie.HoodieWriteClient$5.call(HoodieWriteClient.java:177)

Because of this error HoodieUpdateHandle is not initialized properly
https://code.uberinternal.com/diffusion/DAHOOD/browse/master/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieUpdateHandle.java;1006e4b0b115ddfe246fe3f43024a9213a003637$93

This error is kind of gobbled in and the task succeeds

17/01/04 22:56:48 ERROR table.HoodieCopyOnWriteTable: Error in finding the old file path at commit 20170104223220
17/01/04 22:56:48 INFO table.HoodieCopyOnWriteTable: Upsert Handle has partition path as null null, WriteStatus {fileId=null, globalError='org.apache.spark.shuffle.FetchFailedException: Failed to connect to hadoopworker674-sjc1.prod.uber.internal/10.11.37.11:7337', hasErrors='false', errorCount='0', errorPct='NaN'}
17/01/04 22:56:48 INFO storage.MemoryStore: ensureFreeSpace(3142) called with curMem=2185001, maxMem=1111369973
17/01/04 22:56:48 INFO storage.MemoryStore: Block rdd_53_393 stored as bytes in memory (estimated size 3.1 KB, free 1057.8 MB)
17/01/04 22:56:48 INFO executor.Executor: Finished task 393.1 in stage 34.0 (TID 23543). 1807 bytes result sent to driver

Step 4:
When Partition path is null on the write stat, it is ommitted from the commit json
https://code.uberinternal.com/diffusion/DAHOOD/browse/master/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java;1006e4b0b115ddfe246fe3f43024a9213a003637$55

Hence we do not see that fileID in the commit metadata and also HoodieUpdateHandle does not cleanup the partial written file in Step 1 and susequent bloom filter index checks fails because of this file.

Actual problem:
Task succeeds when HoodieUpdateHandle fails on init(), if it fails on init() it should show a UpsertException.

Introduce neat abstractions for HoodieFile

Some of this discussion happened already in #64

High level, we want to decouple hardcoded assumptions on Parquet, and sequcne file etc and introduce a HoodieFile interface

then there could be subclasses for
HoodieDataFile (for Parquet/ORC storing columnar data)
HoodieLogFile (for sequence/avro files storing commits/delta records)

and so forth

Test issue

Testing a new client for task tracking

Hoodie-hadoop-mr jar is no longer a fat jar bundling common classes

Getting the following, when I try to follow the README for testing..

Caused by: java.lang.ClassNotFoundException: com.uber.hoodie.exception.InvalidDatasetException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 27 more

FYI @prazanna

Add first class support for Rollups in Hoodie

As we spoke a few times, it would be nice to have the rollup-merge functionality in Hoodie, i.e. Hoodie should be able to apply following functions on individual columns in the 2 or more rows to be merged:

Max
Min
Avg
Sum
Var

Implement a cached version of Bloom Index

Two things can be cached technically

Caching the mapping of uuid <=> fileId (higher cost & longer recovery time & high performance)
Caching the mapping of fileId <=> bloom Filters (lower cost & low recovery time & better perf than HoodieBloomIndex)

What we need is something similar to Spark Streaming's built in mapWithState HashMap .

Implement incremental pull with merge-on-read

Now that we have the log format v2 implemented and stable, good time to tap into it to implement incremental pull on MOR datasets.

Algorithm :

Simplified problem is : Given a begin_instant_time and end_instant_time (part of active timeline) pull data that was written between them.

// instants from earliest to latest
instants_to_pull =  commitsTimeline.findInstantsInRange(begin, end)

fileIDToSliceMap = {}
for (instant : instants_to_pull) {
    for (fileId : instant.affectedFileIds()){
      fileIDToSliceMap.put(fileId, fileSlice)
    }
}

// build compacted scan for set of fileIDs, filtering for instant time. 
// Optimization :  if instant time is only in the log, then seek ahead efficiently. 

Considerations :

  • How is the _commit_time metadata preserved as data moves from log to parquet during compaction? #376
  • Should we support two modes - de-duped log mode & raw raw mode?
    A: For COW, incremental pull never returns duplicates. Probably good to keep the same semantics.
  • Need to implement this seamlessly via two entry points , Hive table & natively in data source
    A: Probably makes sense to focus on InputFormat based impl & wrap that using HiveContext or similar inside the datasource. Need to ensure minimally disruptive configs are used. for e.g: register using HoodieSerDe not Parquet SerDe?

Hoodie dataset not queryable because of invalid parquet files. All invalid parquet files are of 4b in length.

Root cause:

  • 20170110210127 commit succeded with all files having valid content, archived and cleaned.
  • Komondor gets the RDD[WriteStatus] and calculates the count() on this RDD to update num of bad records etc
  • Hoodie persists the RDD to avoid recomputation
  • Because of DataNode restarts, some of the persisted is not available and Spark re-executes the upsert DAG for the partitions missing
  • This kicks off the DAG again with the same commit time and if this DAG tried 3 times to re-compute and failed (again most likely because of data nodes restarting)
  • We delete the data file path is already existing to account for partial failures in the update task, so a bunch of data files are deleted and recreated
  • The files that were open and task failed were all 4b files (just the parquet header or magic block)

Resolution:

  • Should not auto-commit by default. Commit should be called after all the processing and publish the data files atomically.

HoodieClient.commit() fails to collect HoodieWriteStat

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:512) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:429) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

in

org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:47) com.uber.hoodie.HoodieWriteClient.commit(HoodieWriteClient.java:313) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob$.writeRDDtoFile(HoodieSchemalessBackfillJob.scala:102) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob$.writeRDDtoFile(HoodieSchemalessBackfillJob.scala:57) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob$.backfill(HoodieSchemalessBackfillJob.scala:133) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob$.main(HoodieSchemalessBackfillJob.scala:241) com.uber.komondor.hoodie.HoodieSchemalessBackfillJob.main(HoodieSchemalessBackfillJob.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:498) org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)

Generic way to link partition path to dataset base path

https://github.com/uber/hoodie/blob/master/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/HoodieInputFormat.java#L247

We need metadata stored at the HDFS partitions to link back to the base path that has the hoodie metadata.

We cannot use the table location as it would not be applicable for all the query engines and there is no enforcement on what value the table location should be if partition paths are pointing to the right paths.

Handle task failures with merge-on-read

Failure and retry - Hdfs Throws AlreadyBeingCreatedException - when fs.append, because lease is not expired or close is not called. Roll over the log if this happened?

Need to think about this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.