Can you nest parallel operations in R?
Win-Vector Blog 2016-09-03
Parallel programming is a technique to decrease how long a task takes by performing more parts of it at the same time (using additional resources). When we teach parallel programming in R we start with the basic use of parallel
(please see here for example). This is, in our opinion, a necessary step before getting into clever notation and wrapping such as doParallel and foreach. Only then do the students have a sufficiently explicit interface to frame important questions about the semantics of parallel computing. Beginners really need a solid mental model of what services are really being provided by their tools and to test edge cases early.
One question that comes up over and over again is “can you nest parLapply
?”
The answer is “no.” This is in fact an advanced topic, but it is one of the things that pops up when you start worrying about parallel programming. Please read on for what that is the right answer and how to work around that (simulate a “yes”).
I don’t think the above question is usually given sufficient consideration (nesting parallel operations can in fact make a lot of sense). You can’t directly nest parLapply
, but that is a different issue than can one invent a work-around. For example: a “yes” answer (really meaning there are work-arounds) can be found here. Again this is a different question than “is there a way to nest foreach loops” (which is possible through the nesting operator %.%
which presumably handles working around nesting issues in parLapply
).
Let’s set up a concrete example, so we can ask and answer a precise question. Suppose we have a list of jobs (coming from an external source) that we will simulate with the code fragment below.
jobs <- list(1:3,5:10,100:200)
Notice the jobs have wildly diverging sizes, this is an important consideration.
Suppose the task we want to perform is some the square roots of the entries. The standard (non-parallel) calculation would look like the following.
worker1 <- function(x) {
sum(sqrt(x))
}
lapply(jobs,worker1)
## [[1]]
## [1] 4.146264
##
## [[2]]
## [1] 16.32201
##
## [[3]]
## [1] 1231.021
For didactic purposes please pretend that the sum
function is very expensive and the sqrt
function is somewhat expensive.
If it was obvious we always had a great number of small sub-lists we would want to use parallelization to make sure we are performing many sum
s at the same time. We would then parallelize over the first level as below.
clus <- parallel::makeCluster(4)
parallel::parLapplyLB(clus,jobs,worker1)
## [[1]]
## [1] 4.146264
##
## [[2]]
## [1] 16.32201
##
## [[3]]
## [1] 1231.021
Notice that parallel::parLapplyLB
uses almost the same calling convention as lapply
and returns the exact same answer.
If it was obvious we had a single large sub-list we would want to make sure we were always parallelizing the sqrt
operations so we would prefer to parallelize as follows:
mkWorker2 <- function(clus) {
force(clus)
function(x) {
xs <- parallel::parLapplyLB(clus,x,sqrt)
sum(as.numeric(xs))
}
}
worker2 <- mkWorker2(clus)
lapply(jobs,worker2)
## [[1]]
## [1] 4.146264
##
## [[2]]
## [1] 16.32201
##
## [[3]]
## [1] 1231.021
(For the details of building functions and passing values to remote workers please see here.)
If we were not sure if in the future what structure we would encounter we would prefer to schedule all operations for possible parallel execution. This would minimize the number of idle resources and minimize the time to finish the jobs. Ideally that would look like the following (a nested use of parallel):
parallel::parLapplyLB(clus,jobs,worker2)
## Error in checkForRemoteErrors(val): 3 nodes produced errors; first error: invalid connection
Notice the above fails with an error. Wishing for flexible code is what beginners intuitively mean when they as if you can nest parallel calls. They may not be able to explain it, but they are worried they don’t have a good characterization of the work they are trying to parallelize over. They are not asking if things get magically faster by “parallelizing parallel.”
It isn’t too hard to find out what the nature of the error is: the communication connection socket file descriptors (con
) are passed as integers to each machine, but they are not valid descriptors where they arrive (they are just integers). We can see this by looking at the structure of the cluster:
str(clus)
## List of 4
## $ :List of 3
## ..$ con :Classes 'sockconn', 'connection' atomic [1:1] 5
## .. .. ..- attr(*, "conn_id")=<externalptr>
## ..$ host: chr "localhost"
## ..$ rank: int 1
## ..- attr(*, "class")= chr "SOCKnode"
## $ :List of 3
## ..$ con :Classes 'sockconn', 'connection' atomic [1:1] 6
## .. .. ..- attr(*, "conn_id")=<externalptr>
## ..$ host: chr "localhost"
## ..$ rank: int 2
## ..- attr(*, "class")= chr "SOCKnode"
## $ :List of 3
## ..$ con :Classes 'sockconn', 'connection' atomic [1:1] 7
## .. .. ..- attr(*, "conn_id")=<externalptr>
## ..$ host: chr "localhost"
## ..$ rank: int 3
## ..- attr(*, "class")= chr "SOCKnode"
## $ :List of 3
## ..$ con :Classes 'sockconn', 'connection' atomic [1:1] 8
## .. .. ..- attr(*, "conn_id")=<externalptr>
## ..$ host: chr "localhost"
## ..$ rank: int 4
## ..- attr(*, "class")= chr "SOCKnode"
## - attr(*, "class")= chr [1:2] "SOCKcluster" "cluster"
mkWorker3 <- function(clus) {
force(clus)
function(x) {
as.character(clus)
}
}
worker3 <- mkWorker3(clus)
parallel::parLapplyLB(clus,jobs,worker3)
## [[1]]
## [1] "list(con = 5, host = \"localhost\", rank = 1)"
## [2] "list(con = 6, host = \"localhost\", rank = 2)"
## [3] "list(con = 7, host = \"localhost\", rank = 3)"
## [4] "list(con = 8, host = \"localhost\", rank = 4)"
##
## [[2]]
## [1] "list(con = 5, host = \"localhost\", rank = 1)"
## [2] "list(con = 6, host = \"localhost\", rank = 2)"
## [3] "list(con = 7, host = \"localhost\", rank = 3)"
## [4] "list(con = 8, host = \"localhost\", rank = 4)"
##
## [[3]]
## [1] "list(con = 5, host = \"localhost\", rank = 1)"
## [2] "list(con = 6, host = \"localhost\", rank = 2)"
## [3] "list(con = 7, host = \"localhost\", rank = 3)"
## [4] "list(con = 8, host = \"localhost\", rank = 4)"
What we are getting wrong is: we can’t share control of the cluster to each worker just by passing the cluster object around. This would require some central registry and call-back scheme (which is one of the things packages like foreach
and doParallel
accomplish when they “register a parallel back-end to use”). Base parallel
depends more on explicit reference to the cluster data structure, so it isn’t “idiomatic parLapply” to assume we can find “the parallel cluster” (there could in fact be more than one at the same time).
So what is the work around?
One work around is to move to sophisticated wrappers (like doParallel
or even future
, also see here).
These fixes roughly split the calculation into two phases one dedicated to the sqrt
step and the second dedicated to the sum
step (remember we are pretending both of these operations are expensive). We can directly demonstrate such a reorganization as follows.
library('magrittr')
mkWorker4a <- function(clus) {
force(clus)
function(x) {
as.numeric(parallel::parLapplyLB(clus,x,sqrt))
}
}
worker4a <- mkWorker4a(clus)
worker4b <- function(x) {
sum(x)
}
jobs %>%
lapply(X=.,FUN=worker4a) %>%
parallel::parLapplyLB(cl=clus,X=.,fun=worker4b)
## [[1]]
## [1] 4.146264
##
## [[2]]
## [1] 16.32201
##
## [[3]]
## [1] 1231.021
The above depends on not too many of the sub-lists being short (and hiding opportunities for parallelism).
Another fix is to (at the cost of time effort and space) to re-organize the calculation into two sequenced phases, each of which is parallel- but not nested. It is a bit involved, but we show how to do that below (using R
’s Reduce
and split
functions to reorganize the data, though one could also use so-called “tidyverse” methods). This fix is more general, but introduces reorganization overhead.
# Preparation 1: collect all items into one flat list
sqrtjobs <- as.list(Reduce(c,jobs))
# Phase 1: sqrt every item in parallel
sqrts <- parallel::parLapplyLB(clus,sqrtjobs,sqrt)
# Preparation 2: re-assemble new job list that needs only sums
lengths <- vapply(jobs,length,numeric(1))
pattern <- lapply(seq_len(length(lengths)),
function(i) {rep(i,lengths[[i]])})
pattern <- Reduce(c,pattern)
sumjobs <- split(sqrts,Reduce(c,pattern))
sumjobs <- lapply(sumjobs,as.numeric)
names(sumjobs) <- names(jobs)
# Phase 2: sum all items in parallel
parallel::parLapplyLB(clus,sumjobs,sum)
## [[1]]
## [1] 4.146264
##
## [[2]]
## [1] 16.32201
##
## [[3]]
## [1] 1231.021
In conclusion: you can’t directly nest parLapply
, but you can usefully sequence through it.
parallel::stopCluster(clus)