Parallel R

Many common tasks that we want to perform in R are embarrassingly parallel - that is, easily chopped up into smaller subsets that are independent of each other. Some examples include:

  • Permutation testing based methods of inference
  • Parameter sweeps
  • Data simulation

The general approach to parallelizing these jobs is fairly straightforward, and goes something like this:

  • Split the job up into smaller segments.
  • Apply the function of interest.
  • Combine the results.

This follows the logic behind other vectorized functions in R like apply and its relatives.

CRAN Packages

Here are some CRAN packages that I have found useful for parallel computing.

  • multicore: For splitting up tasks on multiple cores of your local machine.
  • Rsge: For splitting up tasks on a cluster with the SGE scheduler.

Example

We want to generate 100 samples from the normal distribution, each of length 10. Here are some different ways that we could accomplish this.

apply

Using built-in vectorized functions in R, we can do:

apply(array(1:100), 1, function(x) rnorm(10))

For repeated evaluation of the same expression like this, R provides a convenience function called replicate.

replicate(100, rnorm(10))

multicore

If we want to take advantage of multiple cores, we can do:

mclapply(1:100, function(x) rnorm(10), mc.cores=7)

which gives you a list. If you want it as a matrix, wrap it like so:

do.call(cbind, mclapply(1:100, function(x) rnorm(10), mc.cores=7))

Rscript + SGE

On some clusters, leaving an interactive R session open and waiting to collect your sub-jobs is discouraged. When this is the case I often implement the generic split-apply-combine strategy on my own with Rscript. This is also the same general approach to parallelization as randomise_parallel in FSL.

We start off with a submission script called split, which takes in arguments for the total number of replications we want (nRep), the desired length of each replication (rep.length), and the number of threads on the cluster we want to utilize (threads). It then splits up nRep into smaller chunks, as determined by threads, and submits these as individual apply sub-jobs in a job array to the SGE scheduler. A final combine script is submitted, which will wait until all of the apply sub-jobs finish and then go collect the results.

Make sure to replace /usr/bin/Rscript in the files below if your own setup is different.

split.R
#! /usr/bin/Rscript
 
# Input arguments:
# nRep
# rep.length
# threads
 
# ./split.R nRep=100 rep.length=10 threads=60
 
# Parse command line arguments    
args=(commandArgs(TRUE))   
for(i in 1:length(args)){
	eval(parse(text=args[i]))
}
 
# Split up the tasks
reps.per.thread = nRep %/% threads
 
# Create SGE job array file
fid = file('jobarray.txt', 'w')
for(ithread in 1:threads){
	if(ithread==threads){
		reps.per.thread = reps.per.thread + nRep %% threads
	}
	text = sprintf("apply.R ithread=%i nRep=%i rep.length=%i", ithread, reps.per.thread, rep.length)
	write(text, fid, append=T)
}
close(fid)
 
## Submit job array to the SGE queue
jobID = system('fsl_sub -q short.q -t jobarray.txt', intern=T)
system(sprintf('echo %s', jobID))
#
## Submit job to combine smaller sub-jobs
system(sprintf('fsl_sub -q short.q -j %s combine.R threads=%i', jobID, threads))
#
## Print job info to stdout
system(sprintf('echo Number of replications: %i', nRep))
system(sprintf('echo Number of threads: %i', threads))

If we look at the apply code, we can see that all it is doing is generating the subset of replications, and then saving the results as a temporary data file.

apply.R
#! /usr/bin/Rscript
 
# Input arguments:
# ithread
# nRep
# rep.length
 
# Parse command line arguments    
args=(commandArgs(TRUE))   
for(i in 1:length(args)){
	eval(parse(text=args[i]))
}
 
# Enter the code you want to parallelize...
rep.mat.tmp = replicate(nRep, rnorm(rep.length))
 
# Save out the results from this sub-job
save(rep.mat.tmp, file=sprintf('thread%i.Rdata', ithread))

The combine script is similarly simple, as it just loads up all of the temporary files and stacks them together.

combine.R
#! /usr/bin/Rscript
 
# Input arguments:
# threads
 
# Parse command line arguments    
args=(commandArgs(TRUE))   
for(i in 1:length(args)){
	eval(parse(text=args[i]))
}
 
# Load/combine results from sub-jobs
rep.mat = NULL
for(i in 1:threads){
	load(sprintf('thread%i.Rdata', i))
	rep.mat = cbind(rep.mat, rep.mat.tmp)
}
 
# Save out the combined results
save(rep.mat, file='rep.mat.Rdata')
 
# Clean up
system('rm thread*.Rdata jobarray* combine.R.*')

Since these files will be executed just like shell scripts, we must first make them executable like chmod +x apply.R. We can then run the code like this:

$ ./split.R nRep=100 rep.length=10 threads=60
9173538
9173539
Number of replications: 100
Number of threads: 60

If we look at the status of our jobs with qstat, we can see that our job array of length threads=60 is waiting in line, and that the combine job is on hold until the sub-jobs finish. Plus, our split job exits automatically back to the command line, so the admins will be happy that we aren't taking up space.

$ qstat -u jc
job-ID  prior   name       user         state submit/start at     queue                          slots ja-task-ID 
-----------------------------------------------------------------------------------------------------------------
9173538 0.00000 jobarray.t jc           qw    12/16/2010 17:29:06                                    1 1-60:1
9173539 0.00000 combine.R  jc           hqw   12/16/2010 17:29:07                                    1        

In the jobarray.txt file we can see how our total number of replications has been split up. There is no load balancing with the way we set this up, but it's sufficient to get the idea across.

$ tail jobarray.txt
apply.R ithread=51 nRep=1 rep.length=10
apply.R ithread=52 nRep=1 rep.length=10
apply.R ithread=53 nRep=1 rep.length=10
apply.R ithread=54 nRep=1 rep.length=10
apply.R ithread=55 nRep=1 rep.length=10
apply.R ithread=56 nRep=1 rep.length=10
apply.R ithread=57 nRep=1 rep.length=10
apply.R ithread=58 nRep=1 rep.length=10
apply.R ithread=59 nRep=1 rep.length=10
apply.R ithread=60 nRep=41 rep.length=10
statistics/parallel-r.txt · Last modified: 2011/01/21 9:10 am PST by John Colby
 
Except where otherwise noted, content on this wiki is licensed under the following license: CC Attribution-Noncommercial-Share Alike 4.0 International
Recent changes RSS feed Donate Powered by PHP Valid XHTML 1.0 Valid CSS Driven by DokuWiki