Event-driven promises
mirai
supplies its own as.promise()
method,
allowing it to be used as a promise from the promises
package.
These are next-generation, event-driven promises, developed in collaboration with Joe Cheng (creator of Shiny).
- Does not require each promise to be polled for completion in a background loop like other promises.
- Instead, promise actions are automatically triggered as soon as each ‘mirai’ resolves (asynchronously).
- Allows for much higher responsiveness (zero latency) and massive scalability (thousands or even millions of promises).
A ‘mirai’ may be piped directly using the promise pipe
%...>%
, which implicitly calls as.promise()
on the ‘mirai’. Similarly all promise-aware functions such as
promises::then()
or shiny::ExtendedTask$new()
which take a promise can also take a ‘mirai’ (using
promises
>= 1.3.0).
Alternatively, a ‘mirai’ may be explicitly converted into a promise
by as.promise()
, which then allows using the methods
$then()
, $finally()
etc. directly.
The following example outputs “hello” to the console after one second when the ‘mirai’ resolves.
library(mirai)
library(promises)
p <- mirai({Sys.sleep(1); "hello"}) %...>% cat()
p
#> <Promise [pending]>
It is possible to both access a ‘mirai’ value at $data
and to use a promise for enacting a side effect (assigning the value to
an environment in the example below).
env <- new.env()
m <- mirai({
Sys.sleep(1)
"hello"
})
promises::then(m, function(x) env$res <- x)
m[]
#> [1] "hello"
After returning to the top level prompt:
env$res
#> [1] "hello"
A mirai_map
also has an as.promise()
method. It will resolve when the entire map operation completes or at
least one mirai in the map is rejected.
The One Million Promises Challenge
The code below is taken from the challenge to launch and collect one million promises. For illustration, the example is scaled down to ten thousand.
library(mirai)
daemons(8, dispatcher = FALSE)
#> [1] 8
r <- 0
start <- Sys.time()
m <- mirai_map(1:10000, \(x) x, .promise = \(x) r <<- r + x)
Sys.time() - start
#> Time difference of 1.897708 secs
later::run_now()
r
#> [1] 50005000
daemons(0)
#> [1] 0
The one million promises challenge took 6 mins 25 secs to complete using an Intel i7 11th gen mobile processor with 16GB RAM.
Shiny ExtendedTask: Introduction
mirai is an asynchronous backend to scale Shiny applications. Depending on the
options supplied to daemons()
, mirai tasks may be
distributed across local background processes or networked servers in an
efficient and performant manner.
Shiny ExtendedTask allows the creation of scalable Shiny apps, which remain responsive intra-session for each user, as well as inter-session for multiple concurrent users.
In the example below, the app remains responsive, with the clock continuing to tick whilst the simulated expensive computation is running asynchronously in a parallel process. Also the button is disabled and the plot greyed out until the computation is complete.
The call to
daemons()
is made at the top level, andonStop()
may be used to automatically shut them down when the app exits.
library(shiny)
library(bslib)
library(mirai)
ui <- page_fluid(
p("The time is ", textOutput("current_time", inline = TRUE)),
hr(),
numericInput("n", "Sample size (n)", 100),
numericInput("delay", "Seconds to take for plot", 5),
input_task_button("btn", "Plot uniform distribution"),
plotOutput("plot")
)
server <- function(input, output, session) {
output$current_time <- renderText({
invalidateLater(1000)
format(Sys.time(), "%H:%M:%S %p")
})
task <- ExtendedTask$new(
function(...) mirai({Sys.sleep(y); runif(x)}, ...)
) |> bind_task_button("btn")
observeEvent(input$btn, task$invoke(x = input$n, y = input$delay))
output$plot <- renderPlot(hist(task$result()))
}
# run app using 1 local daemon
daemons(1)
# automatically shutdown daemons when app exits
onStop(function() daemons(0))
shinyApp(ui = ui, server = server)
Thanks to Joe Cheng for providing examples on which the above is based.
The key components to using ExtendedTask are:
- In the UI, use
bslib::input_task_button()
. This is a button which is disabled during computation to prevent additional clicks.
input_task_button("btn", "Plot uniform distribution")
- In the server, create an ExtendedTask object by calling
ExtendedTask$new()
on an anonymous function passing...
arguments tomirai()
, and bind it to the button created in (1).
task <- ExtendedTask$new(
function(...) mirai({Sys.sleep(y); runif(x)}, ...)
) |> bind_task_button("btn")
- In the server, create an observer on the input button, which invokes the ExtendedTask, passing in named arguments to the anonymous function (and hence the mirai) above.
observeEvent(input$btn, task$invoke(x = input$n, y = input$delay))
- In the server, create a render function for the output, which consumes the result of the ExtendedTask.
output$plot <- renderPlot(hist(task$result()))
Shiny ExtendedTask: Cancellation
The app below is a demonstration of the cancellation capability added in mirai v2.
It builds on the introductory app by adding a button that sends an infinite sleep extendedTask. This will block execution as we are using a single daemon - any new extendedTasks will be queued behind this never-ending task. There is also a button to cancel that blocking task and allow any queued plots to continue processing.
It works by assigning a reference to the mirai created in the
extendedTask$new()
method, which can then be passed to
stop_mirai()
.
library(shiny)
library(bslib)
library(mirai)
ui <- page_fluid(
p("The time is ", textOutput("current_time", inline = TRUE)),
hr(),
numericInput("n", "Sample size (n)", 100),
numericInput("delay", "Seconds to take for plot", 5),
input_task_button("btn", "Plot uniform distribution"),
hr(),
p("Click 'block' to suspend execution, and 'cancel' to resume"),
input_task_button("block", "Block"),
actionButton("cancel", "Cancel block"),
hr(),
plotOutput("plot")
)
server <- function(input, output, session) {
output$current_time <- renderText({
invalidateLater(1000)
format(Sys.time(), "%H:%M:%S %p")
})
task <- ExtendedTask$new(
function(...) mirai({Sys.sleep(y); runif(x)}, ...)
) |> bind_task_button("btn")
m <- NULL
block <- ExtendedTask$new(
function() m <<- mirai(Sys.sleep(Inf))
) |> bind_task_button("block")
observeEvent(input$btn, task$invoke(x = input$n, y = input$delay))
observeEvent(input$block, block$invoke())
observeEvent(input$cancel, stop_mirai(m))
observe({
updateActionButton(session, "cancel", disabled = block$status() != "running")
})
output$plot <- renderPlot(hist(task$result()))
}
# run app using 1 local daemon
daemons(1)
# automatically shutdown daemons when app exits
onStop(function() daemons(0))
shinyApp(ui = ui, server = server)
Thanks to Joe Cheng for providing examples on which the above is based.
Shiny ExtendedTask: Generative Art
The following app produces pretty spiral patterns.
The user can add multiple plots, making use of Shiny modules, each having a different calculation time.
The plots are generated asynchronously, and it is easy to see the practical limitations of the number of daemons set. For example, if updating 4 plots, and there are only 3 daemons, the 4th plot will not start to be generated until one of the other plots has finished.
By wrapping the runApp()
call in
with(daemons(...), ...)
the daemons are set up for the
duration of the app, exiting automatically when the app is stopped.
library(shiny)
library(mirai)
library(bslib)
library(ggplot2)
library(aRtsy)
# function definitions
run_task <- function(calc_time) {
Sys.sleep(calc_time)
list(
colors = aRtsy::colorPalette(name = "random", n = 3),
angle = runif(n = 1, min = - 2 * pi, max = 2 * pi),
size = 1,
p = 1
)
}
plot_result <- function(result) {
do.call(what = canvas_phyllotaxis, args = result)
}
# modules for individual plots
plotUI <- function(id, calc_time) {
ns <- NS(id)
card(
strong(paste0("Plot (calc time = ", calc_time, " secs)")),
input_task_button(ns("resample"), "Resample"),
plotOutput(ns("plot"), height="400px", width="400px")
)
}
plotServer <- function(id, calc_time) {
force(id)
force(calc_time)
moduleServer(
id,
function(input, output, session) {
task <- ExtendedTask$new(
function(time, run) mirai(run(time), environment())
) |> bind_task_button("resample")
observeEvent(input$resample, task$invoke(calc_time, run_task))
output$plot <- renderPlot(plot_result(task$result()))
}
)
}
# ui and server
ui <- page_sidebar(fillable = FALSE,
sidebar = sidebar(
numericInput("calc_time", "Calculation time (secs)", 5),
actionButton("add", "Add", class="btn-primary"),
),
layout_column_wrap(id = "results", width = "400px", fillable = FALSE)
)
server <- function(input, output, session) {
observeEvent(input$add, {
id <- nanonext::random(4)
insertUI("#results", where = "beforeEnd", ui = plotUI(id, input$calc_time))
plotServer(id, input$calc_time)
})
}
app <- shinyApp(ui, server)
# run app using 3 local daemons
with(daemons(3), runApp(app))
The above example builds on original code by Joe Cheng, Daniel Woodie and William Landau.
The above uses environment()
instead of ...
as an alternative and equivalent way of passing variables present in the
calling environment to the mirai.
The key components to using this ExtendedTask example are:
- In the UI, use
bslib::input_task_button()
. This is a button which is disabled during computation to prevent additional clicks.
input_task_button(ns("resample"), "Resample")
- In the server, create an ExtendedTask object by calling
ExtendedTask$new()
on an anonymous function passing named arguments tomirai()
, and bind it to the button created in (1). These are passed through to the mirai by the use ofenvironment()
.
task <- ExtendedTask$new(
function(time, run) mirai(run(time), environment())
) |> bind_task_button("resample")
- In the server, create an observer on the input button, which invokes the ExtendedTask, supplying the arguments to the anonymous function above.
observeEvent(input$resample, task$invoke(calc_time, run_task))
- In the server, create a render function for the output, which consumes the result of the ExtendedTask.
output$plot <- renderPlot(plot_result(task$result()))
Shiny ExtendedTask: mirai map
A mirai_map
also has an as.promise()
method, which allows it to be used directly in a Shiny ExtendedTask. It
will resolve when the entire map operation completes or at least one
mirai in the map is rejected.
This example, uses mirai_map()
to perform multiple
calculations simultaneously in multiple daemons, returning the results
asynchronously.
library(shiny)
library(bslib)
library(mirai)
ui <- page_fluid(
titlePanel("ExtendedTask Map Demo"),
hr(),
p("The time is ", textOutput("current_time", inline = TRUE)),
p("Perform 4 calculations that each take between 1 and 4 secs to complete:"),
input_task_button("calculate", "Calculate"),
p(textOutput("result")),
tags$style(type="text/css", "#result {white-space: pre-wrap;}")
)
server <- function(input, output) {
task <- ExtendedTask$new(function() {
mirai_map(1:4, function(i) {
# simulated long calculation
Sys.sleep(i)
sprintf(
"Calc %d | PID %d | Finished at %s.", i, Sys.getpid(), format(Sys.time())
)
})
}) |> bind_task_button("calculate")
observeEvent(input$calculate, {
task$invoke()
})
output$result <- renderText({
# result of mirai_map() is a list
as.character(task$result())
}, sep = "\n")
output$current_time <- renderText({
invalidateLater(1000)
format(Sys.time(), "%H:%M:%S %p")
})
}
app <- shinyApp(ui, server)
with(daemons(4), runApp(app))
Shiny Async: Coin Flips
The below example demonstrates how to integrate a
mirai_map()
operation into a Shiny app in an observer,
without using ExtendedTask.
By specifying the ‘.promise’ argument, this registers a promise action against each mapped operation. These can then be used to update reactive values or otherwise interact with the Shiny app.
library(shiny)
library(mirai)
flip_coin <- function(...) {
Sys.sleep(0.1)
rbinom(n = 1, size = 1, prob = 0.501)
}
ui <- fluidPage(
div("Is the coin fair?"),
actionButton("task", "Flip 1000 coins"),
textOutput("status"),
textOutput("outcomes")
)
server <- function(input, output, session) {
# Keep running totals of heads, tails, and task errors
flips <- reactiveValues(heads = 0, tails = 0, flips = 0)
# Button to submit a batch of coin flips
observeEvent(input$task, {
mirai_map(
1:1000,
flip_coin,
.promise = \(x) {
if (x) flips$heads <- flips$heads + 1 else flips$tails <- flips$tails + 1
}
)
# Ensure there is something after mirai_map() in the observer, as it is
# convertible to a promise, and will otherwise be waited for before returning
flips$flips <- flips$flips + 1000
})
# Print time and task status
output$status <- renderText({
invalidateLater(millis = 1000)
time <- format(Sys.time(), "%H:%M:%S")
sprintf("%s | %s flips submitted", time, flips$flips)
})
# Print number of heads and tails
output$outcomes <- renderText(
sprintf("%s heads %s tails", flips$heads, flips$tails)
)
}
app <- shinyApp(ui = ui, server = server)
# run app using 8 local non-dispatcher daemons (tasks are the same length)
with(daemons(8, dispatcher = FALSE), {
# pre-load flip_coin function on all daemons for efficiency
everywhere({}, flip_coin = flip_coin)
runApp(app)
})
This is an adaptation of an original example provided by Will
Landau for use of crew
with Shiny. Please see https://wlandau.github.io/crew/articles/shiny.html.
Shiny Async: Progress Bar
The below example uses a mirai_map()
operation in an
observer to update a Shiny progress bar with custom messages, and also
to update a reactive value once the entire map operation has completed
(asynchronously).
library(shiny)
library(mirai)
library(promises)
slow_squared <- function(x) {
Sys.sleep(runif(1))
x^2
}
ui <- fluidPage(
titlePanel("Asynchronous Squares Calculator"),
p("The time is ", textOutput("current_time", inline = TRUE)),
hr(),
actionButton("start", "Start Calculation"),
br(), br(),
uiOutput("progress_ui"),
verbatimTextOutput("result")
)
server <- function(input, output, session) {
x <- 1:100
y <- reactiveVal()
observeEvent(input$start, {
progress <- Progress$new(session, min = 0, max = length(x))
progress$set(message = "Parallel calculation in progress", detail = "Starting...")
completed <- reactiveVal(0)
mirai_map(
x,
slow_squared,
slow_squared = slow_squared,
.promise = function(result) {
new_val <- completed() + 1
completed(new_val) # Increment completed counter
progress$inc(1, detail = paste("Completed", new_val)) # Update progress
}
) %...>% {
y(unlist(.))
progress$close()
}
# Ensure there is something after mirai_map() in the observer, as otherwise
# the created promise will be waited for before returning
y(0)
})
output$current_time <- renderText({
invalidateLater(1000)
format(Sys.time(), "%H:%M:%S %p")
})
output$result <- renderPrint({
cat("Sum of squares calculated: ", sum(y()), "\n")
})
}
app <- shinyApp(ui, server)
with(daemons(8), runApp(app))
This example adapts a contribution from Davide Magno.
Plumber GET Endpoint
mirai may be used as an asynchronous backend for plumber
pipelines.
In this example, the plumber router code is run in a daemon process
itself so that it does not block the current process - this is useful in
interactive sessions, but otherwise just taking the code within the
outer mirai()
call will suffice.
The /echo endpoint takes a GET request, sleeps for 1 second (simulating an expensive computation) and simply returns the ‘msg’ request header together with a timestamp and the process ID of the process it is run on.
library(mirai)
# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1
m <- mirai({
library(plumber)
library(promises) # to provide the promise pipe
library(mirai)
# more efficient not to use dispatcher if all requests are similar length
daemons(4L, dispatcher = FALSE) # handles 4 requests simultaneously
pr() |>
pr_get(
"/echo",
function(req, res) {
mirai(
{
Sys.sleep(1L)
list(
status = 200L,
body = list(
time = format(Sys.time()), msg = msg, pid = Sys.getpid()
)
)
},
msg = req[["HEADERS"]][["msg"]]
) %...>% (function(x) {
res$status <- x$status
res$body <- x$body
})
}
) |>
pr_run(host = "127.0.0.1", port = 8985)
})
The API can be queried using an async HTTP client such as
nanonext::ncurl_aio()
.
Here, all 8 requests are submitted at once, but we note that that responses have differing timestamps as only 4 can be processed at any one time (limited by the number of daemons set).
library(nanonext)
res <- lapply(
1:8,
function(i) ncurl_aio(
"http://127.0.0.1:8985/echo",
headers = c(msg = as.character(i))
)
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2025-05-23 08:44:46\"],\"msg\":[\"1\"],\"pid\":[2395]}"
#>
#> [[2]]
#> [1] "{\"time\":[\"2025-05-23 08:44:46\"],\"msg\":[\"2\"],\"pid\":[2397]}"
#>
#> [[3]]
#> [1] "{\"time\":[\"2025-05-23 08:44:46\"],\"msg\":[\"3\"],\"pid\":[2407]}"
#>
#> [[4]]
#> [1] "{\"time\":[\"2025-05-23 08:44:46\"],\"msg\":[\"4\"],\"pid\":[2413]}"
#>
#> [[5]]
#> [1] "{\"time\":[\"2025-05-23 08:44:47\"],\"msg\":[\"5\"],\"pid\":[2397]}"
#>
#> [[6]]
#> [1] "{\"time\":[\"2025-05-23 08:44:47\"],\"msg\":[\"6\"],\"pid\":[2395]}"
#>
#> [[7]]
#> [1] "{\"time\":[\"2025-05-23 08:44:47\"],\"msg\":[\"7\"],\"pid\":[2407]}"
#>
#> [[8]]
#> [1] "{\"time\":[\"2025-05-23 08:44:47\"],\"msg\":[\"8\"],\"pid\":[2413]}"
daemons(0)
#> [1] 0
Plumber POST Endpoint
This is the equivalent using a POST endpoint, accepting a JSON instruction sent as request data.
Note that req$postBody
should always be accessed in the
router process and passed in as an argument to the ‘mirai’, as this is
retrieved using a connection that is not serializable.
library(mirai)
# supply SIGINT so the plumber server is interrupted and exits cleanly when finished
daemons(1L, dispatcher = FALSE, autoexit = tools::SIGINT)
#> [1] 1
m <- mirai({
library(plumber)
library(promises) # to provide the promise pipe
library(mirai)
# uses dispatcher - suitable when requests take differing times to complete
daemons(4L) # handles 4 requests simultaneously
pr() |>
pr_post(
"/echo",
function(req, res) {
mirai(
{
Sys.sleep(1L) # simulate expensive computation
list(
status = 200L,
body = list(
time = format(Sys.time()),
msg = jsonlite::fromJSON(data)[["msg"]],
pid = Sys.getpid()
)
)
},
data = req$postBody
) %...>% (function(x) {
res$status <- x$status
res$body <- x$body
})
}
) |>
pr_run(host = "127.0.0.1", port = 8986)
})
Querying the endpoint produces the same set of outputs as the previous example.
library(nanonext)
res <- lapply(
1:8,
function(i) ncurl_aio(
"http://127.0.0.1:8986/echo",
method = "POST",
data = sprintf('{"msg":"%d"}', i)
)
)
collect_aio(res)
#> [[1]]
#> [1] "{\"time\":[\"2025-05-23 08:44:50\"],\"msg\":[\"1\"],\"pid\":[2470]}"
#>
#> [[2]]
#> [1] "{\"time\":[\"2025-05-23 08:44:50\"],\"msg\":[\"2\"],\"pid\":[2472]}"
#>
#> [[3]]
#> [1] "{\"time\":[\"2025-05-23 08:44:50\"],\"msg\":[\"3\"],\"pid\":[2482]}"
#>
#> [[4]]
#> [1] "{\"time\":[\"2025-05-23 08:44:51\"],\"msg\":[\"4\"],\"pid\":[2482]}"
#>
#> [[5]]
#> [1] "{\"time\":[\"2025-05-23 08:44:50\"],\"msg\":[\"5\"],\"pid\":[2488]}"
#>
#> [[6]]
#> [1] "{\"time\":[\"2025-05-23 08:44:51\"],\"msg\":[\"6\"],\"pid\":[2470]}"
#>
#> [[7]]
#> [1] "{\"time\":[\"2025-05-23 08:44:51\"],\"msg\":[\"7\"],\"pid\":[2488]}"
#>
#> [[8]]
#> [1] "{\"time\":[\"2025-05-23 08:44:51\"],\"msg\":[\"8\"],\"pid\":[2472]}"
daemons(0)
#> [1] 0