Coder Social home page Coder Social logo

eventual-inc / daft Goto Github PK

View Code? Open in Web Editor NEW
1.7K 17.0 95.0 10.24 MB

Distributed DataFrame for Python designed for the cloud, powered by Rust

Home Page: https://getdaft.io

License: Apache License 2.0

Python 34.68% Makefile 0.03% Jupyter Notebook 19.36% Shell 0.17% Rust 45.70% Dockerfile 0.06%
machine-learning python data-engineering data-science dataframe distributed-computing rust big-data

daft's Introduction

Daft dataframes can load any data such as PDF documents, images, protobufs, csv, parquet and audio files into a table dataframe structure for easy querying

Github Actions tests PyPI latest tag Coverage slack community

WebsiteDocsInstallation10-minute tour of DaftCommunity and Support

Daft: Distributed dataframes for multimodal data

Daft is a distributed query engine for large-scale data processing in Python and is implemented in Rust.

  • Familiar interactive API: Lazy Python Dataframe for rapid and interactive iteration
  • Focus on the what: Powerful Query Optimizer that rewrites queries to be as efficient as possible
  • Data Catalog integrations: Full integration with data catalogs such as Apache Iceberg
  • Rich multimodal type-system: Supports multimodal types such as Images, URLs, Tensors and more
  • Seamless Interchange: Built on the Apache Arrow In-Memory Format
  • Built for the cloud: Record-setting I/O performance for integrations with S3 cloud storage

Table of Contents

About Daft

Daft was designed with the following principles in mind:

  1. Any Data: Beyond the usual strings/numbers/dates, Daft columns can also hold complex or nested multimodal data such as Images, Embeddings and Python objects efficiently with it's Arrow based memory representation. Ingestion and basic transformations of multimodal data is extremely easy and performant in Daft.
  2. Interactive Computing: Daft is built for the interactive developer experience through notebooks or REPLs - intelligent caching/query optimizations accelerates your experimentation and data exploration.
  3. Distributed Computing: Some workloads can quickly outgrow your local laptop's computational resources - Daft integrates natively with Ray for running dataframes on large clusters of machines with thousands of CPUs/GPUs.

Getting Started

Installation

Install Daft with pip install getdaft.

For more advanced installations (e.g. installing from source or with extra dependencies such as Ray and AWS utilities), please see our Installation Guide

Quickstart

Check out our 10-minute quickstart!

In this example, we load images from an AWS S3 bucket's URLs and resize each image in the dataframe:

import daft

# Load a dataframe from filepaths in an S3 bucket
df = daft.from_glob_path("s3://daft-public-data/laion-sample-images/*")

# 1. Download column of image URLs as a column of bytes
# 2. Decode the column of bytes into a column of images
df = df.with_column("image", df["path"].url.download().image.decode())

# Resize each image into 32x32
df = df.with_column("resized", df["image"].image.resize(32, 32))

df.show(3)

Dataframe code to load a folder of images from AWS S3 and create thumbnails

Benchmarks

Benchmarks for SF100 TPCH

To see the full benchmarks, detailed setup, and logs, check out our benchmarking page.

More Resources

  • 10-minute tour of Daft - learn more about Daft's full range of capabilities including dataloading from URLs, joins, user-defined functions (UDF), groupby, aggregations and more.
  • User Guide - take a deep-dive into each topic within Daft
  • API Reference - API reference for public classes/functions of Daft

Contributing

To start contributing to Daft, please read CONTRIBUTING.md

Here's a list of good first issues to get yourself warmed up with Daft. Comment in the issue to pick it up, and feel free to ask any questions!

Telemetry

To help improve Daft, we collect non-identifiable data.

To disable this behavior, set the following environment variable: DAFT_ANALYTICS_ENABLED=0

The data that we collect is:

  1. Non-identifiable: events are keyed by a session ID which is generated on import of Daft
  2. Metadata-only: we do not collect any of our users’ proprietary code or data
  3. For development only: we do not buy or sell any user data

Please see our documentation for more details.

Dataframe Query Optimizer Multimodal Distributed Arrow Backed Vectorized Execution Engine Out-of-core
Daft Yes Yes Yes Yes Yes Yes
Pandas No Python object No optional >= 2.0 Some(Numpy) No
Polars Yes Python object No Yes Yes Yes
Modin Eagar Python object Yes No Some(Pandas) Yes
Pyspark Yes No Yes Pandas UDF/IO Pandas UDF Yes
Dask DF No Python object Yes No Some(Pandas) Yes

Check out our dataframe comparison page for more details!

License

Daft has an Apache 2.0 license - please see the LICENSE file.

daft's People

Contributors

amir-f avatar asrst avatar avriiil avatar chandbud5 avatar clarkzinzow avatar colin-ho avatar dependabot[bot] avatar dioptre avatar felixkleineboesing avatar gmweaver avatar jaychia avatar jeevb avatar kaytsui avatar kevinzwang avatar meepowin avatar murex971 avatar normallygaussian avatar nsalerni avatar pang-wu avatar ravern avatar rcurtin avatar reswqa avatar samster25 avatar sherlockbeard avatar suriya-ganesh avatar xcharleslin avatar xushiyan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

daft's Issues

Support column selection with Distinct and allow running on PyListBlocks

Currently DataFrame.distinct() runs on all columns and takes in no args. It should actually take in an optional list of columns to run distinct as well as a switch to choose first / last row.
We should also refactor the blocks.py to have a distinct operator that allows pylist blocks on non-distinct columns.

For example if you have a schema that looks like
ID | name | Image, you should be able to run distinct over ID | name.

API Documentation

Is your feature request related to a problem? Please describe.
Daft currently does not have comprehensive public API documentation for Dataframes and Expressions

Describe the solution you'd like
Documentation should be autogenerated from code docs and published to https://docs.getdaft.io/daft/api

Fix slowdown in notebook display of Images

Describe the bug
Currently we display images by converting the entire image into a base64-encoded PNG bytes array, and only performing resizing using the HTML <img> tag. This leads to slowdown in the display of large images.

To Reproduce
Run the text_to_image_generation tutorial notebook (https://colab.research.google.com/github/Eventual-Inc/Daft/blob/main/notebooks/tutorials/text_to_image/text_to_image_generation.ipynb) and on images_df.show(5) users will notice a significant slowdown in the display of the cell output.

Multi-Column Sorts

Is your feature request related to a problem? Please describe.
Sorts currently only allow for sorting on a single column. This makes not possible to run some workloads that require multi-column hierarchical sorts.

Describe the solution you'd like
Sorts should allow for multiple columns .sort(col("foo"), col("bar"))

Additional context
This currently blocks some TPC-H benchmarking tests that require multi-column sorts.

Improve Pretty Print of DataFrame for Notebooks

Is your feature request related to a problem? Please describe.
Currently when we print a dataframe in a notebook, theres no cell boundaries or style.
image

Describe the solution you'd like
Adding some CSS in repr_html similar to pandas or polars would greatly improve readability.

image

Requirements

Hi,
Can you please tell what are all the requirements when installing daft. I'm using python 3.9.
The error I am getting while installing is

"ERROR: Could not find a version that satisfies the requirement ray==1.13.0 (from getdaft) (from versions: none)
ERROR: No matching distribution found for ray==1.13.0".

Broken links in readme

[10-minute tour of Daft](https://getdaft.io/learn/10-min.html) is broken and also the quickstart one

Explode column pruning optimizer bug

Describe the bug

  • When .explode() is followed by a .select which selects a subset of columns, the column pruning optimizer goes into an infinite loop

To Reproduce

#257

Fix type inference for DataFrame.from_pydict and DataFrame.from_pylist

Describe the bug
Type inference breaks for dictionaries and lists such as:

df = DataFrame.from_pydict({"foo": [None, 1, 1]})

In this case, the type is inferred as null because the from_pydict code naively only samples the first element for performing type inference.

We should instead take the union of all types in the column and remove the null type.

Cast operator for Expressions

Is your feature request related to a problem? Please describe.
It is currently inconvenient to cast expressions, and users have to rely on UDFs to do so.

Describe the solution you'd like
Expressions should support a .cast method which will correctly perform casting between types. Some examples of cases that need to be handled and well documented:

  • FLOAT -> INTEGER should truncate the floats and users should have the ability to specify the behavior when encountering NaN (throw an error, return as None, default to a given value etc)
  • STRING -> INTEGER should coerce each string into an integer, or throw an error at runtime if it fails to do so due to non numeric characters or integer overflows
  • PY[object]-> INTEGER should check that the DataBlock that backs the column is an ArrowBlock of integer type, or throw an error at runtime otherwise

Additional context

For example, in our Quickstart notebook we cast a LOGICAL column to INTEGER using a UDF in order to perform a summation over the column: https://docs.getdaft.io/daft/quickstart#analytics

@udf(return_type=int)
def bool_to_int(c):
    return c.astype(int)

analysis_df = classified_images_df \
    .with_column("correct", bool_to_int(col("model_classification") == col("label"))) \
    .with_column("wrong", bool_to_int(col("model_classification") != col("label"))) \
    .groupby(col("label")) \
    .agg([
        (col("label").alias("num_rows"), "count"),
        (col("correct"), "sum"),
        (col("wrong"), "sum"),
    ]) \
    .sort(col("label"))

analysis_df.show()

Fix list literal expressions

Describe the bug
List literal expressions (such as lit([1, 2, 3])) currently do not work, as they get interpreted as a PyListDataBlock with that data instead of a "scalar".

To Reproduce
Steps to reproduce the behavior:

df = DataFrame.from_pydict({
    "foo": [[1], [2], [3]]
})
df.with_column("bar", col("foo") + lit([0, 0, 0]))

Expected behavior
The expected behavior is for each list to be extended with [0, 0, 0], but instead we get an error.

Run S3 testing in unit tests with minio

Is your feature request related to a problem? Please describe.
Certain issues may come up specifically when testing dataframe I/O from S3-compatible datastores
We should use minio to do this testing locally and ensure that our code works when working with S3.

Better kernels for mod on arrow numeric types

Is your feature request related to a problem? Please describe.

Our current implementation of modulus on Arrow blocks uses Numpy and relies on a bunch of casting back-and-forth between Arrow/Numpy.

We should clean up here with our own kernels that maintain the same type semantics as the Polars kernels:

mod(int, int) -> int
mod(int, float) -> float
mod(float, int) -> float
mod(float, float) -> float

Code: https://github.com/Eventual-Inc/Daft/blob/d33c85d/daft/runners/blocks.py#L734-L737

Allow for `.agg` aggregations over a non-grouped DataFrame

Is your feature request related to a problem? Please describe.
When working with a non-grouped DataFrame, we only have access to .sum() and .mean(). However, oftentimes to understand our data we might want aggregate statistics over the entire dataset, without groups.

Describe the solution you'd like
df.agg(...) should work similarly to how it works for grouped dataframes, and when run over a non-grouped dataframe it should return just one row.

Default column name behavior when there is no `.alias()` call

Describe the bug
Not sure if it's by design, but want to check the default column name behavior upon column expression (when there is no .alias() call)

To Reproduce
For trivial expressions like col("A"), col("B"), it will use the original column name if no .alias() is specified . This is expected (and kind of consistent with SQL as SELECT a, b:

>>> from daft import DataFrame, col
>>> df = DataFrame.from_pydict({
...     "A": [1, 2, 3, 4],
...     "B": [1.5, 2.5, 3.5, 4.5]
...  })
>>> df.select(col("A"), col("B")).show(2)
   A    B
0  1  1.5
1  2  2.5

For expression like col("B") * 2, it will use "B" as output column name if no .alias() is specified :

>>> df.select(col("A"), col("B") * 2).show(2)
   A    B
0  1  3.0
1  2  5.0

Not sure if this is by design (and this behavior will be kept in the future).

Especially, the output column name for col("A") + col("B") will be "A":

>>> df.select(col("B") * 2, col("A") + col("B")).show(2)
     B    A
0  3.0  2.5
1  5.0  4.5

Expected behavior
Not sure what's the best strategy -- shall we explicitly ask for .alias() call for column expression like col("A") + col("B")? Some SQL engine will assign column names like _col0, _col1 upon things like SELECT b * 2, a + b

Operations such as .is_null() do not return well-typed results on PY columns

Summary

df = DataFrame.from_pydict({"foo": [MyObj(), MyObj(), None, None]})
df = df.where(~(col("foo").is_null()))

Expected Behavior: The resulting dataframe should contain 2 rows
Observed Behavior: .where fails to run as .is_null() returns a new PY[object] column instead of a Logical column.

Notes

This is happening because our type matrices only work on primitive types at the moment, and any operation on Python type columns default to returning a PY[object] type. Instead, we need a way of adding PY types to the type matrix for certain well-typed operations such as .is_null().

Use indexing syntax for getting/setting columns in a Dataframe (e.g. df["foo"])

Is your feature request related to a problem? Please describe.
Syntactic sugar for getting columns:

df.join(df2, left_on=df["foo"], right_on=df2["bar"])

Syntactic sugar for setting columns:

# Equivalent to df = df.with_column("foo", col("bar") + 1)
df["foo"] = df["bar"] + 1

Syntactic sugar for selecting columns:

df = df["foo", col("bar"), df["baz"] + 1]

Describe the solution you'd like
The solution will make it easier to read/write code as users won't have to know .with_column, .select and col(...) to use Daft. This feels a lot more intuitive!

Fix DataFrame.show() display of null integers

Describe the bug
Null integers appear as nan in DataFrame.show() because Daft internally resorts to casting to Pandas before displaying, and since Pandas does not have support for nullable integers it performs a cast to floats.

To Reproduce

df = DataFrame.from_pydict({
    "A": [1, 2, 3, 4, None],
})
df.show()

Expected behavior
We should show a None instead.

Screenshots
image

Improve error messages for operator expression type resolution

Is your feature request related to a problem? Please describe.
When certain operators are invalid for the input expression types, the error message is not informative enough.

Example error message:

ExpressionTypeError: Unable to resolve type for operation: len(col(E#31: BYTES))

Describe the solution you'd like
The error message should more clearly describe the operator that caused the error and the input types that were invalid.

True Divide returning integer instead of float

Summary

from daft import DataFrame, col

df = DataFrame.from_pydict({"foo": [1, 2, 3], "bar": [1, 2, 3]})
pd_df = df.with_column("divided", col("foo") / col("bar")).to_pandas()
assert pd_df["divided"].dtype == np.float_

Expected Behavior: True divide between two integer column returns a float column
Observed Behavior: True divide between two integer column returns an integer column, rounded down

Notes

This is happening because the PyArrow compute function we are using exhibits this behavior. We need to make some modifications to our ArrowEvaluator.TRUEDIV operator.

Selection and configuration of backend (PyRunner vs RayRunner)

Is your feature request related to a problem? Please describe.

  1. Users are unable to select the backend that Daft runs on except through an environment variable DAFT_RUNNER=ray|py
  2. Users are unable to configure the backend that Daft runs on (e.g. ray_address="ray://...")

Describe the solution you'd like

  • A Daft context call daft.context.set_runner_*(...) that can only be called once per process and changes the default Daft global context.
  • Daft should have sensible defaults to run locally on the user's current machine
  • Daft should allow for reading the configurations from environment variables as well

Doc versioning on getdaft.io

Currently we only show the latest docs on getdaft.io. We want to be able to travel between the different releases.

DataFrame.explode for splatting sequences of data into rows

Is your feature request related to a problem? Please describe.
Many complex data use-cases involve a "flatmap" or "explode" kind of operation. For instance, sampling images from a video, slicing audio clips into interesting segments, cropping boxes in an image etc.

For these use-cases, an "explode" operation is usually used to splat a sequence of items into rows in a DataFrame.

Describe the solution you'd like

+-------------+--------------+
|           a |            b |
+----------------------------+
|           4 |    [1, 2, 3] |
+----------------------------+

The above dataframe, when exploded on col(b), will look like this:

+-------------+--------------+
|           a |            b |
+----------------------------+
|           4 |            1 |
+----------------------------+
|           4 |            2 |
+----------------------------+
|           4 |            3 |
+----------------------------+

This operation should work on columns of a nested type, or PY columns that are iterable.

Additional context
Note that nested types have not yet been implemented, and will not be in scope here.

Fix loading Parquet file from https URLs

Describe the bug

Currently Parquet files cannot be loaded from a HTTPS URL. Our tutorials which use the LAION dataset on Huggingface resort to first downloading the data to disk.

Allow `.as_py(MyClass)` to run getattr operations

Is your feature request related to a problem? Please describe.
Currently .as_py(MyClass) only allows for either running methods on each object, or indexing each object.

Describe the solution you'd like
.as_py(MyClass).my_property should be valid syntax for accessing the property my_property of every object in the column.

Additional context
This could be especially useful for Protobufs (see: #209)

Build Distinct Logical Plan Operator

  • The PruneColumn pass is trying to optimize out the current way we are doing District which is via the agg operator and min aggregation.
  • We should refactor this to actually be a separate logical plan

Grouped Aggregation for min/max on String is returning NULL

Describe the bug

When we run grouped aggregation in polars on strings for min / max we get nulls where we should get the actual min/max. This is causing our results for the operation to be wrong.
To Reproduce
Steps to reproduce the behavior:
with polars do:
image

image

Document behavior of operations when run on Null

Is your feature request related to a problem? Please describe.
We need to document the behavior of dataframe operations when run on Null:

  1. Expressions (such as add, subtract, .str.contains(), .url.download() etc)
  2. Joins
  3. Sorts
  4. Groupby
  5. Aggregations
  6. Distinct

Sphinx Documentation on GitHub Pages

Is your feature request related to a problem? Please describe.
Our documentation is split across a separate private repository (api-docs.getdaft.io), GitBook (docs.getdaft.io) and Webflow (www.getdaft.io)

Describe the solution you'd like
Let's centralize our documentation and have everything on Sphinx in just www.getdaft.io, hosted on GitHub Pages from this repository so that our documentation can be updated in lockstep with releases made from Eventual-Inc/Daft.

Perform Stateful UDF initialization once-per-worker instead of once-per-partition

Is your feature request related to a problem? Please describe.
Currently Stateful UDFs are initialized once per execution of a UDF, instead of once per worker initialization. This means that we are unable to amortize the cost of expensive initializations over the multiple partitions that a single worker is processing.

Describe the solution you'd like
Workers should be able to identify stateful UDFs in a given window of execution, and only run their initializers once only, reusing them across multiple windows.

Additional context
See code in @udf which hardcodes the initializations of stateful UDFs on a per-UDF call basis:

Daft/daft/udf.py

Lines 73 to 79 in 2496baa

# TODO: The initialization of stateful UDFs is currently done on the execution on every partition here,
# but should instead be done on a higher level so that state initialization cost can be amortized across partitions.
try:
initialized_func = func() if isinstance(func, type) else func
except:
logger.error(f"Encountered error when initializing user-defined function {func.__name__}")
raise

Parallel CSV and Parquet scans

Is your feature request related to a problem? Please describe.
We should support the parallel scanning of a large CSV/Parquet file into multiple partitions

Describe the solution you'd like
When a dataframe is read from CSV or Parquet files, it should compute upfront offsets required for parallel reads in each file, and distribute the reads to various workers to read in parallel.

Additional context
This feature is blocking apples-to-apples benchmarking of the TPC-H tests as we currently have to split the dataset into multiple files for partitioned reads.

Read files from storage with DataFrame.from_files

Is your feature request related to a problem? Please describe.
When users have binary files in storage (not in a tabular/collection format such as CSV/Parquet/JSON) often we want to just load each file as a single row in a DataFrame.

Describe the solution you'd like
df = DataFrame.from_files("s3://path/*.jpeg") should load all JPEG files in the supplied location. It loads all the filepaths as a string column, and optionally additional metadata such as file size, creation time etc.

This is easily followed up with a .url.download() call to download bytes for each file.

Additional context
See: #209 for more context on a real life use-case for this

Fix hashing of floats

Describe the bug
Currently hashing of floats is implemented by casting the floats to strings and hashing that. This is problematic and we should instead quantize the floats for hashing:

In blocks.py:

data_to_hash = data_to_hash.cast(pa.string())

Use Polars as the user-interface for UDFs

Is your feature request related to a problem? Please describe.
Currently UDFs use Numpy arrays as the user-interface (i.e. columns of data are passed into UDFs as np.ndarray). The main problem with doing this, or using Pandas as the interface is that null handling in these two libraries are tricky.

  1. Numpy has no inherent support for null handling.
  2. Pandas has limited support for null handling, and casts to NaN instead for integers.
np.array([1, 1, 1])  # ok, gives a series of int64
np.array([1, None, 1])  # does a cast to an object type
pd.Series([1, None, 1])  # does a cast to a float64 type, and None becomes NaN

Using either of these libraries would make it difficult for us to effectively convey None-ness (data is missing) vs NaN-ness (data is invalid) in our user-facing API, which is really important for a Dataframe library.

Describe the solution you'd like
We should use Polars (https://pola-rs.github.io/) as the data representation for incoming data in the UDF.

@polars_udf(return_type=int)
def f(x: polars.Series):
    ...

Return types in UDFs can remain compatible with Numpy, Pandas and Arrow, but should additionally support Polars as well.

Describe alternatives you've considered
We considered writing our own wrapper on top of our underlying datastructures, but Polars is a great choice here for a few reasons:

  1. Arrow-native: our underlying data is already in Arrow, so layering Polars on top is zero-copy and cheap
  2. Fast, very fast: Polars is very fast. Users gain access to all of the functionality implemented by Polars already.
  3. It handles NaN vs Null correctly: https://pola-rs.github.io/polars-book/user-guide/howcani/missing_data.html
  4. In terms of useability, users have the option of converting the Polars series into Pandas/Numpy if that's an API that they are more familiar with, and if they are willing to take the performance hit of doing so.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.