Loading...
Loading...
Help users write correct R code for async, parallel, and distributed computing using mirai. Use when users need to: run R code asynchronously or in parallel, write mirai code with correct dependency passing, set up local or remote parallel workers, convert code from future or parallel, use parallel map operations, integrate async tasks with Shiny or promises, or configure cluster/HPC computing.
npx skill4agent add posit-dev/skills mirai.args.argsmy_data <- data.frame(x = 1:10)
my_func <- function(df) sum(df$x)
m <- mirai(my_func(my_data), .args = list(my_func = my_func, my_data = my_data))process <- function(x, y) {
mirai(x + y, .args = environment())
}......m <- mirai(run(data), run = my_run_func, data = my_data)...df_matrix <- function(x, y) {
mirai(as.matrix(rbind(x, y)), environment())
}...| Scenario | Use |
|---|---|
| Data and simple functions | |
| Helper functions called by other functions that need lexical scoping | |
| Passing the entire local scope to local eval env | |
| Passing the entire local scope to global env | |
| Large persistent objects shared across tasks | |
# WRONG: my_data and my_func are not available on the daemon
m <- mirai(my_func(my_data))
# CORRECT: Pass via .args
m <- mirai(my_func(my_data), .args = list(my_func = my_func, my_data = my_data))
# CORRECT: Or pass via ...
m <- mirai(my_func(my_data), my_func = my_func, my_data = my_data)# WRONG: dplyr is not loaded on the daemon
m <- mirai(filter(df, x > 5), .args = list(df = my_df))
# CORRECT: Use namespace-qualified calls
m <- mirai(dplyr::filter(df, x > 5), .args = list(df = my_df))
# CORRECT: Or load the package inside the expression
m <- mirai({
library(dplyr)
filter(df, x > 5)
}, .args = list(df = my_df))
# CORRECT: Or pre-load on all daemons with everywhere()
everywhere(library(dplyr))
m <- mirai(filter(df, x > 5), .args = list(df = my_df))m$datam[]unresolved(m)# WRONG: m$data may still be an unresolved value
m <- mirai(slow_computation())
result <- m$data # may return an 'unresolved' logical value
# CORRECT: Use [] to wait for the result
m <- mirai(slow_computation())
result <- m[] # blocks until resolved, returns the value directly
# CORRECT: Or use call_mirai() then access $data
call_mirai(m)
result <- m$data
# CORRECT: Non-blocking check
if (!unresolved(m)) result <- m$data# WRONG: .args names don't match what the expression uses
m <- mirai(process(input), .args = list(fn = process, data = input))
# CORRECT: Names in .args must match names used in the expression
m <- mirai(process(input), .args = list(process = process, input = input))mirai_map()# WRONG: dplyr not available on daemons
results <- mirai_map(data_list, function(x) filter(x, val > 0))[]
# CORRECT: Namespace-qualify, or use everywhere() first
results <- mirai_map(data_list, function(x) dplyr::filter(x, val > 0))[]mirai()daemons()# Start 4 local daemon processes (with dispatcher, the default)
daemons(4)
# Direct connection (no dispatcher) — lower overhead, round-robin scheduling
daemons(4, dispatcher = FALSE)
# Check daemon status
info()
# Daemons persist until explicitly reset
daemons(0)with(daemons(...), {...})with(daemons(4), {
m <- mirai(expensive_task())
m[]
})local_daemons()with_daemons()daemons(4, .compute = "workers")
# Switch active profile for the duration of the calling function
my_func <- function() {
local_daemons("workers")
mirai(task())[] # uses "workers" profile
}
# Switch active profile for a block
with_daemons("workers", {
m <- mirai(task())
m[]
})daemons(4, .compute = "cpu")
daemons(2, .compute = "gpu")
m1 <- mirai(cpu_work(), .compute = "cpu")
m2 <- mirai(gpu_work(), .compute = "gpu").xdaemons(4)
# Basic map — collect with []
results <- mirai_map(1:10, function(x) x^2)[]
# With constant arguments via .args
results <- mirai_map(
1:10,
function(x, power) x^power,
.args = list(power = 3)
)[]
# With helper functions via ... (assigned to daemon global env)
results <- mirai_map(
data_list,
function(x) transform(x, helper),
helper = my_helper_func
)[]
# Flatten results to a vector
results <- mirai_map(1:10, sqrt)[.flat]
# Progress bar (requires cli package)
results <- mirai_map(1:100, slow_task)[.progress]
# Early stopping on error
results <- mirai_map(1:100, risky_task)[.stop]
# Combine options
results <- mirai_map(1:100, task)[.stop, .progress]# Each row becomes arguments to the function
params <- data.frame(mean = 1:5, sd = c(0.1, 0.5, 1, 2, 5))
results <- mirai_map(params, function(mean, sd) rnorm(100, mean, sd))[]daemons(4)
# Load packages on all daemons
everywhere(library(DBI))
# Set up persistent connections
everywhere(con <<- dbConnect(RSQLite::SQLite(), db_path), db_path = tempfile())
# Export objects to daemon global environment via ...
# The empty {} expression is intentional — the point is to export objects via ...
everywhere({}, api_key = my_key, config = my_config)m <- mirai(stop("something went wrong"))
m[]
is_mirai_error(m$data) # TRUE for execution errors
is_mirai_interrupt(m$data) # TRUE for cancelled tasks
is_error_value(m$data) # TRUE for any error/interrupt/timeout
m$data$message # Error message
m$data$stack.trace # Full stack trace
m$data$condition.class # Original error classes
# Timeouts (requires dispatcher)
m <- mirai(Sys.sleep(60), .timeout = 5000) # 5-second timeout
# Cancellation (requires dispatcher)
m <- mirai(long_running_task())
stop_mirai(m)library(shiny)
library(bslib)
library(mirai)
daemons(4)
onStop(function() daemons(0))
ui <- page_fluid(
input_task_button("run", "Run Analysis"),
plotOutput("result")
)
server <- function(input, output, session) {
task <- ExtendedTask$new(
function(n) mirai(rnorm(n), .args = list(n = n))
) |> bind_task_button("run")
observeEvent(input$run, task$invoke(input$n))
output$result <- renderPlot(hist(task$result()))
}library(promises)
mirai({Sys.sleep(1); "done"}) %...>% cat()daemons(
url = host_url(tls = TRUE),
remote = ssh_config(c("ssh://user@node1", "ssh://user@node2"))
)daemons(
n = 4,
url = local_url(tcp = TRUE),
remote = ssh_config("ssh://user@node1", tunnel = TRUE)
)daemons(
n = 1,
url = host_url(),
remote = cluster_config(
command = "sbatch",
options = "#SBATCH --job-name=mirai\n#SBATCH --mem=8G\n#SBATCH --array=1-50",
rscript = file.path(R.home("bin"), "Rscript")
)
)daemons(n = 2, url = host_url(), remote = http_config())| future | mirai |
|---|---|
| Auto-detects globals | Must pass all dependencies explicitly |
| |
| |
| |
| |
| |
| |
| |
.args...| parallel | mirai |
|---|---|
| |
| Pass via |
| |
| |
| |
| |
| |
make_cluster()cl <- mirai::make_cluster(4)
# Use with all parallel::par* functions as normal
parallel::parLapply(cl, 1:100, my_func)
mirai::stop_cluster(cl)
# R >= 4.5: native integration
cl <- parallel::makeCluster(4, type = "MIRAI")# Default: L'Ecuyer-CMRG stream per daemon (statistically safe, non-reproducible)
daemons(4)
# Reproducible: L'Ecuyer-CMRG stream per mirai call
# Results are the same regardless of daemon count or scheduling
daemons(4, seed = 42)# Synchronous mode — runs in the host process, supports browser()
daemons(sync = TRUE)
m <- mirai({
browser()
result <- tricky_function(x)
result
}, .args = list(tricky_function = tricky_function, x = my_x))
daemons(0)
# Capture daemon stdout/stderr
daemons(4, output = TRUE)mirai_maplocal_url()launch_local()daemons(n)mirai_map(1:10, function(x) {
daemons(url = local_url())
launch_local(2)
result <- mirai_map(1:5, function(y, x) x * y, .args = list(x = x))[]
daemons(0)
result
})[]