Skip to content

1. Serialization: Arrow, polars and beyond

Native R serialization transfers data between host and daemons. Objects accessed via external pointers cannot be serialized and normally error in mirai operations.

Using arrow as an example:

library(mirai)
library(arrow, warn.conflicts = FALSE)
daemons(1)
everywhere(library(arrow))

x <- as_arrow_table(iris)

m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> 'miraiError' chr Error: Invalid <Table>, external pointer to null

daemons(0)

serial_config() creates custom serialization configurations with functions that hook into R’s native serialization mechanism for reference objects (‘refhooks’).

Pass this configuration to the ‘serial’ argument of daemons():

cfg <- serial_config(
  "ArrowTabular",
  arrow::write_to_raw,
  function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
)

daemons(1, serial = cfg)

everywhere(library(arrow))

m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> $a
#> Table
#> 6 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <dictionary<values=string, indices=int8>>
#> 
#> See $metadata for additional Schema metadata
#> 
#> $b
#> [1] "some text"

daemons(0)

The arrow table now handles seamlessly, even when deeply nested in lists or other structures.

Register multiple serialization functions for different object classes. This example combines Arrow with polars, a Rust dataframe library (requires polars >= 1.0.0):

daemons(
  n = 1,
  serial = serial_config(
    c("ArrowTabular", "polars_data_frame"),
    list(arrow::write_to_raw, \(x) x$serialize()),
    list(function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE), polars::pl$deserialize_df)
  )
)

x <- polars::as_polars_df(iris)

m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> $a
#> shape: (6, 5)
#> ┌──────────────┬─────────────┬──────────────┬─────────────┬─────────┐
#> │ Sepal.Length ┆ Sepal.Width ┆ Petal.Length ┆ Petal.Width ┆ Species │
#> │ ---          ┆ ---         ┆ ---          ┆ ---         ┆ ---     │
#> │ f64          ┆ f64         ┆ f64          ┆ f64         ┆ cat     │
#> ╞══════════════╪═════════════╪══════════════╪═════════════╪═════════╡
#> │ 5.1          ┆ 3.5         ┆ 1.4          ┆ 0.2         ┆ setosa  │
#> │ 4.9          ┆ 3.0         ┆ 1.4          ┆ 0.2         ┆ setosa  │
#> │ 4.7          ┆ 3.2         ┆ 1.3          ┆ 0.2         ┆ setosa  │
#> │ 4.6          ┆ 3.1         ┆ 1.5          ┆ 0.2         ┆ setosa  │
#> │ 5.0          ┆ 3.6         ┆ 1.4          ┆ 0.2         ┆ setosa  │
#> │ 5.4          ┆ 3.9         ┆ 1.7          ┆ 0.4         ┆ setosa  │
#> └──────────────┴─────────────┴──────────────┴─────────────┴─────────┘
#> 
#> $b
#> [1] "some text"

daemons(0)

2. Serialization: Torch

torch tensors work seamlessly in mirai computations.

Setup:

  1. Create serialization configuration with ‘class’ as ‘torch_tensor’
  2. Set up daemons, supplying configuration to ‘serial’
  3. (Optional) Use everywhere() to load torch on all daemons
library(mirai)
library(torch)

cfg <- serial_config(
  class = "torch_tensor",
  sfunc = torch::torch_serialize,
  ufunc = torch::torch_load
)

daemons(1, serial = cfg)

everywhere(library(torch))

Example Usage:

This creates a convolutional neural network with torch::nn_module(), specifies parameters, then initializes them in a parallel process:

model <- nn_module(
  initialize = function(in_size, out_size) {
    self$conv1 <- nn_conv2d(in_size, out_size, 5)
    self$conv2 <- nn_conv2d(in_size, out_size, 5)
  },
  forward = function(x) {
    x <- self$conv1(x)
    x <- nnf_relu(x)
    x <- self$conv2(x)
    x <- nnf_relu(x)
    x
  }
)

params <- list(in_size = 1, out_size = 20)

m <- mirai(do.call(model, params), model = model, params = params)

m[]
#> An `nn_module` containing 1,040 parameters.
#> 
#> ── Modules ─────────────────────────────────────────────────────────────────────────────────
#> • conv1: <nn_conv2d> #520 parameters
#> • conv2: <nn_conv2d> #520 parameters

The returned model contains many tensor elements:

m$data$parameters$conv1.weight
#> torch_tensor
#> (1,1,.,.) = 
#>  -0.1218  0.1835 -0.1114 -0.1365 -0.1824
#>   0.1107 -0.0498 -0.1219 -0.0938 -0.1570
#>  -0.1944  0.0355  0.1750 -0.1612 -0.1590
#>  -0.0806 -0.1906 -0.0272 -0.1732 -0.0491
#>  -0.0079 -0.0874 -0.1256  0.1276  0.0664
#> 
#> (2,1,.,.) = 
#>  -0.1450 -0.0371  0.0601 -0.1578  0.0918
#>   0.1118 -0.0800  0.0359  0.0452  0.1182
#>   0.0516  0.0109  0.0186  0.1399 -0.1431
#>   0.1720 -0.0919  0.0616  0.0937  0.1511
#>  -0.0270  0.0936  0.1510  0.1995  0.1934
#> 
#> (3,1,.,.) = 
#>   0.1055  0.0056  0.0491 -0.0096  0.0655
#>   0.1950  0.0676  0.0254  0.0834 -0.0401
#>   0.1658  0.1767 -0.0338  0.1644  0.1806
#>  -0.0346 -0.1521  0.0490  0.1153  0.0755
#>  -0.0832  0.0074 -0.0607  0.1704  0.1454
#> 
#> (4,1,.,.) = 
#>   0.1091  0.1982  0.1185  0.1655  0.1716
#>   0.1987 -0.0517 -0.0115 -0.0641  0.0294
#>   0.0078 -0.0942 -0.1629 -0.1114 -0.0833
#>   0.0395 -0.0101  0.0837  0.1523  0.0673
#>  -0.0984  0.0988 -0.1154  0.0453 -0.1577
#> 
#> (5,1,.,.) = 
#>   0.0630 -0.0820 -0.1399  0.0528 -0.0896
#> ... [the output was truncated (use n=-1 to disable)]
#> [ CPUFloatType{20,1,5,5} ][ requires_grad = TRUE ]

Pass model parameters to an optimizer, also initialized in a parallel process:

optim <- mirai(optim_rmsprop(params = params), params = m$data$parameters)

optim[]
#> <optim_rmsprop>
#>   Inherits from: <torch_optimizer>
#>   Public:
#>     add_param_group: function (param_group) 
#>     clone: function (deep = FALSE) 
#>     defaults: list
#>     initialize: function (params, lr = 0.01, alpha = 0.99, eps = 1e-08, weight_decay = 0, 
#>     load_state_dict: function (state_dict, ..., .refer_to_state_dict = FALSE) 
#>     param_groups: list
#>     state: State, R6
#>     state_dict: function () 
#>     step: function (closure = NULL) 
#>     zero_grad: function (set_to_none = FALSE) 
#>   Private:
#>     deep_clone: function (name, value) 
#>     step_helper: function (closure, loop_fun)

daemons(0)

Tensors and complex objects containing tensors pass seamlessly between host and daemons like any R object.

Custom serialization leverages R’s native ‘refhook’ mechanism for transparent usage. Fast and efficient, it minimizes data copies and uses official torch serialization methods directly.

3. Database Hosting using Arrow Database Connectivity

Use DBI to access and manipulate Apache Arrow data efficiently through ADBC (Arrow Database Connectivity).

This creates an in-memory SQLite connection using the adbcsqlite backend.

Serialization uses arrow functions in the daemons() call. The class is ‘nanoarrow_array_stream’ since nanoarrow backs all DBI db*Arrow() queries:

library(mirai)

cfg <- serial_config(
  class = "nanoarrow_array_stream",
  sfunc = arrow::write_to_raw,
  ufunc = function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
)

daemons(1, serial = cfg)

everywhere(
  {
    library(DBI) # `adbi` and `adbcsqlite` packages must also be installed
    con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = ":memory:")
  }
)

Use mirai() to write or query the database in Arrow format:

m <- mirai(dbWriteTableArrow(con, "iris", iris))
m[]
#> [1] TRUE
m <- mirai(dbReadTableArrow(con, "iris"))
m[]
#> Table
#> 150 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
m <- mirai(dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6'))
m[]
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>

Tight integration with R’s ‘refhook’ system allows returning complex nested objects with multiple Arrow queries:

m <- mirai({
  a <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6')
  b <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Width" < 2.6')
  x <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Length" < 1.5')
  y <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Width" < 0.2')
  list(sepal = list(length = a, width = b), petal = list(length = x, width = y))
})
m[]
#> $sepal
#> $sepal$length
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
#> 
#> $sepal$width
#> Table
#> 19 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
#> 
#> 
#> $petal
#> $petal$length
#> Table
#> 24 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>
#> 
#> $petal$width
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length <double>
#> $Sepal.Width <double>
#> $Petal.Length <double>
#> $Petal.Width <double>
#> $Species <string>

Use everywhere() to cleanly tear down databases before resetting daemons:

everywhere(dbDisconnect(con))
daemons(0)

4. Shiny / mirai / DBI / ADBC Integrated Example

This demonstrates database connections hosted in mirai daemons powering a Shiny app.

One-time serialization() setup ensures seamless Arrow data transport in the global environment outside server().

Each Shiny session creates a new database connection in a new daemon process, freeing resources when the session ends. This logic lives in server(). A unique ID identifies each session and specifies the daemons ‘compute profile’.

Non-dispatcher daemons work since scheduling isn’t needed (all queries take a similar time, each session uses one daemon).

Shiny ExtendedTask performs queries via mirai() using the session-specific compute profile:

library(mirai)
library(secretbase)
library(shiny)
library(bslib)

# create an Arrow serialization configuration
cfg <- serial_config(
  class = "nanoarrow_array_stream",
  sfunc = arrow::write_to_raw,
  ufunc = nanoarrow::read_nanoarrow
)

# write 'iris' dataset to temp database file (for this demonstration)
file <- tempfile()
con <- DBI::dbConnect(adbi::adbi("adbcsqlite"), uri = file)
DBI::dbWriteTableArrow(con, "iris", iris)
DBI::dbDisconnect(con)

# common input parameters
slmin <- min(iris$Sepal.Length)
slmax <- max(iris$Sepal.Length)

ui <- page_fluid(
  p("The time is ", textOutput("current_time", inline = TRUE)),
  hr(),
  h3("Shiny / mirai / DBI / ADBC demonstration"),
  p("New daemon-hosted database connection is created for every Shiny session"),
  sliderInput(
    "sl", "Query iris dataset based on Sepal Length", min = slmin, max = slmax,
    value = c(slmin, slmax), width = "75%"
  ),
  input_task_button("btn", "Return query"),
  tableOutput("table")
)

# uses Shiny ExtendedTask with mirai
server <- function(input, output, session) {

  # create unique session id by hashing current time with a random key
  id <- secretbase::siphash13(Sys.time(), key = nanonext::random(4L))

  # create new daemon for each session
  daemons(1L, serial = cfg, .compute = id)

  # tear down daemon when session ends
  session$onEnded(function() daemons(0L, .compute = id))

  # everywhere() loads DBI and creates ADBC connection in each daemon
  # and sets up serialization
  everywhere(
    {
      library(DBI) # `adbi` and `adbcsqlite` packages must also be installed
      con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = file)
    },
    file = file,
    .compute = id
  )

  output$current_time <- renderText({
    invalidateLater(1000)
    format(Sys.time(), "%H:%M:%S %p")
  })

  task <- ExtendedTask$new(
    function(...) mirai(
      dbGetQueryArrow(
        con,
        sprintf(
          "SELECT * FROM iris WHERE \"Sepal.Length\" BETWEEN %.2f AND %.2f",
          sl[1L],
          sl[2L]
        )
      ),
      ...,
      .compute = id
    )
  ) |> bind_task_button("btn")

  observeEvent(input$btn, task$invoke(sl = input$sl))

  output$table <- renderTable(task$result())

}

# run Shiny app
shinyApp(ui = ui, server = server)

# deletes temp database file (for this demonstration)
unlink(file)