Coder Social home page Coder Social logo

nielsbasjes / splittablegzip Goto Github PK

View Code? Open in Web Editor NEW
68.0 8.0 8.0 1.4 MB

Splittable Gzip codec for Hadoop

License: Apache License 2.0

Java 89.92% Shell 1.30% PigLatin 1.62% Makefile 7.15%
hadoop codec gzip splittable pig gzip-codec gzipped-files mapreduce-java spark

splittablegzip's Introduction

Making gzip splittable for Hadoop

Github actions Build status License Maven Central If this project has business value for you then don't hesitate to support me with a small donation. If this project has business value for you then don't hesitate to support me with a small donation.

In many Hadoop production environments you get gzipped files as the raw input. Usually these are Apache HTTPD logfiles. When putting these gzipped files into Hadoop you are stuck with exactly 1 map task per input file. In many scenarios this is fine.

However when doing a lot of work in this very first map task it may very well be advantageous to dividing the work over multiple tasks, even if there is a penalty for this scaling out.

This addon for Hadoop makes this possible.

Benchmark

I did benchmarking jobs to see how this solution scales and performs. The software and the results can be found in the Benchmark folder.

In general we can say that you win as long as there is unused capacity in your cluster.

Graph of the results

Requirements

First of all this only works with Hadoop 1.1.0 and up because this depends on the presence of the SplittableCompressionCodec interface.

I tested it with Hortonworks 2.1.2 and Cloudera CDH 4.5.0.

Downloads

Sources

Currently it can only be downloaded via github.

https://github.com/nielsbasjes/splittablegzip

Binary

For normal projects you can simply download the prebuilt version from maven central. So when using maven you can simply add this to your project

<dependency>
  <groupId>nl.basjes.hadoop</groupId>
  <artifactId>splittablegzip</artifactId>
  <version>1.3</version>
</dependency>

Building

On CDH the build fails if the native gzip could not be loaded for running the unit tests. To fix this you need to install the native package and set the environment so it can find them for your platform. I.e. Do something like this before starting the build or loading your IDE.

export LD_LIBRARY_PATH=/usr/lib/hadoop-0.20/lib/native/Linux-amd64-64

Installation

  1. Place the jar file in the classpath of your hadoop installation.

  2. Enable this codec and make sure the regular GzipCodec is NOT used. This can be done by changing the io.compression.codecs property to something like this:

    org.apache.hadoop.io.compress.DefaultCodec, nl.basjes.hadoop.io.compress.SplittableGzipCodec, org.apache.hadoop.io.compress.BZip2Codec

  3. Set the split size to something that works in your situation. This can be done by setting the appropriate values for mapreduce.input.fileinputformat.split.minsize and/or mapreduce.input.fileinputformat.split.maxsize.

Usage examples

Choosing the configuration settings

How it works

For each "split" the gzipped input file is read from the beginning of the file till the point where the split starts, thus reading, decompressing and discarding (wasting!) everything that is before the start of the split.

FACT: Files compressed with the Gzip codec are NOT SPLITTABLE. Never have been, never will be.

This codec offers a trade off between "spent resources" and "scalability" when reading Gzipped input files by simply always starting at the beginning of the file.

So in general this "splittable" Gzip codec will WASTE CPU time and FileSystem IO (HDFS) and probably other system resources (Network) too to reduce the "wall clock" time in some real-life situations.

When is this useful?

Assume you have a heavy map phase for which the input is a 1GiB Apache httpd logfile. Now assume this map takes 60 minutes of CPU time to run. Then this task will take 60 minutes to run because all of that CPU time must be spent on a single CPU core ... Gzip is not splittable!

This codec will waste CPU power by always starting from the start of the gzipped file and discard all the decompressed data until the start of the split has been reached.

Decompressing a 1GiB Gzip file usually takes only a few (2-4) minutes.

So if a "60 minutes" input file is split into 4 equal parts then:

  1. the 1st map task will
    • process the 1st split (15 minutes)
  2. the 2nd map task will
    • discard the 1st split ( 1 minute ).
    • process the 2nd split (15 minutes).
  3. the 3rd map task will
    • discard the 1st split ( 1 minute ).
    • discard the 2nd split ( 1 minute ).
    • process the 3rd split (15 minutes).
  4. the 4th task will
    • discard the 1st split ( 1 minute ).
    • discard the 2nd split ( 1 minute ).
    • discard the 3rd split ( 1 minute ).
    • process the 4th split (15 minutes).

Because all tasks run in parallel the running time in this example would be 18 minutes (i.e. the worst split time) instead of the normal 60 minutes. We have wasted about 6 minutes of CPU time and completed the job in about 30% of the original wall clock time.

Tuning for optimal performance and scalability.

The overall advise is to EXPERIMENT with the settings and do benchmarks.

Remember that:

  • Being able to split the input has a positive effect scalability IFF there is room to scale out to.
  • This codec is only useful if there are less Gzipped input file(s) than available map task slots (i.e. some slots are idle during the input/map phase).
  • There is a way of limiting the IO impact. Note that in the above example the 4th task will read and decompress the ENTIRE input file.
  • Splitting increases the load on (all kinds of) system resources: CPU and HDFS/Network. The additional load on the system resources has a negative effect on the scalability. Splitting a file into 1000 splits will really hammer the datanodes storing the first block of the file 1000 times.
  • More splits also affect the number of reduce tasks that follow.
  • If you create more splits than you have map task slots you will certainly have a suboptimal setting and you should increase the split size to reduce the number of splits.

A possible optimum:

  • Upload the input files into HDFS with a blocksize that is equal (or a few bytes bigger) than the file size.

    hadoop fs -Ddfs.block.size=1234567890 -put access.log.gz /logs

    This has the effect that all nodes that have "a piece of the file" always have "the entire file". This ensures that no network IO is needed for a single node to read the file IFF it has it locally available.

  • The replication of the HDFS determines on how many nodes the input file is present. So to avoid needless network traffic the number of splits must be limited to AT MOST the replication factor of the underlying HDFS.

  • Try to make sure that all splits of an input file are roughly the same size. Don't be surprised if the optimal setting for the split size turns out to be 500MiB or even 1GiB.

Alternative approaches

Always remember that there are alternative approaches:

  • Decompress the original gzipped file, split it into pieces and recompress the pieces before offering them to Hadoop. For example: http://stackoverflow.com/questions/3960651
  • Decompress the original gzipped file and compress using a different splittable codec. For example by using bzip2 or not compressing at all.

Implementation notes

There were two major hurdles that needed to be solved to make this work:

  • The reported position depends on the read blocksize.

    If you read information in "records" the getBytesRead() will return a value that jumps incrementally. Only after a new disk block has been read will the getBytesRead return a new value. "Read" means: read from disk an loaded into the decompressor but does NOT yet mean that the uncompressed information was read.

    The solution employed is that when we get close to the end of the split we switch to a crawling mode. This simply means that the disk reads are reduced to 1 byte, making the position reporting also 1 byte accurate. This was implemented in the ThrottleableDecompressorStream.

  • The input is compressed.

    If you read 1 byte (uncompressed) you do not always get an increase in the reported getBytesRead(). This happens because the value reported by getBytesRead is all about the filesize on disk (= compressed) and compressed files have less bytes than the uncompressed data. This makes it impossible to make two splits meet accurately. The solution is based around the concept that we try to report the position as accurately as possible but when we get really close to the end we stop reporting the truth and we start lying about the position. The lie we use to cross the split boundry is that 1 uncompressed byte read is reported as 1 compressed byte increase in position. This was implemented using a simple state machine with 3 different states on what position is reported through the getPos(). The state is essentially selected on the distance to the end. These states are:

    • REPORT

      Normally read the bytes and report the actual disk position in the getPos().

    • HOLD

      When very close to the end we no longer change the reported file position for a while.

    • SLOPE

      When we are at the end: start reporting 1 byte increase from the getPos for every uncompressed byte that was read from the stream.

    The overall effect is that the position reporting near the end of the split no longer represents the actual position and this makes the position usable for reliably splitting the input stream. The actual point where the file is split is shifted a bit to the back of the file (we're talking bytes, not even KiB) where this shift actually depends on the compression levels of the data in the stream. If we start too early the split may happen a byte too early and in the end the last split may lose the last record(s). So that's why we hold for a while and only start the slope at the moment we are certain we are beyond the indicated "end". To ensure the split starts at exactly the same spot as the previous split would end: we find the start of a split by running over the "part that must be discarded" as-if it is a split.

History

Originally this feature was submitted to be part of the core of Hadoop.

See: Gzip splittable (HADOOP-7076).

Created by

This idea was conceived and implemented by Niels Basjes.

splittablegzip's People

Contributors

dmoore247 avatar nielsbasjes avatar renovate-bot avatar renovate[bot] avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

splittablegzip's Issues

Thank you - SplittableGzip works out of the box with Apache Spark!

I came across this project via a comment on a Spark Jira ticket where I was thinking about a way to split gzip files that is similar to what this project does. I was delighted to learn that someone had already thought through and implemented such a solution, and from the looks of it done a much better job at it than I could have.

So I just wanted to report here for the record, since gzipped files are a common stumbling block for Apache Spark users, that your solution works with Apache Spark without modification.

Here is an example, which I've tested against Apache Spark 2.4.4 using the Python DataFrame API:

# splittable-gzip.py
from pyspark.sql import SparkSession


if __name__ == '__main__':
    spark = (
        SparkSession.builder
        # If you want to change the split size, you need to use this config
        # instead of mapreduce.input.fileinputformat.split.maxsize.
        # I don't think Spark DataFrames offer an equivalent setting for
        # mapreduce.input.fileinputformat.split.minsize.
        .config('spark.sql.files.maxPartitionBytes', 1000 * (1024 ** 2))
        .getOrCreate()
    )

    print(
        spark.read
        # You can also specify this option against the SparkSession.
        .option('io.compression.codecs', 'nl.basjes.hadoop.io.compress.SplittableGzipCodec')
        .csv(...)
        .count()
    )

Run this script as follows:

spark-submit --packages "nl.basjes.hadoop:splittablegzip:1.2" splittable-gzip.py

Here's what I see in the Spark UI when I run this script against a 20 GB gzip file on my laptop:

Screen Shot 2019-10-04 at 8 46 35 PM

You can see in the task list the behavior described in the README, with each task reading from the start of the file to its target split.

And here is the Executor UI, which shows every available core running concurrently against this single file:

Screen Shot 2019-10-04 at 9 19 20 PM

I will experiment some more with this project -- and perhaps ask some questions on here, if you don't mind -- and then promote it on Stack Overflow and in the Boston area.

Thank you for open sourcing this work!

Action Required: Fix Renovate Configuration

There is an error with this repository's Renovate configuration that needs to be fixed. As a precaution, Renovate will stop PRs until it is resolved.

Location: renovate.json
Error type: The renovate configuration file contains some invalid settings
Message: Invalid regExp for packageRules[0].allowedVersions: '!/^(?i).*[-_\.](Alpha|Beta|RC|M|EA|Snap|snapshot|jboss|atlassian)[-_\.]?[0-9]?.*$/'

Handle very small files

If you have a file that is smaller than 4KB the codec will fail.
This is undesired behaviour.

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

This repository currently has no open or pending branches.

Detected dependencies

github-actions
.github/workflows/build.yml
  • actions/checkout v4
  • actions/cache v4
  • actions/setup-java v4
  • codecov/codecov-action v4.3.1
maven
Benchmark/javamr/pom.xml
  • junit:junit 4.13.2
  • org.apache.hadoop:hadoop-client 3.4.0
Benchmark/pig/pom.xml
hadoop-codec/pom.xml
  • junit:junit 4.13.2
  • org.slf4j:slf4j-api 2.0.13
  • org.apache.maven.plugins:maven-gpg-plugin 3.2.4
  • org.apache.maven.plugins:maven-source-plugin 3.3.1
  • org.apache.maven.plugins:maven-deploy-plugin 3.1.2
  • org.apache.maven.plugins:maven-javadoc-plugin 3.6.3
  • org.apache.maven.plugins:maven-pmd-plugin 3.22.0
  • org.codehaus.mojo:findbugs-maven-plugin 3.0.5
pom.xml
  • org.apache.hadoop:hadoop-client 3.4.0

  • Check this box to trigger a request for Renovate to run again on this repository

Failing on Spark 3.1.2

Hi,
I get an exception trying to start session with spark.jars.packages='nl.basjes.hadoop:splittablegzip:1.3'
java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: Could not initialize class org.slf4j.LoggerFactory when creating Hive client using classpath

Anyone with experience in running on Spark 3+?

Fails on spark with "The provided InputSplit bytes which is too small"

Hi! Thank you for this great library. We used it to process our large input gz files, but we faced with some problem.

java.lang.IllegalArgumentException: The provided InputSplit (786432000;786439029] is 7029 bytes which is too small. (Minimum is 65536)

In our company we use HDP 2.6 with spark 2.3.
I tried to find min split parameter for spark, but spark.hadoop.mapreduce.input.fileinputformat.split.minsize doesn't work. Only spark.sql.files.maxPartitionBytes setting realy works.
Give me some advice, please, what can I do? Or may be it's possible to fix problem in library?

Guide/readme/example for using with AWS Glue ETL job

I wonder if you could make suggestions on how to use this in an AWS glue job. My method does not involve using spark-submit but rather creating job definitions and run-job using boto3 tools.

When I try to use this in my script, i get:
pyspark.sql.utils.IllegalArgumentException: Compression codec nl.basjes.hadoop.io.compress.SplittableGzipCodec not found.

have tried passing --conf nl.basjes.hadoop.io.compress.SplittableGzipCodec, -packages nl.basjes.hadoop.io.compress.SplittableGzipCodec and other methods as args to job to no avail. I think I must need to put a copy of the codec on s3 and point to it with extra-files or other arg?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.