Scaling Up/Out Embarrassingly Parallel Problems in R

  • By: Naeem Khoshnevis

  • Last Update: June 9, 2022

Problem statement

Estimating PI has always been an interesting problem, and many algorithms have been proposed. One of the methods is the Monte Carlo approach, which is a perfect fit for testing embarrassingly parallel problems. According to the following figure and equations, the estimation of PI can be achieved by drawing random samples.


There are two approaches to improve the results:

  • Increasing the number of random samples, and

  • Repeating the test.

We are using both approaches.

Solutions based on computational resources

Most of the statistical analyses can be considered embarrassingly parallelizable problems (Monte Carlo estimations, Bootstrap analyses, …). We present four different solutions based on available computational resources. This clarifies the steps on how to tailor the code based on these resources.


Estimating PI

Different approaches can be used to solve the problem. However, we use lapply because it makes the comparison easier. Here is the function to compute pi.

mc_pi <- function(sample_size){
  x <- runif(sample_size)
  y <- runif(sample_size)
  z <- sqrt(x^2+y^2)
  pi <- (length(which(z<=1))*4)/length(z)


To facilitate understanding the accuracy of the results, we count the number of matched digits between actual and estimated PI.

# PI :
PI <- 3.14159265358979323846

match_chars <- function(number_1, number_2){
  sn1 <- strsplit(sprintf("%.54f", number_1),"")[[1]]
  sn2 <- strsplit(sprintf("%.54f", number_2),"")[[1]]
  i <- 1
  l <- min(length(sn1), length(sn2))
  for (i in seq(1,l)){
    if (sn1[i] != sn2[i]){
    } else {
      i <- i + 1


The sequential approach uses one core. The lapply version of the code is according to the following:

n <- 1000  # number of samples in each trial
m <- 1000000 # number of trials.

trial_vec <- (numeric(m)+1)*n

t1 <- proc.time()
pi_list_tmp <- lapply(trial_vec, mc_pi)
t2 <- proc.time()

print(paste("Processing time: ",t2[[3]] - t1[[3]], " s."))

pi_list <- c(, pi_list_tmp))
pi <- mean(pi_list)

print(paste("PI value: ", PI))
print(paste("Est.  pi: ", pi))
print(paste("Number of matched chars: ", match_chars(pi, PI)))

Parallel on a laptop computer (Shared Memory)

The parallel version of lapply comes in different formats (mclapply vs parLapply). The main difference is the method of spawning new processes. We recommend using parLapply. Although it has slightly more overheads, it works on all systems, including macOS, Windows, and Linux flavors.


n <- 1000  # number of samples in each trial
m <- 100000 # number of trials.

trial_vec <- (numeric(m)+1)*n

# create cluster of workers
nthread <- 12
cl <- parallel::makeCluster(nthread, type="PSOCK")

t1 <- proc.time()
pi_list_tmp <- parallel::parLapply(cl, trial_vec, mc_pi)
t2 <- proc.time()


print(paste("Processing time: ",t2[[3]] - t1[[3]], " s."))

pi_list <- c(, pi_list_tmp))
pi_mean <- mean(pi_list)

print(paste("PI value: ", PI))
print(paste("Est.  pi: ", pi_mean))
print(paste("Number of matched chars: ", match_chars(pi_mean, PI)))

Parallel on a Server Node (Shared Memory)

The parallel version of the solution on a server node is the same as on a laptop. The server node is more or less similar to a laptop or a desktop computer; however, it has a higher number of cores and bigger memories (including a bigger cache). You can see the steps to test the code on FASRC Virtual Desktop Infrastructure in the following link.

Parallel on Multiple Nodes (Distributed Memory)

Going beyond one node requires some infrastructure. In other words, the nodes should be able to communicate with each other. The most commonly used paradigm is using Message Passing Interface (MPI). For R processes in this example, we use Rmpi, which is a wrapper for OpenMPI library. For setting up the environment, please visit the following link:

Rmpi supports parLapply through the parallel package. The advantage of using mpi is going beyond one server node. Here is the modified version of the code (parLapply_mpi.R).

# Load the R MPI package if it is not already loaded.
if (!is.loaded("mpi_initialize")) {

# In case R exits unexpectedly, have it automatically clean up
# resources taken up by Rmpi (slaves, memory, etc...)
.Last <- function(){
       if (is.loaded("mpi_initialize")){
           if (mpi.comm.size(1) > 0){
               print("Please use mpi.close.Rslaves() to close slaves.")
           print("Please use mpi.quit() to quit R")

n <- 1000000  # number of samples in each trial
m <- 100000000 # number of trials.
val <- (numeric(m)+1)*n

time_a <- proc.time()
pi_list_tmp <- mpi.parLapply(val, mc_pi)
time_b <- proc.time()

print(paste("Processing time: ",time_b[[3]] - time_a[[3]], " s."))

pi_list <- c(, pi_list_tmp))
pi_hat <- (mean(pi_list))

print(paste("PI value: ", PI))
print(paste("Est.  pi: ", pi_hat))
print(paste("Number of matched chars: ", match_digit(pi_hat, PI)))

# Tell all slaves to close down, and exit the program
mpi.close.Rslaves(dellog = FALSE)

You can submit a job using the following submission file (

#SBATCH -J mpi
#SBATCH -o %j_job.out
#SBATCH -e %j_job.err
#SBATCH -p shared
#SBATCH -n 100
#SBATCH -t 50
#SBATCH --mem-per-cpu=4000

# Load required software modules 
module load R/3.5.1-fasrc01
module load gcc/10.2.0-fasrc01 openmpi/4.1.1-fasrc01

# Set up Rmpi package
export R_LIBS_USER=$HOME/apps/R/3.5.1:$R_LIBS_USER
export R_PROFILE=$HOME/apps/R/3.5.1/Rmpi/Rprofile

# Run program
export OMPI_MCA_mpi_warn_on_fork=0
srun -n 100 --mpi=pmix R CMD BATCH --no-save --no-restore parLapply_mpi.R

In your login node, you can submit the job.


The following figure shows scaling the job among different number of cores and corresponding wall clock time. The results show a perfect linear scaling.

#cores (ncore)

Wall Clock Time in s (wc)