Speed up your R Work
Win-Vector Blog 2018-08-02
In this note we will show how to speed up work in R
by partitioning data and process-level parallelization. We will show the technique with three different R
packages: rqdatatable
, data.table
, and dplyr
. The methods shown will also work with base-R
and other packages.
For each of the above packages we speed up work by using wrapr::execute_parallel
which in turn uses wrapr::partition_tables
to partition un-related data.frame
rows and then distributes them to different processors to be executed. rqdatatable::ex_data_table_parallel
conveniently bundles all of these steps together when working with rquery
The partitioning is specified by the user preparing a grouping column that tells the system which sets of rows must be kept together in a correct calculation. We are going to try to demonstrate everything with simple code examples, and minimal discussion.
Keep in mind: unless the pipeline steps have non-trivial cost, the overhead of partitioning and distributing the work may overwhelm any parallel speedup. Also data.table
itself already seems to exploit some thread-level parallelism (notice user time is greater than elapsed time). That being said, in this note we will demonstrate a synthetic example where computation is expensive due to a blow-up in an intermediate join step.
Our example
First we set up our execution environment and example (some details: OSX 10.13.4 on a 2.8 GHz Intel Core i5 Mac Mini (Late 2015 model) with 8GB RAM and hybrid disk drive).
## Loading required package: rquery
## Warning: package 'dplyr' was built under R version 3.5.1
## [1] "Sun Jul 8 09:05:25 2018"
## [1] "R version 3.5.0 (2018-04-23)"
## [1] 4
## [1] '3.5.0'
## [1] '0.1.2'
## [1] '0.5.1'
## [1] '0.7.6'
ncore <- parallel::detectCores()print(ncore)
## [1] 4
cl <- parallel::makeCluster(ncore)print(cl)
## socket cluster with 4 nodes on host 'localhost'
set.seed(2362)mk_example <- function(nkey, nrep, ngroup = 20) { keys <- paste0("key_", seq_len(nkey)) key_group <- sample(as.character(seq_len(ngroup)), length(keys), replace = TRUE) names(key_group) <- keys key_table <- data.frame( key = rep(keys, nrep), stringsAsFactors = FALSE) key_table$data <- runif(nrow(key_table)) instance_table <- data.frame( key = rep(keys, nrep), stringsAsFactors = FALSE) instance_table$id <- seq_len(nrow(instance_table)) instance_table$info <- runif(nrow(instance_table)) # groups should be no finer than keys key_table$key_group <- key_group[key_table$key] instance_table$key_group <- key_group[instance_table$key] list(key_table = key_table, instance_table = instance_table)}dlist <- mk_example(10, 10)data <- dlist$instance_tableannotation <- dlist$key_table
rquery / rqdatatable
and rqdatatable
can implement a non-trivial calculation as follows.
# possible data lookup: find rows that# have lookup data <= infooptree <- local_td(data) %.>% natural_join(., local_td(annotation), jointype = "INNER", by = "key") %.>% select_rows_nse(., data <= info) %.>% pick_top_k(., k = 1, partitionby = "id", orderby = "data", reverse = "data", keep_order_column = FALSE) %.>% orderby(., "id")cat(format(optree))
## table('data'; ## key,## id,## info,## key_group) %.>%## natural_join(.,## table('annotation'; ## key,## data,## key_group),## j= INNER, by= key) %.>%## select_rows(.,## data <= info) %.>%## extend(.,## row_number := row_number(),## p= id,## o= "data" DESC) %.>%## select_rows(.,## row_number <= 1) %.>%## drop_columns(.,## row_number) %.>%## orderby(., id)
res1 <- ex_data_table(optree)head(res1)
## data id info key key_group## 1: 0.9152014 1 0.9860654 key_1 20## 2: 0.5599810 2 0.5857570 key_2 8## 3: 0.3011882 3 0.3334490 key_3 10## 4: 0.3650987 4 0.3960980 key_4 5## 5: 0.1469254 5 0.1753649 key_5 14## 6: 0.2567631 6 0.3510280 key_6 7
## [1] 94
And we can execute the operations in parallel.
parallel::clusterEvalQ(cl, library("rqdatatable"))
## [[1]]## [1] "rqdatatable" "rquery" "stats" "graphics" "grDevices" ## [6] "utils" "datasets" "methods" "base" ## ## [[2]]## [1] "rqdatatable" "rquery" "stats" "graphics" "grDevices" ## [6] "utils" "datasets" "methods" "base" ## ## [[3]]## [1] "rqdatatable" "rquery" "stats" "graphics" "grDevices" ## [6] "utils" "datasets" "methods" "base" ## ## [[4]]## [1] "rqdatatable" "rquery" "stats" "graphics" "grDevices" ## [6] "utils" "datasets" "methods" "base"
res2 <- ex_data_table_parallel(optree, "key_group", cl)head(res2)
## data id info key key_group## 1: 0.9152014 1 0.9860654 key_1 20## 2: 0.5599810 2 0.5857570 key_2 8## 3: 0.3011882 3 0.3334490 key_3 10## 4: 0.3650987 4 0.3960980 key_4 5## 5: 0.1469254 5 0.1753649 key_5 14## 6: 0.2567631 6 0.3510280 key_6 7
## [1] 94
can implement the same function.
## ## Attaching package: 'data.table'## The following objects are masked from 'package:dplyr':## ## between, first, last
## [1] '1.11.4'
data_table_f <- function(data, annotation) { data <- data.table::as.data.table(data) annotation <- data.table::as.data.table(annotation) joined <- merge(data, annotation, by = "key", all=FALSE, allow.cartesian=TRUE) joined <- joined[joined$data <= joined$info, ] data.table::setorderv(joined, cols = "data") joined <- joined[, .SD[.N], id] data.table::setorderv(joined, cols = "id")}resdt <- data_table_f(data, annotation)head(resdt)
## id key info key_group.x data key_group.y## 1: 1 key_1 0.9860654 20 0.9152014 20## 2: 2 key_2 0.5857570 8 0.5599810 8## 3: 3 key_3 0.3334490 10 0.3011882 10## 4: 4 key_4 0.3960980 5 0.3650987 5## 5: 5 key_5 0.1753649 14 0.1469254 14## 6: 6 key_6 0.3510280 7 0.2567631 7
## [1] 94
We can also run data.table
in parallel using wrapr::execute_parallel
parallel::clusterEvalQ(cl, library("data.table"))
## [[1]]## [1] "data.table" "rqdatatable" "rquery" "stats" "graphics" ## [6] "grDevices" "utils" "datasets" "methods" "base" ## ## [[2]]## [1] "data.table" "rqdatatable" "rquery" "stats" "graphics" ## [6] "grDevices" "utils" "datasets" "methods" "base" ## ## [[3]]## [1] "data.table" "rqdatatable" "rquery" "stats" "graphics" ## [6] "grDevices" "utils" "datasets" "methods" "base" ## ## [[4]]## [1] "data.table" "rqdatatable" "rquery" "stats" "graphics" ## [6] "grDevices" "utils" "datasets" "methods" "base"
parallel::clusterExport(cl, "data_table_f")dt_f <- function(tables_list) { data <- tables_list$data annotation <- tables_list$annotation data_table_f(data, annotation)}data_table_parallel_f <- function(data, annotation) { respdt <- wrapr::execute_parallel( tables = list(data = data, annotation = annotation), f = dt_f, partition_column = "key_group", cl = cl) %.>% data.table::rbindlist(.) data.table::setorderv(respdt, cols = "id") respdt}respdt <- data_table_parallel_f(data, annotation)head(respdt)
## id key info key_group.x data key_group.y## 1: 1 key_1 0.9860654 20 0.9152014 20## 2: 2 key_2 0.5857570 8 0.5599810 8## 3: 3 key_3 0.3334490 10 0.3011882 10## 4: 4 key_4 0.3960980 5 0.3650987 5## 5: 5 key_5 0.1753649 14 0.1469254 14## 6: 6 key_6 0.3510280 7 0.2567631 7
## [1] 94
can also implement the example.
dplyr_pipeline <- function(data, annotation) { res <- data %>% inner_join(annotation, by = "key") %>% filter(data <= info) %>% group_by(id) %>% arrange(-data) %>% mutate(rownum = row_number()) %>% ungroup() %>% filter(rownum == 1) %>% arrange(id) res}resd <- dplyr_pipeline(data, annotation)head(resd)
## # A tibble: 6 x 7## key id info key_group.x data key_group.y rownum## <chr> <int> <dbl> <chr> <dbl> <chr> <int>## 1 key_1 1 0.986 20 0.915 20 1## 2 key_2 2 0.586 8 0.560 8 1## 3 key_3 3 0.333 10 0.301 10 1## 4 key_4 4 0.396 5 0.365 5 1## 5 key_5 5 0.175 14 0.147 14 1## 6 key_6 6 0.351 7 0.257 7 1
## [1] 94
And we can use wrapr::execute_parallel
to parallelize the dplyr
parallel::clusterEvalQ(cl, library("dplyr"))
## [[1]]## [1] "dplyr" "data.table" "rqdatatable" "rquery" "stats" ## [6] "graphics" "grDevices" "utils" "datasets" "methods" ## [11] "base" ## ## [[2]]## [1] "dplyr" "data.table" "rqdatatable" "rquery" "stats" ## [6] "graphics" "grDevices" "utils" "datasets" "methods" ## [11] "base" ## ## [[3]]## [1] "dplyr" "data.table" "rqdatatable" "rquery" "stats" ## [6] "graphics" "grDevices" "utils" "datasets" "methods" ## [11] "base" ## ## [[4]]## [1] "dplyr" "data.table" "rqdatatable" "rquery" "stats" ## [6] "graphics" "grDevices" "utils" "datasets" "methods" ## [11] "base"
parallel::clusterExport(cl, "dplyr_pipeline")dplyr_f <- function(tables_list) { data <- tables_list$data annotation <- tables_list$annotation dplyr_pipeline(data, annotation)}dplyr_parallel_f <- function(data, annotation) { respdt <- wrapr::execute_parallel( tables = list(data = data, annotation = annotation), f = dplyr_f, partition_column = "key_group", cl = cl) %>% dplyr::bind_rows() %>% arrange(id)}respdplyr <- dplyr_parallel_f(data, annotation)head(respdplyr)
## # A tibble: 6 x 7## key id info key_group.x data key_group.y rownum## <chr> <int> <dbl> <chr> <dbl> <chr> <int>## 1 key_1 1 0.986 20 0.915 20 1## 2 key_2 2 0.586 8 0.560 8 1## 3 key_3 3 0.333 10 0.301 10 1## 4 key_4 4 0.396 5 0.365 5 1## 5 key_5 5 0.175 14 0.147 14 1## 6 key_6 6 0.351 7 0.257 7 1
## [1] 94
We can benchmark the various realizations.
dlist <- mk_example(300, 300)data <- dlist$instance_tableannotation <- dlist$key_tabletimings <- microbenchmark( data_table_parallel = nrow(data_table_parallel_f(data, annotation)), data_table = nrow(data_table_f(data, annotation)), rqdatatable_parallel = nrow(ex_data_table_parallel(optree, "key_group", cl)), rqdatatable = nrow(ex_data_table(optree)), dplyr_parallel = nrow(dplyr_parallel_f(data, annotation)), dplyr = nrow(dplyr_pipeline(data, annotation)), times = 10L)saveRDS(timings, "Parallel_rqdatatable_timings.RDS")
## Unit: seconds## expr min lq mean median uq## data_table_parallel 5.274560 5.457105 5.609827 5.546554 5.686829## data_table 9.401677 9.496280 9.701807 9.541218 9.748159## rqdatatable_parallel 7.165216 7.497561 7.587663 7.563883 7.761987## rqdatatable 12.490469 12.700474 13.320480 12.898154 14.229233## dplyr_parallel 6.492262 6.572062 6.784865 6.787277 6.875076## dplyr 20.056555 20.450064 20.647073 20.564529 20.800350## max neval## 6.265888 10## 10.419316 10## 7.949404 10## 14.282269 10## 7.328223 10## 21.332103 10
# autoplot(timings)timings <- as.data.frame(timings)timings$seconds <- timings$time/1e+9ScatterBoxPlotH(timings, xvar = "seconds", yvar = "expr", title="task duration by method")
The benchmark timings show parallelized data.table
is the fastest, followed by parallelized dplyr
, and parallelized rqdatatable
. In the non-paraellized case data.table
is the fastest, followed by rqdatatable
, and then dplyr
A reason dplyr
sees greater speedup relative to its own non-parallel implementation (yet does not beat data.table
) is that data.table
starts already multi-threaded, so data.table
is exploiting some parallelism even before we added the process level parallelism (and hence sees less of a speed up, though it is fastest).
pipelines exhibit superior performance on big data systems (Spark, PostgreSQL, Amazon Redshift, and hopefully soon Google bigquery), and rqdatatable
supplies a very good in-memory implementation of the rquery
system based on data.table
. rquery
also speeds up solution development by supplying higher order operators and early debugging features.
In this note we have demonstrated simple procedures to reliably parallelize any of rqdatatable
, data.table
, or dplyr
Note: we did not include alternatives such as multidplyr
or dtplyr
in the timings, as they did not appear to work on this example.
Note: Matt Summersgill pointed out for this sort of problem data.table
is far more effective than I have originally shown. rqdatatable
can not take advantage of this technique, because I have not yet truly implemented theta-joins in the rqdatatable
dialect of rquery
(it is on my TODO-list).
A re-rendering of this article can be found here, source code here, and raw timings here.