sunchao / parquet-rs Goto Github PK
View Code? Open in Web Editor NEWApache Parquet implementation in Rust
License: Apache License 2.0
Apache Parquet implementation in Rust
License: Apache License 2.0
Similar to PARQUET-684, dictionary encoding can potentially be improved using SIMD instructions such as gather
. There's a blog post that describes this idea, and a prototype.
This is an RFC for write support in this crate.
Prototype PR #127. API resembles closely read support. See the design overview and implementation details below in the comments.
Sub-tasks:
This is going to be easy. I have some ugly prototype code working already.
let path = Path::new(&args[1]);
let file = File::open(&path).unwrap();
let parquet_reader = SerializedFileReader::new(file).unwrap();
let row_group_reader = parquet_reader.get_row_group(0).unwrap();
for i in 0..row_group_reader.num_columns() {
match row_group_reader.get_column_reader(i) {
Ok(ColumnReader::Int32ColumnReader(ref mut r)) => {
let batch_size = 1024;
let sz = mem::size_of::<i32>();
let p = memory::allocate_aligned((batch_size * sz) as i64).unwrap();
let ptr_i32 = unsafe { mem::transmute::<*const u8, *mut i32>(p) };
let mut buf = unsafe {
slice::from_raw_parts_mut(ptr_i32, batch_size) };
// let mut builder : Builder<i32> = Builder::with_capacity(1024);
// let buffer = builder.finish();
match r.read_batch(1024, None, None, &mut buf) {
Ok((count,_)) => {
let arrow_buffer = Buffer::from_raw_parts(ptr_i32, count as i32);
let arrow_array = Array::from(arrow_buffer);
match arrow_array.data() {
ArrayData::Int32(b) => {
println!("len: {}", b.len());
println!("data: {:?}", b.iter().collect::<Vec<i32>>());
},
_ => println!("wrong type")
}
},
_ => println!("error")
}
}
_ => println!("column type not supported")
}
}
Now we need to come up with a real design and I probably need to add more helper methods to Arrow to make this easier.
It may be useful to do some profiling on encoding & decoding. We can use existing bench for this. Some useful scripts:
For CPU:
perf record -g -F 1000 <bench-name> --bench && perf script | stackcollapse-perf | flamegraph > flamegraph.svg
The stackcollapse-perf
and flamegraph
are from flamegraph.
For cache performance (from this thread):
valgrind --tool=callgrind --dump-instr=yes --collect-jumps=yes --simulate-cache=yes <bench-name>
I also found that we may need to add the following section in Cargo.toml
:
[profile.bench]
debug = true
opt-level = 1
If opt-level
is too high, rustc will inline most of the functions and therefore it's hard to see which function is the bottleneck. Not sure if there's better way to do this.
Currently we do not publish documentation when releasing a new version of crate. It would be very useful for people to get started, if we had one.
Items to cover:
I'm curious if there are plans to support HDFS within this crate?
The Java parquet library allows parquet files to be read locally or from HDFS and in both cases it is possible to push down the projection and only retrieve the columns needed, which can make a huge difference in performance.
Should have enough information for people to get started.
This is the preliminary work before we can implement reading for Parquet files.
We may just follow what Impala & Parquet-cpp does (https://github.com/apache/parquet-cpp/blob/master/src/parquet/util/memory.h), but replace with Rust's semantics.
At some point we should investigate on the possibility to publish parquet-rs as a crate so that it can be more easily used by other projects. So far I think one main issue is that parquet.rs
needs to be generated before compile time and there is no easy way to do this (something similar was done for protobuf rust which we may borrow). Otherwise, we have to put the generated parquet.rs
in the repo, which is not very desirable.
This crate doesn't seem to support writing Parquet files. Is this planned? I'd like to convert CSV files to Parquet.
We have decoder for this but not encoder.
I compiled list of items that could be fixed in the near future, though it is not a complete list:
parquet-mr/parquet-tools/cli
.Questions I have:
Parquet encoding format specifies boolean should be encoded as bit-packed, LSB first. In the current implementation we just encode it as plain 1-bit value.
It is a little confusing though, that the bit-packed encoding, is already deprecated and replaced by RLE/bit-packed hybrid encoding. Nevertheless, parquet-mr follows this specification and still uses the bit-packed encoding, while parquet-cpp uses the same plain encoding as us.
Remove parquet_thrift/parquet.rs
from test coverage.
I encountered this TODO the other day, and thought if we should fix it, since we added support for pretty much all decodings in Parquet.
Line with TODO: https://github.com/sunchao/parquet-rs/blob/master/src/column/reader.rs#L315
Suggest we open PR to add it. I am not sure how we are going to test - adding files with new encodings or using make pages function?
The current parquet.thrift
is outdated - we should update to the latest one (https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift) (and hopefully this won't break anything..)
I was rushing through this a bit to be honest, so may have failed to properly read all the instructions, but I copied steps from the travis yaml file to clone the thrift repo.
My build fails with:
$ make
make all-recursive
make[1]: Entering directory '/home/andy/git/thrift'
Making all in compiler/cpp
make[2]: Entering directory '/home/andy/git/thrift/compiler/cpp'
Making all in src
make[3]: Entering directory '/home/andy/git/thrift/compiler/cpp/src'
/bin/bash ../../../ylwrap thrift/thrifty.yy y.tab.c thrift/thrifty.cc y.tab.h `echo thrift/thrifty.cc | sed -e s/cc$/hh/ -e s/cpp$/hpp/ -e s/cxx$/hxx/ -e s/c++$/h++/ -e s/c$/h/` y.output thrift/thrifty.output -- yacc -d
yacc: e - line 1 of "/home/andy/git/thrift/compiler/cpp/src/thrift/thrifty.yy", syntax error
%code requires {
^
I'm using Ubuntu 16.04.4 LTS
Right now we deserialize File metadata from Thrift, but this process has to handle all row groups, which is inefficient in case only a subset is selected. It's better to do something similar to parquet-mr.
It may be good to add the features we support, as well as future work items into README, so people can easily know what is working and what is not.
Currently the SIMD feature requires the x86intrin
crate and a compile time flag RUSTFLAGS="-C target-feature=+sse4.2"
. Since x86/x86_64 SIMD is already stabilized, we should no longer need this crate dependency, and more importantly, enable feature check at run time via the command: is_x86_feature_detected
.
There is a TODO that I think we should complete to have full featured read support.
https://github.com/sunchao/parquet-rs/blob/master/src/file/metadata.rs#L268
https://github.com/sunchao/parquet-rs/blob/master/src/file/reader.rs#L357
Also take into consideration https://github.com/sunchao/parquet-rs/blob/master/src/file/reader.rs#L332.
Will work on it.
When I look up coverage per file for a commit, coveralls reports following:
The file "file/metadata.rs" isn't available on github. Either it's been removed, or the repo root directory needs to be updated.
I found that one could set repository root directory in coveralls: lemurheavy/coveralls-public#886 (comment)
@sunchao Could you have a look and possibly fix it in coveralls? So we can see what lines are not covered and add more unittests.
To evaluate performance, we should add benchmarks for encoding & decoding. Perhaps we can borrow some from the C++/Java version and compare the performance between these implementations.
Current read API is low-level, and you can deserialize column values and repetition and definition levels separately. We should add more of high level API that allows to map column batches into rows (structs or vectors).
I am currently working on Usage/Examples section in README and wanted to add couple of code snippets on how to use library (I hope you do not object to this). While working on it, found that there is a potential issue when reading columns with PLAIN_DICTIONARY encoding.
I am trying to read data/alltypes_plain.snappy.parquet
file that is in repository with code that is attached below.
Discovered that this returns me empty buffers when I read values. For example, the first column in Parquet file is "id" that has values 6
and 7
:
column 0:
--------------------------------------------------------------------------------
column type: INT32
column path: "id"
encodings: RLE PLAIN_DICTIONARY PLAIN
file path: N/A
file offset: 55
num of values: 2
total compressed size (in bytes): 51
total uncompressed size (in bytes): 47
data page offset: 27
index page offset: N/A
dictionary page offset: 4
When I run my code it prints following:
Row group: 0
Read column 0: 0 values (0 levels): [0, 0, 0, 0, 0, 0, 0, 0]
! Skip column 1, unknown type: BOOLEAN
Read column 2: 0 values (0 levels): [0, 0, 0, 0, 0, 0, 0, 0]
Read column 3: 0 values (0 levels): [0, 0, 0, 0, 0, 0, 0, 0]
Read column 4: 0 values (0 levels): [0, 0, 0, 0, 0, 0, 0, 0]
! Skip column 5, unknown type: INT64
! Skip column 6, unknown type: FLOAT
! Skip column 7, unknown type: DOUBLE
! Skip column 8, unknown type: BYTE_ARRAY
! Skip column 9, unknown type: BYTE_ARRAY
! Skip column 10, unknown type: INT96
All returned buffers are empty.
Turns out that there are several issues with reading values: when reading dictionary page we should not increment number of seen values, because they are just for dictionary, we have not read actual values yet, and also encoding assignment for data page .
diff contains updated test_file_reader
test:
diff --git a/src/column/reader.rs b/src/column/reader.rs
index 9d36918..cea0bc5 100644
--- a/src/column/reader.rs
+++ b/src/column/reader.rs
@@ -213,9 +213,11 @@ impl<'a, T: DataType> ColumnReaderImpl<'a, T> where T: 'static {
self.def_level_decoder = Some(def_decoder);
}
- if encoding == Encoding::PLAIN_DICTIONARY {
- encoding == Encoding::RLE_DICTIONARY;
- }
+ let encoding = if encoding == Encoding::PLAIN_DICTIONARY {
+ Encoding::RLE_DICTIONARY
+ } else {
+ encoding
+ };
let decoder =
if encoding == Encoding::RLE_DICTIONARY {
diff --git a/src/file/reader.rs b/src/file/reader.rs
index ea7871d..5863f57 100644
--- a/src/file/reader.rs
+++ b/src/file/reader.rs
@@ -308,7 +308,6 @@ impl PageReader for SerializedPageReader {
assert!(page_header.dictionary_page_header.is_some());
let dict_header = page_header.dictionary_page_header.as_ref().unwrap();
let is_sorted = dict_header.is_sorted.unwrap_or(false);
- self.seen_num_values += dict_header.num_values as i64;
Page::DictionaryPage {
buf: ByteBufferPtr::new(buffer), num_values: dict_header.num_values as u32,
encoding: Encoding::from(dict_header.encoding), is_sorted: is_sorted
@@ -396,7 +395,7 @@ mod tests {
let mut page_reader_0: Box<PageReader> = page_reader_0_result.unwrap();
let mut page_count = 0;
while let Ok(Some(page)) = page_reader_0.get_next_page() {
- let is_dict_type = match page {
+ let is_expected_page = match page {
Page::DictionaryPage{ buf, num_values, encoding, is_sorted } => {
assert_eq!(buf.len(), 32);
assert_eq!(num_values, 8);
@@ -404,12 +403,22 @@ mod tests {
assert_eq!(is_sorted, false);
true
},
- _ => false
+ Page::DataPage { buf, num_values, encoding, def_level_encoding, rep_level_encoding } => {
+ assert_eq!(buf.len(), 11);
+ assert_eq!(num_values, 8);
+ assert_eq!(encoding, Encoding::PLAIN_DICTIONARY);
+ assert_eq!(def_level_encoding, Encoding::RLE);
+ assert_eq!(rep_level_encoding, Encoding::BIT_PACKED);
+ true
+ },
+ _ => {
+ false
+ }
};
- assert!(is_dict_type);
+ assert!(is_expected_page);
page_count += 1;
}
- assert_eq!(page_count, 1);
+ assert_eq!(page_count, 2);
}
fn get_test_file<'a>(file_name: &str) -> fs::File {
With these changes I get following output - looks correct:
Row group: 0
Read column 0: 2 values (0 levels): [6, 7, 0, 0, 0, 0, 0, 0]
! Skip column 1, unknown type: BOOLEAN
Read column 2: 2 values (0 levels): [0, 1, 0, 0, 0, 0, 0, 0]
Read column 3: 2 values (0 levels): [0, 1, 0, 0, 0, 0, 0, 0]
Read column 4: 2 values (0 levels): [0, 1, 0, 0, 0, 0, 0, 0]
! Skip column 5, unknown type: INT64
! Skip column 6, unknown type: FLOAT
! Skip column 7, unknown type: DOUBLE
! Skip column 8, unknown type: BYTE_ARRAY
! Skip column 9, unknown type: BYTE_ARRAY
! Skip column 10, unknown type: INT96
Add CLI tools like parquet-mr/parquet-tools
as binaries, so people could use them to inspect files. We have already started doing it for file schema/metadata, we should add something to list records in a file, e.g. in JSON or tabular format or something similar.
When processing parquet files I would like the ability to get values like this:
let a = row.get_bool().unwrap();
let a = row.get_float().unwrap();
...
Currently I have to write pattern matching each time I want to extract a value.
Right now the test coverage is quite poor. We should solidify on this before working on new features.
Benchmark shows that the RLE/bit-packed encoding spent most of the time on load_deltas_in_mini_block
which in turn calls BitReader::get_value
. This is not very efficient as it get one boolean value at a time!
Parquet-671 implements some nice improvement for this. We should try to port it to our Rust implementation.
For required fields with PLAIN encoding, current code read_batch
in ColumnReaderImpl
goes into infinite loop. It happens when field is required, it does not have repetition/definition levels, therefore values_to_read
keeps being set to 0, which results in buffer being &[values_read..values_read + values_to_read]
-> empty. This goes into infinite loop because decoding for empty buffer returns 0, and loop never progresses.
Fix for the issue is setting values_to_read
explicitly, when field is required (assertion is optional):
diff --git a/src/column/reader.rs b/src/column/reader.rs
index 9d36918..2a230e8 100644
--- a/src/column/reader.rs
+++ b/src/column/reader.rs
@@ -154,6 +154,8 @@ impl<'a, T: DataType> ColumnReaderImpl<'a, T> where T: 'static {
}
}
}
+ } else {
+ values_to_read = batch_size;
}
if self.descr.max_rep_level() > 0 && rep_levels.is_some() {
diff --git a/src/encodings/decoding.rs b/src/encodings/decoding.rs
index cf98276..a76ace8 100644
--- a/src/encodings/decoding.rs
+++ b/src/encodings/decoding.rs
@@ -127,6 +127,7 @@ default impl<T: DataType> Decoder<T> for PlainDecoder<T> {
#[inline]
fn get(&mut self, buffer: &mut [T::T]) -> Result<usize> {
assert!(self.data.is_some());
+ assert!(buffer.len() != 0);
let data = self.data.as_mut().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
Here is schema information for which it fails.
version: 1
num of rows: 4
created by: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
message spark_schema {
REQUIRED INT32 a;
REQUIRED INT64 b;
OPTIONAL BYTE_ARRAY c;
REQUIRED group d {
REQUIRED INT32 a;
REQUIRED INT64 b;
OPTIONAL BYTE_ARRAY c;
}
REQUIRED group e (LIST) {
REPEATED group list {
REQUIRED INT32 element;
}
}
}
We have decoder for this but not encoder.
Some of the test setup, such as the one in column/reader.rs
, is quite complicated and hard to understand. We should add more high-level comments for future reference.
Parquet-906 introduced new logical type representation which we should consider to upgrade to. Parquet-MR is also going to work on this. This requires us to upgrade to Parquet format 2.5.0
We made several important changes on the code base recently:
I think it may be a good time to publish version 0.2.0.
From the benchmarks, Brotli compressing performance is pretty bad comparing to other codecs. We should investigate and improve it.
Looks like build fails on the latest nightly version:
nightly-x86_64-unknown-linux-gnu (default)
rustc 1.25.0-nightly (27a046e93 2018-02-18)
Get following errors:
error[E0599]: no method named `set_data` found for type `encodings::decoding::PlainDecoder<T>` in the current scope
--> src/column/reader.rs:443:18
|
443 | dictionary.set_data(page.buffer().clone(), num_values as usize)?;
| ^^^^^^^^
|
::: src/encodings/decoding.rs:83:1
|
83 | pub struct PlainDecoder<T: DataType> {
| ------------------------------------ method `set_data` not found for this
|
= note: the method `set_data` exists but the following trait bounds were not satisfied:
`encodings::decoding::PlainDecoder<T> : encodings::decoding::Decoder<_>`
= help: items from traits can only be used if the trait is implemented and in scope
= note: the following trait defines an item `set_data`, perhaps you need to implement it:
candidate #1: `encodings::decoding::Decoder`
error[E0275]: overflow evaluating the requirement `encodings::decoding::PlainDecoder<T>: encodings::decoding::Decoder<T>`
--> src/column/reader.rs:446:24
|
446 | decoder.set_dict(Box::new(dictionary))?;
| ^^^^^^^^^^^^^^^^^^^^
|
= note: required because of the requirements on the impl of `encodings::decoding::Decoder<T>` for `encodings::decoding::PlainDecoder<T>`
= note: required for the cast to the object type `encodings::decoding::Decoder<T>`
error: aborting due to 2 previous errors
error: Could not compile `parquet`.
With a new Rust project I usually go look at the examples first. I think it would be helpful to have some simple examples for reading parquet files. Perhaps one column-oriented and one row-oriented?
Currently we have tests in src/column/reader.rs
, which are split into 3 main functions test_plain
, test_dict
and test_read_batch
. We should refactor common functionality and improve test structure, so it is easy to add new tests in the future, e.g. for different encodings or types.
Relates to #48.
I cloned the repo, ran cargo clean
then cargo build
but then cannot run the parquet-read
binary.
andy@freedom:~/git/parquet-rs$ cargo build
Compiling cc v1.0.9
Compiling libc v0.2.40
Compiling cfg-if v0.1.2
Compiling void v1.0.2
Compiling alloc-no-stdlib v1.2.0
Compiling num-traits v0.2.2
Compiling integer-encoding v1.0.5
Compiling lazy_static v1.0.0
Compiling byteorder v1.2.2
Compiling parquet v0.1.0 (file:///home/andy/git/parquet-rs)
Compiling try_from v0.2.2
Compiling byteorder v0.5.3
Compiling quick-error v1.2.1
Compiling x86intrin v0.4.5
Compiling log v0.4.1
Compiling unreachable v0.1.1
Compiling brotli-decompressor v1.2.1
Compiling snap v0.2.4
Compiling num_cpus v1.8.0
Compiling rand v0.4.2
Compiling num-traits v0.1.43
Compiling log v0.3.9
Compiling ordered-float v0.5.0
Compiling threadpool v1.7.1
Compiling miniz-sys v0.1.10
Compiling thrift v0.0.4
Compiling brotli v1.2.0
Compiling flate2 v0.2.20
Finished dev [unoptimized + debuginfo] target(s) in 23.33 secs
andy@freedom:~/git/parquet-rs$ ./target/debug/parquet-read
./target/debug/parquet-read: error while loading shared libraries: libstd-990ef2cc9d91a9e2.so: cannot open shared object file: No such file or directory
I do not have this libstd library. The closest I have are:
$ find /usr -name "libstd*.so"
/usr/lib/x86_64-linux-gnu/coreutils/libstdbuf.so
/usr/lib/gcc/x86_64-linux-gnu/5/libstdc++.so
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.