The
snow
(Simple Network of Workstations) package implements a simple mechanism
for using a collection of workstations or a Beowulf cluster for
``embarrassingly parallel'' computations in R. The interface, which
is based in part on the Python CoW
(Cluster of Workstations) package, is intended to be quite simple,
and is designed so that it can be implemented on top of several
different lower level communication mechanisms. Three low level
interfaces have been implemented, one using PVM via the
rpvm
package by Li and Rossini, one using MPI via the
Rmpi
package
by Hao Yu, and one using raw sockets that may be useful if PVM and MPI
are not available. This note describes how to start up a cluster, the
basic functions for computing with a cluster, and an example of using
the cluster for parallel bootstrapping.
Two notes of caution:
^C
is likely to cause
problems so avoid this when running snow
.
stopCluster
for your
cluster. This helps in cleanly shutting down your framework.
Starting a workstation cluster is the only step in using a cluster
that depends explicitly on the underlying communication mechanism. A
cluster is started by calling the makeCluster
function, but the
details of the call currently vary slightly depending on the type of
cluster. PVM and MPI clusters may also need some preliminary
preparations to start the PVM or MPI systems.
pvm
:
<starting PVM using the pvm console>= luke@itasca cluster% pvm pvm> add itasca add itasca 0 successful HOST DTID itasca Duplicate host pvm> add owasso add owasso 1 successful HOST DTID owasso 80000 pvm> conf conf 2 hosts, 2 data formats HOST DTID ARCH SPEED DSIG itasca 40000 LINUX 1000 0x00408841 owasso 80000 DARWIN 1000 0x0658eb59 pvm>
The parallel virtual machine configuration shown by the conf
command consists of two hosts, itasca
running i386 Linux and
owasso
running Mac OS X.
As an alternative to the command line console pvm
you can also use
the graphical console xpvm
, if it is installed, or you can start
PVM from R using functions provided by the rpvm
package.
Once PVM is running, to use a cluster of two worker nodes you would
start R on the master, load the snow package, and call makeCluster
with arguments 2, the number of worker nodes, and type = "PVM"
:
<starting a PVM cluster>= cl <- makeCluster(2, type = "PVM")
The returned value is a list of references to these two processes.
Currently the default cluster type is "PVM"
if the rpvm
package is available and Rmpi
is not already loaded, so the
type = "PVM"
argument can be omitted.
Once you are done with PVM
remember to shut it down, for example
with the halt
command in the pvm
console.
lamboot
; the details depend on
your MPI system.
Once you are done with MPI remember to shut it down if yur MPI
requires this; for LAM-MPI, for example, use lamhalt
.
<starting an MPI cluster with process spawning>= cl <- makeCluster(2, type = "MPI")
Currently if rpvm
is not available but Rmpi
is available then
the default cluster type is "MPI"
and the type = "MPI"
argument can be omitted.
mpirun
, you will need to install Rmpi
accordingly, and you need to install the RMPISNOW
shell script
from the installed snow
package or inst
directory of the
package sources in an appropriate location, preferably on your path.
If RMPISNOW
is in your path you can then start R with
<starting R with mpirun
>=
mpirun -np 3 RMPISNOW
This starts a master and two worker processes. A cluster is created in the master process. You can obtain a reference to the cluster with
<obtaining a cluster started with mpirun
>= [D->]
cl <- getMPIcluster()
With recent versins of snow
You can also obtain a reference to the
running cluster with
<obtaining a cluster started with mpirun
>+= [<-D->]
cl <- makeCluster()
or, for a cluster with two workers, with
<obtaining a cluster started with mpirun
>+= [<-D]
cl <- makeCluster(2)
makeCluster
and specify
type="SOCK"
and a list of machines to use. For example,
<starting a socket cluster>= cl <- makeCluster(c("itasca", "owasso"), type = "SOCK")
creates a cluster consisting of two R processes, one running on a
machine named itasca
and the other on owasso
. The returned
value is a list of references to these two processes.
makeCluster
accepts a number of
additional named arguments for specifying options for the node
creation processes. The most useful arguments are type
for
specifying the cluster type, homogeneous
for specifying whether
the startup process should assume that worker hosts have the same
architecture and file system layout as the master (see Section
[->] for more on heterogeneous
clusters), and outfile
for specifying where output from worker
processes is to be sent. By default outfile
is /dev/null
.
For debugging purposes, especially during initial configuration, it
can be useful to redirect output to a proper file.
An alternative to specifying additional cluster options with each
cluster creation call is to use setDefaultClusterOptions
to change
the defaults. For example, to specify that the default should be an
inhomogeneous MPI cluster use
setDefaultClusterOptions(type="MPI", homogeneous = FALSE)
<stopping a cluster>= stopCluster(cl)
Socket clusters should stop automatically when the process that
created them terminates; however, it is still a good idea to call
stopCluster
. If you do not call stopCluster
to terminate a
PVM cluster before the creating process exits you can use the halt
command in the PVM console. Your MPI system may provide a similar
facility; for LAM you can use lamhalt
or lamwipe
.
R
shell script and the snow
library live at the same absolute paths on all nodes. snow
need
not be installed in the system library, but, for the defaults to work,
it has to be in one of the libraries specified in the R_LIBS
environment variable for the master.
For inhomogeneous systems, such as systems with differing file system layouts or systems where different architectures share a common file system, some additional configuration is needed:
RunSnowNode
from the installed snow
to a location on the
search path for that machine and make sure it is executable.
R_SNOW_LIB
to the name of
the directory that contains the snow
package and supporting
packages such as rpvm
, Rmpi
, or rlecuyer
for the
appropriate architecture.
R
is available on the
search path on each machine.
R_SNOW_LIB
variable to a nonempty value on
the master causes the default setting of the homogeneous
cluster
option to be FALSE
; otherwise, it is TRUE
.
clusterCall
and
clusterApply
. clusterCall
calls a specified function with
identical arguments on each member of the cluster and returns a list
of the results. The calls are executed in parallel. For example,
we can ask the nodes for their names and machine types:
<R session>= [D->] > clusterCall(cl, function() Sys.info()[c("nodename","machine")]) [[1]] nodename machine "owasso.stat.uiowa.edu" "Power Macintosh" [[2]] nodename machine "itasca" "i686"
A useful variation of clusterCall
is clusterEvalQ
, defined as
<definition of clusterEvalQ
>=
clusterEvalQ<-function(cl, expr)
clusterCall(cl, eval, substitute(expr), env=.GlobalEnv)
This can be used, for example, to load a package on each cluster node with
<loading a library on all nodes>= clusterEvalQ(cl, library(boot))
clusterApply
is a version of lapply
that evaluates each call
on a different member of the cluster. It requires that the number of
nodes in the cluster be at least as large as the number of elements in
the list argument. A simple example:
<R session>+= [<-D->] > clusterApply(cl, 1:2, get("+"), 3) [[1]] [1] 4 [[2]] [1] 5
<R session>+= [<-D->] > clusterCall(cl, runif, 3) [[1]] [1] 0.08496597 0.35232298 0.60300751 [[2]] [1] 0.08496597 0.35232298 0.60300751
A quick and (very) dirty way of addressing this is random seeding, using something like
<random seeding of cluster generators>= clusterApply(cl, runif(length(cl),max=10000000), set.seed)
A better approach is to use a parallel random number genarator
package. Several are available. By default snow
uses
therlecuyer
package, which provides an interface to a parallel
random number generator package of
L'Ecuyer, Simard, Chen, and Kelton. The function
clusterSetupRNG
handles the initialization. The default call with
no additional arguments uses a random seed; named arguments described
in the help page can be used for more control:
<R session>+= [<-D->] > clusterSetupRNG(cl) > clusterCall(cl, runif, 3) [[1]] [1] 0.749391854 0.007316102 0.152742874 [[2]] [1] 0.8424790 0.8896625 0.2256776
boot
package includes an example using the data nuclear
.
The setup code, given in the boot
help page, is
<bootstrap setup>= library(boot) # In this example we show the use of boot in a prediction from # regression based on the nuclear data. This example is taken # from Example 6.8 of Davison and Hinkley (1997). Notice also # that two extra arguments to statistic are passed through boot. data(nuclear) nuke <- nuclear[,c(1,2,5,7,8,10,11)] nuke.lm <- glm(log(cost)~date+log(cap)+ne+ ct+log(cum.n)+pt, data=nuke) nuke.diag <- glm.diag(nuke.lm) nuke.res <- nuke.diag$res*nuke.diag$sd nuke.res <- nuke.res-mean(nuke.res) # We set up a new dataframe with the data, the standardized # residuals and the fitted values for use in the bootstrap. nuke.data <- data.frame(nuke,resid=nuke.res,fit=fitted(nuke.lm)) # Now we want a prediction of plant number 32 but at date 73.00 new.data <- data.frame(cost=1, date=73.00, cap=886, ne=0, ct=0, cum.n=11, pt=1) new.fit <- predict(nuke.lm, new.data) nuke.fun <- function(dat, inds, i.pred, fit.pred, x.pred) { assign(".inds", inds, envir=.GlobalEnv) lm.b <- glm(fit+resid[.inds] ~date+log(cap)+ne+ct+ log(cum.n)+pt, data=dat) pred.b <- predict(lm.b,x.pred) remove(".inds", envir=.GlobalEnv) c(coef(lm.b), pred.b-(fit.pred+dat$resid[i.pred])) }
On a single workstation the actual bootstrap example takes approximately half a minute:
<R session>+= [<-D->] > system.time(nuke.boot <- + boot(nuke.data, nuke.fun, R=999, m=1, + fit.pred=new.fit, x.pred=new.data)) [1] 26.32 0.71 27.02 0.00 0.00
Using a two-machine cluster (identical i686 nodes this time) is approximately twice as fast:
<R session>+= [<-D->] > clusterEvalQ(cl, library(boot)) > system.time(cl.nuke.boot <- + clusterCall(cl,boot,nuke.data, nuke.fun, R=500, m=1, + fit.pred=new.fit, x.pred=new.data)) [1] 0.01 0.00 14.27 0.00 0.00
Using a five node cluster takes about 6 seconds:
<R session>+= [<-D] > cl<-makeCluster(5) > clusterEvalQ(cl,library(boot)) > system.time(cl.nuke.boot <- + clusterCall(cl,boot,nuke.data, nuke.fun, R=200, m=1, + fit.pred=new.fit, x.pred=new.data)) [1] 0.03 0.00 5.58 0.00 0.00
The value to use comparing speeds is the third one, the elapsed time.
clusterApply
include
parLapply
, parSapply
, and parApply
. These are parallel
versions of lapply
, sapply
, and apply
. Some examples:
<higher level examples>= [D->] sum(parApply(cl, matrix(1:100,10), 1, sum)) sum(parApply(cl, matrix(1:10000,100), 1, sum))
<higher level examples>+= [<-D->] x<-1:100/101 system.time(qtukey(x, 2, df=2)) system.time(unlist(parLapply(cl, x, qtukey, 2, df=2)))
A very simple version of a parallel matrix multiply is also provides as an illustration.
<higher level examples>+= [<-D] A<-matrix(rnorm(10000),100) system.time(A %*% A) system.time(parMM(cl,A , A))
lamhalt
and lamwipe
you can
kill all R processes by halting the PVM or LAM-MPI universe.
Error handling is currently very primitive. All evaluations on worker
nodes are carried out using try
. Results returned can therefore
either be legitimate results or try
errors. In principle user
code should check for this.
The right way to handle situations where some nodes signal errors and
some do not is not obvious and can depend on the application. Once R
obtains a more sophisticated error handling mechanism it will be
possible to build on that mechanism to provide flexible user control
over the handling of errors in snow
.
Ideally, an interrupt on the master should be propagated to the worker
nodes and interrupt any calculations currently in progress. This does
not happen. In fact, an interrupt on the master during a snow
computation will most likely leave the communication infrastructure in
a confused state that makes further use of the cluster within the
current R session impossible. The temporary solution: don't interrupt
a snow
computation. This is clearly not a satisfactory solution.
Once R obtains a mechanism for controlling the handling of interrupts
it sill be possible to do somewhat better. Whether it sill be
possible to actually propagate interrupts to worker nodes is not
completely clear and may depend on operating system considerations.
clusterEvalQ
>: D1
mpirun
>: D1, D2, D3
mpirun
>: D1