Coder Social home page Coder Social logo

dask / fastparquet Goto Github PK

View Code? Open in Web Editor NEW

This project forked from jcrobak/parquet-python

738.0 21.0 173.0 30.01 MB

python implementation of the parquet columnar file format.

License: Apache License 2.0

Python 84.90% Thrift 6.97% Makefile 0.02% Cython 8.11%
hacktoberfest

fastparquet's Introduction

fastparquet

image

image

fastparquet is a python implementation of the parquet format, aiming integrate into python-based big data work-flows. It is used implicitly by the projects Dask, Pandas and intake-parquet.

We offer a high degree of support for the features of the parquet format, and very competitive performance, in a small install size and codebase.

Details of this project, how to use it and comparisons to other work can be found in the documentation.

Requirements

(all development is against recent versions in the default anaconda channels and/or conda-forge)

Required:

  • numpy
  • pandas
  • cython >= 0.29.23 (if building from pyx files)
  • cramjam
  • fsspec

Supported compression algorithms:

  • Available by default:
    • gzip
    • snappy
    • brotli
    • lz4
    • zstandard
  • Optionally supported

Installation

Install using conda, to get the latest compiled version:

conda install -c conda-forge fastparquet

or install from PyPI:

pip install fastparquet

You may wish to install numpy first, to help pip's resolver. This may install an appropriate wheel, or compile from source. For the latter, you will need a suitable C compiler toolchain on your system.

You can also install latest version from github:

pip install git+https://github.com/dask/fastparquet

in which case you should also have cython to be able to rebuild the C files.

Usage

Please refer to the documentation.

Reading

from fastparquet import ParquetFile
pf = ParquetFile('myfile.parq')
df = pf.to_pandas()
df2 = pf.to_pandas(['col1', 'col2'], categories=['col1'])

You may specify which columns to load, which of those to keep as categoricals (if the data uses dictionary encoding). The file-path can be a single file, a metadata file pointing to other data files, or a directory (tree) containing data files. The latter is what is typically output by hive/spark.

Writing

from fastparquet import write
write('outfile.parq', df)
write('outfile2.parq', df, row_group_offsets=[0, 10000, 20000],
      compression='GZIP', file_scheme='hive')

The default is to produce a single output file with a single row-group (i.e., logical segment) and no compression. At the moment, only simple data-types and plain encoding are supported, so expect performance to be similar to numpy.savez.

History

This project forked in October 2016 from parquet-python, which was not designed for vectorised loading of big data or parallel access.

fastparquet's People

Contributors

adamhooper avatar alonmkovrr avatar ap-- avatar cmenguy avatar dargueta avatar dscottcs avatar eriknw avatar esc avatar generalpiston avatar gianlucaficarelli avatar indera avatar jaguarx avatar jbrockmendel avatar jcrist avatar jcrobak avatar jonathanunderwood avatar jrbourbeau avatar jsignell avatar kylebarron avatar lithomas1 avatar mariusvniekerk avatar martindurant avatar moriyoshi avatar mrocklin avatar phofl avatar pitrou avatar sdementen avatar tomaugspurger avatar viveshok avatar yohplala avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fastparquet's Issues

support for zero-length dataframes

If nrows of the input dataframe is zero, you get an error.
We should investigate whether a zero-length parquet table is valid (i.e., can apache-drill or other load it), and if so, allow creation of such files. If not, the exception raised should be more informative.

Snappy doesn't release the GIL

I believe that python-snappy does not release the GIL. This can be a significant performance blocker for multi-threaded workers. The direct fix to this would be to improve the upstream library, but there might be another workaround. For example, blosc might have direct access.

@esc I know this isn't your focus these days, but can we get GIL-free access to raw compression libraries through Blosc? Other people without blosc are (de)compressing the data, so we can't use blosc headers and such.

Can't read the parquet directory without _metadata file

When I am trying to open directory with parquet files it crashes when _metadata file is not being found:

(lambda2) [ec2-user@ ~]$ cat start.py
#!/usr/bin/python
from fastparquet import ParquetFile
fn = "/home/ec2-user/parquet-gzip/"
#fn = "/home/ec2-user/parquet-gzip/part-00000-test-gz.parquet"
pf = ParquetFile(fn)
df = pf.to_pandas()
print (df)
(lambda2) [ec2-user@ ~]$ ls -lah parquet-gzip/
total 21M
drwxr-xr-x  2 ec2-user ec2-user 4.0K Feb 22 13:31 .
drwx------ 14 ec2-user ec2-user 4.0K Feb 22 13:34 ..
-rw-r--r--  1 ec2-user ec2-user  11M Feb 22 12:09 part-00000-test.gz.parquet
-rw-r--r--  1 ec2-user ec2-user  11M Feb 22 12:09 part-00001-test.gz.parquet
(lambda2) [ec2-user@ ~]$ python start.py
Traceback (most recent call last):
  File "start.py", line 5, in <module>
    pf = ParquetFile(fn)
  File "/home/ec2-user/lambda2/local/lib/python2.7/dist-packages/fastparquet/api.py", line 52, in __init__
    with open_with(fn, 'rb') as f:
  File "/home/ec2-user/lambda2/local/lib/python2.7/dist-packages/fastparquet/util.py", line 39, in default_open
    return open(f, mode)
IOError: [Errno 21] Is a directory: '/home/ec2-user/parquet-gzip/'
(lambda2) [ec2-user@ ~]$

Output from strace:

3741  open("/home/ec2-user/parquet-gzip//_metadata", O_RDONLY) = -1 ENOENT (No such file or directory)

Would it be difficult to implement support for directories without this file? After some research I suspect they are not obligatory:

https://forums.databricks.com/questions/6494/do-you-need-a-metadata-file-in-a-parquet-folder-to.html

decimal type conversion

Hi,

the decimal to float type conversion fails for decimals with high precision. Here's the schema for the failing column:

converted_type: 5
field_id: None
name: foobar
num_children: None
precision: 38
repetition_type: 1
scale: 1
type: 7
type_length: 16

Calling pf.to_pandas() raises:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
 in ()
----> 1 ba_cast = t.foobar.cast(np.float32)

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/api.py in cast(arg, target_type)
    384 def cast(arg, target_type):
    385     # validate
--> 386     op = _ops.Cast(arg, target_type)
    387 
    388     if op.args[1] == arg.type():

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/types.py in __init__(self, *args)
    302 
    303     def __init__(self, *args):
--> 304         args = self._validate_args(args)
    305         Node.__init__(self, args)
    306 

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/types.py in _validate_args(self, args)
    309             return args
    310 
--> 311         return self.input_type.validate(args)
    312 
    313     def root_tables(self):

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/rules.py in validate(self, args)
    345             args = list(args) + [t.default for t in self.types[n:]]
    346 
--> 347         return self._validate(args, self.types)
    348 
    349     def _validate(self, args, types):

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/rules.py in _validate(self, args, types)
    351         for i, validator in enumerate(types):
    352             try:
--> 353                 clean_args[i] = validator.validate(clean_args, i)
    354             except IbisTypeError as e:
    355                 exc = e.args[0]

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/rules.py in validate(self, args, i)
    302                 return arg
    303 
--> 304         return self._validate(args, i)
    305 
    306     def _validate(self, args, i):

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/rules.py in _validate(self, args, i)
    712             arg = arg.lower()
    713 
--> 714         arg = args[i] = dt.validate_type(arg)
    715         return arg
    716 

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/datatypes.py in validate_type(t)
    449         return t
    450 
--> 451     parsed_type = _parse_type(t)
    452     if parsed_type is not None:
    453         return parsed_type

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/datatypes.py in _parse_type(t)
    480 def _parse_type(t):
    481     for parse_fn in _type_parsers:
--> 482         parsed = parse_fn(t)
    483         if parsed is not None:
    484             return parsed

/home/jwersdoerfer/miniconda3/envs/bidding/lib/python3.5/site-packages/ibis/expr/datatypes.py in _parse_decimal(t)
    463 
    464 def _parse_decimal(t):
--> 465     m = _DECIMAL_RE.match(t)
    466     if m:
    467         precision, scale = m.groups()

TypeError: expected string or bytes-like object

It's possible to fix this with this change to converted_types.py:

    if ctype == parquet_thrift.ConvertedType.DECIMAL:
        scale_factor = 10**-se.scale
        if se.precision > 18:
            data = np.array(
                [int.from_bytes(d, byteorder='big', signed=False)
                 for d in data])

But I don't know if this is the right way to do it. I've played around with np.fromstring, but couldn't get it to work.

regards,
jochen

SNAPPY Compression does not work on Windows

I am using fastparquet 0.0.5 installed today from conda-forge with Python 3.6 from the Anaconda distribution.

When I attempt to use SNAPPY compression on a Windows machine using:

fastparquet.write('c:/temp/test.pqt', df_in, file_scheme='simple', compression='SNAPPY')`

I get the following:

RuntimeError: Compression 'SNAPPY' not available. Options: ['GZIP', 'UNCOMPRESSED']

Encoding text fails with object_encoding='bytes'

In [1]: import pandas as pd

In [2]: import fastparquet

In [3]: df = pd.DataFrame({'x': ['a', 'ab']})

In [4]: fastparquet.write('foo.parq', df, object_encoding='bytes')
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-4-628d67a81840> in <module>()
----> 1 fastparquet.write('foo.parq', df, object_encoding='bytes')

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in write(filename, data, row_group_offsets, compression, file_scheme, open_with, mkdirs, has_nulls, write_index, partition_on, fixed_text, append, object_encoding)
    710     if file_scheme == 'simple':
    711         write_simple(filename, data, fmd, row_group_offsets,
--> 712                      compression, open_with, has_nulls, append)
    713     elif file_scheme == 'hive':
    714         if append:

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in write_simple(fn, data, fmd, row_group_offsets, compression, open_with, has_nulls, append)
    622                    else None)
    623             rg = make_row_group(f, data[start:end], fmd.schema,
--> 624                                 compression=compression)
    625             if rg is not None:
    626                 fmd.row_groups.append(rg)

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in make_row_group(f, data, schema, compression)
    536                 comp = compression
    537             chunk = write_column(f, data[column.name], column,
--> 538                                  compression=comp)
    539             rg.columns.append(chunk)
    540     rg.total_byte_size = sum([c.meta_data.total_uncompressed_size for c in

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in write_column(f, data, selement, compression, object_encoding)
    452     start = f.tell()
    453     bdata = definition_data + repetition_data + encode[encoding](
--> 454             data, selement)
    455     try:
    456         if encoding != 'PLAIN_DICTIONARY' and num_nulls == 0:

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in encode_plain(data, se)
    216     out = convert(data, se)
    217     if se.type == parquet_thrift.Type.BYTE_ARRAY:
--> 218         return b''.join([struct.pack('<l', len(x)) + x for x in out])
    219     else:
    220         return out.tobytes()

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in <listcomp>(.0)
    216     out = convert(data, se)
    217     if se.type == parquet_thrift.Type.BYTE_ARRAY:
--> 218         return b''.join([struct.pack('<l', len(x)) + x for x in out])
    219     else:
    220         return out.tobytes()

TypeError: can't concat bytes to str

Update dependencies

  • add cython to required packages
  • put minimum versions on numba (0.29?) and pandas (0.18?)
  • add py36 to build; currently requires thriftpy from conda-forge (and python-snappy also, if doing all tests)
  • add py27 to build; requires #66 to be merged and perhaps more testing

np.datetime64 vs pd.Timestamp

It appears that we convert to datetime64s. Is this intentional? Do we roundtrip cleanly to pandas timestamps with timezones?

Rename partitions= to divisions=

I'm not sure if this term is Parquet-specific or taken from Dask. If the latter then divisions= is probably better as it refers to the values at which to split the dataframe. Partition is somewhat synonymous with row-group.

fixed-length bytes type

numpy dtypes like S3, |S7, |U10 are only sporadically supported in pandas, and generally get coerced to object type. Writing fixed-length will generally be much faster than variable, so we need to provide an option for what to do with string-like columns:

  • write as variable-length
  • convert to categorical (user could have done that)
  • convert to fixed-length (user could have done this with difficulty), possibly with pd.lib.max_len_string_array to find required length.

Fastparquet write :AttributeError: 'Series' object has no attribute 'valid'

Hi,
I am using a Dask data-frame as follows:
fastparquet.write(parquet_dir, X_df, file_scheme='hive',compression="SNAPPY")

Stack trace:
AttributeError Traceback (most recent call last)
in ()
2 parquet_dir=createNewDir()
3 print parquet_dir
----> 4 fastparquet.write(parquet_dir, X_df, file_scheme='hive',compression="SNAPPY")
5
6 # X_df.to_parquet(parquet_dir, compression='SNAPPY')

/usr/local/lib/python2.7/dist-packages/fastparquet/writer.pyc in write(filename, data, row_group_offsets, compression, file_scheme, open_with, mkdirs, has_nulls, write_index, partition_on, fixed_text, append, object_encoding, times)
747 fmd = make_metadata(data, has_nulls=has_nulls, ignore_columns=ignore,
748 fixed_text=fixed_text, object_encoding=object_encoding,
--> 749 times=times)
750
751 if file_scheme == 'simple':

/usr/local/lib/python2.7/dist-packages/fastparquet/writer.pyc in make_metadata(data, has_nulls, ignore_columns, fixed_text, object_encoding, times)
614 else:
615 se, type = find_type(data[column], fixed_text=fixed,
--> 616 object_encoding=oencoding, times=times)
617 col_has_nulls = has_nulls
618 if has_nulls is None:

/usr/local/lib/python2.7/dist-packages/fastparquet/writer.pyc in find_type(data, fixed_text, object_encoding, times)
90 elif dtype == "O":
91 if object_encoding == 'infer':
---> 92 object_encoding = infer_object_encoding(data)
93
94 if object_encoding == 'utf8':

/usr/local/lib/python2.7/dist-packages/fastparquet/writer.pyc in infer_object_encoding(data)
181
182 def infer_object_encoding(data):
--> 183 head = data[:10] if isinstance(data, pd.Index) else data.valid()[:10]
184 if all(isinstance(i, STR_TYPE) for i in head) and not PY2:
185 return "utf8"

AttributeError: 'Series' object has no attribute 'valid'

Is it not possible to write a dask data-frame directly form fastparquet?

Doc for Spark compatibility

I'm trying to make a dataset with fastparquet that can be easily read by both dask.dataframe and spark. I'm running into some issues because fastparquet appears to support parts of the Parquet spec that Spark does not. (fixed-len-bytes, run length encoding?) I anticipate getting questions about this set of landmines in the future. It would be useful to have a document describing the general situation around the uneven coverage of Parquet readers and a list of features to avoid if cross-system compatibility is desired.

Timestamp metadata and Spark

OK, so I create a pandas dataframe that has a timestamp column. I save this to parquet using fastparquet and then read the data with Spark. I find that my spark dataframe identifies my timestamp column as an integer column. Is there perhaps some special metadata that Spark is looking out for?

Example

In [1]: import pandas as pd

In [2]: import pyspark

In [3]: import fastparquet

In [4]: df = pd.DataFrame({'x': [1, 2, 3]})

In [5]: df['x'] = pd.to_datetime(df.x)

In [6]: df
Out[6]: 
                              x
0 1970-01-01 00:00:00.000000001
1 1970-01-01 00:00:00.000000002
2 1970-01-01 00:00:00.000000003

In [7]: sc = pyspark.SparkContext('local[4]')
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
17/02/27 17:13:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/27 17:13:11 WARN Utils: Your hostname, carbon resolves to a loopback address: 127.0.1.1; using 192.168.1.115 instead (on interface wlp4s0)
17/02/27 17:13:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
17/02/27 17:13:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.

In [8]: sql = pyspark.SQLContext(sc)

In [9]: fastparquet.write('foo.parquet', df)

In [10]: sdf = sql.read.parquet('foo.parquet')
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

In [11]: sdf
Out[11]: DataFrame[x: bigint]

handle indexes

A pandas dataframe has an index, which we simply ignore for now. Suggest a "with index" option on write, which puts the index into a column. Also should check is_monotonic_[in/de]creasing on each column, including the index, and note this sorting in the metadata.

On load enable choice of index: a specific column, a numeric index, or any sorted index, if present. Whether a column is sorted should show up in the ParquetFile instance, to enable the user's choice.
For loading with dask, if the index is a sorted column or numeric index, can fill in the partition divisions. (also, we don't set the numeric index offsets in each partition, they all start at 0, which is not an important problem, but still inconsistent)

Non-string column names

We currently assume that column names are strings when writing, because within parquet they must be. If columns are integers or something else, this causes an exception.

Thanks, @teh for pointing this out

Regression: pre-allocation fails when int column contains NULLs

Previously, a column was converted to float when we encountered any NULLs. Now we pre-allocate, so this is not possible, and we either get the value -0 (current master) or "can't set integer to NaN" exception (branch low_level_pre_allocate).

Solution: if an int column is OPTIONAL, and we don't know for certain that it has zero NULLs (from column statistics), we must allocate a float column up front. Note that this will never happen with data we write: pandas integer columns never contain NULLs.

backend filesystem specifics

On the local filesystem, the path separator may be '/' or '' (os.sep), but on other filesystems (s3fs, hdfs3, adlfs...) it is generally '/'. Also, some filesystems require making directories before writing to them (local, hdfs), but others do not (s3fs, adlfs); if writing in directory trees split by partitioning, we cannot expect the user to premake the directories.

Code review

fastparquet was forked from parquet-python, but underwent a large amount of redevelopment to make use of pandas and numba for performance. Along the way, a lot of functionality has been added, mostly listed in #1. As functionality was added, some parts of the code have become unwieldy and make adding further functionality in less than straight-forward. This issue is to allow for comments on how better to structure the code, what kind of refactoring might be useful. I will list a few items of concern that I have at the moment.

  • There should be testing for a much wider range of cases, e.g,. the recent discovery that we could not round-trip categorical columns that have only one category. There is a lack of test data to run against, so we have to become good at generating test data which catches all corner cases.
  • some large functions have exploded, with too many if/elif chain statements and nesting. Of particular concern:
    • core.read_col
    • writer.write_column
  • There is a certain amount of duplication in dask.dataframe.io.parquet, where similar but distinct things happen compared to read/write here. Some concerns in dask, such as the importance of the index, may be concerns for pure fastparquet users.
  • The top-level functions have many optional keyword parameters, which are then being passed down, the list having expanded as features become available. This is ugly and error-prone, and there are some cases (e.g., creating a schema element versus writing a column chunk) where keywords have to reach different parts of the code, but do something very similar
  • some functions are dual-purpose, depending on a keyword flag. Of particular concern:
    • core.read_col (grab_dict)
    • writer.find_type (convert)
  • there seem to be more numpy/pandas<->parquet type maps around than necessary
  • use of the numba jitclass as a way to have a file-like experience (i.e., keeping track of the current position) with reading and writing binary data

If you are interested I have some changes I made in a fork off of your repo a while back

This is in https://github.com/bdrosen96/parquet-python . I'm not sure if any of these changes might be useful, but if so feel free to take any of them. Some of the changes include:

Python 3 support

Switch from thrift to thriftpy

Boolean array

Filesystem class

Reader class with support for multiple files.

Proper support for dictionary

Optimization using cython for some operations for significant performance gain. (order of magnitude)

Writing massive files

I'm using fastparquet and it's great so far. My only problem is I need to write massive files to disk. Due to how fastparquet.write method works I need to prepare the whole dataframe in memory which doesn't fit even in the really expensive Azure VM. For me writing to separate files is not an option (customer requirements). Is there something you can recommend when it can be written in chunks i.e. one Series at a time? Much appreciated.

Optimizations

Some ideas to think about which should make things work faster. Some of these are readily implementable, others more tricky

  • storing strings as fixed-length; see #30. In the first instance, we should give this option to the user, where they know beforehand what string length they would like (or they are prepared to find out), so that we don't have to calculate it, which could be very costly in the dask case. (implemented)
  • we don't have a fast way to encode or decode variable-length strings, hence the previous item; if we did, some issues may go away. The trouble is, there is no way to access the underlying strings pointed to by a numpy object array from within a numba-jitted function
  • datetime columns can be constructed using the pd.DatetimeIndex directly rather than to_datetime, but care will need to be taken for what happens with NaT encoded in the integers and NULL values (special case in read_col, not only checking for nan) (should be easy) PR #33
  • currently we read everything into numpy arrays and convert immediately into pandas Series, so that we can do things like conversions using map. In this, pandas creates a default index, which is compared when putting the series together into the final dataframe. The cost of this has not been measured, but given that dask wants knowledge of the index anyway, and we have some code to deal with that, we could create the index up front and apply it to series as they are made, or keep things as arrays further on (in fact, arrays may be simpler and faster in some cases). PR #33

Confusing error on utf-8 encoding

People sometimes use utf-8 rather than utf8. It might be worth thinking of other ways in which we can defend against bad inputs with informative errors:

In [1]: import fastparquet
im
In [2]: import pandas as pd

In [3]: df = pd.DataFrame({'x': ['a', 'ab']})

In [4]: fastparquet.write('foo.parq', df, object_encoding='utf-8')
---------------------------------------------------------------------------
UnboundLocalError                         Traceback (most recent call last)
<ipython-input-4-f4f0e309cee5> in <module>()
----> 1 fastparquet.write('foo.parq', df, object_encoding='utf-8')

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in write(filename, data, row_group_offsets, compression, file_scheme, open_with, mkdirs, has_nulls, write_index, partition_on, fixed_text, append, object_encoding)
    706     ignore = partition_on if file_scheme != 'simple' else []
    707     fmd = make_metadata(data, has_nulls=has_nulls, ignore_columns=ignore,
--> 708                         fixed_text=fixed_text, object_encoding=object_encoding)
    709 
    710     if file_scheme == 'simple':

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in make_metadata(data, has_nulls, ignore_columns, fixed_text, object_encoding)
    584         else:
    585             se, type = find_type(data[column], fixed_text=fixed,
--> 586                                  object_encoding=oencoding)
    587         if has_nulls is None:
    588             se.repetition_type = type == parquet_thrift.Type.BYTE_ARRAY

/home/mrocklin/workspace/fastparquet/fastparquet/writer.py in find_type(data, fixed_text, object_encoding)
    110         raise ValueError("Don't know how to convert data type: %s" % dtype)
    111     se = parquet_thrift.SchemaElement(
--> 112             name=data.name, type_length=width,
    113             converted_type=converted_type, type=type,
    114             repetition_type=parquet_thrift.FieldRepetitionType.REQUIRED)

UnboundLocalError: local variable 'width' referenced before assignment

Read directly from row group

OK, so as we have it now any system that has to serialize a dask graph with ParquetFile objects can get very expensive. Each task has megabytes of metadata in it.

In [69]: from fastparquet import ParquetFile

In [70]: %time pf = ParquetFile('/home/mrocklin/data/nyc/indexed-full/')
CPU times: user 1.11 s, sys: 8 ms, total: 1.12 s
Wall time: 1.11 s

In [71]: %time len(cloudpickle.dumps(pf))
CPU times: user 1.56 s, sys: 20 ms, total: 1.58 s
Wall time: 1.58 s
Out[71]: 2082175

Currently we have the parquet file in each task, so everyone gets this cost, which is unfortunate when reads are in the 10ms range. My guess is that this cost increases the more RowGroups we have, and presumably these can become quite large.

So we can't have these in each task of the graph. There are a few options:

  1. We can put it in the graph once, and depend on the system to move it around between workers
  2. We can add opening the parquet file to each reading task. This adds 1s of overhead per task
  3. We can make a new function that skips making the ParquetFile, and instead knows to go to a particular row group on disk. This would be ideal if we can make it fast.

Any thoughts on how to do option 3 @martindurant ?

'categories' information empty in ParquetFile repr

If I write a df with a categorical column

> d = DataFrame({'A': [1, 2, 3], 'B': ['a', 'b', 'c']}).assign(B=lambda x: x.B.astype('category'))
> d.dtypes
A       int64
B    category
dtype: object
> fn = '/tmp/test.parq'
> fastparquet.write(fn, d)

then create a ParquetFile, the 'categories' entry in the repr shows nothing. Does categories here refer to something else?

> pf = fastparquet.ParquetFile(fn)
> pf
<Parquet File: {'categories': [], 'rows': 3, 'columns': ['A', 'B'], 'name': '/tmp/test.parq'}>

(I noticed one needs to manually designate categorical columns when converting to df, so I'd like to find a way to store this as metadata and convert to categorical automatically without too much manual bookkeeping)

First impression feedback

Just took this for a spin, here is some feedback. Some of this is administrative and probably obvious to you. I'm listing it here just for completeness, not to prioritize:

Administrative

  • Needs nicer __repr__ implementation
  • Should move code out of __init__.py
  • Should maybe move tests within parquet/tests ?
  • Should probably keep test data outside of the package (I think that this is already done, just wanted to check)
  • Need to remove references to personal namespace (mdurant shows up in the tests)
  • How are we going to package this long-term? Do we need a different name than the existing parquet project?

Performance

  • The from_delayed call needs to be passed metadata to avoid triggering a local computation
  • The from_delayed call would like to be passed divisions= information if we can get it from the parquet file. We might want to sniff statistics within the parquet file for sorted columns and, if we find exactly one, make it the index automatically?

Python 2 compatibility?

I haven't audited the codebase thouroughly, but I was wondering if this project has the aim to support python 2.7?

I know that this module relies on gzip.compress, which is not present in 2.7, and if this is the only thing, would it be too burdensome to allow the user to forgo the option for gzip compression in order to run on 2.7? I gather gzip compression for parquest files is a fairly rare choice anyway, with performance stats showing that gzipped files can be slower to work with.

The primary motitvation for the request is some issues with using pyodbc on python 3 (encoding, surprise), which would be used as the primary mechanism for reading the data to be written to parquet.

Can't open a single file from parquet directory

I have opened the #89 issue, which has been closed with suggestion to read files individually. I have tried to apply the solution but it fails as presented below.

In [11]: f = '/Users/rafal/Dev/parquet-gzip/part-00000-25585821-d1e0-45e9-b18d-82ed994ea89d.gz.parquet'
In [12]: pf = fastparquet.ParquetFile(f)
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-12-883c968c7a26> in <module>()
----> 1 pf = fastparquet.ParquetFile(f)

/Users/rafal/anaconda2/lib/python2.7/site-packages/fastparquet/api.pyc in __init__(self, fn, verify, open_with, sep)
     51             self.fn = fn
     52             with open_with(fn, 'rb') as f:
---> 53                 self._parse_header(f, verify)
     54         if all(rg.columns[0].file_path is None for rg in self.row_groups):
     55             self.file_scheme = 'simple'

/Users/rafal/anaconda2/lib/python2.7/site-packages/fastparquet/api.pyc in _parse_header(self, f, verify)
     90             for chunk in rg.columns:
     91                 self.group_files.setdefault(i, set()).add(chunk.file_path)
---> 92         self.helper = schema.SchemaHelper(self.schema)
     93         self.selfmade = self.created_by == "fastparquet-python"
     94         self._read_partitions()

/Users/rafal/anaconda2/lib/python2.7/site-packages/fastparquet/schema.pyc in __init__(self, schema_elements)
     22         self.schema_elements_by_name = dict(
     23             [(se.name, se) for se in schema_elements])
---> 24         assert len(self.schema_elements) == len(self.schema_elements_by_name)
     25
     26     def schema_element(self, name):

AssertionError:

In [13]:

Performance question on ParquetFile.dtypes

This is the result of calling dd.read_parquet(...)

         5617237 function calls (5394398 primitive calls) in 7.391 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   534360    4.412    0.000    4.412    0.000 api.py:331(<listcomp>)
      366    0.711    0.002    5.134    0.014 api.py:321(dtypes)
   442323    0.304    0.000    0.535    0.000 __init__.py:10(readall)
 181873/5    0.288    0.000    2.037    0.407 compact.py:274(read_val)
  28126/1    0.227    0.000    2.037    2.037 compact.py:247(read_struct)
   168004    0.173    0.000    0.592    0.000 compact.py:163(read_field_begin)
   126008    0.159    0.000    0.496    0.000 compact.py:69(read_varint)
   442323    0.133    0.000    0.668    0.000 __init__.py:31(read)

Most of the time is spent in def dtypes(self)

    @property
    def dtypes(self):
        """ Implied types of the columns in the schema """
        dtype = {f.name: converted_types.typemap(f)
                 for f in self.schema if f.num_children is None}
        for col, dt in dtype.copy().items():
            if dt.kind == 'i':
                # int columns that may have nulls become float columns
                num_nulls = 0
                for rg in self.row_groups:
                    chunks = [c for c in rg.columns  # <<--- This is the main culprit
                              if c.meta_data.path_in_schema[-1] == col]
                    for chunk in chunks:
                        if chunk.meta_data.statistics is None:
                            num_nulls = True
                            break
                        if chunk.meta_data.statistics.null_count is None:
                            num_nulls = True
                            break
                        num_nulls += chunk.meta_data.statistics.null_count
                if num_nulls:
                    dtype[col] = np.dtype('f%i' % max(dt.itemsize, 2))
        for cat in self.cats:
            dtype[cat] = "category"
        return dtype

Can we reduce this cost somehow? Is this value immutable? Can we cache it?

Unable to read file produced by Impala

Here's a link to the file: https://www.dropbox.com/s/fazbrkegbzzb7az/airlines2.parq?dl=0, let me know after you've downloaded it so I can delete from my Dropbox folder

In [6]: fp.ParquetFile('airlines2.parq').to_pandas()
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-6-61849c4e0274> in <module>()
----> 1 fp.ParquetFile('airlines2.parq').to_pandas()

/home/wesm/anaconda3/envs/parquet-test/lib/python3.5/site-packages/fastparquet/api.py in to_pandas(self, columns, categories, filters, index)
    293                              for (name, v) in views.items()}
    294                     self.read_row_group(rg, columns, categories, infile=f,
--> 295                                         index=index, assign=parts)
    296                     start += rg.num_rows
    297         else:

/home/wesm/anaconda3/envs/parquet-test/lib/python3.5/site-packages/fastparquet/api.py in read_row_group(self, rg, columns, categories, infile, index, assign)
    151         core.read_row_group(
    152                 infile, rg, columns, categories, self.helper, self.cats,
--> 153                 self.selfmade, index=index, assign=assign)
    154         if ret:
    155             return df

/home/wesm/anaconda3/envs/parquet-test/lib/python3.5/site-packages/fastparquet/core.py in read_row_group(file, rg, columns, categories, schema_helper, cats, selfmade, index, assign)
    300         raise RuntimeError('Going with pre-allocation!')
    301     read_row_group_arrays(file, rg, columns, categories, schema_helper,
--> 302                           cats, selfmade, assign=assign)
    303 
    304     for cat in cats:

/home/wesm/anaconda3/envs/parquet-test/lib/python3.5/site-packages/fastparquet/core.py in read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, selfmade, assign)
    289         read_col(column, schema_helper, file, use_cat=use,
    290                  selfmade=selfmade, assign=out[name],
--> 291                  catdef=out[name+'-catdef'] if use else None)
    292 
    293 

/home/wesm/anaconda3/envs/parquet-test/lib/python3.5/site-packages/fastparquet/core.py in read_col(column, schema_helper, infile, use_cat, grab_dict, selfmade, assign, catdef)
    248     for defi, rep, val, d in out:
    249         if d and not use_cat:
--> 250             cval = dic[val]
    251         elif do_convert:
    252             cval = convert(val, se)

IndexError: index 240 is out of bounds for axis 0 with size 155

parallel write possibility

Since fastparquet provides file partitioning ability it should be possible theoretically to allow parallel write of multiple pandas dataframes (with common schema). Is it? And if it is are there any plans for this?

Questions: How to read parq file into dask data frame?

Hello, I have a question on the usage. I want to read the parq file into dask data frame. For now it seems I can only use df = pf.to_pandas() then df = dd.from_pandas(df). Is it possible to read parq file directly into dask data frames? Besides, is the to_pandas() function the only way to use the data? In that way, the data are read into the memory, right? Or I am not doing it right, there are ways to use the data directly from the disk? Thank you very much!

Fastparquet can not read parquet folder generated by Apache Drill

Hi,
I created a folder with parquet files using Apache drill.
However, if the folder is given as a parameter to Fastparquet, an error is thrown that it can not read a folder.
The parquet folder is as follows:
image

And the stack trace:
image

Is it not possible to read ALL the files inside a folder? Am I supposed to read file by file ... ?

Thanks,

partition count

The current default is to put all data into a single row_group, with one data-page per column. A data-page can only hold 2**31-1~2B rows (signed int32), which could be violated. Some other knock-on metadata values such as size in bytes or some chunk may also be affected.

Could cope with violation (or raise informative exception), made default "auto" partitions guaranteed to be small enough or introduce new "rows_per_row_group" parameter with reasonable default value.

Taking dependency on FastParquet in production

Hello:

This is awesome that you folks have added native read/write between Pandas dataframe and Parquet. As I read this, I understand that this is still in Alpha and interfaces are expected to change.

What is the plan on package version upgrade if such breaking changes happen? Hopefully, I am hoping that they should not affect existing ones.

Thanks,
MC

unable to merge parquet files located in the working directory

fastparquet fails with the following error:
File "...\fastparquet\writer.py", line 854, in write_common_metadata
with open_with(fn, 'wb') as f:
File "...\fastparquet\util.py", line 39, in default_open
return open(f, mode)
PermissionError: [Errno 13] Permission denied: '\\_metadata'

The paths for '_metadata' and '_common_metadata' are generated in writer.py on lines 920 and 924 by joining an empty basepath with the listed file names.

This issue is fixed, if the join is only done for a non empty basepath.

Thanks for fixing.

schema element name problem on nested elements

Trying to parse a parquet file, and it falls over on a nested element.

  File ".../fastparquet/schema.py", line 28, in schema_element
    return self.schema_elements_by_name[name]
KeyError: 'ids.eid'

When I look at the schema, the element in question believes its name to be 'eid', not 'ids.eid'.

(Pdb) p self.schema
...
<class 'parquet_thrift.SchemaElement'>
converted_type: None
field_id: None
name: ids
num_children: 4
precision: None
repetition_type: 1
scale: None
type: None
type_length: None
, <class 'parquet_thrift.SchemaElement'>
converted_type: 0
field_id: None
name: eid
num_children: None
precision: None
repetition_type: 1
scale: None
type: 6
type_length: None
, 
...

I shall try to create a test example which fails, and then provide a fix.

Question: which is the correct behaviour?

  1. element name should be 'eid' in the schema and the code that is looking for 'ids.eid' is wrong; or
  2. element name should be 'ids.eid' in the schema and the code that is looking for 'ids.eid' is right; or
  3. something else

ValueError: Don't know how to convert data type: uint64

fastparquet refuses to write parquet file if data frame has columns with dtype uint64.

I am getting the following error:

File "..\fastparquet\writer.py", line 114, in find_type
raise ValueError("Don't know how to convert data type: %s" % dtype)
ValueError: Don't know how to convert data type: uint64

With file_scheme='hive', appends slow down notably after around 200 times

Is there any way to append without updating the metadata? I am using the append mode with hive file scheme, but after 1-200 appends the performance is degrading, and I am fairly sure it is to do with read/manipulate/writing of the metadata. Is there any scope for an "append and finalise later" kind of mode?

Eliminate copies

To investigate, unnecessary copies when:

  • assign[:] = data.astype(...) conversion for things like uint8; assign already has correct dtype, astype causes copy
  • object decoding (not fixed-string) can be done in place; less of an effect, since the references in the object array are probably small compared to the object data itself. Would need adding output array par to cython functions
  • time-shift functions could write directly into output.

For all of these, note that if there are nulls, the output is actually assign[bool_indices], which is a copy array unless used only when assigning values (i.e., setitem).
Also, stats max/min also calls convert, but in that case has no output array preassigned.

Add ability to output Hive/Impala compatible timestamps

Hey,

I am doing some work with Amazon's Athena (Presto under the hood) system and using fastparquet to convert JSON files to parquet format. However, when I output datetime[ns] fields and read them in Athena, I get incorrect results:

Actual Dates:

"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:23"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"
"2016-08-08 23:08:22"

Returned Dates:

"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:43:20.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"
"+48575-01-04 19:26:40.000"

This is because Hive (and Impala) use a different timestamp format (int96) than the Parquet default. Check out these posts for more details:

  1. http://stackoverflow.com/questions/28292876/hives-timestamp-is-same-as-parquets-timestamp
  2. https://community.mapr.com/thread/18883-getting-weird-output-for-date-timestamp-data-type-columns-while-selecting-data-from-parquet-file-in-drill
  3. https://github.com/Parquet/parquet-mr/issues/218

It would be helpful if it used the compatible TIMESTAMP format when writing with file_scheme='hive'.

Status and roadmap

Features to be implemented.
An asterisk shows the next item(s) on the list.
A question mark shows something that might (almost) work, but isn't tested.

  • python2 compatibility

Reading

  • Types of encoding ( https://github.com/Parquet/parquet-format/blob/master/Encodings.md )
    • plain
    • bitpacked/RLE hybrid
    • dictionary
      • decode to values
      • make into categoricals
    • delta (needs test data)
    • delta-length byte-array (needs test data)
  • compression algorithms (gzip, lzo, snappy, brotli)
  • nulls
  • repeated/list values (*)
  • map, key-value types
  • multi-file (hive-like)
    • understand partition tree structure
      • filtering by partitions
    • parallelized for dask
  • filtering by statistics
  • converted/logical types
  • alternative file-systems
  • index handling

Writing

  • primitive types
  • converted/logical types
  • encodings (selected by user)
    • plain (default)
    • dictionary encoding (default for categoricals)
    • delta-length byte array (should be much faster for variable strings)
      • delta encoding (depends on reading delta encoding)
  • nulls encoding (for dtypes that don't accept NaN)
  • choice of compression
    • per column
  • multi-file
    • partitions on categoricals
    • parallelize for dask
      • partitions and division for dask
  • append
    • single-file
    • multi-file
    • consolidate files into logical data-set
  • alternative file-systems

Admin

  • packaging
    • pypi, conda
  • README
  • documentation
    • RTD
    • API documentation and doc-strings
    • Developer documentation (everything you need to run tests)
    • List of parquet features not yet supported to establish expectations
  • Announcement blogpost with example

Features not to be attempted

  • nested schemas (maybe can find a way to flatten or encode as dicts)
  • choice of encoding on write? (keep it simple)
  • schema evolution

Reading parquet file containing categories or fixed-length bytes fails

While trying to use dask interface for fastparquet to write some census data to file I think I ran into an issue reading certain dtypes. The data consists of two float64 columns and a column of length one strings. When writing the data to a file there is no error but loading fails if the string column is encoded as either S1 or category:

from dask.dataframe.io.parquet import read_parquet, to_parquet
df = dd.read_hdf('./census_sample.h5',  key='census')
to_parquet('./census.parq/', df)
df = read_parquet('./census.parq/') <- category dtype fails here
df.compute() <- S1 dtype fails here

error: unpack requires a bytes object of length 4

Using object as the dtype works fine but is ~6x slower. I've uploaded the data sample here.

Value when read parquet files generate by other pipeline

I'm trying to use fastparquet with pandas to analyze my data generated by other data pipeline.
But while I tried to load the parquet files with fastparquet files , encountered below error. parquet_df.append(s3util.extract_to_pandas(path='/data/s3fs/warehouse/ott_user_info/year=%s/month=%s/day=%s' % (year,month,day)).drop_duplicates())
File "/usr/local/wechat_profit_analyze/wechat_where.py", line 22, in extract_to_pandas
dfarr.append(pf.to_pandas())
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/api.py", line 201, in to_pandas
for rg in rgs]
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/api.py", line 201, in
for rg in rgs]
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/api.py", line 132, in read_row_group
self.selfmade, index=index)
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/core.py", line 302, in read_row_group
cats, selfmade)
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/core.py", line 291, in read_row_group_arrays
selfmade=selfmade)
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/core.py", line 208, in read_col
skip_nulls)
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/core.py", line 106, in read_data_page
definition_levels, num_nulls = read_def(io_obj, daph, helper, metadata)
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/core.py", line 67, in read_def
daph.num_values, bit_width)[:daph.num_values]
File "/root/anaconda3/lib/python3.5/site-packages/fastparquet/core.py", line 48, in read_data
encoding.read_rle_bit_packed_hybrid(fobj, bit_width, o=o)
ValueError: cannot assign slice from input of different size

Error if a partition contains no rows

Combination of row-wise and group partitioning can lead to calling make_row_group with a dataframe of zero rows. Should return None, not create a file and not have the corresponding row_group included in the master metadata.

Python2 support?

I didn't see any python2 package on conda, so I installed with pip, but that doesn't seem to work (with errors suggesting 2/3 issues; see below.) Should python2 work?

$ pip install fastparquet
Collecting fastparquet
  Downloading fastparquet-0.0.1.post2-py2.py3-none-any.whl
Requirement already satisfied: numba in .../anaconda/lib/python2.7/site-packages (from fastparquet)
Requirement already satisfied: numpy in .../anaconda/lib/python2.7/site-packages (from fastparquet)
Requirement already satisfied: pandas in .../anaconda/lib/python2.7/site-packages (from fastparquet)
Collecting thriftpy>=0.3.6 (from fastparquet)
  Downloading thriftpy-0.3.9.tar.gz (208kB)
Requirement already satisfied: llvmlite in .../anaconda/lib/python2.7/site-packages (from numba->fastparquet)
Requirement already satisfied: enum34 in .../anaconda/lib/python2.7/site-packages (from numba->fastparquet)
Requirement already satisfied: singledispatch in .../anaconda/lib/python2.7/site-packages (from numba->fastparquet)
Requirement already satisfied: funcsigs in .../anaconda/lib/python2.7/site-packages (from numba->fastparquet)
Requirement already satisfied: python-dateutil in .../anaconda/lib/python2.7/site-packages (from pandas->fastparquet)
Requirement already satisfied: pytz>=2011k in .../anaconda/lib/python2.7/site-packages (from pandas->fastparquet)
Requirement already satisfied: ply<4.0,>=3.4 in .../anaconda/lib/python2.7/site-packages (from thriftpy>=0.3.6->fastparquet)
Requirement already satisfied: six in .../anaconda/lib/python2.7/site-packages (from singledispatch->numba->fastparquet)
Building wheels for collected packages: thriftpy
  Running setup.py bdist_wheel for thriftpy ... �[?25ldone
Stored in directory: .../Library/Caches/pip/wheels/ae/3b/73/082d28b917d2886b1c0d3e0dc6f3ca3919b7e1df519580873b
Successfully built thriftpy
Installing collected packages: thriftpy, fastparquet
Successfully installed fastparquet-0.0.1.post2 thriftpy-0.3.9
$ python
Python 2.7.11 |Anaconda 2.5.0 (x86_64)| (default, Dec  6 2015, 18:57:58) 
[GCC 4.2.1 (Apple Inc. build 5577)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
>>> import fastparquet as fp
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../anaconda/lib/python2.7/site-packages/fastparquet/__init__.py", line 15, in <module>
    from .core import read_thrift
  File ".../anaconda/lib/python2.7/site-packages/fastparquet/core.py", line 8, in <module>
    from . import encoding
  File ".../anaconda/lib/python2.7/site-packages/fastparquet/encoding.py", line 205, in <module>
    Numpy8 = numba.jitclass(spec8)(NumpyIO)
  File ".../anaconda/lib/python2.7/site-packages/numba/jitclass/decorators.py", line 27, in wrap
    return register_class_type(cls, spec, types.ClassType, ClassBuilder)
  File ".../anaconda/lib/python2.7/site-packages/numba/jitclass/base.py", line 140, in register_class_type
    _validate_spec(spec)
  File ".../anaconda/lib/python2.7/site-packages/numba/jitclass/base.py", line 108, in _validate_spec
    raise TypeError("spec keys should be strings, got %r" % (k,))
TypeError: spec keys should be strings, got u'data'
>>> 

ValueError when converting to pandas

When I'm trying to convert data to Pandas DF with pf.to_pandas() I'm getting a ValueError:

...
/home/artem/.pyenv/versions/anaconda3-4.1.1/lib/python3.5/site-packages/fastparquet/core.py in read_data(fobj, coding, count, bit_width)
     46     if coding == parquet_thrift.Encoding.RLE:
     47         while o.loc < count:
---> 48             encoding.read_rle_bit_packed_hybrid(fobj, bit_width, o=o)
     49     else:
     50         raise NotImplementedError('Encoding %s' % coding)

ValueError: cannot assign slice from input of different size

ParquetFile:

<Parquet File: {'rows': 147638, 'columns': ['id', 'avg_price', 'max_price', 'min_price', 'array'], 'name': '/data/hotel_price.parquet/_metadata', 'categories': []}>

read partitioned parquet directories

Hi, can I read partitioned parquet file (which is tree of directories) WITHOUT metadata file? I get the parquet collection from Spark.
For example:
test.parq
├─date=20150105
├─date=20150106
├─date=20150107
which contains 3 partition.
Thanks.

cython speedups failing

When I was testing 0.5 release with a script I had used previous without issue, I came across the following issue:

  File "/home/yalwan/.virtualenvs/fastpq/lib/python3.5/site-packages/fastparquet/writer.py", line 781, in write
    compression=compression)
  File "/home/yalwan/.virtualenvs/fastpq/lib/python3.5/site-packages/fastparquet/writer.py", line 580, in make_part_file
    rg = make_row_group(f, data, schema, compression=compression)
  File "/home/yalwan/.virtualenvs/fastpq/lib/python3.5/site-packages/fastparquet/writer.py", line 568, in make_row_group
    compression=comp)
  File "/home/yalwan/.virtualenvs/fastpq/lib/python3.5/site-packages/fastparquet/writer.py", line 484, in write_column
    data, selement)
  File "/home/yalwan/.virtualenvs/fastpq/lib/python3.5/site-packages/fastparquet/writer.py", line 246, in encode_plain
    out = convert(data, se)
  File "/home/yalwan/.virtualenvs/fastpq/lib/python3.5/site-packages/fastparquet/writer.py", line 154, in convert
    out = array_encode_utf8(data)
  File "fastparquet/speedups.pyx", line 58, in fastparquet.speedups.array_encode_utf8 (fastparquet/speedups.c:2296)
TypeError: bad argument type for built-in operation

Unable to read dictionary-encoded `FLOAT64` column

Attaching two variants of the same dataset, one generated by Impala and the other generated by parquet-cpp

parquet_files.zip

Reading the parquet-cpp file yields an empty DataFrame. The Impala file raises an exception:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-4-d23670f504e1> in <module>()
----> 1 fp.ParquetFile('dict_encoded_impala.parq').to_pandas()

/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/api.py in to_pandas(self, columns, categories, filters, index)
    293                              for (name, v) in views.items()}
    294                     self.read_row_group(rg, columns, categories, infile=f,
--> 295                                         index=index, assign=parts)
    296                     start += rg.num_rows
    297         else:

/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/api.py in read_row_group(self, rg, columns, categories, infile, index, assign)
    151         core.read_row_group(
    152                 infile, rg, columns, categories, self.helper, self.cats,
--> 153                 self.selfmade, index=index, assign=assign)
    154         if ret:
    155             return df

/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/core.py in read_row_group(file, rg, columns, categories, schema_helper, cats, selfmade, index, assign)
    300         raise RuntimeError('Going with pre-allocation!')
    301     read_row_group_arrays(file, rg, columns, categories, schema_helper,
--> 302                           cats, selfmade, assign=assign)
    303 
    304     for cat in cats:

/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/core.py in read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, selfmade, assign)
    289         read_col(column, schema_helper, file, use_cat=use,
    290                  selfmade=selfmade, assign=out[name],
--> 291                  catdef=out[name+'-catdef'] if use else None)
    292 
    293 

/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/core.py in read_col(column, schema_helper, infile, use_cat, grab_dict, selfmade, assign, catdef)
    215             skip_nulls = False
    216         defi, rep, val = read_data_page(infile, schema_helper, ph, cmd,
--> 217                                         skip_nulls, selfmade=selfmade)
    218         d = ph.data_page_header.encoding == parquet_thrift.Encoding.PLAIN_DICTIONARY
    219         if use_cat and not d:

/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/core.py in read_data_page(f, helper, header, metadata, skip_nulls, selfmade)
    105         skip_definition_bytes(io_obj, daph.num_values)
    106     else:
--> 107         definition_levels, num_nulls = read_def(io_obj, daph, helper, metadata)
    108 
    109     repetition_levels = read_rep(io_obj, daph, helper, metadata)

/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/core.py in read_def(io_obj, daph, helper, metadata)
     65             definition_levels = read_data(
     66                     io_obj, daph.definition_level_encoding,
---> 67                     daph.num_values, bit_width)[:daph.num_values]
     68         num_nulls = daph.num_values - (definition_levels ==
     69                                        max_definition_level).sum()

/home/wesm/anaconda3/lib/python3.5/site-packages/fastparquet/core.py in read_data(fobj, coding, count, bit_width)
     46     if coding == parquet_thrift.Encoding.RLE:
     47         while o.loc < count:
---> 48             encoding.read_rle_bit_packed_hybrid(fobj, bit_width, o=o)
     49     else:
     50         raise NotImplementedError('Encoding %s' % coding)

ValueError: cannot assign slice from input of different size

In Python I have:

import pandas.util.testing as tm
import pyarrow.parquet as pq

df1 = pq.read_table('dict_encoded_impala.parq').to_pandas()
df2 = pq.read_table('dict_encoded.parquet').to_pandas()

tm.assert_frame_equal(df1, df2)

and

In [10]: df1.c0.sum()
Out[10]: -5173.725333513341

In Impala I have:

import ibis

hdfs = ibis.hdfs_connect('localhost', port=5070)
con = ibis.impala.connect('localhost', port=21050, hdfs_client=hdfs)

hdfs.mkdir('/tmp/parquet-test-3')
hdfs.put('/tmp/parquet-test-3/data.parquet', 'dict_encoded.parquet')

hdfs.mkdir('/tmp/parquet-test-4')
hdfs.put('/tmp/parquet-test-4/data.parquet', 'dict_encoded_impala.parq')
In [12]: con.parquet_file('/tmp/parquet-test-3').c0.sum().execute()
Out[12]: -5173.725333513341

In [13]: con.parquet_file('/tmp/parquet-test-4').c0.sum().execute()
Out[13]: -5173.725333513341

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.