Coder Social home page Coder Social logo

multidplyr's Introduction

multidplyr

Lifecycle: experimental R-CMD-check Codecov test coverage CRAN status

Overview

multidplyr is a backend for dplyr that partitions a data frame across multiple cores. You tell multidplyr how to split the data up with partition() and then the data stays on each node until you explicitly retrieve it with collect(). This minimises the amount of time spent moving data around, and maximises parallel performance. This idea is inspired by partools by Norm Matloff and distributedR by the Vertica Analytics team.

Due to the overhead associated with communicating between the nodes, you won’t see much performance improvement with simple operations on less than ~10 million observations, and you may want to instead try dtplyr, which uses data.table. multidplyr’s strength is found parallelising calls to slower and more complex functions.

(Note that unlike other packages in the tidyverse, multidplyr requires R 3.5 or greater. We hope to relax this requirement in the future.)

Installation

You can install the released version of multidplyr from CRAN with:

install.packages("multidplyr")

And the development version from GitHub with:

# install.packages("pak")
pak::pak("tidyverse/multidplyr")

Usage

To use multidplyr, you first create a cluster of the desired number of workers. Each one of these workers is a separate R process, and the operating system will spread their execution across multiple cores:

library(multidplyr)

cluster <- new_cluster(4)
cluster_library(cluster, "dplyr")
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union

There are two primary ways to use multidplyr. The first, and most efficient, way is to read different files on each worker:

# Create a filename vector containing different values on each worker
cluster_assign_each(cluster, filename = c("a.csv", "b.csv", "c.csv", "d.csv"))

# Use vroom to quickly load the csvs
cluster_send(cluster, my_data <- vroom::vroom(filename))

# Create a party_df using the my_data variable on each worker
my_data <- party_df(cluster, "my_data")

Alternatively, if you already have the data loaded in the main session, you can use partition() to automatically spread it across the workers. Before calling partition(), it’s a good idea to call group_by() to ensure that all of the observations belonging to a group end up on the same worker.

library(nycflights13)

flight_dest <- flights %>% group_by(dest) %>% partition(cluster)
flight_dest
#> Source: party_df [336,776 x 19]
#> Groups: dest
#> Shards: 4 [81,594--86,548 rows]
#> 
#> # A data frame: 336,776 × 19
#>    year month   day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#>   <int> <int> <int>    <int>          <int>     <dbl>    <int>          <int>
#> 1  2013     1     1      544            545        -1     1004           1022
#> 2  2013     1     1      558            600        -2      923            937
#> 3  2013     1     1      559            600        -1      854            902
#> 4  2013     1     1      602            610        -8      812            820
#> 5  2013     1     1      602            605        -3      821            805
#> 6  2013     1     1      611            600        11      945            931
#> # ℹ 336,770 more rows
#> # ℹ 11 more variables: arr_delay <dbl>, carrier <chr>, flight <int>,
#> #   tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>,
#> #   hour <dbl>, minute <dbl>, time_hour <dttm>

Now you can work with it like a regular data frame, but the computations will be spread across multiple cores. Once you’ve finished computation, use collect() to bring the data back to the host session:

flight_dest %>% 
  summarise(delay = mean(dep_delay, na.rm = TRUE), n = n()) %>% 
  collect()
#> # A tibble: 105 × 3
#>    dest  delay     n
#>    <chr> <dbl> <int>
#>  1 ABQ    13.7   254
#>  2 AUS    13.0  2439
#>  3 BQN    12.4   896
#>  4 BTV    13.6  2589
#>  5 BUF    13.4  4681
#>  6 CLE    13.4  4573
#>  7 CMH    12.2  3524
#>  8 DEN    15.2  7266
#>  9 DSM    26.2   569
#> 10 DTW    11.8  9384
#> # ℹ 95 more rows

Note that there is some overhead associated with copying data from the worker nodes back to the host node (and vice versa), so you’re best off using multidplyr with more complex operations. See vignette("multidplyr") for more details.

multidplyr's People

Contributors

anobel avatar batpigandme avatar bbrewington avatar cscheid avatar eipi10 avatar fugufisch avatar fvd avatar hadley avatar jennybc avatar maschette avatar michaelgrund avatar paulponcet avatar romainfrancois avatar smsaladi avatar wibeasley avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

multidplyr's Issues

multidplyr throws warning when used with dplyr 0.7.0

Using multidplyr with dplyr 0.7.0 generates the following warning:
group_indices_.grouped_df ignores extra arguments

Minimal example:
mtcars %>% partition(cyl) %>% collect()

It seems to have no impact on the calculated results, but I am not sure if that's always the case...

List columns get copied when partitioning

The example:

tmp <- data_frame(i = rep(1:4, 100000)) %>% 
  group_by(i) %>% 
  nest() %>% 
  mutate(iris = rep(list(iris), 4)) %>% 
  unnest(data, .drop = FALSE) 
pryr::mem_used() # 42.6 MB
# This doesn't use much memory on the main process as the repeated lists are just pointers

cl <- create_cluster(4)
tmp2 <- tmp  %>% 
  # This uses heaps of memory as iris is copied 100000 times
  partition(cluster = cl)

cluster_eval(cl, {
  pryr::mem_used() # 671 MB
})

A work around, of course, is to partition on the nested df, but a potential improvement to the package is to prevent the identical elements of the list column from getting copied. I am (stuck) on Windows.

Support join functions

Support for join functions when the partition columns are part of the by clause in the join for the two dataframes would be very useful, or self joins.

At the moment, an error is generated.

I will provide an example if there is interest in supporting the functionality.

Issue attaching multiple libraries to each node

The documentation infers you can attach multiple packages to each node with a single call to cluster_library(). However, this doesn't work, at least in my case:

get_default_cluster() %>%
  cluster_library(c("dplyr", "geosphere"))

Initialising 7 core cluster.
Error in library(packages, character.only = TRUE) : 
  'package' must be of length 1

Partition() doesn't play nicely with data.table

Fails with incorrect error message when data.table is used as input to partition.

my data.table is too large to replicate here, but perhaps it is enough to try:

cl <- create_cluster(3)
set_default_cluster(cl)

iris %>% partition(Species) #works

library(data.table)
data.table(iris) %>% partition(Species) #fails "Error: length(values) == length(cluster) is not TRUE"

Error in checkForRemoteErrors(lapply(cl, recvResult)) ... could not find function

I have a column in a tbl_df that is a character vector and I am trying to convert it from a string like, "20151001", to a ymd(tzone="UTC"). It's as if I need the lubridate library ?loaded on each node of the cluster?, which doesn't make much sense to me if that's the case.

Here is the error I get:

data.frame(date = rep('20151001', 10000)) %>%
tbl_df %>%
partition(cluster=create_cluster(2)) %>%
mutate(date = ymd(date, tzone = 'UTC'))

Initialising 2 core cluster.
Error in checkForRemoteErrors(lapply(cl, recvResult)) :
2 nodes produced errors; first error: could not find function "ymd"

summarize_each doesn't seem to work

Hi,

I get an error message when using summarize_each with multidplyr:

library(dplyr)
library(multidplyr)
library(nycflights13)
flights %>% 
  partition(cluster = create_cluster(2)) %>%
    summarise_each(funs(mean(., na.rm = TRUE)), matches("dep_delay")) %>% 
  collect()
Error in UseMethod("tbl_vars") : 
  no applicable method for 'tbl_vars' applied to an object of class "party_df"

Can't immediately see what's going wrong. Hope this helps!

output to separate files on each cluster

Not an issue but I'd like to output data in party_df objects to separate files on each cluster. With data.frames, I can do this:
library(readr)
data %>% write_tsv( ., file )
But write_tsv is not compatible with party_df.
Anyone get this to work?

group_by groups within partitions only

Let's assume that we've got 7 cores as worknodes.

data %>% partition() %>% group_by(transactionID) %>% collect()

The thing is that after collect() here we've got still even 7 rows with the same transactionID representing the same one group (one for each core).
Thus there is a need to call the same group_by one more time after collect() to get results we wanted intuitively (just to merge results from different cores).

Instead of that, multidplyr could overwrite dplyr's group_by in such simple way:

multidplyr:group_by(...):
if (currently_working_in_partitions) {
dplyr:group_by(...) %>% collect() %>% dplyr:group_by(...) %>% partitions()
} else {
dplyr:group_by(...)
}
}

^ I'm not 100% sure if that would give always correct results for all agregate functions
Ofc it would be even better, if such operation is written by design in multicore way (e.g. some map-reduce paradigm on cores).

or safer as workaround (but not using power of multi-cores):

multidplyr:group_by(...):
if (currently_working_in_partitions) {
collect() %>% dplyr:group_by(...) %>% partitions()
} else {
dplyr:group_by(...) %>%
}
}

Feature request: Option to partition based on object.size()

I have an application splitting a nested tbl using spatial data with highly unbalanced rows. It would be good to be able to have multidplyr approximately split based on the memory size of each row. i.e. Huge Russia would end up having it's own core, while 20 countries in Europe would be put together on another.

Related to #34.

`partition()` seams don't manage unbalanced (number) of group/core

# partitioning 9 df-rows grouped in 7 groups on 7 core
# windows server 2012 R2 (64 bit, R3.2.2, RStudio 1.0.136) 
# `partition()` distribute data only in 6 core with more
# than a group in some core and left the last one empty.

# 4 cpu / 8 core / winserver 2012 R2
#
# note: i was not able to reproduce a similar issue on my
# 2 cpu / 4 core macbook-pro

library(tidyverse)
#> Loading tidyverse: ggplot2
#> Loading tidyverse: tibble
#> Loading tidyverse: tidyr
#> Loading tidyverse: readr
#> Loading tidyverse: purrr
#> Loading tidyverse: dplyr
#> Conflicts with tidy packages ----------------------------------------------
#> filter(): dplyr, stats
#> lag():    dplyr, stats
library(magrittr)
#> 
#> Attaching package: 'magrittr'
#> The following object is masked from 'package:purrr':
#> 
#>     set_names
#> The following object is masked from 'package:tidyr':
#> 
#>     extract
library(multidplyr)
df <- data_frame(
    df_to_be_modelled = map(seq_len(9),
                            ~ mtcars[seq_len(.), ] 
    )
)

# suppose data are very unbalanced and that the time
# to model a couple of the first is quite the same spent
# to model one of the lasts: you like to group in a way 
# each core works quite the same amount of time
# (and use all "max - 1" core).

cluster <- create_cluster() # n - 1 =  7 by default
#> Initialising 7 core cluster.
set_default_cluster(cluster)

df %<>% mutate(group = c(1L, 2L, 2L, 1L, 3L, 4L, 5L, 6L, 7L))

df_cl <- df %>% partition(group)
df_cl
#> Source: party_df [9 x 2]
#> Groups: group
#> Shards: 6 [1--2 rows]
#> 
#> # S3: party_df
#>       df_to_be_modelled group
#>                  <list> <int>
#> 1 <data.frame [8 × 11]>     6
#> 2 <data.frame [2 × 11]>     2
#> 3 <data.frame [3 × 11]>     2
#> 4 <data.frame [5 × 11]>     3
#> 5 <data.frame [6 × 11]>     4
#> 6 <data.frame [7 × 11]>     5
#> 7 <data.frame [9 × 11]>     7
#> 8 <data.frame [1 × 11]>     1
#> 9 <data.frame [4 × 11]>     1

cluster_ls(cluster)
#> [[1]]
#> [1] "ukwoanoyti"
#> 
#> [[2]]
#> [1] "ukwoanoyti"
#> 
#> [[3]]
#> [1] "ukwoanoyti"
#> 
#> [[4]]
#> [1] "ukwoanoyti"
#> 
#> [[5]]
#> [1] "ukwoanoyti"
#> 
#> [[6]]
#> [1] "ukwoanoyti"
#> 
#> [[7]]
#> character(0)

# the first cluster have two different groups
# the last one have no groups, i.e. have no data
# note: the two observation of group 1 are both in the same
# node (i.e. cluster 4), as well as the two of group 2 (i.e. cluster 6).
# cluster 1 is the only one with two different groups.

actual_name <- cluster_ls(cluster)[[1]]
# cluster_eval(cluster, purrr::safely(print)(<name into `actual_name`>))
# sorry, I don't know how to do it in a simple automatic way
Session info
devtools::session_info()
#> Session info --------------------------------------------------------------
#>  setting  value                       
#>  version  R version 3.3.2 (2016-10-31)
#>  system   x86_64, mingw32             
#>  ui       RTerm                       
#>  language (EN)                        
#>  collate  Italian_Italy.1252          
#>  tz       Europe/Berlin               
#>  date     2017-01-31
#> Packages ------------------------------------------------------------------
#>  package    * version    date       source                            
#>  assertthat   0.1        2013-12-06 CRAN (R 3.3.0)                    
#>  backports    1.0.5      2017-01-18 CRAN (R 3.3.2)                    
#>  broom        0.4.1      2016-06-24 CRAN (R 3.3.1)                    
#>  colorspace   1.3-2      2016-12-14 CRAN (R 3.3.2)                    
#>  DBI          0.5-1      2016-09-10 CRAN (R 3.3.2)                    
#>  devtools     1.12.0     2016-06-24 CRAN (R 3.3.2)                    
#>  digest       0.6.11     2017-01-03 CRAN (R 3.3.2)                    
#>  dplyr      * 0.5.0      2016-06-24 CRAN (R 3.3.2)                    
#>  evaluate     0.10       2016-10-11 CRAN (R 3.3.2)                    
#>  foreign      0.8-67     2016-09-13 CRAN (R 3.3.2)                    
#>  ggplot2    * 2.2.1      2016-12-30 CRAN (R 3.3.2)                    
#>  gtable       0.2.0      2016-02-26 CRAN (R 3.3.0)                    
#>  haven        1.0.0      2016-09-23 CRAN (R 3.3.2)                    
#>  hms          0.3        2016-11-22 CRAN (R 3.3.2)                    
#>  htmltools    0.3.5      2016-03-21 CRAN (R 3.3.2)                    
#>  httr         1.2.1      2016-07-03 CRAN (R 3.3.2)                    
#>  jsonlite     1.2        2016-12-31 CRAN (R 3.3.2)                    
#>  knitr        1.15.1     2016-11-22 CRAN (R 3.3.2)                    
#>  lattice      0.20-34    2016-09-06 CRAN (R 3.3.2)                    
#>  lazyeval     0.2.0      2016-06-12 CRAN (R 3.3.2)                    
#>  lubridate    1.6.0      2016-09-13 CRAN (R 3.3.2)                    
#>  magrittr   * 1.5        2014-11-22 CRAN (R 3.3.0)                    
#>  memoise      1.0.0      2016-01-29 CRAN (R 3.3.0)                    
#>  mnormt       1.5-5      2016-10-15 CRAN (R 3.3.2)                    
#>  modelr       0.1.0      2016-08-31 CRAN (R 3.3.2)                    
#>  multidplyr * 0.0.0.9000 2017-01-27 Github (hadley/multidplyr@0085ded)
#>  munsell      0.4.3      2016-02-13 CRAN (R 3.3.0)                    
#>  nlme         3.1-130    2017-01-24 CRAN (R 3.3.2)                    
#>  plyr         1.8.4      2016-06-08 CRAN (R 3.3.2)                    
#>  psych        1.6.12     2017-01-08 CRAN (R 3.3.2)                    
#>  purrr      * 0.2.2      2016-06-18 CRAN (R 3.3.2)                    
#>  R6           2.2.0      2016-10-05 CRAN (R 3.3.2)                    
#>  Rcpp         0.12.9     2017-01-14 CRAN (R 3.3.2)                    
#>  readr      * 1.0.0      2016-08-03 CRAN (R 3.3.2)                    
#>  readxl       0.1.1      2016-03-28 CRAN (R 3.3.2)                    
#>  reshape2     1.4.2      2016-10-22 CRAN (R 3.3.2)                    
#>  rmarkdown    1.3        2016-12-21 CRAN (R 3.3.2)                    
#>  rprojroot    1.2        2017-01-16 CRAN (R 3.3.2)                    
#>  rvest        0.3.2      2016-06-17 CRAN (R 3.3.2)                    
#>  scales       0.4.1      2016-11-09 CRAN (R 3.3.2)                    
#>  stringi      1.1.2      2016-10-01 CRAN (R 3.3.2)                    
#>  stringr      1.1.0      2016-08-19 CRAN (R 3.3.2)                    
#>  tibble     * 1.2        2016-08-26 CRAN (R 3.3.2)                    
#>  tidyr      * 0.6.1      2017-01-10 CRAN (R 3.3.2)                    
#>  tidyverse  * 1.1.0      2017-01-20 CRAN (R 3.3.2)                    
#>  withr        1.0.2      2016-06-20 CRAN (R 3.3.2)                    
#>  xml2         1.1.1      2017-01-24 CRAN (R 3.3.2)                    
#>  yaml         2.1.14     2016-11-12 CRAN (R 3.3.2)

Request: Partition by group_by

I often can group my data into fewer groups than the number of cores on my node. I would like to partition my data into separate cores based on these groups, operate on them, and then collect the results. Is there a way to specify partition(cluster=cluster) by groups (from group_by() )?

"multiplyr" package do not handle integer64 columns correctly.

Through multidplyr processing (partition() -> collect()),
integer64 columns are converted to "numeric" automatically, however interger columns are not.

Example:

dt <- data.table(x=1:5, y=6:10)
dt$x <- as.integer64(dt$x)

class(dt$x)
# [1] "integer64"
class(dt$y)
# [1] "integer"

dt %>% partition %>% collect -> dt2

class(dt2$x)
# [1] "numeric" -> !!!
class(dt2$y)
# [1] "integer"

My environment is:

R version 3.2.2 (2015-08-14)
Platform: x86_64-w64-mingw32/x64 (64-bit)
Running under: Windows 7 x64 (build 7601) Service Pack 1

multidplyr package version is:

‘0.0.0.9000’

progress bar

Hi:
I am thinking of using this package for some intensive Bayesian simulations, since it appears I can "loop" through row of condition very nicely. However, I would like to have a progress bar. Is this possible ? Here is a basic t-test sim that gives an idea of what I am thinking of using this package for (but for far more complex models).

I am also wondering how random number streams would be handled here, and how to set a cluster rng stream.

library(multidplyr)

conditions to simulate

n1 <- c(10, 20, 30)
dat <- data.frame(n1, n2 = rev(n1), sd = c(20, 10, 5))
d <- expand.grid(dat)

define function to be applied to each row nsims times (could be anything: mse, etc)

func <- function(nsims, x1, x2, sd){
replicate(nsims, t.test(rnorm(x1, 0, sd),
rnorm(x2, 0, 1), var.equal = TRUE)$p.value)
}

create cluster

cluster <- create_cluster(16)

register function to cluster

cluster_assign_value(cluster, 'func', func)

results <- d %>%
partition(n1, n2, sd,cluster = cluster) %>%
do(t1 = mean(func(5000, x1 = .$n1, x2 = .$n2, sd = .$sd) < 0.05)) %>%
collect()

unlist into data frame

results$t1 <- unlist(results$t1)

filter() not observing cluster_assign_value() objects

As I understand how to send objects/values to each cluster for filtering - for example - is to use the cluster_assign_value() function for each value needed.

Maybe I'm not doing this correctly, but I would expect the below code to work.

> cluster2 <- create_cluster(2)
> mtcars_cl <- partition(mtcars, cluster = cluster2)
> cyl8 <- 8
> cluster_assign_vlaue(cluster = cluster2, name = "cyl8", value = a)
> cluster_get(cluster = cluster2, "cyl8")
[[1]]
[1] 8

[[2]]
[1] 8
> mtcars_cl %>% filter(cyl == cly8)
Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
  2 nodes produced errors; first error: object 'cly8' not found

Repartition

Need some way to repartition data across clusters in case it becomes highly imbalanced (or you want to group differently)

Make partition work with invoke_rows

It seems invoke_rows doesn't accept a party_df object. That would be useful...

cluster <- c(detectCores(), length(unique(mtcars$carb))/2) %>% min %>% create_cluster()
mtcars %>% partition(carb, cluster=cluster) %>% invoke_rows(.f = sum)

-->
Error: .d must be a data frame

problems with mulitidplyr and broom

I can't reproduce the behaviour with a smaller dataset so I attach my original data. Sorry for that.

first the original code, that worked fine:
myDF <- read.table("myDF.txt", sep = "\t")

myDF <- group_by(myDF, Sampling, BOT, Wells, rep) %>%
 summarize(maxOD = max(OD700)) %>%
 filter(maxOD >= 0.2) %>%
 inner_join(myDF, .)

fitDF <- myDF %>%
 group_by(Sampling, BOT, Wells, rep) %>%
 do(gompertz_fit = try(nls( OD700 ~ K * exp( -exp((( r * exp( 1)) / K) * (l - dhour) + 1)),
                    data = .,
                    start = list(K = 2, l = 30, r = 0.1)),
                    silent = T))

 filter(fitDF, class(gompertz_fit) == "nls") %>% tidy(., gompertz_fit)

no the same code but with multidplyr

myDF_par <- partition(myDF, Wells)

fitDF_t <- myDF_par %>%
  group_by(Sampling, BOT, Wells, rep) %>%
  do(gompertz_fit = try(nls( OD700 ~ K * exp( -exp((( r * exp( 1)) / K) * (l - dhour) + 1)),
                    data = .,
                    start = list(K = 2, l = 30, r = 0.1)),
                    silent = T))

fitDF_par <- collect(fitDF_t)

first thing that changed is that the following doesn't work any longer:

filter(fitDF_par, class(gompertz_fit) == "nls") 

although this is working

filter(fitDF_t, class(gompertz_fit) == "nls") 

for the former I found a workaround that does work

filter(fitDF_par, length( unlist (gompertz_fit)) > 1)

but tidy doesn't work with it

filter(fitDF_par, length( unlist (gompertz_fit)) > 1) %>% tidy(., gompertz_fit)

and this doesn't work either but probably for different reasons

filter(fitDF_t, length( unlist (gompertz_fit)) > 1) %>% tidy(., gompertz_fit)

I think the last one has to with the fact that the data are still distributed on the cores and may not be a bug (?). The other two might be?

group_by keeps old groups.

When defining new groups, in 'regular' dplyr the grouping is overwritten. In multiplyr, more and more groups are stacked on.

library(nycflights13)
flights1 <- partition(flights, flight)
flights1 <- group_by(flights1, month)
flights1 <- group_by(flights1, day)
flights1 <- group_by(flights1, month)
Source: party_df [336,776 x 16]
Groups: flight, month, day, month
Shards: 7 [47,967--48,341 rows]

    year month   day dep_time dep_delay arr_time arr_delay carrier tailnum flight
   (int) (int) (int)    (int)     (dbl)    (int)     (dbl)   (chr)   (chr)  (int)
1   2013     1     1      557        -3      709       -14      EV  N829AS   5708
2   2013     1     1      558        -2      849        -2      B6  N793JB     49
3   2013     1     1      558        -2      924         7      UA  N29129    194
4   2013     1     1      558        -2      923       -14      UA  N53441   1124
5   2013     1     1      559         0      702        -4      B6  N708JB   1806
6   2013     1     1      559        -1      854        -8      UA  N76515   1187
7   2013     1     1      623        -4      933         1      UA  N459UA    496
8   2013     1     1      629        -1      824        -9      US  N426US   1019
9   2013     1     1      643        -3      922       -18      UA  N497UA    556
10  2013     1     1      653        -7      936       -33      DL  N327NW   1383
..   ...   ...   ...      ...       ...      ...       ...     ...     ...    ...
Variables not shown: origin (chr), dest (chr), air_time (dbl), distance (dbl), hour
  (dbl), minute (dbl)

can not load multiple packages through `cluster_library`

According to ?cluster_library (regarding its argument):

packages A character vector of package names

But if I try to load multilpe packages, such as:
cluster_library(c("dplyr", "icd"))

I get the following error:

Error in library(packages, character.only = TRUE) : 
  'package' must be of length 1

And if I indeed take a look at the source code of cluster_library it seems like it is trynig to pass the packages (plural) from cluster_library to package (singular) in library

Progress bars

I know this is difficult, but it would be fantastic if you could pull this off.

Unable to load packages into nodes

Hello,

I was trying to load a "dplyr" and "lubridate" into my nodes but I encounter the following error:

cl <- create_cluster(cores=15)
Initialising 15 core cluster.
set_default_cluster(cl)
cluster_library(cl, "dplyr")
Error in checkForRemoteErrors(lapply(cl, recvResult)) :
15 nodes produced errors; first error: there is no package called 'dplyr'
cluster_library(cl, "lubridate")
Error in checkForRemoteErrors(lapply(cl, recvResult)) :
15 nodes produced errors; first error: package or namespace load failed for 'lubridate'

I have dplyr and lubridate loaded in my current R session so the package is there. Is it problem with any of my packages?

Thanks in advance.

Partition not working

I am having an issue with partioning a dataframe, which I don't understand. The grouped df looks as follows:

p2012 %>% group_by(year, month)
Source: local data frame [6,171,602 x 17]
Groups: year, month [6]

      ID  year   month            datetime        x       y sensor.depth dist_from_shore datapoint.number metadata.date       PIT length.total
   (int) (dbl)  (fctr)              (time)    (dbl)   (dbl)        (dbl)           (int)            (int)         (int)    (fctr)        (int)
1  41500  2011 October 2011-10-28 16:09:35 404703.4 5872714        3.519             132                9             1 116606026          504
2  41500  2011 October 2011-10-18 14:50:09 404722.3 5872716           NA             115                1             1 116606026          504
3  41500  2011 October 2011-10-27 12:11:07 404700.2 5872720        3.312             131                2             1 116606026          504
4  41500  2011 October 2011-10-28 08:19:37 404704.2 5872712           NA             131                3             1 116606026          504
5  41500  2011 October 2011-10-28 11:58:46 404703.8 5872712        3.312             132                4             1 116606026          504
6  41500  2011 October 2011-10-31 00:48:36 404712.1 5872728           NA             116               31             1 116606026          504
7  41500  2011 October 2011-10-31 00:51:08 404712.1 5872728        3.726             116               32             1 116606026          504
8  41500  2011 October 2011-10-31 00:51:33 404712.1 5872728        3.726             116               33             1 116606026          504
9  41500  2011 October 2011-10-31 00:56:08 404712.1 5872728        3.726             116               34             1 116606026          504
10 41500  2011 October 2011-10-31 01:04:26 404712.0 5872728           NA             116               35             1 116606026          504
..   ...   ...     ...                 ...      ...     ...          ...             ...              ...           ...       ...          ...

Grouping works fine, but partitioning throws the following error:

> p2012 %>% 
+   partition(year, month)

Error: length(values) == length(cluster) is not TRUE

I have a 32 core machine and multidplyr tries to allocate 31 cores. Creating a different cluster with fewer nodes does also produce the same error most of the time. The strange part is that it sometimes seems to work in a random fashion.

'filename' is assigned to each cluster, but... Error in inherits(x, "connection") : object 'filename' not found

Followed the final example from this page:
https://github.com/hadley/multidplyr/blob/master/vignettes/multidplyr.md

... in order to load multiple files to individual partitions.

> files <- dir(pattern = 'xyzzy')[1:2] %>% as.list
> print(files)
[[1]]
[1] "xyzzy.aa"

[[2]]
[1] "xyzzy.ab"

> for(f in files) print(file.info(f)$size)
[1] 211256293
[1] 211263186
> cluster <- create_cluster(2)
Initialising 2 core cluster.
> cluster_assign_each(
+     cluster = cluster,
+     name = "filename",
+     values = files
+ )
> cluster_assign_expr(
+     cluster = cluster,
+     name = "my_data",
+     expr = readr::read_tsv(filename)
+ )
Error in inherits(x, "connection") : object 'filename' not found

arrange function does not work in multidplyr

R CODE:

bel.rx.clm <- rbind(bel.rx.clms.2015d,bel.rx.clms.2016d) %>%
select(ENROLID, SVCDATE) %>%
arrange(SVCDATE)

cluster <- create_cluster(16)

bel.cohort.idx <- bel.rx.clm %>%
partition(ENROLID,cluster=cluster) %>%
arrange(SVCDATE) %>% first() %>% collect()

ERROR

Error in UseMethod("arrange_") :
no applicable method for 'arrange_' applied to an object of class "party_df"

Clusters created does not automatically clean up

Here is my code:

cl <- create_cluster(8)
set_default_cluster(cl)
data_par <- data %>% partition(IID, cluster=cl)
cluster_assign_value(cluster=cl, "myfun", myfun)
out <- data_par %>% do(res=myfun(.)) %>% collect()

After that, sometimes the clusters created does not automatically clean up
Is there any function can close those clusters?

Standard evaluation version for partition

I'd like to use partition for programming like I use group_by_ in dplyr. I tried to export the existing partition_ function in multidplyr, but that didn't do the trick.

Object management examples bug

Assuming the user's machine has more than one core, cl should be assigned the value create_cluster(2) so that:

cl %>%
  cluster_assign_each("z", list(3, 4)) %>%
  cluster_get("z")

doesn't throw an error on machines with e.g. 8 cores.

Resolving Dependencies on Nodes

Hello,

More of a general question than an issue - my apologies if this is the wrong forum for such an inquiry. I'd be grateful if you could point me to the correct forum if this is the case.

Perhaps I'm missing something obvious, but I'm not sure how best to leverage partition() in combination with do() with custom functions with dependencies. As a simple example, see below ...

library(dplyr)
library(multidplyr)
library(nycflights13)

cluster <- create_cluster(8)
set_default_cluster(cluster)

myfun <- function(x) { mean(x, na.rm=TRUE) }

system.time({
flights %>%
partition() %>%
summarise(myfun(dep_delay)) %>%
collect()
})

This fails unless I manually assign the function value on each node using the following preamble:

cluster %>% cluster_assign_value('myfun', myfun)

This is easy enough to do with a single function, but I am trying to apply functions with sometimes rather involved dependencies on other custom functions living in my environment. I also rely on ~20 - 30 libraries for much of my work and pushing each of these out to each node in the cluster is costly.

I can certainly write helper functions to push libraries and relevant local functions, but wanted to know if there's a more straightforward (or automatic) way of doing this? Or if I'm missing a key concept entirely?

-Chris

partition fails to complete

I'm following the multidplyr example given in the introduction, and R got stuck on the first multidplyr command (partition); the command doesn't complete (I've let it run 15 minutes, which seems like more than enough time).

#> flights1 <- partition(flights, flight)
Initializing 3 core cluster.

Looking at the system time from the example, it should have completed quickly:

#>    user  system elapsed 
#>   0.434   0.057   0.967

for me, the command has been running 15 minutes now.

What should I try to fix this problem?

This is with R 3.4.3 on an iMac running High Sierra (10.13.3) with 32Gb memory.

UPDATE:
To reproduce the error on a different system, I ran the same commands on my Macbook Pro (also OSX 10.13.3, R 3.4.3).

#> system.time(flights1 <- partition(flights, flight))
Initialising 7 core cluster:

#>    user  system elapsed 
#>   0.796   0.091   1.188

So this appears to be a machine or system configuration specific issue.

Still, I would appreciate any advice on things to try to get multidplyr working on my iMac.

'Could not find function "gam"' in `group_by()` version of `models` in vignette

I followed the vignette exactly (i.e., copy-and-paste) and ran into an issue towards the end:

Works

system.time({
    models <- by_dest %>% 
        do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})

Doesn't Work

system.time({
    models <- common_dest %>% 
        group_by(dest) %>% 
        do(mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
})

Traceback:
Error in eval(expr, envir, enclos) : could not find function "gam"
14 eval(expr, envir, enclos)
13 eval(args[[j]]$expr, envir = env)
12 do_.grouped_df(.data, .dots = lazyeval::lazy_dots(...))
11 do_(.data, .dots = lazyeval::lazy_dots(...))
10 do(., mod = gam(dep_delay ~ s(yday) + s(dep_time), data = .))
9 function_list[k]
8 withVisible(function_list[k])
7 freduce(value, _function_list)
6 _fseq(_lhs)
5 eval(expr, envir, enclos)
4 eval(quote(_fseq(_lhs)), env, env)
3 withVisible(eval(quote(_fseq(_lhs)), env, env))
2 common_dest %>% group_by(dest) %>% do(mod = gam(dep_delay ~ s(yday) +
s(dep_time), data = .))
1 system.time({
models <- common_dest %>% group_by(dest) %>% do(mod = gam(dep_delay ~
s(yday) + s(dep_time), data = .))
})

pmap_dfr - Error: Element 5 is not a vector (environment)

I'm trying to use multidplyr_0.0.0.9000 with dplyr_0.7.4.9000 and pmap_dfr from purrr_0.2.4.9000. The following code (without using multidplyr) works fine:

grid1 = as_tibble(expand.grid(m1 = c(1:10), m2 = c(20:30)))
retstuff = function(m1, m2) { return(tribble(~m3, ~m4, m1+1, m2+2)) }
pmap_dfr(grid1, retstuff)

When I try to partition the grid with multidplyr:

grid2 = partition(grid1, m1)
pmap_dfr(grid2, retstuff)

I get the error Error: Element 5 is not a vector (environment) from pmap_dfr()

I also get the #57 warning from partition(): group_indices_.grouped_df ignores extra arguments. Not sure if that's related or not.

`partition_()` fails before creating a party df

The standard evaluation version of partition() throws an error and doesn't create a party df.

library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
library(multidplyr)
data(iris)

partition_(iris, "Species") 
#> Initialising 3 core cluster.
#> Error in x$expr: $ operator is invalid for atomic vectors

# but partition() works just fine:
partition(iris, Species)
#> Source: party_df [150 x 5]
#> Groups: Species
#> Shards: 3 [50--50 rows]
#> 
#> # S3: party_df
#>    Sepal.Length Sepal.Width Petal.Length Petal.Width Species
#>           <dbl>       <dbl>        <dbl>       <dbl>  <fctr>
#> 1           5.1         3.5          1.4         0.2  setosa
#> 2           4.9         3.0          1.4         0.2  setosa
#> 3           4.7         3.2          1.3         0.2  setosa
#> 4           4.6         3.1          1.5         0.2  setosa
#> 5           5.0         3.6          1.4         0.2  setosa
#> 6           5.4         3.9          1.7         0.4  setosa
#> 7           4.6         3.4          1.4         0.3  setosa
#> 8           5.0         3.4          1.5         0.2  setosa
#> 9           4.4         2.9          1.4         0.2  setosa
#> 10          4.9         3.1          1.5         0.1  setosa
#> # ... with 140 more rows

I'm running updated versions of R, dplyr, and multidplyr:

R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: OS X El Capitan 10.11.6

locale:
[1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] multidplyr_0.0.0.9000 dplyr_0.5.0          

loaded via a namespace (and not attached):
[1] lazyeval_0.2.0 magrittr_1.5   R6_2.2.0       assertthat_0.1 parallel_3.3.2 DBI_0.5-1      tools_3.3.2   
[8] tibble_1.2     Rcpp_0.12.9   

devtools::install_github rersults in partial install

Install via devtools::install_github seems to only partially complete:

> devtools::install_github("hadley/multidplyr")
Downloading GitHub repo hadley/multidplyr@master
from URL https://api.github.com/repos/hadley/multidplyr/zipball/master
Installation failed: cannot open file 'C:/Users/<redacted>/AppData/Local/Temp/Rtmp8yh9oB/devtools5cc0298015e9/hadley-multidplyr-0085ded/R/cluster-assign.R': No such file or directory
>

When I look in that folder:

{ hadley-multidplyr-0085ded }  » ls
DESCRIPTION  NAMESPACE

make nest and partition/party_df play nice

Ref: https://twitter.com/hadleywickham/status/694687708799578112

Nest fails when encountering a party_df:

library(gapminder)
library(dplyr)
library(multidplyr)
library(tidyr)

cluster <- create_cluster(4) %>% cluster_library("purrr")

by_country <- gapminder %>% 
              partition(cluster = cluster, continent, country) %>%
              nest() %>% 
              mutate( model = map(data, ~ lm(lifeExp ~ year, data = . ))  ) %>%
              collect()

Current workaround:

library(gapminder)
library(dplyr)
library(multidplyr)
library(tidyr)

cluster <- create_cluster(4) %>% cluster_library("purrr")

by_country <- gapminder %>% 
              group_by(continent, country) %>% 
              nest() %>% 
              partition(cluster = cluster, continent, country) %>% 
              mutate( model = map(data, ~ lm(lifeExp ~ year, data = . ))  ) %>%
              collect()

Utility function for attaching functions?

(probably) dubious implementation, but something like:

cluster_assign_func <- function(cl, f) {
  fname <- as.character(substitute(f))
  stopifnot(is.function(f))
  cluster_assign_value(cl, fname, f)
}

Overhead is excrutiating

I have an application where the input data.frame size is around 60GB and the moving-memory-around stage is taking almost as long as the computation I want to run - possibly will be longer with the collect stage.

I know this isn't a particularly helpful issue, but it would extremely helpful if someone could work out a way to not have to do the moving-memory-around stage, perhaps with something inspired by Rcppparallel, which I can easily use for computations which are ungrouped.

multidplyr seems like it can only be helpful in situations where the computation stage is intensive, which, for most big data applications, isn't the case.

Error in if (left == 0) break : argument is of length zero

Running a number of models from qgam package (https://github.com/mfasiolo/qgam) and run into this error. Can't provide reproducible example but here is example code. The error seems to be quite random.

Thought you'd like to know, and would be good to know if it's to do with my setting up of the clusters.

#  Detect clusters
parallel::detectCores() # 4 cores

# Create groups
group <- rep(1:4, 
             length.out = nrow(TestData %>% group_by(Group1, Group2) %>% nest()))
TestData2 <- bind_cols(tibble(group), TestData %>% group_by(TaxonomicName, Method) %>% nest())
TestData2 <- TestData2 %>% unnest(data)

# Create clusters
cluster <- create_cluster(cores = 4)

# Create partition dataframe
TestDataParty <- TestData2 %>% partition(group, cluster = cluster)

# Load library for within each cluster
cluster_library(TestDataParty, "qgam")

# Run models in parallel 
TestDataPartyOutput <- TestDataParty %>% group_by(Group1, Group2) %>% 
  do(Model1 = tryCatch(qgam(y ~ s(x, k = 3), qu = 0.95, data = .), error = function(e) NA),
     Model2 = tryCatch(qgam(y ~ s(x, k = 4), qu = 0.95, data = .), error = function(e) NA),
     Model3 = tryCatch(qgam(y ~ s(x, k = 5), qu = 0.95, data = .), error = function(e) NA),
     Model4 = tryCatch(qgam(y ~ s(x, k = 7), qu = 0.95, data = .), error = function(e) NA))

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.