Recently, I was presented with a large embarrassingly parallel scientific calculation in R. The calculation was too large for a single computer, taking far too much time and requiring too much memory. I needed a more scalable solution, one that will run across a fleet of computers.

In particular, I needed a solution that:

  • dynamically resizes the cluster according to my workload, so I don’t have to pay for unused compute time.
  • automatically installs dependent packages.
  • is written entirely in R, without external job-control shell scripts.

AWS CfnCluster provides a solution to the dynamic cluster size problem. When the SGE job queue size grows, new compute nodes are added. When a compute node is idle near the hour-boundary, it is terminated, saving me precious dollars (recall that AWS charges by the hour or part thereof).

pacman automatically installs any requisite packages, so I don’t need to install a curated list of R packages myself. Further, by installing pacmaninto the base image, I can use the same cluster to run different projects without planning ahead.

Finally, the BatchJobs package wraps around SGE, allowing me to invoke the power of the cluster entirely within R. When invoked, BatchJobs generates small independent R-scripts and associated input RData files, and then shares these with the entire cluster via NFS. When the jobs are submitted, compute nodes use the shared Rscripts and data, and produce output data, again shared via NFS. BatchJobs poll for completion, and combines the individual calculation results into the main R process.

The MapReduce is automatic. I can access nearly all the CPU and memory resources of the entire cluster from within a single R process. Here’s how you can do it too…

Step by step

A number of CfnCluster base AMIs are available as starting points. I’ve selected the Amazon Linux one, but you may choose your favourite distribution, and adjust accordingly. You will need NFSv4, so pick one with this pre-installed.

R is already installed in the base image, so we’ll need to install pacmanand BatchJobs. This only takes a few minutes, so a custom Bootstrap action is all that’s needed. If you plan on doing more customisations, perhaps building a custom AMI is a better option.

sudo yum -y install libcurl-devel
sudo R --vanilla -e 'install.packages("pacman", repos="https://cran.rstudio.com/")'
sudo R --vanilla -e 'source("http://bioconductor.org/biocLite.R"); biocLite(c("BatchJobs", "BiocParallel"))'

BatchJobs needs to know about SGE; so provide some additional environment variables to R about how CfnCluster configured SGE:

# Tell R (thus BatchJobs) about SGE
sudo tee -a /usr/lib64/R/etc/Renviron <<EOF
PATH=/opt/sge/bin:/opt/sge/bin/lx-amd64:/usr/bin:/bin
SGE_CELL=default
SGE_ARCH=lx-amd64
SGE_EXECD_PORT=6445
SGE_QMASTER_PORT=6444
SGE_ROOT=/opt/sge
SGE_CLUSTER_NAME=p6444
EOF

Then setup some user-level configuration:

# Configuration options
LOGIN_USER=ec2-user
CRAN_REPO=https://cran.rstudio.com/

# Add R initialisation files
sudo -u $LOGIN_USER tee /home/$LOGIN_USER/.BatchJobs.R <<EOF
cluster.functions = makeClusterFunctionsSGE('/home/$LOGIN_USER/simple.tmpl')
mail.start = "none"
mail.done = "none"
mail.error = "none"
db.driver = "SQLite"
db.options = list()
debug = FALSE
EOF

sudo -u $LOGIN_USER tee /home/$LOGIN_USER/simple.tmpl <<EOF
#!/bin/bash

# The name of the job, can be anything, simply used when displaying the list of running jobs
#$ -N <%= job.name %>
# Combining output/error messages into one file
#$ -j y
# Giving the name of the output log file
#$ -o <%= log.file %>
# One needs to tell the queue system to use the current directory as the working directory
# Or else the script may fail as it will execute in your top level home directory /home/username
#$ -cwd
# use environment variables
#$ -V
# use correct queue
$ -q <%= resources\$queue %>
# use job arrays
#$ -t 1-<%= arrayjobs %>

# we merge R output with stdout from SGE, which gets then logged via -o option
R CMD BATCH --no-save --no-restore "<%= rscript %>" /dev/null
exit 0
EOF

# Configure user repositories
sudo -u $LOGIN_USER tee /home/$LOGIN_USER/.Rprofile <<EOF
options(download.file.method = "wget")
local({
  r <- getOption("repos")
  r["CRAN"] <- "$CRAN_REPO"
  options(repos=r)
})
EOF

# Create R user library (shared via NFS)
sudo -u $LOGIN_USER mkdir -p /home/$LOGIN_USER/R/x86_64-redhat-linux-gnu-library/3.2

In this example, I’ve omitted to configure a mailer to notify about job completion, but you might want to look into a Mail Transport Agent such as exim or msmtp or similar.

Manual job submission

Let’s digress and see how we might manually submit R jobs to SGE. Indeed this is how many blog posts suggest parallelising R computations in a cluster. Consider the following Rscript:

#!/usr/bin/Rscript

#$ -S /usr/bin/Rscript
#$ -j yes
#$ -cwd
#$ -l h_core=1.5G
#$ -l h_cpu=1:0:0
#$ -pe smp 2

# Insert normal R code here

The first comment line is a she-bang operator, such that we can invoke the script directly with ./script.R arg1 arg2 (assuming we’ve already chmod +x the file). The next set of #$ comments are SGE switches, such that when we submit the job to SGE with qsub script.R arg1 arg2, SGE knows to request the appropriate resources and call the /usr/bin/Rscript interpreter. In the example, I’ve requested for 1.5GB memory, 1 hour of CPU time, and an SMP environment with 2 CPUs. See the SGE manual for other constraints that you can use.

This is actually not a bad setup. We can verify script correctness locally on a personal computer without SGE with a small dataset, then submit the same file to SGE with a larger dataset. However, we still need to manually manage the mapping of parameters and datasets, and manually combine the results upon completion.

Automatic job submission using BatchJobs

With BatchJobs, we call parallel functions to distribute the jobs, see BatchJobs and BiocParallel. Consider the following R script (stolen from BioConductor’s example):

#!/usr/bin/Rscript

library(BatchJobs)
library(BiocParallel)

param <- BatchJobsParam(2, resources=list(ncpus=1))
register(param)

FUN <- function(i) system("hostname", intern=TRUE)
xx <- bplapply(1:100, FUN)
table(unlist(xx))

This is regular R code. The BatchJobsParams() invocation requests for 2 nodes, with 1 CPU each, and the bplapply() invocation sends 100 jobs to the requested resources. We can also request for other limits, like amount of RAM, multicore environments, and so on. Behind the scenes, the parallel jobs are distributed throughout the cluster using SGE.

In this example, no additional packages were used, but if needed, the following line installs it automatically.

pacman::p_load("mylibrary") # instead of library("mylibrary")

Using this technique, I can proceed to perform large scientific calculations across a fleet of computers without leaving R.

Caveats

Performance

N-ways parallelisation does not magically make calculations run N times faster. There are costs associated with mapping the large job into N smaller jobs, and reducing the result. Ideally, these costs should be significantly smaller than the job itself. So, parallelise only if each parallel computation takes a significant amount of time. Benchmark according to your workload, but typically you’ll see benefit only with jobs that take more than 30 seconds or so.

Spot pricing

AWS spot pricing is cheap, usually in the order of 10–20% of the on-demand cost. The Spot Bid Advisor gives you a good indication of the expected savings. However, with spot pricing, nodes can be terminated even whilst they are busy doing work. This requires modifying SGE to re-run terminated jobs, and possibly enabling checkpointing capabilites.

I haven’t tried these. Maybe it is something to consider if you’re drawing a large AWS bill.

See also

BioConductor offers StarCluster AMIs pre-configured with BatchJobs. They also provide source code on how the AMI was created, but these appear to be outdated and overly complicated.

Have fun coding!