doParallel - Parallel processing in R
This tutorial demonstrates syntax for the use of the doParallel library in R. It is not intended to be complete nor comprehensive, but as an addition to the documentation and cheatsheets that are available elsewhere.
Parallel processing is commonly used when repetitive tasks take a lot of time if performed sequentially, i.e. one ofter the other. We make use of the available cores of the computer to run the tasks in parallel, which should speed up a task more or less by the number of cores. However, R has to create fresh environments for every core, which takes some computing time and additional memory. The extra time used when creating the multiple fresh environments is called overhead. Depending on the task, the overhead may be larger than the speed gained by parallel processing, so use parallel processing with care.
This script shows the basics of parallel computing in R using the doParallel package. It also introduces a series of improvements and hints that will often be very helpful when using parallel computing in R. Note however, that there are other packages, as “future”, available, too.
We will cover the following sub-topics:
Change the type of the return variable
Load packages in the foreach loop
Receive console output from workers
Catch errors to avoid crashes
Optimize memory usage
Return results directly from the foreach loop
Environments
If you have questions, suggestions, spot errors, or want to contribute, get in touch with us through planthub@idiv.de.
Author: David Schellenberger Costa
Requirements¶
To run the script, the following is needed:
some R libraries that may need to be installed
Code¶
General structure and requisites¶
The doParallel package builds on the parallel package for parallel computing, and it is sufficient to load doParallel, as it will load its dependencies. We also load data.table and abind here, as they complement the doParallel package.
# load libraries
library(doParallel) # parallel computing
library(data.table) # handle large datasets
library(abind) # combine arrays
# clear workspace
rm(list = ls())
Lade nötiges Paket: foreach
Lade nötiges Paket: iterators
Lade nötiges Paket: parallel
Warning message:
"Paket 'abind' wurde unter R Version 4.3.3 erstellt"
Change the working directory to where you want. There are parts of this tutorial that will require saving and reading objects.
# set working directory
setwd(paste0(.brd, "snippets"))
The basic structure of a parallel computation is a foreach loop, which is used to iterate over a vector of values. However, we need to load the doParallel package and call the registerDoParallel function to register the parallel backend first, meaning that we tell R we want to run the script on multiple cores.
To know how many cores we have, we can use the detectCores function from the parallel package.
If we want to use the computer for other tasks, it is wise to keep one or two cores free. Often, the cores that run individual instances of the script are called the “workers”.
(clustNumAll <- detectCores() - 1) # maximum number of cores minus 1
Now, we can create a series of R processes running in parallel with the makeCluster function. Then, we register those preocesses to work with a foreach loop using registerDoParallel().
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
The following component is the core of the parallel computing script: the foreach loop. The foreach loop is used to iterate over a vector of values and execute the code in the %dopar% block on each iteration. As an example, we just run a very simple script that returns the number of the current iteration. The data is written into the res variable and combined in a list, so that the result of each iteration is one list element.
res <- foreach(i = seq_len(1000)) %dopar% {
i
}
# stop the cluster
stopCluster(cl)
# look at the result
length(res)
res[1:3]
That should have worked. This is the basic structure of a parallel computation in R. Several modifications can be done, as we will learn below.
1) Change the type of the return variable¶
The default return type of foreach is a list. This means that the results of each iteration are stored in a list element. The .combine parameter gives great flexibility to change the type of return variable.
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(1000), .combine = c) %dopar% {
i
}
stopCluster(cl)
length(res)
res[1:3]
Using “c” as .combine parameter will combine the results into a vector.
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(1000), .combine = list) %dopar% {
i
}
stopCluster(cl)
length(res)
# res[1:3] # remove the comment to get a super-large output
Using “list” as .combine parameter will combine the results into a list, but in a weird way: It will always create a new branch at the list base, resulting in a super-branched list. Usually, when wanting a list output, we will not define the .combine parameter at all.
Often, parallel processing is used to process data from some sort of table. The results are often stored in a matrix, data frame, or array. In this case, the .combine parameter can be set to “rbind”.
testlist <- data.table(ID = seq_len(1000), a = sample(seq_len(1e6), 1000), b = sample(seq_len(1e6), 1000), c = NA_real_)
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(1000), .combine = rbind) %dopar% {
testlist[i, ]$c <- testlist[i, ]$a + testlist[i, ]$b
testlist[i, ]
}
stopCluster(cl)
length(res)
res[1:3]
More complex return functions can be defined as well. Here, we want to bind three-dimensional arrays together, the result being a four-dimensional array. The first dimension is the iteration number, the following dimensions are the array contents per iteration.
# define function that stacks results together on the fourth dimension
# counting backwards. As the individual results have only three dimensions,
# a new dimension is created on which they are stacked. For some reason,
# using "along = 0" does not give the same expected result.
custbind <- function(...) abind(..., rev.along = 4) # rev.along = 4: reverse along the 4th (new) dimension
# array(1, dim = c(2, 2, 2))
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(10), .combine = custbind) %dopar% {
array(i, dim = c(2, 2, 2))
}
stopCluster(cl)
dim(res)
res[1, , , ]
More complicated combine functions are needed, if several results should be returned per iteration, and be combined in separate sub-lists. The following example shows how to calculate two different measures, and return them stacked together in two sub-lists.
# define function to take arguments, and stack first and second sub-arguments
# together, but in separate lists
custbind <- function(x, ...) {
lapply(
seq_along(x),
function(i) c(x[[i]], lapply(list(...), function(y) y[[i]]))
)
}
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
i <- 1
res <- foreach(i = seq_len(10), .combine = custbind) %dopar% {
list(i, 10 * i)
}
stopCluster(cl)
length(res)
res[[1]]
res[[2]]
In the next example, we want to return more complex individual structures. These are three-dimensional arrays that should be stacked through creating a new last dimension. Note that we use the additional parameter “simplify” here in sapply.
custbind <- function(x, ...) {
lapply(
seq_along(x),
function(i) abind(x[[i]], sapply(list(...), function(y) y[[i]], simplify = "array"))
)
}
# list(array(1, dim = c(3, 3, 3)), array(10 * 1, dim = c(2, 3, 3)))
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(10), .combine = custbind) %dopar% {
list(array(i, dim = c(3, 3, 3)), array(10 * i, dim = c(2, 3, 3)))
}
stopCluster(cl)
str(res)
res[[1]][, , , 2]
res[[2]][, , , 1]
List of 2
$ : int [1:3, 1:3, 1:3, 1:10] 1 1 1 1 1 1 1 1 1 1 ...
..- attr(*, "dimnames")=List of 4
.. ..$ : NULL
.. ..$ : NULL
.. ..$ : NULL
.. ..$ : NULL
$ : num [1:2, 1:3, 1:3, 1:10] 10 10 10 10 10 10 10 10 10 10 ...
..- attr(*, "dimnames")=List of 4
.. ..$ : NULL
.. ..$ : NULL
.. ..$ : NULL
.. ..$ : NULL
The following example shows how to return a list of two data.table objects. The function submitted to .combine is very different to the one used in the example before, and it is sometimes trial and error to create the correct function for one’s purpose. Often, structuring your returns in a simple way is the best choice.
custbind <- function(x, y) {
# x and y are results from different iterations
list(
rbindlist(list(x[[1]], y[[1]])),
rbindlist(list(x[[2]], y[[2]]))
)
}
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(
i = seq_len(10),
.combine = custbind,
# note that we need the data.table package inside the loop here
# more on this below
.packages = c("data.table")
) %dopar% {
nrows <- sample(seq_len(10) - 1, 2)
res <- list(
data.table(a = rep(i, nrows[1]), b = sample(seq_len(100), nrows[1])),
data.table(a = rep(10 * i, nrows[2]), b = sample(seq_len(100), nrows[2]))
)
print(res)
}
stopCluster(cl)
length(res)
res[[1]][1:3]
res[[2]][1:3]
2) Load packages in the foreach loop¶
In the last example of the previous section, we already used the .packages parameter to let the worker know we need the data.table package within the foreach loop. While workers have access to all variables in the global environment, i.e. outside the foreach loop, they do not have access to the loaded packages, so we need to specify them in the foreach loop.
In the following example, we want the workers to work with spatial objects. To this end, hey need the sf package. We can load the sf package into the global environment, but we will see, this is not going to help.
library(sf)
# create reference point
referencePoint <- data.frame(x = 0, y = 0)
referencePoint <- st_as_sf(referencePoint, coords = c("x", "y"), crs = "4326")
# create other points for which the distance from the reference point should be calculated
otherPoints <- data.frame(
x = sample(-180:180, 1000, replace = TRUE),
y = sample(-90:90, 1000, replace = TRUE)
)
otherPoints <- st_as_sf(otherPoints, coords = c("x", "y"), crs = "4326")
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(nrow(otherPoints))) %dopar% {
dist <- st_distance(referencePoint, otherPoints[i, ])
}
stopCluster(cl)
Warning message:
"Paket 'sf' wurde unter R Version 4.3.3 erstellt"
Linking to GEOS 3.11.2, GDAL 3.8.2, PROJ 9.3.1; sf_use_s2() is TRUE
Error in {: task 1 failed - "konnte Funktion "st_distance" nicht finden"
Traceback:
1. e$fun(obj, substitute(ex), parent.frame(), e$data)
2. stop(simpleError(msg, call = expr))As expected, we get the error message that the function st_distance could not be found. So even as the workers can access variables in the global environment, they cannot access the functions from loaded packages. Instead, we need to load the packages explicitly for each worker.
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(nrow(otherPoints)), .packages = "sf") %dopar% {
dist <- st_distance(referencePoint, otherPoints[i, ])
}
stopCluster(cl)
length(res)
res[1]
Using the .packages parameter and specifying the needed package, we got it working.
Parallel computing is often done on clusters, and sometimes, we can access the standard R library for all users, but not all packages might be installed there. In this case, we can load an additional user library (our own library), where we have additional packages installed. This works for loading packages into the global environment, but might fail if workers only get the path to the standard R library, but not to our user library submitted. In this case, the following has to be done:
# DO NOT RUN - THIS IS JUST AN ABSTRACT EXAMPLE
# define path to user library
userLib <- "R_libs/R_4.2"
# create vector of standard R library and userLib
allLibs <- c(.libPaths(), userLib)
# update .libPaths() for the global environment (will only work if given paths exists!)
.libPaths(allLibs)
# update .libPaths() for the workers (will only work if given paths exists!)
cl <- makeCluster(clustNumAll)
clusterExport(cl, "allLibs")
clusterEvalQ(cl, .libPaths(allLibs))
What we do is creating a variable with all library paths, moving this variable into the local environments of each worker, and then evaluating the .libPaths() function with the variable as its argument within each worker, effectively updating the library paths. Now, we can register the parallel backend and load the packages within foreach.
3) Receive console output from workers¶
Usually, any output from the workers is not transferred to the console of the global environment and therefore lost. However, we can define an “outfile” when creating the cluster, and all outputs will be written there. If we set outfile to “”, output is written into the console. Otherwise, a file is created and data is written there. Compare the following examples:
# no outfile - no information from the workers
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(1000)) %dopar% {
if (i %% 100 == 0) print(i)
}
stopCluster(cl)
# add the parameter outfile = ""
cl <- makeCluster(clustNumAll, outfile = "")
registerDoParallel(cl)
res <- foreach(i = seq_len(1000)) %dopar% {
if (i %% 100 == 0) print(i)
}
stopCluster(cl)
# add the parameter outfile = "outfile.txt"
cl <- makeCluster(clustNumAll, outfile = "outfile.txt")
registerDoParallel(cl)
res <- foreach(i = seq_len(1000)) %dopar% {
if (i %% 100 == 0) print(i)
}
stopCluster(cl)
# read outfile from disc (saved within the current directory)
readLines("outfile.txt")
file.remove("outfile.txt") # remove the created file
Warning message in file.remove("outfile.txt"):
"kann Datei 'outfile.txt' nicht löschen. Grund 'Permission denied'"
4) Catch errors to avoid crashes¶
Very often, it happens that individual workers crash. This means that nothing will be returned, turning all previous calculations useless. This can be very frustrating and time-consuming. Therefore, it is wise to catch unforeseen errors within the foreach loop.
rm(list = c(intersect(c("x", "res"), ls())))
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(1000), .combine = c) %dopar% {
if (i == 100) a <- x else a <- i # crashes, because x is undefined
a
}
stopCluster(cl)
str(res) # cannot be found because not created
Error in {: task 100 failed - "Objekt 'x' nicht gefunden"
Traceback:
1. e$fun(obj, substitute(ex), parent.frame(), e$data)
2. stop(simpleError(msg, call = expr))We use the tryCatch function to avoid these problems. We write the whole code within the foreach loop into parentheses, and give an error argument, which, in the simplest case, just defines the return value in case of errors within the loop. Note, however, that the object return by tryCatch should have the same structure as the one returned by an iteration if everything works. This means it might be a row of a data.frame, or an individual list element.
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(1000), .combine = c) %dopar% {
tryCatch(
{
if (i == 100) a <- x else a <- i # crashes, because x is undefined
a
},
error = function(e) NA
) # return NA on error
}
stopCluster(cl)
res[95:105] # we got NA returned for i = 100
In the following example, we return rows of a data.table.
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(1000), .combine = rbind, .packages = "data.table") %dopar% {
tryCatch(
{
if (i == 100) a <- x else a <- data.table(a = i, b = 2 * i) # crashes, because x is undefined
a
},
error = function(e) data.table(a = NA_real_, b = NA_real_)
) # return NA on error
}
stopCluster(cl)
res[95:105] # we got NA returned for i = 100
5) Optimize memory usage¶
We may run into trouble when doing parallel computing due to memory limitations. Mostly, this will be expressed by “oom-errors” (out-of-memory-errors). This is a big problem, because we often cannot increase the amount of memory available (hint for SLURM-users: #SBATCH --mem-per-cpu=...). In this case, it might help to review the script and optimize the memeory usage per worker. Each worker will load all objects it needs into memory, but if we know a worker only works on parts of a huge data.table object, we might create a smaller subset it accesses. Or, if we iterate over the rows of a huge data.table, we might directly loop over individual rows instead of row indices in the foreach loop.
# create large data.table
# clear workspace (for standardizing memory use)
rm(list = setdiff(ls(), "clustNumAll"))
sum(gc()[, 2]) # get used memory, about 150 MB
# create large data.table (takes a bit, you may reduce nrows for it to work faster)
nrows <- 1e7
test <- data.table(index = seq_len(nrows))
for (i in seq_along(c(letters, toupper(letters)))) {
test[, new := sample(seq_len(1e6), nrows, replace = TRUE)]
setnames(test, "new", c(letters, toupper(letters))[i])
}
sum(gc()[, 2]) # get used memory, about 2 GB
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
# create multiple copies of the data.table by iterating over indices
res1 <- foreach(i = seq_len(100), .combine = rbind, .packages = "data.table") %dopar% {
test[i, a := sum(gc()[, 2])]
test[i]
}
# iterate over data.table rows to avoid reading in the whole data.table
res2 <- foreach(
testRow = iter(test[seq_len(100)], by = "row"),
.combine = rbind, .packages = "data.table"
) %dopar% {
testRow[, a := sum(gc()[, 2])]
testRow
}
stopCluster(cl)
all(res1[, -"a"] == res2[, -"a"])
table(res1$a) # about 2 GB
table(res2$a) # about 25 MB
2045
100
23.8
100 As you will have noticed, the first loop takes much longer to execute, and the used memory is about 2 GB for each worker, meaning that the whole system needs 2 * clustNumAll GB. In the second example, we get exactly the same output, but each worker only consumes around 25 MB, meaning we need about 1.25% of the memory we needed in the run before.
6) Return results directly from the foreach loop¶
In some cases, it might be desirable to return results from a worker directly, for example when web-scraping a particular website or when you are interested in seeing progress in real-time. (Note there is a progress bar function, but we won’t use it). The way to do this is straightforward.
# create a temporary directory
if (!"temp" %in% list.files()) dir.create("temp")
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
res <- foreach(i = seq_len(1000), .combine = c) %dopar% {
save(i, file = paste0("temp/", i, ".RData")) # save result of one iteration
}
stopCluster(cl)
list.files("temp")[1:20]
# remove temporary directory and all files within
unlink("temp", recursive = TRUE)
7) Environments¶
Workers can access only the environment they are called in, and sometimes that may cause them not to see some functions. In that case, we may load functions from one environment into the other.
# define a function in the global environment
fun1 <- function(x) {
x <- x + 1
x
}
# define a function that runs some task in parallel
fun2 <- function(y) {
cl <- makeCluster(clustNumAll)
registerDoParallel(cl)
fun1 <- fun1 # define fun1 in the environment of fun2
res <- foreach(i = seq_len(y), .combine = c) %dopar% {
fun1(i)
}
stopCluster(cl)
res
}
# test the functions
fun1(10)
fun2(10)
Comment or remove the line “fun1 <- fun1”, which defines fun1 in the fun2 environment. You will see that running fun2 produces an error then, because the workers can’t see fun1 in their calling environment, which is fun2.