Skip to content

1. Request Reply Protocol

nanonext implements remote procedure calls (RPC) using NNG’s req/rep protocol for distributed computing. Use this for computationally-expensive calculations or I/O-bound operations in separate server processes.

[S] Server process: reply() waits for a message, applies a function, and sends back the result. Started in a background ‘mirai’ process.

m <- mirai::mirai({
  library(nanonext)
  rep <- socket("rep", listen = "tcp://127.0.0.1:6556")
  reply(context(rep), execute = rnorm, send_mode = "raw")
  Sys.sleep(2) # linger period to flush system socket send
})

[C] Client process: request() performs async send/receive, returning immediately with a recvAio object.

req <- socket("req", dial = "tcp://127.0.0.1:6556")
aio <- request(context(req), data = 1e8, recv_mode = "double")

The client can now run additional code while the server processes the request.

# do more...

When the result is needed, call the recvAio using call_aio() to retrieve the value at $data.

call_aio(aio)$data |> str()
#>  num [1:100000000] -0.63 0.883 1.134 -0.474 -0.237 ...

Since call_aio() blocks, alternatively query aio$data directly, which returns ‘unresolved’ (logical NA) if incomplete.

For server-side operations (e.g., writing to disk), calling or querying the value confirms completion and provides the function’s return value (typically NULL or an exit code).

The mirai package (https://mirai.r-lib.org/) uses nanonext as the back-end to provide asynchronous execution of arbitrary R code using the RPC model.

2. Publisher Subscriber Protocol

nanonext implements NNG’s pub/sub protocol. Subscribers can subscribe to one or multiple topics broadcast by a publisher.

pub <- socket("pub", listen = "inproc://nanobroadcast")
sub <- socket("sub", dial = "inproc://nanobroadcast")

sub |> subscribe(topic = "examples")

pub |> send(c("examples", "this is an example"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples"           "this is an example"

pub |> send("examples at the start of a single text message", mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples at the start of a single text message"

pub |> send(c("other", "this other topic will not be received"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> 'errorValue' int 8 | Try again

# specify NULL to subscribe to ALL topics
sub |> subscribe(topic = NULL)
pub |> send(c("newTopic", "this is a new topic"), mode = "raw")
#> [1] 0
sub |> recv("character")
#> [1] "newTopic"            "this is a new topic"

sub |> unsubscribe(topic = NULL)
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw")
#> [1] 0
sub |> recv("character")
#> 'errorValue' int 8 | Try again

# however the topics explicitly subscribed to are still received
pub |> send(c("examples will still be received"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples will still be received"

The subscribed topic can be of any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent and received.

sub |> subscribe(topic = 1)
pub |> send(c(1, 10, 10, 20), mode = "raw")
#> [1] 0
sub |> recv(mode = "double")
#> [1]  1 10 10 20
pub |> send(c(2, 10, 10, 20), mode = "raw")
#> [1] 0
sub |> recv(mode = "double")
#> 'errorValue' int 8 | Try again

close(pub)
close(sub)

3. Surveyor Respondent Protocol

Useful for service discovery and similar applications. A surveyor broadcasts a survey to all respondents, who may reply within a timeout period. Late responses are discarded.

sur <- socket("surveyor", listen = "inproc://nanoservice")
res1 <- socket("respondent", dial = "inproc://nanoservice")
res2 <- socket("respondent", dial = "inproc://nanoservice")

# sur sets a survey timeout, applying to this and subsequent surveys
sur |> survey_time(value = 500)

# sur sends a message and then requests 2 async receives
sur |> send("service check")
#> [1] 0
aio1 <- sur |> recv_aio()
aio2 <- sur |> recv_aio()

# res1 receives the message and replies using an aio send function
res1 |> recv()
#> [1] "service check"
res1 |> send_aio("res1")

# res2 receives the message but fails to reply
res2 |> recv()
#> [1] "service check"

# checking the aio - only the first will have resolved
aio1$data
#> [1] "res1"
aio2$data
#> 'unresolved' logi NA

# after the survey expires, the second resolves into a timeout error
msleep(500)
aio2$data
#> 'errorValue' int 5 | Timed out

close(sur)
close(res1)
close(res2)

msleep() is an uninterruptible sleep function (using NNG) that takes a time in milliseconds.

The final value resolves to a timeout error (integer 5 classed as ‘errorValue’). All error codes are classed as ‘errorValue’ for easy distinction from integer message values.