Skip to content

MQTT Request (mqtt-request)

Synchronous MQTT v5 request/response in a single node. On every input message it generates a unique response topic and 16-byte correlation data, opens a one-shot subscription, publishes the request with v5 Response Topic and Correlation Data properties, then either emits the matching response on the output or fails on timeout. Multiple inflight requests run in parallel.

Inputs Outputs
1 1

When to use this node vs. mqtt-in + mqtt-out

For the synchronous "ask a service a question, get an answer" pattern, the subscribe + publish + correlate + cleanup dance is repetitive and easy to get wrong. mqtt-request owns it once.

If you need to fire-and-forget, or to consume an unsolicited stream of messages, use mqtt-in and mqtt-out directly.

MQTT v5 only. The pattern relies on the v5 Response Topic and Correlation Data properties. With a v3.1.1 broker the node logs a warning at deploy and falls back to topic-uniqueness correlation only — no out-of-band envelope is added, so the responder must know the response topic by some other convention.

Configuration

Field Default Description
broker ID of the referenced mqtt-broker config node (use v5).
topic Request topic. Empty → fallback to msg.topic.
qos 0 QoS for both the request publish and the response subscription. msg.qos overrides.
retain false Retain flag on the request publish (rare to retain a request — usually wrong).
responseTopicPrefix loopze/response Prefix for the random response topic. Full topic: <prefix>/<random-uuid>.
timeout 30 (seconds) Wait deadline. 0 disables the timeout (request stays inflight until response or stop).
timeoutMode error error — emit a catchable error on timeout. passthrough — emit msg.timedOut=true.
responseFormat string string / json / buffer — same options as mqtt-in.
defaultUserProperties v5: default user properties; merged with msg.userProperties.
defaultContentType v5: e.g. application/json.
defaultMessageExpiry 0 v5: seconds (0 = unset).
defaultPayloadFormat 0 v5: 0 bytes / 1 UTF-8 text.

responseTopic and correlationData are always generated by the node. msg.responseTopic and msg.correlationData on the input are deliberately ignored — the request/response correlation is owned by this node.

Incoming message

{
  "payload": "<request body — encoded the same way as mqtt-out>",
  "topic": "<optional override of the configured topic>",
  "qos":   "<optional override of the configured QoS>",
  "userProperties": {  },
  "contentType":  "application/json",
  "messageExpiry": 30,
  "payloadFormat": 1
}

Outgoing message (on response)

{
  "payload": "<response — decoded per responseFormat>",
  "topic":  "loopze/response/<random-uuid>",
  "requestTopic": "rpc/devices/42/getState",
  "qos": 1,
  "retain": false,
  "correlationData": "<bytes the responder echoed back>",
  "userProperties": {  },
  "contentType":  "application/json",
  "messageExpiry": 30,
  "payloadFormat": 1
}

The original input message is forwarded with the response merged in — custom fields the user set on the way in are preserved.

Outgoing message (on timeout)

In timeoutMode = passthrough:

{
  "payload": "<original input payload — preserved>",
  "topic":   "<original input topic>",
  "timedOut": true
}

In timeoutMode = error (default), no message is emitted on the regular output. The node raises the catchable error mqtt-request: timeout — attach a Catch node downstream to react.

How a round-trip looks on the wire

  ┌──────────────┐                              ┌─────────────────┐
  │ mqtt-request │                              │ Responder side  │
  │              │                              │ (mqtt-in →      │
  │              │                              │  mqtt-out       │
  │              │                              │  target=        │
  │              │                              │  responseTopic) │
  └──────────────┘                              └─────────────────┘

  1. msg in
  2. responseTopic = loopze/response/<uuid>
     correlationData = 16 random bytes
  3. SUBSCRIBE responseTopic ─────────►  (broker)
     ◄──────────── SUBACK
  4. PUBLISH target_topic
       Properties.ResponseTopic   = responseTopic
       Properties.CorrelationData = correlationData
                                       ──────────►  mqtt-in receives
                                                    msg.responseTopic
                                                    msg.correlationData

                                                    [user flow]

                                                    mqtt-out target=
                                                      responseTopic
                                                    PUBLISH
                                                      Properties.
                                                        CorrelationData
                                                        forwarded
                                       ◄──────────
  5. publish on responseTopic with matching correlationData →
     - cancel timeout
     - UNSUBSCRIBE responseTopic
     - emit response on output 0

Status

  • Green idle when no request is inflight.
  • Green n inflight while requests are outstanding.
  • Yellow / red — inherited from the broker connection. On broker disconnect every inflight context is failed according to timeoutMode.

Pairing with a remote responder

The responder side is a regular mqtt-in → flow → mqtt-out target=responseTopic chain. The function node in the middle only needs to set msg.payload; responseTopic and correlationData are already on the inbound message and get forwarded automatically by mqtt-out in response mode.

[mqtt-in rpc/devices/+/getState] → [Function: read state] → [mqtt-out target=responseTopic qos=1]

Why noLocal=false on the response subscription. The internal subscription that mqtt-request opens for the response uses NoLocal=false on purpose. When request and responder share the same LOOPZE instance, they share the broker connection (= same Client ID), and per MQTT v5 §3.8.3.1 a NoLocal=true subscription would be filtered out by the broker. Echo-loops are not a concern because the response topic is a random UUID per request.

Examples

RPC over MQTT.

[Inject 30s {payload: "ping"}] → [mqtt-request topic=rpc/heartbeat timeout=5] → [Debug]

Catch timeouts and retry.

                     ┌─→ on response  → [Debug]
[Inject] → [mqtt-request error mode] →┤
                                       └─→ [Catch mqtt-request] → [Function: backoff + retry]

Branch on success vs. timeout in passthrough mode.

[Inject] → [mqtt-request passthrough] → [Switch msg.timedOut === true]
                                            ├─ true  → [Debug "timed out"]
                                            └─ false → [Debug "got response"]

See also