1. Request Reply Protocol
nanonext implements remote procedure calls (RPC) using
NNG’s req/rep protocol for distributed computing. Use this for
computationally-expensive calculations or I/O-bound operations in
separate server processes.
[S] Server process: reply() waits for a
message, applies a function, and sends back the result. Started in a
background ‘mirai’ process.
m <- mirai::mirai({
library(nanonext)
rep <- socket("rep", listen = "tcp://127.0.0.1:6556")
reply(context(rep), execute = rnorm, send_mode = "raw")
Sys.sleep(2) # linger period to flush system socket send
})[C] Client process: request() performs
async send/receive, returning immediately with a recvAio
object.
req <- socket("req", dial = "tcp://127.0.0.1:6556")
aio <- request(context(req), data = 1e8, recv_mode = "double")The client can now run additional code while the server processes the request.
# do more...When the result is needed, call the recvAio using
call_aio() to retrieve the value at $data.
Since call_aio() blocks, alternatively query
aio$data directly, which returns ‘unresolved’ (logical NA)
if incomplete.
For server-side operations (e.g., writing to disk), calling or querying the value confirms completion and provides the function’s return value (typically NULL or an exit code).
The mirai
package (https://mirai.r-lib.org/) uses nanonext as
the back-end to provide asynchronous execution of arbitrary R code using
the RPC model.
2. Publisher Subscriber Protocol
nanonext implements NNG’s pub/sub protocol. Subscribers
can subscribe to one or multiple topics broadcast by a publisher.
pub <- socket("pub", listen = "inproc://nanobroadcast")
sub <- socket("sub", dial = "inproc://nanobroadcast")
sub |> subscribe(topic = "examples")
pub |> send(c("examples", "this is an example"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples" "this is an example"
pub |> send("examples at the start of a single text message", mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples at the start of a single text message"
pub |> send(c("other", "this other topic will not be received"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> 'errorValue' int 8 | Try again
# specify NULL to subscribe to ALL topics
sub |> subscribe(topic = NULL)
pub |> send(c("newTopic", "this is a new topic"), mode = "raw")
#> [1] 0
sub |> recv("character")
#> [1] "newTopic" "this is a new topic"
sub |> unsubscribe(topic = NULL)
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw")
#> [1] 0
sub |> recv("character")
#> 'errorValue' int 8 | Try again
# however the topics explicitly subscribed to are still received
pub |> send(c("examples will still be received"), mode = "raw")
#> [1] 0
sub |> recv(mode = "character")
#> [1] "examples will still be received"The subscribed topic can be of any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent and received.
3. Surveyor Respondent Protocol
Useful for service discovery and similar applications. A surveyor broadcasts a survey to all respondents, who may reply within a timeout period. Late responses are discarded.
sur <- socket("surveyor", listen = "inproc://nanoservice")
res1 <- socket("respondent", dial = "inproc://nanoservice")
res2 <- socket("respondent", dial = "inproc://nanoservice")
# sur sets a survey timeout, applying to this and subsequent surveys
sur |> survey_time(value = 500)
# sur sends a message and then requests 2 async receives
sur |> send("service check")
#> [1] 0
aio1 <- sur |> recv_aio()
aio2 <- sur |> recv_aio()
# res1 receives the message and replies using an aio send function
res1 |> recv()
#> [1] "service check"
res1 |> send_aio("res1")
# res2 receives the message but fails to reply
res2 |> recv()
#> [1] "service check"
# checking the aio - only the first will have resolved
aio1$data
#> [1] "res1"
aio2$data
#> 'unresolved' logi NA
# after the survey expires, the second resolves into a timeout error
msleep(500)
aio2$data
#> 'errorValue' int 5 | Timed out
close(sur)
close(res1)
close(res2)msleep() is an uninterruptible sleep function (using
NNG) that takes a time in milliseconds.
The final value resolves to a timeout error (integer 5 classed as ‘errorValue’). All error codes are classed as ‘errorValue’ for easy distinction from integer message values.
