Coder Social home page Coder Social logo

diskframe / disk.frame Goto Github PK

View Code? Open in Web Editor NEW
594.0 20.0 40.0 4.47 MB

Fast Disk-Based Parallelized Data Manipulation Framework for Larger-than-RAM Data

Home Page: https://diskframe.com

License: Other

R 99.13% C++ 0.83% Batchfile 0.04%
data-science manipulation-data large-dataset r medium-data data

disk.frame's Introduction

disk.frame

NOTICE

{disk.frame} has been soft-deprecated in favor of {arrow}. With the {arrow} 6.0.0 release, it’s now capable of doing larger-than-RAM data analysis quite well see release note. Hence, there is no strong reason to prefer {disk.frame} unless you have very specific feature needs.

For the above reason, I’ve decided to soft-deprecate {disk.frame} which means I will no longer actively develop new features for it but it will remain on CRAN in maintenance mode.

To help with the transition I’ve created a function, disk.frame::disk.frame_to_parquet(df, outdir) to help you convert existing {disk.frame}s to the parquet format so you can use {arrow} with it.

I am working on an reincarnation of {disk.frame} in Julia, so the {disk.frame} will live on!

Thank your for support {disk.frame}. I’ve learnt alot along the way, but time has come to move on!

Introduction

How do I manipulate tabular data that doesn’t fit into Random Access Memory (RAM)?

Use {disk.frame}!

In a nutshell, {disk.frame} makes use of two simple ideas

  1. split up a larger-than-RAM dataset into chunks and store each chunk in a separate file inside a folder and
  2. provide a convenient API to manipulate these chunks

{disk.frame} performs a similar role to distributed systems such as Apache Spark, Python’s Dask, and Julia’s JuliaDB.jl for medium data which are datasets that are too large for RAM but not quite large enough to qualify as big data.

Installation

You can install the released version of {disk.frame} from CRAN with:

install.packages("disk.frame")

And the development version from GitHub with:

# install.packages("devtools")
devtools::install_github("DiskFrame/disk.frame")

On some platforms, such as SageMaker, you may need to explicitly specify a repo like this

install.packages("disk.frame", repo="https://cran.rstudio.com")

Vignettes and articles

Please see these vignettes and articles about {disk.frame}

Common questions

a) What is {disk.frame} and why create it?

{disk.frame} is an R package that provides a framework for manipulating larger-than-RAM structured tabular data on disk efficiently. The reason one would want to manipulate data on disk is that it allows arbitrarily large datasets to be processed by R. In other words, we go from “R can only deal with data that fits in RAM” to “R can deal with any data that fits on disk”. See the next section.

b) How is it different to data.frame and data.table?

A data.frame in R is an in-memory data structure, which means that R must load the data in its entirety into RAM. A corollary of this is that only data that can fit into RAM can be processed using data.frames. This places significant restrictions on what R can process with minimal hassle.

In contrast, {disk.frame} provides a framework to store and manipulate data on the hard drive. It does this by loading only a small part of the data, called a chunk, into RAM; process the chunk, write out the results and repeat with the next chunk. This chunking strategy is widely applied in other packages to enable processing large amounts of data in R, for example, see chunkded arkdb, and iotools.

Furthermore, there is a row-limit of 2^31 for data.frames in R; hence an alternate approach is needed to apply R to these large datasets. The chunking mechanism in {disk.frame} provides such an avenue to enable data manipulation beyond the 2^31 row limit.

c) How is {disk.frame} different to previous “big” data solutions for R?

R has many packages that can deal with larger-than-RAM datasets, including ff and bigmemory. However, ff and bigmemory restrict the user to primitive data types such as double, which means they do not support character (string) and factor types. In contrast, {disk.frame} makes use of data.table::data.table and data.frame directly, so all data types are supported. Also, {disk.frame} strives to provide an API that is as similar to data.frame’s where possible. {disk.frame} supports many dplyr verbs for manipulating disk.frames.

Additionally, {disk.frame} supports parallel data operations using infrastructures provided by the excellent future package to take advantage of multi-core CPUs. Further, {disk.frame} uses state-of-the-art data storage techniques such as fast data compression, and random access to rows and columns provided by the fst package to provide superior data manipulation speeds.

d) How does {disk.frame} work?

{disk.frame} works by breaking large datasets into smaller individual chunks and storing the chunks in fst files inside a folder. Each chunk is a fst file containing a data.frame/data.table. One can construct the original large dataset by loading all the chunks into RAM and row-bind all the chunks into one large data.frame. Of course, in practice this isn’t always possible; hence why we store them as smaller individual chunks.

{disk.frame} makes it easy to manipulate the underlying chunks by implementing dplyr functions/verbs and other convenient functions (e.g. the cmap(a.disk.frame, fn, lazy = F) function which applies the function fn to each chunk of a.disk.frame in parallel). So that {disk.frame} can be manipulated in a similar fashion to in-memory data.frames.

e) How is {disk.frame} different from Spark, Dask, and JuliaDB.jl?

Spark is primarily a distributed system that also works on a single machine. Dask is a Python package that is most similar to {disk.frame}, and JuliaDB.jl is a Julia package. All three can distribute work over a cluster of computers. However, {disk.frame} currently cannot distribute data processes over many computers, and is, therefore, single machine focused.

In R, one can access Spark via sparklyr, but that requires a Spark cluster to be set up. On the other hand {disk.frame} requires zero-setup apart from running install.packages("disk.frame") or devtools::install_github("xiaodaigh/disk.frame").

Finally, Spark can only apply functions that are implemented for Spark, whereas {disk.frame} can use any function in R including user-defined functions.

Example usage

Set-up {disk.frame}

{disk.frame} works best if it can process multiple data chunks in parallel. The best way to set-up {disk.frame} so that each CPU core runs a background worker is by using

setup_disk.frame()

# this allows large datasets to be transferred between sessions
options(future.globals.maxSize = Inf)

The setup_disk.frame() sets up background workers equal to the number of CPU cores; please note that, by default, hyper-threaded cores are counted as one not two.

Alternatively, one may specify the number of workers using setup_disk.frame(workers = n).

Quick-start

suppressPackageStartupMessages(library(disk.frame))
library(nycflights13)

# this will setup disk.frame's parallel backend with number of workers equal to the number of CPU cores (hyper-threaded cores are counted as one not two)
setup_disk.frame()
#> The number of workers available for disk.frame is 6
# this allows large datasets to be transferred between sessions
options(future.globals.maxSize = Inf)

# convert the flights data.frame to a disk.frame
# optionally, you may specify an outdir, otherwise, the 
flights.df <- as.disk.frame(nycflights13::flights)

Example: dplyr verbs

dplyr verbs

{disk.frame} aims to support as many dplyr verbs as possible. For example

flights.df %>% 
  filter(year == 2013) %>% 
  mutate(origin_dest = paste0(origin, dest)) %>% 
  head(2)
#>    year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay
#> 1: 2013     1   1      517            515         2      830            819        11
#> 2: 2013     1   1      533            529         4      850            830        20
#>    carrier flight tailnum origin dest air_time distance hour minute           time_hour
#> 1:      UA   1545  N14228    EWR  IAH      227     1400    5     15 2013-01-01 05:00:00
#> 2:      UA   1714  N24211    LGA  IAH      227     1416    5     29 2013-01-01 05:00:00
#>    origin_dest
#> 1:      EWRIAH
#> 2:      LGAIAH

Group-by

Starting from {disk.frame} v0.3.0, there is group_by support for a limited set of functions. For example:

result_from_disk.frame = iris %>% 
  as.disk.frame %>% 
  group_by(Species) %>% 
  summarize(
    mean(Petal.Length), 
    sumx = sum(Petal.Length/Sepal.Width), 
    sd(Sepal.Width/ Petal.Length), 
    var(Sepal.Width/ Sepal.Width), 
    l = length(Sepal.Width/ Sepal.Width + 2),
    max(Sepal.Width), 
    min(Sepal.Width), 
    median(Sepal.Width)
    ) %>% 
  collect

The results should be exactly the same as if applying the same group-by operations on a data.frame. If not, please report a bug.

List of supported group-by functions

If a function you like is missing, please make a feature request here. It is a limitation that function that depend on the order a column can only be obtained using estimated methods.

Function Exact/Estimate Notes
min Exact
max Exact
mean Exact
sum Exact
length Exact
n Exact
n_distinct Exact
sd Exact
var Exact var(x) only cor, cov support planned
any Exact
all Exact
median Estimate
quantile Estimate One quantile only
IQR Estimate

Example: data.table syntax

library(data.table)

suppressWarnings(
  grp_by_stage1 <- 
    flights.df[
      keep = c("month", "distance"), # this analysis only required "month" and "dist" so only load those
      month <= 6, 
      .(sum_dist = sum(distance)), 
      .(qtr = ifelse(month <= 3, "Q1", "Q2"))
      ]
)
#> data.table syntax for disk.frame may be moved to a separate package in the future

grp_by_stage1
#>    qtr sum_dist
#> 1:  Q1 27188805
#> 2:  Q1   953578
#> 3:  Q1 53201567
#> 4:  Q2  3383527
#> 5:  Q2 58476357
#> 6:  Q2 27397926

The result grp_by_stage1 is a data.table so we can finish off the two-stage aggregation using data.table syntax

grp_by_stage2 = grp_by_stage1[,.(sum_dist = sum(sum_dist)), qtr]

grp_by_stage2
#>    qtr sum_dist
#> 1:  Q1 81343950
#> 2:  Q2 89257810

Basic info

To find out where the disk.frame is stored on disk:

# where is the disk.frame stored
attr(flights.df, "path")
#> [1] "C:\\Users\\RTX2080\\AppData\\Local\\Temp\\RtmpeygI4C\\file4e9c4ab6775c.df"

A number of data.frame functions are implemented for disk.frame

# get first few rows
head(flights.df, 1)
#>    year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay
#> 1: 2013     1   1      517            515         2      830            819        11
#>    carrier flight tailnum origin dest air_time distance hour minute           time_hour
#> 1:      UA   1545  N14228    EWR  IAH      227     1400    5     15 2013-01-01 05:00:00
# get last few rows
tail(flights.df, 1)
#>    year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time arr_delay
#> 1: 2013     9  30       NA            840        NA       NA           1020        NA
#>    carrier flight tailnum origin dest air_time distance hour minute           time_hour
#> 1:      MQ   3531  N839MQ    LGA  RDU       NA      431    8     40 2013-09-30 08:00:00
# number of rows
nrow(flights.df)
#> [1] 336776
# number of columns
ncol(flights.df)
#> [1] 19

Hex logo

disk.frame logo

Contributors

This project exists thanks to all the people who contribute.

Current Priorities

The work priorities at this stage are

  1. Bugs
  2. Urgent feature implementations that can improve an awful user-experience
  3. More vignettes covering every aspect of disk.frame
  4. Comprehensive Tests
  5. Comprehensive Documentation
  6. More features

Blogs and other resources

Title Language Author Date Description
25 days of disk.frame English ZJ 2019-12-01 25 tweets about {disk.frame}
https://www.researchgate.net/post/What-is-the-Maximum-size-of-data-that-is-supported-by-R-datamining English Knut Jägersberg 2019-11-11 Great answer on using disk.frame
{disk.frame} is epic English Bruno Rodriguez 2019-09-03 It’s about loading a 30G file into {disk.frame}
My top 10 R packages for data analytics English Jacky Poon 2019-09-03 {disk.frame} was number 3
useR! 2019 presentation video English Dai ZJ 2019-08-03
useR! 2019 presentation slides English Dai ZJ 2019-08-03
Split-apply-combine for Maximum Likelihood Estimation of a linear model English Bruno Rodriguez 2019-10-06 {disk.frame} used in helping to create a maximum likelihood estimation program for linear models
Emma goes to useR! 2019 English Emma Vestesson 2019-07-16 The first mention of {disk.frame} in a blog post
深入对比数据科学工具箱:Python3 和 R 之争(2020版) Chinese Harry Zhu 2020-02-16 Mentions disk.frame

Interested in learning {disk.frame} in a structured course?

Please register your interest at:

https://leanpub.com/c/taminglarger-than-ramwithdiskframe

Open Collective

If you like {disk.frame} and want to speed up its development or perhaps you have a feature request? Please consider sponsoring {disk.frame} on Open Collective

Backers

Thank you to all our backers!

Sponsor and back {disk.frame}

Support {disk.frame} development by becoming a sponsor. Your logo will show up here with a link to your website.

Contact me for consulting

Do you need help with machine learning and data science in R, Python, or Julia? I am available for Machine Learning/Data Science/R/Python/Julia consulting! Email me

Non-financial ways to contribute

Do you wish to give back the open-source community in non-financial ways? Here are some ways you can contribute

  • Write a blogpost about your {disk.frame} usage or experience. I would love to learn more about how {disk.frame} has helped you
  • Tweet or post on social media (e.g LinkedIn) about {disk.frame} to help promote it
  • Bring attention to typos and grammatical errors by correcting and making a PR. Or simply by raising an issue here
  • Star the {disk.frame} Github repo
  • Star any repo that {disk.frame} depends on e.g. {fst} and {future}

Related Repos

https://github.com/DiskFrame/disk.frame-fannie-mae-example https://github.com/DiskFrame/disk.frame-vs https://github.com/DiskFrame/disk.frame.ml

Download Counts & Build Status

disk.frame's People

Contributors

andthewings avatar ben-schwen avatar marcusklik avatar monkeywithacupcake avatar privefl avatar rekyt avatar romainfrancois avatar tmstauss avatar xiaodaigh avatar zjpi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

disk.frame's Issues

lazy rechunk

when rechunking, we can do it lazily because say chunk 15 -> 5 then 1,2,3 chunks can be treated as first chunk hence requiring no disk IO step.

Fuzzy Join at the chunk level

Great package, still learning how to work with it.

It would be great to allow for non-equi joins a the chunk level. Currently my code looks something like this for data.frame:

library(disk.frame)
library(lubridate)
library(dplyr)
library(tidyverse)

prices <- data.table(strike = seq(0,1000,by=1)) 
dates <- data.table(seq(ymd('2017-01-01'),ymd('2017-09-01'), by = '1 day'))
ticker <- data.frame(ticker = letters[1:26])

t = ticker %>% 
  crossing(dates) %>% 
  group_by(ticker) %>% 
  mutate(mid = 50+cumsum(rnorm(n = 244,mean=0.05)))
 
RICs = t %>% 
  group_by(ticker) %>% 
  mutate(midd = mid*0.98, midu = mid*1.02) %>% 
  fuzzyjoin::fuzzy_left_join(prices, by = c("midd" = "strike","midu" = "strike"), match_fun = list(`<=`,`>=`))

In the above example, both t and prices are data frames, and when joined generate a larger than ram data.frame. If I could do the join chunk by chunk, it would be much faster as it is currently single thread CPU bound.

Ideally, t would be a disk.frame, with each chunk fed in separately.

Is there currently a better way of doing this?

discrepancy in sharding

This case is related to #77 where reading multiple csv files were discussed

When I use shardby I get discrepant results for counts. Below is the summary of my findings

UniqueCarrier disk.frame shard disk.frame not_sharded actual
AA 5263987 6655735 6655735
AS 795393 994764 994764
CO 3187682 4017862 4017862
DL 6247363 7884309 7884309
EA 701362 919785 919785
HP 1434364 1809250 1809250
ML (1) 56231 70622 70622
NW 3502394 4410734 4410734
PA (1) 240546 316167 316167
PI 672802 873957 873957
PS 70563 83617 83617
TW 1918283 2421955 2421955
UA 4731504 5938506 5938506
US 5752829 7295919 7295919
WN 3428952 4231882 4231882

More detailed analysis for sharded and not-sharded approaches are available.

reading multiple csv files

First of all, kudos for this package, I hope it becomes as good as dask one day..

I was wondering if it's possible to read multiple large csv files in parallel with disk.frame? This is the initial demonstration of dask in many talks/presentations. However, in many sparklyr or similar demonstrations, flights data is copied to database or backend and reading multiple csv files in parallel is ignored.

As far as I can tell sparklyr's read csv function allows wildcard so that many csv files can be imported at once (e.g. "200*.csv"). I checked the vignette of disk.frame and couldn't find a hint about this. What is the easiest way to process multiple large csv files?

I can think of a workaround in which each file is processed individually to generate fst files then (maybe) those files can be merged, resharded? If I run the following code, there'll be fst files in separate folders. Is it possible to merge those fst files in single folder later (or during import)?

flights6.df <- csv_to_disk.frame("2006.csv", outdir="tmp2006.df",overwrite=T)
flights7.df <- csv_to_disk.frame("2007.csv", outdir="tmp2007.df",overwrite=T)
flights8.df <- csv_to_disk.frame("2008.csv", outdir="tmp2008.df",overwrite=T)

can't find object when comparing in data.frame or data.table.

Hi, XiaoDai,
Me again. Just found a critical issue that the data stored in DF format can't do match job.

#--------- Disk.Frame way, doesn't work.
DT <- as.disk.frame(
  data.table(A = letters[1:24], B = 1:48),
  outdir = "TEST",
  overwrite = TRUE
)

X <- "x"
DT[A == X]
# Error in eval(stub[[3L]], x, enclos) : 找不到对象'X'
# can't find object'X'

DT[A %in% X] # still can't find object

#---- Ordinary way, it works.

DX <- data.table(A = letters[1:24], B = 1:48)
DX[A == X]
#    A  B
# 1: x 24
# 2: x 48

Please advise if I have to do some change on doing matching or search.
Thanks
Eric

Easy way to select options (via Shiny?)

There are many hidden options in disk.frame. PRovide an easy way for the user to select these options via a shiny interface. Eg. number of workers, and lazy by default.

the interchange between disk.frame and data.frame

Dear Xiaodai,
Thanks for the great work of releasing R from the constraint of RAM. As a heavy user of data.table, I am wondering how I can seamlessly working with data.table(DT) and disk.frame(DF) together.
I tried few time, and found that. After converting a dataset into DF format, the common DT way is still workable, but when I need to assign it to a new variable, the new variable automatically becomes a DT.

DF <- as.disk.frame(
  data.table(X=rep(letters[1:5],2), Y=1:10),
  outdir = "temp",
  overwrite = T
)

class(DF)   #[1] "disk.frame"        "disk.frame.folder"
class(DF[,.N, by = .(X)]) #[1] "data.table" "data.frame"

It seems that the disk.frame serves as a storage keeper. It does save lot of space by saving everything in warehouse, and get things out for others to process.
Am I right?
Thanks for the great work again,
Eric

Restartable operations

Some operations take a long time and it fails mid way through then one needs to restart from the beginning. To solve this we can make a detailed plan of what to do an execute each step and if it fails wr can restart at any point. Perhaps the drake package csn help with this

Overwrite = T can cause some incorrect behaviour

When overwrite = T and the folder already contains some chunks then the overerite may overwrite too few files. Therefore its better to move all the chunks to a separate folder before writing to the folder

enhance `rbindlist.disk.frame` algorithm for faster merging many fst folders

Sorry for submitting many issues, I already mentioned this case in #77 , but I really liked the package and wanted to share my wishes.

rbindlist.disk.frame function is quite handy for merging data, but it replicates the data. For rbindlist.disk.frame function, I wanted to propose the following. Let's say we have too folders, data1 and data2 with following contents:

├── data1
│   ├── 11.fst
│   ├── 15.fst
│   └── 9.fst
└── data2
    ├── 11.fst
    ├── 13.fst
    ├── 16.fst
    └── 7.fst

In that case, row binding the data should consist of following steps:

  • create new folder merged
  • move non-clashing fst files to merged: data1/9.fst, data1/15.fst, data2/7.fst, data2/13.fst, data2/16.fst
  • for clashing data, move data1/11.fst to merged then row_bind data2/11.fst to it
  • update the .metadata folder contents for merged folder

My fst knowledge is limited, maybe this is not possible, but rbindlist.disk.frame takes long time when data gets bigger.

implement generalised form of `[.disk.frame`

Currently [.disk.frame can only accept i, j, by so not all features of data.table are supported. This should be generalised to [.disk.frame where all features of [ are supported.

memory limit for processing of fst files

While I was processing 12GB of data as mentioned in #77 , I'm barely making it in terms of RAM usage. I have 16GB of RAM and I close most programs (including Rstudio) and use R console to do group_by operations.

So, I was wondering, what is the limit of processing files with 16GB of RAM? Can I process 100GB file, for example with disk.frame in my computer?

Because I was expecting that if I split the files into many more chunks then RAM should not be a problem just it will took longer to process so much files.

If I split files into even smaller sizes, can I process even larger files?

(again sorry for too much issues today, this is last one for today)

large graphs in tidy format - similar to "GraphFrames"

It would be great if disk.frame is able to support analysis of large networks via tibble forms of graphs (aka tidygraph). Since Spark started to support GraphFrames maybe disk.frame can step into that domain as well.

tidygraph allows tibble access to nodes and edges separately but sharding large newtork into chunks might be quite challenging.

Fixed width files

This is an enhancement request, but I can't see how to designate it as such.

disk.frame looks to be wonderfully valuable. Many thanks in advance.

It would be helpful if the csv reading capacity could be extended to fixed-width files, as these files (often in the form of logs, etc) are typically massive.

The readr::read_fwf() is a nice implementation of fwf input, and might be a model for work on something comparable for this package.

Many thanks

Sharding Example

I'd like to understand how to create a disk.frame using shards, but couldn't really find an example.

Talked about here

My use case is a dataset covering many years (already a single large fst file!). I would like to shard by year, so that each chunk just contains data from a single year.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.