The rzmq
package (which this package depends on) needs the system library ZeroMQ.
# You can skip this step on Windows and macOS, the rzmq binary has it
brew install zeromq # Linuxbrew, Homebrew on macOS
conda install zeromq # Conda
sudo apt-get install libzmq3-dev # Ubuntu
sudo yum install zeromq3-devel # Fedora
pacman -S zeromq # Arch Linux
More details can be found at the rzmq project README.
The latest stable version is available on CRAN.
Alternatively, it is also available on the master
branch of the repository.
# from CRAN
install.packages('clustermq')
# from Github
# install.packages('devtools')
devtools::install_github('mschubert/clustermq')
In the develop
branch, we will introduce code changes and new features. These may contain bugs, poor documentation, or other inconveniences. This branch may not install at times. However, feedback is very welcome.
# install.packages('devtools')
devtools::install_github('mschubert/clustermq', ref="develop")
An HPC cluster’s scheduler ensures that computing jobs are distributed to available worker nodes. Hence, this is what clustermq
interfaces with in order to do computations.
By default, we will take whichever scheduler we find and fall back on local processing. This will work in most, but not all cases.
To set up a scheduler explicitly, see the following links:
There are reasons why you might prefer to not to work on the computing cluster directly but rather on your local machine instead. RStudio is an excellent local IDE, it’s more responsive than and feature-rich than browser-based solutions (RStudio server, Project Jupyter), and it avoids X forwarding issues when you want to look at plots you just made.
Using this setup, however, you lost access to the computing cluster. Instead, you had to copy your data there, and then submit individual scripts as jobs, aggregating the data in the end again. clustermq
is trying to solve this by providing a transparent SSH interface.
In order to use clustermq
from your local machine, the package needs to be installed on both there and on the computing cluster. On the computing cluster, set up your scheduler and make sure clustermq
runs there without problems. On your local machine, add the following options in your ~/.Rprofile
:
options(
clustermq.scheduler = "ssh",
clustermq.ssh.host = "user@host", # use your user and host, obviously
clustermq.ssh.log = "~/cmq_ssh.log" # log for easier debugging
)
We recommend that you set up SSH keys for password-less login.
Q
functionThe following arguments are supported by Q
:
fun
- The function to call. This needs to be self-sufficient (because it will not have access to the master
environment)...
- All iterated arguments passed to the function. If there is more than one, all of them need to be namedconst
- A named list of non-iterated arguments passed to fun
export
- A named list of objects to export to the worker environmentBehavior can further be fine-tuned using the options below:
fail_on_error
- Whether to stop if one of the calls returns an errorseed
- A common seed that is combined with job number for reproducible resultsmemory
- Amount of memory to request for the job (bsub -M
)n_jobs
- Number of jobs to submit for all the function callsjob_size
- Number of function calls per job. If used in combination with n_jobs
the latter will be overall limitchunk_size
- How many calls a worker should process before reporting back to the master. Default: every worker will report back 100 times totalThe full documentation is available by typing ?Q
.
The package is designed to distribute arbitrary function calls on HPC worker nodes. There are, however, a couple of caveats to observe as the R session running on a worker does not share your local memory.
The simplest example is to a function call that is completely self-sufficient, and there is one argument (x
) that we iterate through:
fx = function(x) x * 2
Q(fx, x=1:3, n_jobs=1)
#> Submitting 1 worker jobs (ID: 6855) ...
#> [[1]]
#> [1] 2
#>
#> [[2]]
#> [1] 4
#>
#> [[3]]
#> [1] 6
Non-iterated arguments are supported by the const
argument:
fx = function(x, y) x * 2 + y
Q(fx, x=1:3, const=list(y=10), n_jobs=1)
#> Submitting 1 worker jobs (ID: 6855) ...
#> [[1]]
#> [1] 12
#>
#> [[2]]
#> [1] 14
#>
#> [[3]]
#> [1] 16
If a function relies on objects in its environment that are not passed as arguments, they can be exported using the export
argument:
fx = function(x) x * 2 + y
Q(fx, x=1:3, export=list(y=10), n_jobs=1)
#> Submitting 1 worker jobs (ID: 6855) ...
#> [[1]]
#> [1] 12
#>
#> [[2]]
#> [1] 14
#>
#> [[3]]
#> [1] 16
If we want to use a package function we need to load it on the worker using a library()
call or referencing it with package_name::
:
fx = function(x) {
library(dplyr)
x %>%
mutate(area = Sepal.Length * Sepal.Width) %>%
head()
}
Q(fx, x=list(iris), n_jobs=1)
#> Submitting 1 worker jobs (ID: 6855) ...
#>
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#>
#> filter, lag
#> The following objects are masked from 'package:base':
#>
#> intersect, setdiff, setequal, union
#> [[1]]
#> Sepal.Length Sepal.Width Petal.Length Petal.Width Species area
#> 1 5.1 3.5 1.4 0.2 setosa 17.85
#> 2 4.9 3.0 1.4 0.2 setosa 14.70
#> 3 4.7 3.2 1.3 0.2 setosa 15.04
#> 4 4.6 3.1 1.5 0.2 setosa 14.26
#> 5 5.0 3.6 1.4 0.2 setosa 18.00
#> 6 5.4 3.9 1.7 0.4 setosa 21.06
foreach
backendThe foreach
package provides an interface to perform repeated tasks on different backends. While it can perform the function of simple loops using %do%
:
library(foreach)
x = foreach(i=1:3) %do% sqrt(i)
it can also perform these operations in parallel using %dopar%
:
x = foreach(i=1:3) %dopar% sqrt(i)
#> Submitting 2 worker jobs (ID: 6665) ...
The latter allows registering different handlers for parallel execution, where we can use clustermq
:
clustermq::register_dopar_cmq(n_jobs=2, memory=1024) # this accepts same arguments as `Q`
x = foreach(i=1:3) %dopar% sqrt(i) # this will be executed as jobs
#> Submitting 2 worker jobs (ID: 6855) ...
As BiocParallel supports foreach
too, this means we can run all packages that use BiocParallel
on the cluster as well via DoparParam
.
drake
The drake
package enables users to define a dependency structure of different function calls, and only evaluate them if the underlying data changed.
drake — or, Data Frames in R for Make — is a general-purpose workflow manager for data-driven tasks. It rebuilds intermediate data objects when their dependencies change, and it skips work when the results are already up to date. Not every runthrough starts from scratch, and completed workflows have tangible evidence of reproducibility. drake also supports scalability, parallel computing, and a smooth user experience when it comes to setting up, deploying, and maintaining data science projects.
It can use clustermq
to perform calculations as jobs:
library(drake)
load_mtcars_example()
# clean(destroy = TRUE)
# options(clustermq.scheduler = "multicore")
make(my_plan, parallelism = "clustermq", jobs = 2, verbose = 4)
Function calls evaluated by workers are wrapped in event handlers, which means that even if a call evaluation throws an error, this should be reported back to the main R session.
However, there are reasons why workers might crash, and in which case they can not report back. These include:
In this case, it is useful to have the worker(s) create a log file that will also include events that are not reported back. It can be requested using:
Q(..., log_file="/path/to.file")
Note that log_file
is a template field of your scheduler script, and hence needs to be present there in order for this to work. The default templates all have this field included.
In order to log each worker separately, some schedulers support wildcards in their log file names. For instance:
log_file="/path/to.file.%I"
log_file="/path/to.file.%a"
Your scheduler documentation will have more details about the available options.
When reporting a bug that includes worker crashes, please always include a log file.
Before trying remote schedulers via SSH, make sure that the scheduler works when you first connect to the cluster and run a job from there.
If the terminal is stuck at
Connecting <user@host> via SSH ...
make sure that each step of your SSH connection works by typing the following commands in your local terminal and make sure that you don’t get errors or warnings in each step:
# test your ssh login that you set up in ~/.ssh/config
# if this fails you have not set up SSH correctly
ssh <user@host>
# test port forwarding from 54709 remote to 6687 local (ports are random)
# if the fails you will not be able to use clustermq via SSH
ssh -R 54709:localhost:6687 <user@host> R --vanilla
If you get an Command not found: R
error, make sure your $PATH
is set up correctly in your ~/.bash_profile
and/or your ~/.bashrc
(depending on your cluster config you might need either).
If you get a SSH warning or error try again with ssh -v
to enable verbose output.
If the forward itself works, set the following option in your ~/.Rprofile
:
options(clustermq.ssh.log = "~/ssh_proxy.log")
This will create a log file on the remote server that will contain any errors that might have occurred during ssh_proxy
startup.
In some cases, it may be necessary to activate a specific computing environment on the scheduler jobs prior to starting up the worker. This can be, for instance, because R was only installed in a specific environment or container.
Examples for such environments or containers are:
It should be possible to activate them in the job submission script (i.e., the template file). This is widely untested, but would look the following for the LSF scheduler (analogous for others):
#BSUB-J {{ job_name }}[1-{{ n_jobs }}] # name of the job / array jobs
#BSUB-o {{ log_file | /dev/null }} # stdout + stderr
#BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
##BSUB-q default # name of the queue (uncomment)
##BSUB-W {{ walltime | 6:00 }} # walltime (uncomment)
module load {{ bashenv | default_bash_env }}
# or: source activate {{ conda | default_conda_env_name }}
# or: your environment activation command
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
This template still needs to be filled, so in the above example you need to pass either
Q(..., template=list(bashenv="my environment name"))
or set it via an .Rprofile option:
options(
clustermq.defaults = list(bashenv="my default env")
)
In your ~/.Rprofile
on your computing cluster, set the following options:
options(
clustermq.scheduler = "lsf",
clustermq.template = "/path/to/file/below"
)
The option clustermq.template
should point to a LSF template file like the one below.
#BSUB-J {{ job_name }}[1-{{ n_jobs }}] # name of the job / array jobs
#BSUB-o {{ log_file | /dev/null }} # stdout + stderr; %I for array index
#BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
##BSUB-q default # name of the queue (uncomment)
##BSUB-W {{ walltime | 6:00 }} # walltime (uncomment)
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #BSUB-*
defines command-line arguments to the bsub
program.
BSUB-M
and BSUB-R
. Check your local setup if the memory values supplied are MiB or KiB, default is 4096
if not requesting memory when calling Q()
BSUB-q default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional #
)BSUB-W {{ walltime }}
. Set the maximum time a job is allowed to run before being killed. The default here is to disable this line. If you enable it, enter a fixed value or pass the walltime
argument to each function call. The way it is written, it will use 6 hours if no arguemnt is given.#BSUB-*
(where *
represents the argument){{ ... }}
), as they are used to fill in the right variablesOnce this is done, the package will use your settings and no longer warn you of the missing options.
In your ~/.Rprofile
on your computing cluster, set the following options:
options(
clustermq.scheduler = "sge",
clustermq.template = "/path/to/file/below"
)
The option clustermq.template
should point to a SGE template file like the one below.
#$ -N {{ job_name }} # job name
#$ -q default # submit to queue named "default"
#$ -j y # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }} # output file
#$ -cwd # use pwd as work dir
#$ -V # use environment variable
#$ -t 1-{{ n_jobs }} # submit jobs as array
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #$-*
defines command-line arguments to the qsub
program.
$ -q default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional #
){{ ... }}
), as they are used to fill in the right variables.Once this is done, the package will use your settings and no longer warn you of the missing options.
In your ~/.Rprofile
on your computing cluster, set the following options:
options(
clustermq.scheduler = "slurm",
clustermq.template = "/path/to/file/below"
)
The option clustermq.template
should point to a SLURM template file like the one below.
#!/bin/sh
#SBATCH --job-name={{ job_name }}
#SBATCH --partition=default
#SBATCH --output={{ log_file | /dev/null }} # you can add .%a for array index
#SBATCH --error={{ log_file | /dev/null }}
#SBATCH --mem-per-cpu={{ memory | 4096 }}
#SBATCH --array=1-{{ n_jobs }}
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #SBATCH
defines command-line arguments to the sbatch
program.
SBATCH --partition default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional #
){{ ... }}
), as they are used to fill in the right variables.Once this is done, the package will use your settings and no longer warn you of the missing options.
In your ~/.Rprofile
on your computing cluster, use the SGE scheduler with a PBS template:
options(
clustermq.scheduler = "sge",
clustermq.template.lsf = "/path/to/file/below"
)
The option clustermq.template
should point to a PBS template file like the one below.
#PBS -N {{ job_name }}
#PBS -l select=1:ncpus={{ cores | 1 }}
#PBS -l walltime={{ walltime | 1:00:00 }}
#PBS -q default
#PBS -o {{ log_file | /dev/null }}
#PBS -j oe
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #PBS-*
defines command-line arguments to the qsub
program.
#PBS-q default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional #
){{ ... }}
), as they are used to fill in the right variables.Once this is done, the package will use your settings and no longer warn you of the missing options.
In your ~/.Rprofile
on your computing cluster, use the SGE scheduler with a Torque template:
options(
clustermq.scheduler = "sge",
clustermq.template.lsf = "/path/to/file/below"
)
The option clustermq.template
should point to a Torque template file like the one below.
#PBS -N {{ job_name }}
#PBS -l nodes={{ n_jobs }}:ppn=1,walltime={{ walltime | 30:00 }}
#PBS -o {{ log_file | /dev/null }}
#PBS -q default
#PBS -j oe
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
In this file, #PBS-*
defines command-line arguments to the qsub
program.
#PBS -q default
. Use the queue with name default. This will most likely not exist on your system, so choose the right name (or comment out this line with an additional #
){{ ... }}
), as they are used to fill in the right variables.Once this is done, the package will use your settings and no longer warn you of the missing options.