Asynchronous Evaluator


Queue arbitrary number of R expressions and evaluate on the background without blocking the main process.


Packages parallel and future enables asynchronous evaluation of R expressions in another process. However, parallel blocks the main session when evaluating expressions. future blocks the main session when the number of running futures exceed the maximum number of workers. For example, the following code schedules 2 future sessions. It only allows at most 2 unresolved futures at the same time.

# Future
plan(multisession, workers = 2L)

start = Sys.time()
lapply(1:8, function(ii){
  future({ Sys.sleep(2) })
  print(sprintf('%d - Time ellapsed: %.2f sec.', 
                ii, time_delta(start, Sys.time())))
#> [1] "1 - Time ellapsed: 0.22 sec."
#> [1] "2 - Time ellapsed: 0.43 sec."
#> [1] "3 - Time ellapsed: 2.47 sec."
#> [1] "4 - Time ellapsed: 2.67 sec."
#> [1] "5 - Time ellapsed: 4.72 sec."
#> [1] "6 - Time ellapsed: 4.94 sec."
#> [1] "7 - Time ellapsed: 6.98 sec."
#> [1] "8 - Time ellapsed: 7.19 sec."

The first two futures are scheduled within 0.5 seconds. When the third future comes online, the main session is blocked until the first future get evaluated. Therefore the it takes 2.47 sec for the third future get scheduled. Similarly, when scheduling the 5th, 7th futures, the main process is blocked.

Is there a way to schedule asynchronous evaluations without blocking the main session?

dipsaus task scheduler:

make_async_evaluator uses qs_queue as back-end to share information across sessions.

To initialize evaluator, you need a unique name for the evaluator. n_nodes is the number of sessions to manage the sub-nodes (managers); n_subnodes is the number of sub-sessions for each manager (employee). There are total 8 R sessions created, with 1 node to manage the rest 7. If n_nodes=2 and n_subnodes=3, then total \((1+3)\times 2=8\) sessions are created, with 2 manager nodes and 3 child nodes for each manager nodes.

evaluator <- make_async_evaluator(name = 'test', n_nodes = 1, n_subnodes = 7)
#> ✔ Initializing 2 workers with 1 sub-workers. Total workers will be (0+2) x 1
Queue R expressions:
  1. Queue a normal call

Check progress using evaluator$progress(). The returned values are: total, ’running, 'await, and finished.

  1. Queue with handlers to print results
  1. Queue 20 calls without blocking the main session
  1. Queue evaluations with quasi-quotation (see help("quasiquotation"))
  1. Queue when variable is large, quasi-quotation might fail

Data initialization

A bad example, using quasi-quotation (will raise error)

Instead of quasi-quotation, pass variables as parameters

Retrieve, modify the evaluator at any place

With name, you can retrieve the evaluators at any place

You can also scale up or down the evaluator

Suspend evaluator: pause, you can resume later.

Stop evaluator: pause and clear all await tasks. The running tasks will not stop, but their callback functions are removed.

Terminate: stop, and release all resources.