There are multiple ways to achieve parallel computing in R such as package parallel
, snow
or foreach
. Each package has their merits and drawbacks in implementation.
Here, I recommended to use function mclapply()
from package parallel
, which is essentially a paralleled version of lapply()
.
A minimum working example is posted below:
# load parallel package
library(parallel)
# set M as the repeat times
M = 24
# choose a reasonable number of threads based on the configuration of your machine
cl_nodes = 12
#use mclapply for parallel computing
time_start <- Sys.time()
para_result <- mclapply(1:M, function(m) {
#### put everything you want to parallel compute here:
#generate a random matrix
rd_mat <- matrix(runif(100),10,10)
#obtain the largest eigen values
largest_eigen = eigen(rd_mat)$values[1]
#return as a vector
return(largest_eigen)
}, mc.cores=cl_nodes) ## Split this job across cores, # of cores is specified in this argument
time_end <- Sys.time()
#report runing time
print(time_end-time_start)
## Time difference of 0.30935 secs
#obtain the standard error of the largest eigen values
sd(unlist(para_result))
## [1] 0.3009694
Normally, users would specify cl_nodes
as the number of physical cores of the machine to use all the physical cores. If you are sure that no other one is using it, you can specify cl_nodes as the number of logical cores to enable it run at full power. In most of the case, using more threads than the number of logical cores does NOT improve performance.
The above example use a self-defined anonymous function as the tool function to specify what you want to do in each parallel thread. It can be defined earlier and used in an explicit way as below:
# load parallel package
library(parallel)
# set M as the repeat times
M = 24
# choose a reasonable number of threads based on the configuration of your machine
cl_nodes = 12
#tool function to be run in each parallel thread
para_function = function(m) {
#generate a random matrix
rd_mat <- matrix(runif(100),10,10)
#obtain the largest eigen values
largest_eigen = eigen(rd_mat)$values[1]
#return as a vector
return(largest_eigen)
}
#use mclapply for parallel computing
time_start <- Sys.time()
para_result <- mclapply(1:M, para_function, mc.cores=cl_nodes) #don't forget this mc.cores argument
time_end <- Sys.time()
#report runing time
print(time_end-time_start)
## Time difference of 0.09781194 secs
#obtain the standard error of the largest eigen values
sd(unlist(para_result))
## [1] 0.313547
When you need to have a reproducible results with mclapply()
, set the random set before the function does not work as expected:
# load parallel package
library(parallel)
# set M as the repeat times
M = 24
# choose a reasonable number of threads based on the configuration of your machine
cl_nodes = 12
#tool function to be run in each parallel thread
para_function = function(m) {
#generate a random matrix
rd_mat <- matrix(runif(100),10,10)
#obtain the largest eigen values
largest_eigen = eigen(rd_mat)$values[1]
#return as a vector
return(largest_eigen)
}
#use mclapply for parallel computing
set.seed(1)
para_result1 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
para_result2 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
#obtain the standard error of the largest eigen values
sd(unlist(para_result1))
## [1] 0.3423168
sd(unlist(para_result2))
## [1] 0.3509881
Although the random seed has been fixed to 1, the parallel results are still not the same. Moving the set.seed(1)
into the para_function
does not help either (Please give it a try if don’t know why they don’t work). An important argument of mclapply()
is mc.set.seed
, which is a logical variable that specify whether you want mclapply()
starts with its own random seed. In case you need reproducible results, you need to set it as FALSE
as below:
#use mclapply for parallel computing
set.seed(1)
para_result1 <- mclapply(1:M, para_function, mc.cores=cl_nodes,mc.set.seed = FALSE)
para_result2 <- mclapply(1:M, para_function, mc.cores=cl_nodes,mc.set.seed = FALSE)
#obtain the standard error of the largest eigen values
sd(unlist(para_result1))
## [1] 0.03912339
sd(unlist(para_result2))
## [1] 0.03912339
A more complicated situation is that you may want part of your simulation reproducible but keep other part random. For example, you want the generated random matrix always larger than reproducible some random number for each run, but random matrix are generated randomly. You may do something like this:
#tool function to be run in each parallel thread
para_function = function(m) {
set.seed(1)
#generate random number that are reproducible
unif_min = runif(M)[m]
#generate a random matrix
rd_mat <- matrix(runif(100,unif_min,1),10,10)
#obtain the largest eigen values
largest_eigen = eigen(rd_mat)$values[1]
#return as a vector
return(largest_eigen)
}
#use mclapply for parallel computing
para_result1 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
para_result2 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
#obtain the standard error of the largest eigen values
sd(unlist(para_result1))
## [1] 1.453918
sd(unlist(para_result2))
## [1] 1.453918
The resulting standard error is fixed while it should be varying as part of it should be random. This is because when you have set.seed(1)
in the code, not only unif_min = runif(M)[m]
is affected, matrix(runif(100,unif_min,1),10,10)
is also affected. To keep unif_min
fixed but matrix(runif(100,unif_min,1),10,10)
random, you would use set.seed(NULL)
to offset the random seed. See example below:
#tool function to be run in each parallel thread
para_function = function(m) {
set.seed(1)
#generate random number that are reproducible
unif_min = runif(M)[m]
set.seed(NULL) #remove pre-set random seed
#generate a random matrix
rd_mat <- matrix(runif(100,unif_min,1),10,10)
#obtain the largest eigen values
largest_eigen = eigen(rd_mat)$values[1]
#return as a vector
return(largest_eigen)
}
#use mclapply for parallel computing
para_result1 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
para_result2 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
#obtain the standard error of the largest eigen values
sd(unlist(para_result1))
## [1] 1.549888
sd(unlist(para_result2))
## [1] 1.488085
It works!
If you don’t intend to use set.seed()
at all, make sure you have checked EVERY function you used in the parallel function.
In the above example, you may already noticed that M
is actually not explicitly defined in the parallel threads, but we can still access mclapply()
allows parallel threads access to the global variables. But it would be very different if you are using foreach()
. Let see am example here:
# global variables
# a very large matrix, about 0.8GB
large_mat = matrix(runif(10000*10000),10000,10000)
#tool function to be run in each parallel thread
para_function = function(m) {
#get a sub_matrix of the large matrix
random_index = sample(10,1:length(M))
sub_mat = large_mat[random_index,random_index]
#obtain the largest eigen values of sub_mat
largest_eigen = eigen(sub_mat)$values[1]
#return as a vector
return(largest_eigen)
}
#use mclapply for parallel computing
time_start <- Sys.time()
para_result1 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
#report memory usage
memuse::Sys.meminfo()
## Totalram: 32.000 GiB
## Freeram: 19.942 GiB
time_end <- Sys.time()
#report runing time
print(time_end-time_start)
## Time difference of 0.0485549 secs
sd(unlist(para_result1))
## [1] 0.2827814
It seems really fast and memory efficient. Let’s compare with foreach()
#use foreach
library(foreach)
time_start <- Sys.time()
cl = makeCluster(cl_nodes,type="FORK")
doParallel::registerDoParallel(cl)
eigenValues = foreach(m = 1:M) %dopar%{para_function(m)}
memuse::Sys.meminfo()
## Totalram: 32.000 GiB
## Freeram: 9.403 GiB
time_end <- Sys.time()
#report runing time
print(time_end-time_start)
## Time difference of 7.790788 secs
sd(unlist(para_result1))
## [1] 0.2827814
Apperantly, foreach()
copies all related variables into each parallel thread, which make the running time much longer and the system memory was eaten up by 10GB.
You will have to use the follow function to stop the cluster and free those memories.
stopCluster(cl)
Sys.sleep(10)
memuse::Sys.meminfo()
## Totalram: 32.000 GiB
## Freeram: 18.663 GiB
For this reason, mclapply()
is recommended for most of the scenarios as it handles system memories more efficiently. But please note that foreach()
has its own niche scenarios.
Although mclapply()
handles system memories in a more efficient way, users are still highly recommended to think about memory control proactively. It usually get involved with three steps:
Check input variables, that are those used in the self-defined function para_function
. Always try to avoid large input variables and never alter input variables.
Make sure (large) temporary variables created during parallel computing be recycled/deleted when it will not be used anymore. Try to estimate the memory need for each thread and see if the total system memory size is enough for your choice of number of threads. If you find it hard to estimate, you can use top
in terminal to monitor memory usage while running your codes.
Only return useful information in each thread. Try to avoid writing to hard disk within parallel threads, as error occurs when multiple threads write to the same file.
It is a good habit to start with toy example with a small number of threads, before you actually run with full power.
Like a variable, explicitly defined functions can be accessed from the threads created by mclapply()
. An example:
#tool function to be run in each parallel thread
para_function = function(m) {
mat = generate_mat(10)
#obtain the largest eigen values of sub_mat
largest_eigen = eigen(mat)$values[1]
#return as a vector
return(largest_eigen)
}
#other supporting function
generate_mat = function(size){
#get random matrix of certain size
mat = matrix(runif(size^2),size,size)
mat
}
#use mclapply for parallel computing
para_result1 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
sd(unlist(para_result1))
## [1] 0.2954936
Supporting function generate_mat()
is accessible by mclapply()
. Similarly, sourced home-made functions can also be accessed within sub threads. Functions from installed package can be referened as thier full function name as package_name::function_name
, such as mvtnorm::rmvnorm
. An example is showed as follow:
#tool function to be run in each parallel thread
para_function = function(m) {
mat = generate_mat1(10)
mat = mat + mvtnorm::rmvnorm(10,rep(1,10),diag(10))
#obtain the largest eigen values of sub_mat
largest_eigen = eigen(mat)$values[1]
#return as a vector
return(largest_eigen)
}
#other supporting function
source("util.R")
#use mclapply for parallel computing
para_result1 <- mclapply(1:M, para_function, mc.cores=cl_nodes)
sd(unlist(para_result1))
## [1] 0.9367494
I recommend to return the results in an array
format. array
is a multi-dimensional matrix (tensor) which can hold two or more than two-dimensional data. Here is an example for simulating multiple sized Erdos-Renyi graphs with different connecting probabilities:
library(parallel)
library(igraph)
##
## Attaching package: 'igraph'
## The following objects are masked from 'package:stats':
##
## decompose, spectrum
## The following object is masked from 'package:base':
##
## union
size_list = c(10,20,30,40)
p_list = c(0.1,0.5,0.9)
M_true = 50
cl_nodes = 12
#use mclapply for parallel computing
Res_Array <- mclapply(1:M_true, function(m) {
result_array = array(NA,dim=c(length(size_list),length(p_list),2))
for(size in size_list){
for(p in p_list){
#generate graph with size and p
W <- igraph::erdos.renyi.game(size,p,loops = TRUE)
#obtain graph statistics
result_array[which(size==size_list),which(p==p_list),1] <- mean(igraph::degree(W))
result_array[which(size==size_list),which(p==p_list),2] <- sum(igraph::degree(W))
}
}
#return as a multi-dim array
return(result_array)
}, mc.cores=cl_nodes)
#We use abind package to combine multiple arrays
library(abind)
all_res_array = do.call("abind",c(Res_Array,along=0)) #along=0 means create an new index to combine arrays
dim(all_res_array)
## [1] 50 4 3 2
The resulted array has 4 dimensions. The first index indicates # of simulations, we can extract the result from first simulation as:
all_res_array[1,,,]
## , , 1
##
## [,1] [,2] [,3]
## [1,] 2.000000 7.00 9.20000
## [2,] 1.800000 9.90 18.80000
## [3,] 3.333333 14.20 27.46667
## [4,] 4.900000 20.95 36.85000
##
## , , 2
##
## [,1] [,2] [,3]
## [1,] 20 70 92
## [2,] 36 198 376
## [3,] 100 426 824
## [4,] 196 838 1474
The other three dimensions are graph size, p and type of graph statistics. We can calculate the averaged graph staistics for each size and p over simulations:
apply(all_res_array,c(2,3,4),mean)
## , , 1
##
## [,1] [,2] [,3]
## [1,] 1.164000 5.80000 9.89600
## [2,] 2.122000 10.46400 18.97800
## [3,] 3.126667 15.37067 27.93867
## [4,] 4.032000 20.56700 36.82000
##
## , , 2
##
## [,1] [,2] [,3]
## [1,] 11.64 58.00 98.96
## [2,] 42.44 209.28 379.56
## [3,] 93.80 461.12 838.16
## [4,] 161.28 822.68 1472.80
It is almost always a bad idea to nest an mclapply()
in another mclapply()
. And it is true for all other parallel computing packages such as foreach()
. If you really need to have multiple loops nested, it is recommended to only make the inner loop parallel. Please see the pseudo code below:
# Write something like this:
for (i in ...) {
for (j in ...) {
for(h in ...){
mclapply(1:M, ...)
}
}
}
# Never do this:
for (i in ...) {
for (j in ...) {
mclapply(1:K, function(){
mclapply(1:M, function(){
...
})
}
)
}
}