MQTT Request (mqtt-request)¶
Synchronous MQTT v5 request/response in a single node. On every input message it generates a unique response topic and 16-byte correlation data, opens a one-shot subscription, publishes the request with v5 Response Topic and Correlation Data properties, then either emits the matching response on the output or fails on timeout. Multiple inflight requests run in parallel.
| Inputs | Outputs |
|---|---|
| 1 | 1 |
When to use this node vs. mqtt-in + mqtt-out¶
For the synchronous "ask a service a question, get an answer" pattern, the
subscribe + publish + correlate + cleanup dance is repetitive and easy to get
wrong. mqtt-request owns it once.
If you need to fire-and-forget, or to consume an unsolicited stream of
messages, use mqtt-in and mqtt-out directly.
MQTT v5 only. The pattern relies on the v5 Response Topic and Correlation Data properties. With a v3.1.1 broker the node logs a warning at deploy and falls back to topic-uniqueness correlation only — no out-of-band envelope is added, so the responder must know the response topic by some other convention.
Configuration¶
| Field | Default | Description |
|---|---|---|
broker |
— | ID of the referenced mqtt-broker config node (use v5). |
topic |
— | Request topic. Empty → fallback to msg.topic. |
qos |
0 |
QoS for both the request publish and the response subscription. msg.qos overrides. |
retain |
false |
Retain flag on the request publish (rare to retain a request — usually wrong). |
responseTopicPrefix |
loopze/response |
Prefix for the random response topic. Full topic: <prefix>/<random-uuid>. |
timeout |
30 (seconds) |
Wait deadline. 0 disables the timeout (request stays inflight until response or stop). |
timeoutMode |
error |
error — emit a catchable error on timeout. passthrough — emit msg.timedOut=true. |
responseFormat |
string |
string / json / buffer — same options as mqtt-in. |
defaultUserProperties |
— | v5: default user properties; merged with msg.userProperties. |
defaultContentType |
— | v5: e.g. application/json. |
defaultMessageExpiry |
0 |
v5: seconds (0 = unset). |
defaultPayloadFormat |
0 |
v5: 0 bytes / 1 UTF-8 text. |
responseTopic and correlationData are always generated by the node.
msg.responseTopic and msg.correlationData on the input are deliberately
ignored — the request/response correlation is owned by this node.
Incoming message¶
{
"payload": "<request body — encoded the same way as mqtt-out>",
"topic": "<optional override of the configured topic>",
"qos": "<optional override of the configured QoS>",
"userProperties": { … },
"contentType": "application/json",
"messageExpiry": 30,
"payloadFormat": 1
}
Outgoing message (on response)¶
{
"payload": "<response — decoded per responseFormat>",
"topic": "loopze/response/<random-uuid>",
"requestTopic": "rpc/devices/42/getState",
"qos": 1,
"retain": false,
"correlationData": "<bytes the responder echoed back>",
"userProperties": { … },
"contentType": "application/json",
"messageExpiry": 30,
"payloadFormat": 1
}
The original input message is forwarded with the response merged in — custom fields the user set on the way in are preserved.
Outgoing message (on timeout)¶
In timeoutMode = passthrough:
{
"payload": "<original input payload — preserved>",
"topic": "<original input topic>",
"timedOut": true
}
In timeoutMode = error (default), no message is emitted on the regular
output. The node raises the catchable error mqtt-request: timeout —
attach a Catch node downstream to react.
How a round-trip looks on the wire¶
┌──────────────┐ ┌─────────────────┐
│ mqtt-request │ │ Responder side │
│ │ │ (mqtt-in → │
│ │ │ mqtt-out │
│ │ │ target= │
│ │ │ responseTopic) │
└──────────────┘ └─────────────────┘
1. msg in
2. responseTopic = loopze/response/<uuid>
correlationData = 16 random bytes
3. SUBSCRIBE responseTopic ─────────► (broker)
◄──────────── SUBACK
4. PUBLISH target_topic
Properties.ResponseTopic = responseTopic
Properties.CorrelationData = correlationData
──────────► mqtt-in receives
msg.responseTopic
msg.correlationData
[user flow]
mqtt-out target=
responseTopic
PUBLISH
Properties.
CorrelationData
forwarded
◄──────────
5. publish on responseTopic with matching correlationData →
- cancel timeout
- UNSUBSCRIBE responseTopic
- emit response on output 0
Status¶
- Green
idlewhen no request is inflight. - Green
n inflightwhile requests are outstanding. - Yellow / red — inherited from the broker connection. On broker disconnect
every inflight context is failed according to
timeoutMode.
Pairing with a remote responder¶
The responder side is a regular mqtt-in → flow → mqtt-out target=responseTopic
chain. The function node in the middle only needs to set msg.payload;
responseTopic and correlationData are already on the inbound message and
get forwarded automatically by mqtt-out in response mode.
Why
noLocal=falseon the response subscription. The internal subscription thatmqtt-requestopens for the response usesNoLocal=falseon purpose. When request and responder share the same LOOPZE instance, they share the broker connection (= same Client ID), and per MQTT v5 §3.8.3.1 aNoLocal=truesubscription would be filtered out by the broker. Echo-loops are not a concern because the response topic is a random UUID per request.
Examples¶
RPC over MQTT.
Catch timeouts and retry.
┌─→ on response → [Debug]
[Inject] → [mqtt-request error mode] →┤
└─→ [Catch mqtt-request] → [Function: backoff + retry]
Branch on success vs. timeout in passthrough mode.
[Inject] → [mqtt-request passthrough] → [Switch msg.timedOut === true]
├─ true → [Debug "timed out"]
└─ false → [Debug "got response"]
See also¶
- MQTT Subscribe — for the responder side.
- MQTT Publish — its
target=responseTopicmode is the other half of the round-trip. - TCP Request — the same idea over TCP.
- HTTP Request — the same idea over HTTP.