ClusterMQ Technical Documentation

Worker API

Base API and schedulers

The main worker functions are wrapped in an R6 class with the name of QSys. This provides a standardized API to the lower-level messages that are sent via rzmq.

The base class itself is derived in scheduler classes that add the required functions for submitting and cleaning up jobs:

+ QSys
  |- Multicore
  |- LSF
  + SGE
    |- PBS
    |- Torque
  |- etc.

A pool of workers can be created using the workers() function, which instantiates an object of the corresponding QSys-derived scheduler class. See ?workers for details.

Worker startup

For workers that are started up via a scheduler, we do not know which machine they will run on. This is why we start up every worker with a TCP/IP address of the master socket that will distribute work.

This is achieved by the call to R common to all schedulers:

On the master’s side, we wait until a worker connects:

Common data and exports

Workers will start up without any knowledge of what they should process or how. In order to transfer initial data to the worker, we first create and serialize a list object with the following fields:

Workers that connect to the master will send a list with a field token. This can be used to check if the worker already received the common data it is supposed to work on.

Iterated data

If the worker has already received the common data, we can send it a chunk of iterated arguments to work on. These are passed as a list of iterables, e.g. a data.frame with a column for each iterated argument.

It also needs to have a column with name <space>id<space>, which will be used to identify each call.

If the worker has finished processing, it will send a message with the field result that is a list, containing:

Custom calls

Apart from sending common and iterated data that the worker will process in chunks, it is also possible to send arbitrary calls that it will evaluate. It needs the following fields:

Main event loop

Putting the above together in an event loop, we get what is essentially implemented in master.

A loop of a similar structure can be used to extend clustermq. As an example, this was done by drake using common data and custom calls only (no iterated chunks).

ZeroMQ message specification

Communication between the master (main event loop) and workers (QSys base class) is organised in messages. These are chunks of serialized data with an id field, and other data that is required for this type of message.

Messages per type

Below, the message id is listed with the additional fields per message.


This workflow is handled by the worker() event loop of clustermq (not to be confused with the workers() control). It is the function called in every job or thread to interact with the master(). The event loop is internal, i.e. it is not modifiable and not exported.

  • Message ID indicating worker is accepting data
  • Field has to be worker_id to master or empty to ssh_proxy
  • Answer is serialized common data (fun, const, and seed) or redirect (with URL where worker can get data)
  • Message ID indicating worker is accepting chunks
  • It may contain the field result with a finished chunk
  • If processing failed, result is an object of type error
  • If success, result is a list with the following vectors:
    • result is a named rettype with results
    • warnings is a list with warning messages of individual calls
    • errors is a list with error messages of individual calls
  • Message ID indicating worker is shutting down
  • Worker will send this in response to WORKER_STOP
  • Field has to be time (from Sys.time()), mem (max memory used) and calls (number of processed calls)
  • Some error occurred in processing flow (not the function calls themselves)
  • Field msg is describing the error
  • Master will shut down after receiving this signal


This workflow is handled by the master() function of clustermq. If you are using Q() or Q_rows(), this is handled under the hood. Workers created outside of these functions can be reused within Q()/Q_rows() without knowing any of the internal message structure.

The documentation below is to show it is possible to implement a custom control flow, e.g. if you want to evaluate arbitrary expressions on workers instead of defining one function to call and different arguments.

  • Message contains common data, like the function to call and its arguments
  • Required fields are: fun, const, export, rettype, common_seed, token
  • Worker will respond with WORKER_READY
  • Chunk of iterated arguments for the worker
  • Field has to be chunk, a data.frame where each row is a call and columns are arguments
  • Row names of chunk are used as call IDs
DO_CALL (new in 0.8.5)
  • Evaluate a specific expression on the worker
  • Needs fields expr (the expression to be evaluated) and env (list environment to evaluate it in)
  • Instruct the worker to wait wait seconds
  • Worker will respond with WORKER_READY
  • Instruct the worker to exit its main event loop
  • This message has no fields
Disconnect and reset socket state

Control flow stages

The convention here is

Batch processing, no proxy

This is the default use case for Q, Q_rows. It will set the common data (DO_SETUP; function to call, constant arguments, exported data, random seed) once and then provide chunks of arguments (DO_CHUNK) as data.frames for batch processing.

    • DO_SETUP
  • WORKER_READY [repeat]
    • DO_CHUNK [repeat]

These can be implemented the following way:

Evaluating custom expressions

This can be mixed with batch processing, as long as DO_SETUP is called before DO_CHUNK (otherwise it will cause WORKER_ERROR on token mismatch).

    • DO_SETUP or DO_CALL (e.g. to export commonly used data)
  • WORKER_READY [repeat]
    • DO_CALL [repeat]

These can be implemented the following way: