A gentle introduction to parallel computing in R
Win-Vector Blog 2016-01-25
Let’s talk about the use and benefits of parallel computation in R.
IBM’s Blue Gene/P massively parallel supercomputer (Wikipedia).
Parallel computing is a type of computation in which many calculations are carried out simultaneously.”
Wikipedia quoting: Gottlieb, Allan; Almasi, George S. (1989). Highly parallel computing
The reason we care is: by making the computer work harder (perform many calculations simultaneously) we wait less time for our experiments and can run more experiments. This is especially important when doing data science (as we often do using the R analysis platform) as we often need to repeat variations of large analyses to learn things, infer parameters, and estimate model stability.
Typically to get the computer to work a harder the analyst, programmer, or library designer must themselves work a bit hard to arrange calculations in a parallel friendly manner. In the best circumstances somebody has already done this for you:
- Good parallel libraries, such as the multi-threaded BLAS/LAPACK libraries included in Revolution R Open (RRO, now Microsoft R Open) (see here).
- Specialized parallel extensions that supply their own high performance implementations of important procedures such as rx methods from RevoScaleR or h2o methods from h2o.ai.
- Parallelization abstraction frameworks such as Thrust/Rth (see here).
- Using R application libraries that dealt with parallelism on their own (examples include gbm, boot and our own vtreat). (Some of these libraries do not attempt parallel operation until you specify a parallel execution environment.)
In addition to having a task ready to “parallelize” you need a facility willing to work on it in a parallel manner. Examples include:
- Your own machine. Even a laptop computer usually now has four our more cores. Potentially running four times faster, or equivalently waiting only one fourth the time, is big.
- Graphics processing units (GPUs). Many machines have a one or more powerful graphics cards already installed. For some numerical task these cards are 10 to 100 times faster than the basic Central Processing Unit (CPU) you normally use for computation (see here).
- Clusters of computers (such as Amazon ec2, Hadoop backends and more).
Obviously parallel computation with R is a vast and specialized topic. It can seem impossible to quickly learn how to use all this magic to run your own calculation more quickly.
In this tutorial we will demonstrate how to speed up a calculation of your own choosing using basic R.
First you must have a problem that is amenable to parallelism. The most obvious such problems involve repetition (what the cognoscenti call “embarrassingly parallel”):
- Tuning model parameters by repeated re-fitting of models (such as is done by the caret package).
- Apply a transform to many different variables (such as is done by the vtreat package).
- Estimating model quality through cross-validation, bootstrap or other repeated sampling techniques.
So we are going to assume we have a task in mind that has already been cup up into a number of simple repetitions. Note: this conceptual cutting up is not necessarily always easy, but it is the step needed to start the processes.
Here is our example task: fitting a predictive model onto a small dataset. We load the data set and some definitions into our workspace:
d <- iris # let "d" refer to one of R's built in data setsvars <- c('Sepal.Length','Sepal.Width','Petal.Length')yName <- 'Species'yLevels <- sort(unique(as.character(d[[yName]])))print(yLevels)## [1] "setosa" "versicolor" "virginica"
(We are using the convention that any line starting with “##” is a printed result from the R command above itself.)
We encounter a small modeling issue: the variable we are trying to predict takes on three levels. The modeling technique we were going to use (glm(family='binomial')
) is not specialized to predict “multinomial outcomes” (though other libraries are). So we decide to solve this using a “one versus rest” strategy and build a series of classifiers: each separating one target from the rest of the outcomes. This is where we see a task that is an obvious candidate for parallelization. Let’s wrap building a single target model into a function for neatness:
fitOneTargetModel <- function(yName,yLevel,vars,data) { formula <- paste('(',yName,'=="',yLevel,'") ~ ', paste(vars,collapse=' + '),sep='') glm(as.formula(formula),family=binomial,data=data)}
Then the usual “serial” way to build all the models would look like the following:
for(yLevel in yLevels) { print("*****") print(yLevel) print(fitOneTargetModel(yName,yLevel,vars,d))}
Or we could wrap our procedure into a new single argument function (this pattern is called Currying) and then use R’s elegant lapply()
notation:
worker <- function(yLevel) { fitOneTargetModel(yName,yLevel,vars,d)}models <- lapply(yLevels,worker)names(models) <- yLevelsprint(models)
The advantage of the lapply()
notation is it emphasizes the independent nature of each calculation, exactly the sort of isolation we need to parallelize our calculations. Think of the for-loop
as accidentally over-specifying the calculation by introducing a needless order or sequence of operations.
Re-organizing our calculation functionally has gotten us ready to use a parallel library and perform the calculation in parallel. First we set up a parallel cluster to do the work:
# Start up a parallel clusterparallelCluster <- parallel::makeCluster(parallel::detectCores())print(parallelCluster)## socket cluster with 4 nodes on host ‘localhost’
Notice we created a “socket cluster.” The socket cluster is a crude-seeming “course grained parallel” cluster that is extremely flexible. A socket cluster is crude in that is fairly slow to send jobs to it (so you want to send work in big “coarse” chunks) but amazingly flexible as it can be implemented as any of: multiple cores on a single machine, multiple cores on multiple machines on a shared network, or on top of other systems such as an MPI cluster.
At this point we would expect code like below to work (see here for details on tryCatch
).
tryCatch( models <- parallel::parLapply(parallelCluster, yLevels,worker), error = function(e) print(e))## <simpleError in checkForRemoteErrors(val):## 3 nodes produced errors; first error: ## could not find function "fitOneTargetModel">
Instead of results, we got the error “could not find function "fitOneTargetModel">
.”
The issue is: on a socket cluster arguments to parallel::parLapply
are copied to each processing node over a communications socket. However, the entirety of the current execution environment (in our case the so-called “global environment) is not copied over (or copied back, only values are returned). So our function worker()
when transported to the parallel nodes must have an altered lexical closure (as it can not point back to our execution environment) and it appears this new lexical closure no longer contains references to the needed values yName
, vars
, d
and fitOneTargetModel
. This is unfortunate, but makes sense. R uses entire execution environments to implement the concept of lexical closures, and R has no way of knowing which values in a given environment are actually needed by a given function.
So we know what is wrong, how do we fix it? We fix it by using an environment that is not the global environment to transport the values we need. The easiest way to do this is to use our own lexical closure. To do this we wrap the entire process inside a function (so we are executing in a controlled environment). The code that works is as follows:
# build the single argument function we are going to pass to parallelmkWorker <- function(yName,vars,d) { # make sure each of the three values we need passed # are available in this environment force(yName) force(vars) force(d) # define any and every function our worker function # needs in this environment fitOneTargetModel <- function(yName,yLevel,vars,data) { formula <- paste('(',yName,'=="',yLevel,'") ~ ', paste(vars,collapse=' + '),sep='') glm(as.formula(formula),family=binomial,data=data) } # Finally: define and return our worker function. # The function worker's "lexical closure" # (where it looks for unbound variables) # is mkWorker's activation/execution environment # and not the usual Global environment. # The parallel library is willing to transport # this environment (which it does not # do for the Global environment). worker <- function(yLevel) { fitOneTargetModel(yName,yLevel,vars,d) } return(worker)}models <- parallel::parLapply(parallelCluster,yLevels, mkWorker(yName,vars,d))names(models) <- yLevelsprint(models)
The above works because we forced the values we needed into the new execution environment and defined the function we were going to use directly in that environment. Obviously it is incredibly tedious and wasteful to have to re-define every function we need every time we need it (though we could also have passed in the wrapper as we did with the other values). A more versatile pattern is: use a helper function we supply called “bindToEnv
” to do some of the work. With bindToEnv
the code looks like the following.
source('bindToEnv.R') # Download from: http://winvector.github.io/Parallel/bindToEnv.R# build the single argument function we are going to pass to parallelmkWorker <- function() { bindToEnv(objNames=c('yName','vars','d','fitOneTargetModel')) function(yLevel) { fitOneTargetModel(yName,yLevel,vars,d) }}models <- parallel::parLapply(parallelCluster,yLevels, mkWorker())names(models) <- yLevelsprint(models)
The above pattern is concise and works very well. A few caveats to remember are:
- Remember each parallel worker is a remote environment. Make sure libraries you need are defined on each remote machine.
- Non-core libraries loaded in the home environment are not necessarily loaded in the remote environment. Prefer package-style notation such as
stats::glm()
when calling library functions (as callinglibrary(...)
on each remote node would be wasteful). - Our
bindToEnv
function is itself directly altering environments of passed functions (so they can refered to the values we are transporting). This can cause its own problems in damaging previously curried environments. Some notes on working around this can be found here.
This is a bit to think about. However, I think you will find adding about eight lines of wrapper/boilerplate code is amply paid back by the four or more times speedup you will routinely see in your work.
Also: when you are finished, you should remember to shutdown your cluster reference:
# Shutdown cluster neatlyif(!is.null(parallelCluster)) { parallel::stopCluster(parallelCluster) parallelCluster <- c()}
And that concludes our initial tutorial. We will followup, in a later article, with how to quickly build socket clusters using many machines, and using Amazon ec2.
The bindToEnv
function itself is fairly simple:
#' Copy arguments into env and re-bind any function's lexical scope to bindTargetEnv .#' #' See http://winvector.github.io/Parallel/PExample.html for example use.#' #' #' Used to send data along with a function in situations such as parallel execution #' (when the global environment would not be available). Typically called within #' a function that constructs the worker function to pass to the parallel processes#' (so we have a nice lexical closure to work with).#' #' @param bindTargetEnv environment to bind to#' @param objNames additional names to lookup in parent environment and bind#' @param names of functions to NOT rebind the lexical environments ofbindToEnv <- function(bindTargetEnv=parent.frame(),objNames,doNotRebind=c()) { # Bind the values into environment # and switch any functions to this environment! for(var in objNames) { val <- get(var,envir=parent.frame()) if(is.function(val) && (!(var %in% doNotRebind))) { # replace function's lexical environment with our target (DANGEROUS) environment(val) <- bindTargetEnv } # assign object to target environment, only after any possible alteration assign(var,val,envir=bindTargetEnv) }}
It also can be dowloaded from here.
One of the pains of using parallelism is this way is you always seem to need one more function or data item. One way around this is to use R’s ls()
command to build the list of names you want to send. Saving the results of an ls()
right after sourcing files that define functions and important global values is especially effective. Without some such strategy you find your self adding items to lists in a painful piecemeal fashion as illustrated in the following video:
“The Jerk” remembering what values needs to pack for parallel computation.
For working at larger scale: rough instructions for spinning up multiple machines as R servers on ec2 can be found here. I have also found remembering the ssh credentials of local machines lets you perform informal network cluster calculations quite handily.
Some more references:
- CRAN Task View
- Fast parallel computing with Intel Phi coprocessors
- Professor Norman Matloff on parallel computing:
- Creating progress bars with foreach parallel processing
- Introduction to parallel computing in R
- Older, but still good, State of the Art in Parallel Computing with R