Core Concepts
nanonext provides bindings to NNG (Nanomsg Next Gen), a high-performance messaging library for building distributed systems.
This is a cheatsheet. Refer to the other vignettes for detailed introductions:
- Messaging and Async I/O - cross-language exchange, async operations, synchronisation
- Scalability Protocols - req/rep, pub/sub, surveyor/respondent
- Configuration and Security - TLS, options, serialization, statistics
- Web Toolkit - HTTP client/server, WebSocket, streaming
Key Takeaways
- Sockets connect via URLs using scalability protocols (req/rep, pub/sub, etc.)
-
Transports:
inproc://(in-process),ipc://(inter-process),tcp://,ws://,wss://,tls+tcp:// -
Async I/O:
send_aio()/recv_aio()return immediately; access results via$dataor$result -
Modes:
"serial"(R objects),"raw"(bytes),"double","integer","character", etc. -
Condition variables:
cv()for zero-latency event synchronisation
1. Sockets and Connections
2. Send and Receive
Receive Modes
| Mode | Description |
|---|---|
"serial" / 1
|
R serialization (default) |
"character" / 2
|
Coerce to character |
"complex" / 3
|
Coerce to complex |
"double" / 4
|
Coerce to double |
"integer" / 5
|
Coerce to integer |
"logical" / 6
|
Coerce to logical |
"numeric" / 7
|
Coerce to numeric |
"raw" / 8
|
Raw bytes |
"string" / 9
|
Fast option for length-1 character |
3. Async I/O
Basic Async
# Async send - returns immediately
res <- send_aio(s, data)
res$result # 0 = success, error code otherwise
# Async receive - returns immediately
msg <- recv_aio(s)
msg$data # Value when resolved, 'unresolved' NA otherwise
# Check if resolved
unresolved(msg) # TRUE while pending
# Wait for resolution
call_aio(msg) # Blocks, returns Aio object
collect_aio(msg) # Blocks, returns value directly
msg[] # Blocks (user-interruptible), returns valueNon-blocking Patterns
# Poll while doing other work
while (unresolved(msg)) {
# do other tasks
}
result <- msg$data
# Multiple async operations
msg1 <- recv_aio(s1)
msg2 <- recv_aio(s2)
# Both run concurrently4. Condition Variables
Pipe Notifications
# Signal on connection/disconnection
pipe_notify(socket, cv = cv, add = TRUE, remove = TRUE)
# Distinguish message vs disconnect with flag
pipe_notify(socket, cv = cv, remove = TRUE, flag = TRUE)
r <- recv_aio(socket, cv = cv)
wait(cv) || stop("disconnected") # FALSE = pipe event6. Pub/Sub
pub <- socket("pub", listen = "inproc://pubsub")
sub <- socket("sub", dial = "inproc://pubsub")
# Subscribe to topic (prefix matching)
subscribe(sub, topic = "news")
subscribe(sub, topic = NULL) # All topics
# Unsubscribe
unsubscribe(sub, topic = "news")
# Publish (topic is message prefix)
send(pub, c("news", "headline"), mode = "raw")
# Receive (includes topic)
recv(sub, mode = "character")
close(pub)
close(sub)7. Surveyor/Respondent
sur <- socket("surveyor", listen = "inproc://survey")
res1 <- socket("respondent", dial = "inproc://survey")
res2 <- socket("respondent", dial = "inproc://survey")
# Set survey timeout (ms)
survey_time(sur, 500)
# Broadcast survey
send(sur, "ping")
# Collect responses (async)
aio1 <- recv_aio(sur)
aio2 <- recv_aio(sur)
# Respondents reply
recv(res1)
send(res1, "pong1")
# Late/missing responses timeout (errorValue 5)
msleep(500)
aio2$data # errorValue if no response
close(sur)
close(res1)
close(res2)8. TLS Secure Connections
Self-signed Certificates
# Generate certificate (cn must match URL host exactly)
cert <- write_cert(cn = "127.0.0.1")
# Create TLS configs
server_tls <- tls_config(server = cert$server)
client_tls <- tls_config(client = cert$client)
# Use with tls+tcp:// or wss://
s1 <- socket(listen = "tls+tcp://127.0.0.1:5555", tls = server_tls)
s2 <- socket(dial = "tls+tcp://127.0.0.1:5555", tls = client_tls)CA Certificates
# Client with CA cert file
client_tls <- tls_config(client = "/path/to/ca-cert.pem")
# Server with cert + key
server_tls <- tls_config(server = c("/path/to/cert.pem", "/path/to/key.pem"))9. Options and Statistics
Common Options
| Option | Description |
|---|---|
"recv-size-max" |
Max message size (0 = unlimited) |
"send-timeout" |
Send timeout (ms) |
"recv-timeout" |
Receive timeout (ms) |
"reconnect-time-min" |
Min reconnect interval (ms) |
"reconnect-time-max" |
Max reconnect interval (ms) |
"req:resend-time" |
Request retry interval |
"sub:prefnew" |
Prefer newer messages |
Custom Serialization
# Register custom serializer for a class
serial <- serial_config(
"class_name",
function(x) serialize(x, NULL), # serialize
unserialize # unserialize
)
opt(socket, "serial") <- serial10. Contexts
Contexts enable concurrent operations on a single socket (for req/rep, surveyor/respondent).
11. Cross-language Exchange
12. Error Handling
# Errors return as 'errorValue' class
result <- recv(s, block = FALSE)
# Check for errors
is_error_value(result)
# Error codes
# 5 = Timed out
# 6 = Connection refused
# 8 = Try again (non-blocking, no message)
# Get error message
nng_error(5) # "Timed out"