Part
4
  |  
Connected Systems
  |  
Chapter
16

Sensor Networks and MQTT

HTTP is the wrong protocol for IoT and most engineers don't realize it until their polling loop melts a Pi Zero.
Reading Time
12
mins
BACK TO RASPBERRY PI MASTERCLASS

The instinct is understandable. You know HTTP. You know REST. You've built APIs with Flask, consumed them with requests, and debugged them with curl. So when you need a temperature sensor on one Pi to report readings to another Pi running a dashboard, the first architecture that comes to mind is an HTTP endpoint. The sensor Pi hits /api/temperature on the dashboard Pi every five seconds with a POST request. The dashboard Pi responds with 200 OK. Simple. Familiar. Wrong.

HTTP asks "give me data now." MQTT says "tell me when something changes."

HTTP asks "give me data now." MQTT says "tell me when something changes." That distinction sounds academic until you have twelve sensors reporting every three seconds. With HTTP, the dashboard Pi opens a new TCP connection for every reading, negotiates headers, waits for a response, and tears the connection down — seventy-two times per minute, per sensor. With twelve sensors, that's 864 HTTP request-response cycles per minute. Each one carries 400–800 bytes of HTTP overhead to deliver a 20-byte temperature reading. The dashboard Pi spends more time parsing headers than processing data.

MQTT was designed for exactly this problem. It's a publish/subscribe protocol built for constrained devices on unreliable networks. A sensor publishes a reading to a topic. Any number of subscribers receive it. The connection stays open. The overhead per message is 2–4 bytes. The broker handles routing, buffering, and delivery guarantees. The sensor doesn't know or care who's listening. The dashboard doesn't know or care how many sensors exist.

MQTT Fundamentals: Broker, Publisher, Subscriber

MQTT has three actors. The broker is the central server that receives all messages and routes them to subscribers. The publisher is any device that sends data — your sensor Pi. The subscriber is any device that wants to receive data — your dashboard Pi, your logging server, your alerting system. Publishers and subscribers never talk to each other directly. They talk to the broker.

Framework · The MQTT Contract · MC

Publish/subscribe is the right abstraction for physical things. HTTP is request/response — the client asks, the server answers. MQTT is event-driven — the publisher announces, and anyone who cares receives. For sensors that report every few seconds, the difference between polling and event-driven is the difference between a system that scales and one that collapses under its own weight.

The broker is the only component that needs to be reachable by every device on the network. Publishers connect to the broker, send messages to named topics, and don't wait for a response from subscribers. Subscribers connect to the broker, declare which topics they care about, and receive messages as they arrive. The broker matches publishers to subscribers by topic name.

This decoupling is the architectural win. When you add a thirteenth sensor, you don't change a single line of code on the dashboard. The new sensor publishes to the same topic hierarchy, and every subscriber receives the new data automatically. When you add a second dashboard — maybe a mobile app, or an alerting service — you don't change anything on the sensors. The new subscriber registers its interest with the broker and starts receiving messages immediately.

Key takeaway

MQTT decouples producers from consumers through a broker. Adding a sensor requires zero changes to the dashboard. Adding a dashboard requires zero changes to the sensors. That's the architectural property you're buying.

Installing Mosquitto: Your Pi as an MQTT Broker

Mosquitto is the standard open-source MQTT broker. It runs comfortably on a Pi Zero and handles thousands of messages per second on a Pi 4. One of your Pis becomes the broker; every other device publishes or subscribes to it.

# Install Mosquitto broker and command-line clients
sudo apt update
sudo apt install -y mosquitto mosquitto-clients

# Enable the broker to start on boot
sudo systemctl enable mosquitto

# Verify it's running
sudo systemctl status mosquitto

By default, Mosquitto listens on port 1883 and accepts anonymous connections from localhost only. For a local network deployment, you need to allow remote connections:

# /etc/mosquitto/conf.d/local.conf
listener 1883 0.0.0.0
allow_anonymous true
# Restart Mosquitto to apply the config
sudo systemctl restart mosquitto
Anonymous access is for prototyping only

allow_anonymous true means any device on your network can publish any message to any topic. For a home lab or development environment, this is fine. For anything exposed beyond your local network, configure username/password authentication or TLS client certificates. I'll show authentication later in this chapter.

Test the broker with the command-line tools before writing any Python:

# Terminal 1: Subscribe to a topic
mosquitto_sub -h localhost -t "test/greeting"

# Terminal 2: Publish a message
mosquitto_pub -h localhost -t "test/greeting" -m "Hello from the broker"

The subscriber receives the message instantly. No polling. No HTTP overhead. The connection is persistent — the subscriber stays connected and receives every subsequent message published to that topic.

The paho-mqtt Python Library: Publishing Sensor Data

The paho-mqtt library is the standard Python MQTT client. It handles connection management, reconnection on failure, message queuing, and all three QoS levels. Install it in a virtual environment on every Pi that needs to publish or subscribe:

python3 -m venv ~/mqtt-env
source ~/mqtt-env/bin/activate
pip install paho-mqtt

Here is a complete publisher that reads a DHT22 temperature and humidity sensor and publishes readings every 10 seconds. If you don't have a DHT22, the code includes a simulated fallback:

#!/usr/bin/env python3
"""sensor_publisher.py — publishes temperature and humidity to MQTT."""

import json
import time
import random
import paho.mqtt.client as mqtt

# ── Configuration ───────────────────────────────────────────────────
BROKER_HOST = "pi-broker.local"   # hostname of your MQTT broker Pi
BROKER_PORT = 1883
DEVICE_ID = "living-room-01"
PUBLISH_INTERVAL = 10             # seconds between readings

# ── Try to import real sensor library; fall back to simulation ──────
try:
    import adafruit_dht
    import board
    sensor = adafruit_dht.DHT22(board.D4)
    USE_REAL_SENSOR = True
except ImportError:
    USE_REAL_SENSOR = False
    print("DHT library not found — using simulated sensor data")


def read_sensor():
    """Return a dict with temperature_c, temperature_f, and humidity."""
    if USE_REAL_SENSOR:
        temperature_c = sensor.temperature
        humidity = sensor.humidity
    else:
        # Simulated readings with realistic drift
        temperature_c = 22.0 + random.uniform(-2.0, 2.0)
        humidity = 45.0 + random.uniform(-5.0, 5.0)

    return {
        "device_id": DEVICE_ID,
        "temperature_c": round(temperature_c, 1),
        "temperature_f": round(temperature_c * 9.0 / 5.0 + 32.0, 1),
        "humidity": round(humidity, 1),
        "timestamp": time.time(),
    }


def on_connect(client, userdata, flags, reason_code, properties):
    """Called when the client connects to the broker."""
    if reason_code == 0:
        print(f"Connected to MQTT broker at {BROKER_HOST}")
    else:
        print(f"Connection failed: {reason_code}")


def main():
    client = mqtt.Client(
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
        client_id=f"sensor-{DEVICE_ID}",
    )
    client.on_connect = on_connect

    # Last Will and Testament — broker publishes this if we disconnect
    client.will_set(
        topic=f"home/{DEVICE_ID}/status",
        payload="offline",
        qos=1,
        retain=True,
    )

    client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
    client.loop_start()

    # Announce that we're online
    client.publish(f"home/{DEVICE_ID}/status", "online", qos=1, retain=True)

    try:
        while True:
            reading = read_sensor()
            payload = json.dumps(reading)

            # Publish to a structured topic
            client.publish(
                topic=f"home/{DEVICE_ID}/sensors",
                payload=payload,
                qos=1,
            )
            print(f"Published: {payload}")
            time.sleep(PUBLISH_INTERVAL)

    except KeyboardInterrupt:
        print("Shutting down...")
        client.publish(f"home/{DEVICE_ID}/status", "offline", qos=1, retain=True)
        client.loop_stop()
        client.disconnect()


if __name__ == "__main__":
    main()

Notice three design decisions in this publisher. First, it uses JSON payloads — structured, parseable, and self-describing. Every message includes a device_id and timestamp so the subscriber never has to guess where a reading came from or when it was taken. Second, it sets a Last Will and Testament message. Third, it publishes to a hierarchical topic. All three of these matter at scale.

Topic Hierarchy: The Namespace That Scales

MQTT topics are slash-separated strings, like filesystem paths. The topic hierarchy you choose on day one determines whether your network scales cleanly or becomes an unmaintainable mess.

# Bad: flat topics with no structure
temperature
humidity
living-room-temp
kitchen-temp

# Good: hierarchical topics with consistent structure
home/living-room-01/sensors      # all sensor data from this device
home/living-room-01/status       # online/offline status
home/kitchen-01/sensors
home/kitchen-01/status
home/+/sensors                   # wildcard: all sensor data from any device
home/#                           # wildcard: everything under home/

The + wildcard matches exactly one level. home/+/sensors matches home/living-room-01/sensors and home/kitchen-01/sensors but not home/living-room-01/extra/sensors. The # wildcard matches everything below a level. home/# matches every topic that starts with home/.

Design your topic hierarchy like you design a database schema — get it right on day one, because migrating later means updating every publisher and subscriber simultaneously.

I use a three-level convention: {location}/{device-id}/{data-type}. Location is the physical area (home, warehouse, office-3rd-floor). Device ID is a unique identifier per physical board. Data type is what's being reported (sensors, status, commands). This convention means a dashboard that monitors an entire building subscribes to warehouse/+/sensors and instantly receives data from every device in the building, regardless of how many there are or when they were added.

Subscribing and Reacting: The Dashboard Side

Here is a complete subscriber that receives sensor data, logs it, and triggers an alert when temperature exceeds a threshold:

#!/usr/bin/env python3
"""sensor_subscriber.py — receives MQTT sensor data and reacts."""

import json
import paho.mqtt.client as mqtt

BROKER_HOST = "pi-broker.local"
BROKER_PORT = 1883
TEMP_ALERT_THRESHOLD = 30.0  # Celsius


def on_connect(client, userdata, flags, reason_code, properties):
    if reason_code == 0:
        print("Connected to broker — subscribing to topics")
        # Subscribe to all sensor data and all status messages
        client.subscribe("home/+/sensors", qos=1)
        client.subscribe("home/+/status", qos=1)
    else:
        print(f"Connection failed: {reason_code}")


def on_message(client, userdata, msg):
    topic = msg.topic
    payload = msg.payload.decode("utf-8")

    # Handle status messages
    if topic.endswith("/status"):
        device = topic.split("/")[1]
        print(f"[STATUS] {device} is now {payload}")
        return

    # Handle sensor readings
    try:
        data = json.loads(payload)
    except json.JSONDecodeError:
        print(f"[ERROR] Invalid JSON on {topic}: {payload}")
        return

    device = data.get("device_id", "unknown")
    temp_c = data.get("temperature_c", 0)
    humidity = data.get("humidity", 0)

    print(f"[SENSOR] {device}: {temp_c}°C, {humidity}% humidity")

    # Alert on high temperature
    if temp_c > TEMP_ALERT_THRESHOLD:
        print(f"[ALERT] {device} temperature {temp_c}°C exceeds "
              f"threshold {TEMP_ALERT_THRESHOLD}°C!")
        # In production: send a webhook, email, or push notification here


def main():
    client = mqtt.Client(
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
        client_id="dashboard-main",
    )
    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)

    # Blocking loop — runs forever, dispatching callbacks
    print(f"Listening for sensor data from {BROKER_HOST}...")
    client.loop_forever()


if __name__ == "__main__":
    main()

The subscriber does its work in callbacks. on_connect fires when the connection to the broker is established (or re-established after a dropout) — this is where you subscribe to topics. on_message fires for every incoming message on any subscribed topic. The loop_forever() call blocks and handles network I/O, reconnection, and callback dispatching.

Subscribe in on_connect, not in main()

Always place your client.subscribe() calls inside the on_connect callback, not after client.connect(). If the network drops and the client reconnects, on_connect fires again and re-subscribes automatically. If you subscribe only in main(), a reconnection leaves you connected but receiving nothing.

Last Will, Retained Messages, and QoS

Three MQTT features separate a toy implementation from a production one.

Quality of Service (QoS) controls delivery guarantees. QoS 0 is fire-and-forget — the broker delivers the message at most once, with no acknowledgment. QoS 1 guarantees at-least-once delivery — the broker stores the message and retries until the subscriber acknowledges. QoS 2 guarantees exactly-once delivery — a four-step handshake ensures no duplicates. I use QoS 1 for almost everything. QoS 0 is fine for high-frequency telemetry where a missed reading doesn't matter. QoS 2 is rarely worth the overhead outside of financial or medical systems.

Retained messages solve the cold-start problem. When a subscriber connects for the first time, it knows nothing — no sensor readings, no device statuses. With retained messages, the broker stores the last message published to each topic with the retain=True flag. When a new subscriber connects, the broker immediately sends the retained message. The subscriber gets the current state without waiting for the next publish cycle.

Last Will and Testament (LWT) solves the disconnect-detection problem. When a client connects, it registers a "will" message with the broker — a message the broker publishes on the client's behalf if the client disconnects unexpectedly (network failure, power loss, crash). In the publisher code above, the will message sets the device status to "offline." The subscriber sees the status change and knows the device is down — without polling, without health checks, without timeouts.

# The publisher set this on connection:
client.will_set(
    topic="home/living-room-01/status",
    payload="offline",
    qos=1,
    retain=True,   # retained so new subscribers see the offline state
)

# And publishes "online" on successful connection:
client.publish("home/living-room-01/status", "online", qos=1, retain=True)
Key takeaway

QoS 1 for sensor data (at-least-once), retained messages for current state (so new subscribers don't start blind), and LWT for disconnect detection (so you know when a device dies). These three features together give you a monitoring system that requires zero active polling.

Bridging Multiple Pis Through a Central Broker

The architecture for a multi-Pi sensor network is simpler than most engineers expect. One Pi runs the Mosquitto broker. Every other Pi runs a publisher, a subscriber, or both. The broker is the star of a star topology — every device connects to it, and the broker handles all message routing.

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ Sensor Pi 1 │────▶│             │     │ Dashboard   │
│ (publisher) │     │  Broker Pi  │────▶│ Pi          │
└─────────────┘     │ (Mosquitto) │     │ (subscriber)│
                    │             │     └─────────────┘
┌─────────────┐     │             │     ┌─────────────┐
│ Sensor Pi 2 │────▶│             │────▶│ Alert       │
│ (publisher) │     │             │     │ Service     │
└─────────────┘     └─────────────┘     │ (subscriber)│
                                        └─────────────┘

For a small deployment (under 50 devices), a single Pi 4 as the broker handles everything. Mosquitto on a Pi 4 can sustain over 10,000 messages per second — far more than a typical sensor network generates. The bottleneck is never the broker. It's the network, the SD card I/O on the sensor Pis, or the subscriber's processing speed.

For production, add authentication to the broker:

# Create a password file
sudo mosquitto_passwd -c /etc/mosquitto/passwd sensor-user
# (enter password when prompted)

# Add more users
sudo mosquitto_passwd /etc/mosquitto/passwd dashboard-user
# /etc/mosquitto/conf.d/auth.conf
listener 1883 0.0.0.0
allow_anonymous false
password_file /etc/mosquitto/passwd
sudo systemctl restart mosquitto

Then update your Python clients to include credentials:

client.username_pw_set("sensor-user", "your-secure-password")
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)

The broker is the star of the topology. Every device connects to it. Adding a sensor or a dashboard is a one-device change — the rest of the network doesn't know or care.

What to Do Monday Morning

Install Mosquitto on one Pi

sudo apt install -y mosquitto mosquitto-clients. Enable it with sudo systemctl enable mosquitto. Test with mosquitto_sub and mosquitto_pub in two terminal windows. Confirm messages arrive instantly.

Run the publisher on a second Pi (or the same Pi)

Copy sensor_publisher.py to a Pi, install paho-mqtt in a virtualenv, and run it. Watch messages appear in a mosquitto_sub terminal on the broker Pi. If you don't have a DHT22 sensor, the simulated mode works identically.

Run the subscriber and verify the full loop

Copy sensor_subscriber.py to your dashboard Pi. Run it. Confirm you see sensor readings and status messages. Kill the publisher process and verify the LWT "offline" message arrives on the subscriber.

Design your topic hierarchy on paper

Before adding more devices, sketch your topic hierarchy: {location}/{device-id}/{data-type}. Write it down. Every future device follows this convention. Changing it later means updating every publisher and subscriber.

Add authentication to the broker

Create a password file with mosquitto_passwd, update the Mosquitto config to allow_anonymous false, and update your Python clients with username_pw_set(). Restart Mosquitto and verify that unauthenticated connections are rejected.