Skip to content

1. Cross-language Exchange

nanonext provides a fast, reliable data interface between different programming languages where NNG has an implementation, including C, C++, Java, Python, Go, and Rust.

This messaging interface is lightweight, robust, and has limited points of failure. It enables:

  • Communication between processes in the same or different languages
  • Distributed computing across networks or on the same machine
  • Real-time data pipelines where computation times exceed data frequency
  • Modular software design following Unix philosophy

This example demonstrates numerical data exchange between R and Python (NumPy).

Create socket in Python using the NNG binding ‘pynng’:

import numpy as np
import pynng
socket = pynng.Pair0(listen="ipc:///tmp/nanonext.socket")

Create nano object in R using nanonext, then send a vector of ‘doubles’, specifying mode as ‘raw’:

library(nanonext)
n <- nano("pair", dial = "ipc:///tmp/nanonext.socket")
n$send(c(1.1, 2.2, 3.3, 4.4, 5.5), mode = "raw")
#> [1] 0

Receive in Python as a NumPy array of ‘floats’, and send back to R:

raw = socket.recv()
array = np.frombuffer(raw)
print(array)
#> [1.1 2.2 3.3 4.4 5.5]

msg = array.tobytes()
socket.send(msg)

socket.close()

Receive in R, specifying the receive mode as ‘double’:

n$recv(mode = "double")
#> [1] 1.1 2.2 3.3 4.4 5.5

n$close()

2. Async and Concurrency

nanonext implements true async send and receive, leveraging NNG as a massively-scalable concurrency framework.

s1 <- socket("pair", listen = "inproc://nano")
s2 <- socket("pair", dial = "inproc://nano")

send_aio() and recv_aio() return immediately with an ‘Aio’ object that performs operations asynchronously. Aio objects return an unresolved value while the operation is ongoing, then automatically resolve once complete.

# async receive requested, but no messages waiting yet
msg <- recv_aio(s2)
msg
#> < recvAio | $data >
msg$data
#> 'unresolved' logi NA

For ‘sendAio’ objects, the result is stored at $result. For ‘recvAio’ objects, the message is stored at $data.

res <- send_aio(s1, data.frame(a = 1, b = 2))
res
#> < sendAio | $result >
res$result
#> [1] 0

0 indicates successful send - the message has been accepted by the socket for sending but may still be buffered within the system.

# once a message is sent, the recvAio resolves automatically
msg$data
#>   a b
#> 1 1 2

Use unresolved() in control flow to perform actions before or after Aio resolution without blocking.

msg <- recv_aio(s2)

# unresolved() checks resolution status
while (unresolved(msg)) {
  # perform other tasks while waiting
  send_aio(s1, "resolved")
  cat("unresolved")
}
#> unresolved

# access resolved value
msg$data
#> [1] "resolved"

Explicitly wait for completion with call_aio() (blocking).

# wait for completion and return resolved Aio
call_aio(msg)

# access resolved value (waiting if required):
call_aio(msg)$data
#> [1] "resolved"

# or directly:
collect_aio(msg)
#> [1] "resolved"

# or user-interruptible:
msg[]
#> [1] "resolved"

close(s1)
close(s2)

3. Synchronisation Primitives

nanonext implements cross-platform synchronisation primitives from the NNG library, enabling synchronisation between NNG events and the main R execution thread.

Condition variables can signal events such as asynchronous receive completions and pipe events (connections established or dropped). Each condition variable has a value (counter) and flag (boolean). Signals increment the value; successful wait() or until() calls decrement it. A non-zero value allows waiting threads to continue.

This approach is more efficient than polling - consuming no resources while waiting and synchronising with zero latency.

Example 1: Wait for connection

sock <- socket("pair", listen = "inproc://nanopipe")

cv <- cv()
cv_value(cv)
#> [1] 0

pipe_notify(sock, cv = cv, add = TRUE, remove = TRUE)

# wait(cv) would block until connection established

# for illustration:
sock2 <- socket("pair", dial = "inproc://nanopipe")

cv_value(cv) # incremented when pipe created
#> [1] 1

wait(cv) # does not block as cv value is non-zero

cv_value(cv) # decremented by wait()
#> [1] 0

close(sock2)

cv_value(cv) # incremented when pipe destroyed
#> [1] 1

close(sock)

Example 2: Wait for message or disconnection

sock <- socket("pair", listen = "inproc://nanosignal")
sock2 <- socket("pair", dial = "inproc://nanosignal")

cv <- cv()
cv_value(cv)
#> [1] 0

pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)

send(sock2, "this message will wake waiting thread")
#> [1] 0

r <- recv_aio(sock, cv = cv)

# wakes when async receive completes
wait(cv) || stop("peer disconnected")
#> [1] TRUE

r$data
#> [1] "this message will wake waiting thread"

close(sock)
close(sock2)

When flag = TRUE is set for pipe notifications, wait() returns FALSE for pipe events (rather than TRUE for message events). This distinguishes between disconnections and successful receives, something not possible using call_aio() alone.

This mechanism enables waiting simultaneously on multiple events while distinguishing between them. pipe_notify() can signal up to two condition variables per event for additional flexibility in concurrent applications.