Message Fabric

MQTT on Pi4
Health Telemetry to PostgreSQL

This guide adds a serious MQTT stack to your server. You will run Mosquitto on 10.10.10.250, publish hardware and system health from Pi4, consume telemetry into PostgreSQL, and operate the pipeline with security and resilience controls.

BrokerMosquitto
PublisherPi4 health agent
ConsumerPython SQL ingestor
StoragePostgreSQL
Network10.10.10.0/24
Engineering Goal

MQTT is not just pub/sub syntax. Treat it as a reliable edge event bus with contracts, security boundaries, and operational checks.

Lesson 0

Create Folders, Files, and First MQTT Run

Objective: start from an empty host and build a working MQTT workspace with reproducible run and test commands.

Learning Focus: This lesson removes setup assumptions so every later MQTT lesson starts from a known-good baseline with clear folder layout, executable files, and validation checks.

Before broker hardening or telemetry ingestion, you need a stable developer loop: create project directories, create code files, run them, and verify expected behavior. That loop is what prevents confusion later when failures appear, because you know whether the problem is setup, security policy, or application logic.

Create workspace directories

Keep code, logs, and service artifacts in explicit locations so operations and troubleshooting stay predictable.

bash
directory bootstrap
sudo mkdir -p /opt/mqtt/{app,bin,logs}
    sudo chown -R $USER:$USER /opt/mqtt
    cd /opt/mqtt/app
    pwd

Create a Python virtual environment and install packages

This keeps MQTT dependencies isolated from system Python packages.

bash
python setup
python3 -m venv /opt/mqtt/.venv
    source /opt/mqtt/.venv/bin/activate
    pip install --upgrade pip
    pip install paho-mqtt psutil psycopg[binary]
    pip freeze > /opt/mqtt/app/requirements.txt

Create your first publisher file

This is a minimal file-creation example that proves your tooling path before the larger health publisher in later lessons.

python
/opt/mqtt/app/hello_pub.py
import paho.mqtt.publish as publish

    publish.single(
        topic="test/hello",
        payload="hello_from_lesson0",
        hostname="127.0.0.1",
        port=1883,
    )

Run and test

Use one terminal as subscriber observer and another as publisher executor. This confirms file, runtime, and broker path in one loop.

bash
execution checks
# terminal A
    mosquitto_sub -h 127.0.0.1 -t test/hello -v

    # terminal B
    source /opt/mqtt/.venv/bin/activate
    python /opt/mqtt/app/hello_pub.py
    python -m py_compile /opt/mqtt/app/hello_pub.py

Checkpoint: subscriber receives hello_from_lesson0 and py_compile returns no syntax errors.

Lesson 1

Define MQTT System Architecture

Objective: define a contract-first telemetry flow before installing anything.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Teaching Lens

This lesson teaches separation of concerns so publishers, broker, and consumers can evolve independently.

This lesson also focuses on operational reasoning: what healthy behavior looks like, what failure signals look like, and how this step protects the reliability of the lessons that come next.

The critical engineering idea is that MQTT design starts with contracts, not scripts. When topic names and payload fields are defined first, your publisher can be rewritten in Rust later, your ingestor can move to another host, and your analytics code still works because the message contract remains stable.

In this lesson, you are learning to think in failure boundaries. A broker outage, a crashed publisher, and a slow SQL consumer are three different failures that should be visible and handled differently. That mindset is what turns MQTT from a demo transport into an operational telemetry bus.

Reference data flow

Use this blueprint to keep your implementation deterministic and testable.

Read this flow as a systems boundary map, not just a diagram. The publisher owns measurement timing and payload creation, the broker owns fan-out and delivery semantics, and the SQL ingestor owns persistence policy. When each boundary is explicit, debugging is faster because you can isolate faults by stage instead of treating MQTT as one opaque black box.

architecture
Pi4 Health Agent -> mqtt/pi4/health/json -> Mosquitto Broker
Pi4 Health Agent -> mqtt/pi4/health/lwt  -> Mosquitto Broker
SQL Ingestor    <- mqtt/pi4/health/#     <- Mosquitto Broker
SQL Ingestor    -> embedded_events table -> PostgreSQL

Checkpoint: all topic names and payload fields are agreed before coding.

Lesson 2

Install and Baseline Mosquitto Broker

Objective: deploy broker runtime with startup persistence and basic local test capability.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Run

This sequence installs Mosquitto, enables service startup, and verifies listener readiness on port 1883.

The purpose of this first run is to establish a known-good control plane before adding credentials, TLS, and custom ACLs. If you cannot prove baseline broker health now, later security configuration errors will become ambiguous and cost you time during troubleshooting.

bash
sudo apt-get update
sudo apt-get install -y mosquitto mosquitto-clients
sudo systemctl enable --now mosquitto
sudo systemctl status mosquitto --no-pager
ss -tulpn | grep 1883

Sanity publish and subscribe

Run subscriber first, then publish once from a second shell. This proves end-to-end broker function before security layering.

Starting the subscriber first teaches an important MQTT debugging habit: always observe the bus before injecting traffic. This lets you verify topic routing and payload visibility directly, so later failures are clearly tied to auth, ACL, or transport changes rather than core broker behavior.

bash
two terminals
mosquitto_sub -h 127.0.0.1 -t test/hello -v
mosquitto_pub -h 127.0.0.1 -t test/hello -m "broker_alive"

Checkpoint: subscriber receives the test payload with topic and value.

Lesson 3

Add User Authentication and Topic ACL

Objective: enforce identity and per-topic rights so random clients cannot publish to critical channels.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Create users and ACL policy

This setup creates separate identities for publisher and SQL consumer and limits each to minimal privileges.

This block teaches principle-of-least-privilege applied to messaging systems. A publisher identity should never consume unrelated topics, and a consumer identity should not publish into control topics. These constraints reduce accidental misuse and make malicious behavior easier to detect in logs.

bash + conf
/etc/mosquitto/conf.d/security.conf
sudo mosquitto_passwd -c /etc/mosquitto/passwd pi4_publisher
sudo mosquitto_passwd /etc/mosquitto/passwd sql_ingestor

sudo tee /etc/mosquitto/acl >/dev/null <<'EOF'
user pi4_publisher
topic write mqtt/pi4/health/#

user sql_ingestor
topic read mqtt/pi4/health/#
EOF

sudo tee /etc/mosquitto/conf.d/security.conf >/dev/null <<'EOF'
allow_anonymous false
password_file /etc/mosquitto/passwd
acl_file /etc/mosquitto/acl
listener 1883 0.0.0.0
EOF

sudo systemctl restart mosquitto
sudo systemctl status mosquitto --no-pager
Boundary Rule

Do not reuse one account for all components. Split credentials by function to reduce blast radius.

Checkpoint: unauthenticated publish fails; authenticated publish succeeds.

Lesson 4

Design Topic and Payload Contracts

Objective: standardize message semantics so consumers can parse without fragile assumptions.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Contract example

Use stable fields and explicit units. Keep values machine-oriented, not prose strings.

Think of this payload as an API contract, not a casual message. Field names, units, and cardinality decisions here determine whether downstream SQL, dashboards, and alert rules remain stable as your implementation evolves from Python to Rust and from single-node to multi-node publishing.

json contract
Topic: mqtt/pi4/health/json
Payload:
{
  "host": "pi-dns-core",
  "ip": "10.10.10.250",
  "cpu_pct": 17.2,
  "mem_pct": 43.8,
  "temp_c": 52.4,
  "disk_root_pct": 61.1,
  "services": {
    "bind9": "active",
    "postgresql": "active",
    "nginx": "active",
    "freeradius": "active"
  },
  "ts": "2026-05-12T12:34:56Z"
}

This lesson teaches contract stability over ad hoc payload changes.

You verify every publisher message includes required fields and timestamp.

Checkpoint: payload schema is fixed and documented for all producers and consumers.

Lesson 5

Build Pi4 Health Publisher

Objective: publish real host health at intervals and emit Last Will status on disconnect.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Create publisher agent

This script reads system metrics, service states, and pushes JSON to MQTT every 10 seconds.

Read this agent as a periodic sampling pipeline. It collects host state, normalizes values into a single JSON contract, and emits to one telemetry topic at fixed cadence. The stable cadence matters because SQL queries and alert windows depend on predictable sample intervals.

python
/opt/mqtt/pi4_health_pub.py
import json
import socket
import time
import subprocess
from datetime import datetime, timezone

import psutil
import paho.mqtt.client as mqtt

BROKER = "127.0.0.1"
PORT = 1883
USER = "pi4_publisher"
PASS = "ChangePublisherPassword!"
TOPIC = "mqtt/pi4/health/json"
LWT_TOPIC = "mqtt/pi4/health/lwt"
SERVICES = ["bind9", "postgresql", "nginx", "freeradius"]


def svc_state(name: str) -> str:
    rc = subprocess.run(["systemctl", "is-active", "--quiet", name]).returncode
    return "active" if rc == 0 else "inactive"


client = mqtt.Client(client_id="pi4-health-publisher")
client.username_pw_set(USER, PASS)
client.will_set(LWT_TOPIC, payload="offline", qos=1, retain=True)
client.connect(BROKER, PORT, keepalive=30)
client.loop_start()
client.publish(LWT_TOPIC, payload="online", qos=1, retain=True)

while True:
    payload = {
        "host": socket.gethostname(),
        "ip": "10.10.10.250",
        "cpu_pct": psutil.cpu_percent(interval=0.5),
        "mem_pct": psutil.virtual_memory().percent,
        "temp_c": psutil.sensors_temperatures().get("cpu_thermal", [{}])[0].get("current", 0.0),
        "disk_root_pct": psutil.disk_usage("/").percent,
        "services": {s: svc_state(s) for s in SERVICES},
        "ts": datetime.now(timezone.utc).isoformat()
    }
    client.publish(TOPIC, payload=json.dumps(payload), qos=1, retain=False)
    time.sleep(10)

Run as service

Service wrapper ensures restart behavior and predictable startup order.

Systemd conversion is a teaching step, not just packaging. It moves the publisher from an interactive script into an operable service with restart policy, dependency ordering, and journal visibility. That transition is what makes telemetry trustworthy after reboot and unattended failures.

bash + systemd
sudo apt-get install -y python3-psutil python3-pip
python3 -m pip install --break-system-packages paho-mqtt
sudo mkdir -p /opt/mqtt
sudo tee /etc/systemd/system/pi4-health-pub.service >/dev/null <<'EOF'
[Unit]
Description=Pi4 MQTT health publisher
After=network-online.target mosquitto.service

[Service]
ExecStart=/usr/bin/python3 /opt/mqtt/pi4_health_pub.py
Restart=always
RestartSec=2
User=pi
Group=pi

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable --now pi4-health-pub
sudo systemctl status pi4-health-pub --no-pager

Checkpoint: live health JSON messages appear on the broker topic every 10 seconds.

Lesson 6

Consume MQTT into PostgreSQL

Objective: persist MQTT health messages in SQL for analytics and alerting.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Create SQL table and ingestor

This table stores broker metadata, topic, and payload for replay and debugging. Ingestor subscribes to wildcard health topics and inserts rows.

The key design decision here is raw payload preservation. By storing original JSON in JSONB, you keep forward compatibility when publishers add new keys. This avoids brittle migrations every time telemetry evolves and lets you reprocess historical messages with improved analytics logic later.

sql + python
/opt/mqtt/mqtt_sql_ingestor.py
psql -U dns_user -d dns_analytics -h localhost <<'EOF'
CREATE TABLE IF NOT EXISTS mqtt_messages (
  id BIGSERIAL PRIMARY KEY,
  ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  topic TEXT NOT NULL,
  qos SMALLINT NOT NULL,
  payload JSONB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_mqtt_messages_ts ON mqtt_messages (ts DESC);
CREATE INDEX IF NOT EXISTS idx_mqtt_messages_topic ON mqtt_messages (topic);
EOF

cat <<'EOF' > /opt/mqtt/mqtt_sql_ingestor.py
import json
import psycopg
import paho.mqtt.client as mqtt

MQTT_USER = "sql_ingestor"
MQTT_PASS = "ChangeIngestorPassword!"

conn = psycopg.connect("host=localhost dbname=dns_analytics user=dns_user password=ChangeThisPassword!")


def on_connect(client, userdata, flags, rc):
    client.subscribe("mqtt/pi4/health/#", qos=1)


def on_message(client, userdata, msg):
    body = msg.payload.decode("utf-8")
    try:
        payload = json.loads(body)
    except json.JSONDecodeError:
        payload = {"raw": body}

    with conn, conn.cursor() as cur:
        cur.execute(
            "INSERT INTO mqtt_messages(topic, qos, payload) VALUES (%s, %s, %s::jsonb)",
            (msg.topic, msg.qos, json.dumps(payload)),
        )


client = mqtt.Client(client_id="sql-mqtt-ingestor")
client.username_pw_set(MQTT_USER, MQTT_PASS)
client.on_connect = on_connect
client.on_message = on_message
client.connect("127.0.0.1", 1883, 30)
client.loop_forever()
EOF

python3 /opt/mqtt/mqtt_sql_ingestor.py

Checkpoint: new rows appear in mqtt_messages while publisher is running.

Lesson 7

Tune Retain, QoS, and Last Will Behavior

Objective: choose delivery semantics deliberately instead of defaulting everything.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Teaching Lens

This lesson teaches where message durability helps and where it creates noise.

This lesson also focuses on operational reasoning: what healthy behavior looks like, what failure signals look like, and how this step protects the reliability of the lessons that come next.

Durability settings are architecture decisions, not defaults to copy. QoS 1 is appropriate for health state transitions because duplicates are acceptable but loss is not. For fast telemetry streams, retain can backfire by replaying stale data to new subscribers and creating false operational conclusions.

The deeper lesson is semantic clarity. A retained Last Will topic communicates current node presence, while non-retained periodic metrics communicate live time-series behavior. Keeping those roles separate prevents dashboards and automation from mixing historical snapshots with real-time state.

Recommended profile

Apply this profile to avoid stale telemetry floods and still keep reliable state transitions.

The configuration below encodes intent. Telemetry streams are transient observations and should not be retained, while liveness state must be immediately visible to late subscribers. Keeping those semantics separate prevents false positives in monitoring and prevents stale state from contaminating real-time decision logic.

policy
mqtt/pi4/health/json  -> qos=1, retain=false
mqtt/pi4/health/lwt   -> qos=1, retain=true
mqtt/pi4/alerts/#     -> qos=1, retain=true for latest alert state
mqtt/pi4/debug/#      -> qos=0, retain=false

Checkpoint: subscriber restart receives current LWT state immediately and only fresh telemetry onward.

Lesson 8

Enable TLS for Broker Clients

Objective: protect credentials and payload integrity when broker traffic leaves localhost paths.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Create local CA and broker certificate

This creates private PKI material for your home network and configures a TLS listener on 8883.

Do this step methodically because TLS failures often look like generic connection failures at the client side. By generating a local CA and broker certificate explicitly, you gain a trust chain you control and can rotate, rather than relying on insecure plaintext credentials for non-local traffic.

bash + conf
/etc/mosquitto/conf.d/tls.conf
sudo mkdir -p /etc/mosquitto/certs
cd /etc/mosquitto/certs
sudo openssl genrsa -out ca.key 4096
sudo openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt -subj "/CN=codeandcore-mqtt-ca"
sudo openssl genrsa -out broker.key 2048
sudo openssl req -new -key broker.key -out broker.csr -subj "/CN=10.10.10.250"
sudo openssl x509 -req -in broker.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out broker.crt -days 825 -sha256

sudo tee /etc/mosquitto/conf.d/tls.conf >/dev/null <<'EOF'
listener 8883 0.0.0.0
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/broker.crt
keyfile /etc/mosquitto/certs/broker.key
require_certificate false
allow_anonymous false
EOF

sudo chown mosquitto:mosquitto /etc/mosquitto/certs/broker.key
sudo chmod 640 /etc/mosquitto/certs/broker.key
sudo systemctl restart mosquitto
Migration Plan

Keep 1883 only for localhost clients during migration. Move remote clients to 8883 then close 1883 on LAN.

Checkpoint: authenticated TLS clients connect successfully on port 8883.

Lesson 9

Operate MQTT with Real Observability

Objective: create an operator runbook for outages, stalled consumers, and schema drift.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Runbook commands

These checks identify whether failures come from publisher, broker, consumer, or database layers.

Use this block as layered diagnosis, not random command sampling. Read from service health to bus visibility to database persistence in that order. This sequence narrows fault scope quickly and keeps incident response deterministic when telemetry appears to stall.

bash + sql
sudo systemctl status mosquitto --no-pager
sudo journalctl -u mosquitto -n 80 --no-pager
sudo systemctl status pi4-health-pub --no-pager
sudo systemctl status mqtt-sql-ingestor --no-pager
mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/# -C 3 -v
psql -U dns_user -d dns_analytics -h localhost -c "SELECT topic, ts FROM mqtt_messages ORDER BY id DESC LIMIT 10;"

Create ingestor service

Run your SQL consumer as a systemd service so it auto-recovers from network or database interruptions.

Service-managing the ingestor closes the reliability loop. Without this step, broker uptime can be perfect while data silently stops landing in SQL after a transient exception. A managed unit with restart policy ensures ingestion continuity and makes failures visible through standard host observability tools.

systemd
/etc/systemd/system/mqtt-sql-ingestor.service
sudo tee /etc/systemd/system/mqtt-sql-ingestor.service >/dev/null <<'EOF'
[Unit]
Description=MQTT to PostgreSQL ingestor
After=network-online.target mosquitto.service postgresql.service

[Service]
ExecStart=/usr/bin/python3 /opt/mqtt/mqtt_sql_ingestor.py
Restart=always
RestartSec=2
User=pi
Group=pi

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable --now mqtt-sql-ingestor
sudo systemctl status mqtt-sql-ingestor --no-pager

Checkpoint: ingestor survives restart and rows keep landing during publisher runtime.

Lesson 10

Final Validation: End-to-End MQTT Health Pipeline

Objective: prove that publish, broker routing, SQL ingestion, and outage signaling all work together.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Execute final test sequence

This final lesson is intentionally operational. You are verifying both the data plane and the failure plane. The data plane proves that health messages move from publisher to broker to SQL. The failure plane proves that when the publisher dies, the system emits an offline signal instead of silently failing.

Run the commands as a single narrative sequence. Watch live topic output first, then validate SQL persistence, then simulate failure and recovery. This sequence teaches you how to diagnose real outages with evidence instead of assumptions.

validation
sudo systemctl status mosquitto --no-pager
    sudo systemctl status pi4-health-pub --no-pager
    sudo systemctl status mqtt-sql-ingestor --no-pager

    mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/# -C 5 -v
    psql -U dns_user -d dns_analytics -h localhost -c "SELECT topic, ts, payload->>'host' AS host FROM mqtt_messages ORDER BY id DESC LIMIT 10;"

    sudo systemctl stop pi4-health-pub
    mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/lwt -C 1 -v

    sudo systemctl start pi4-health-pub
    mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/lwt -C 1 -v
    psql -U dns_user -d dns_analytics -h localhost -c "SELECT topic, ts FROM mqtt_messages ORDER BY id DESC LIMIT 10;"
Completion Standard

You are complete when Pi4 health JSON is continuously published to MQTT, consumed into PostgreSQL, and Last Will state transitions are visible during failures.

Lesson 11

Build a Rust MQTT Health Publisher

Objective: replace script-level publishing with a compiled Rust publisher that emits structured health telemetry.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Teaching Lens

This lesson teaches Rust crate composition for MQTT, host metrics, and JSON payload contracts.

This lesson also focuses on operational reasoning: what healthy behavior looks like, what failure signals look like, and how this step protects the reliability of the lessons that come next.

The teaching focus here is architecture continuity: language can change while interfaces remain stable. You are moving from a Python publisher to a Rust publisher without changing topics or payload keys, which is exactly how production migrations should work.

You are also learning reliability-by-design in Rust. The publisher loop, Last Will semantics, and serialization path are explicit in code, so you can reason about behavior under load and failure instead of relying on implicit framework behavior.

Create Rust project and dependencies

Use this once to scaffold a dedicated publisher binary under your edge workspace.

This stage is a controlled migration pattern. You create a separate Rust binary with explicit dependencies so you can test parity with the existing publisher before cutover. Building in parallel preserves operational stability while you gain compiled performance and stronger type guarantees.

bash
source $HOME/.cargo/env
cargo new --bin /opt/mqtt/rust-health-publisher
cd /opt/mqtt/rust-health-publisher
cargo add rumqttc serde serde_json anyhow chrono sysinfo

Implement publisher

This implementation publishes health JSON to mqtt/pi4/health/json and uses LWT on mqtt/pi4/health/lwt. Keep the topic contract unchanged so your current consumer keeps working.

Read the Rust implementation as an explicit reliability model. Connection handling, keepalive, Last Will, serialization, and publish cadence are all directly visible in code. That transparency is one reason Rust works well in edge systems where hidden runtime behavior can be operationally expensive.

rust
/opt/mqtt/rust-health-publisher/src/main.rs
use anyhow::Result;
use chrono::Utc;
use rumqttc::{Client, LastWill, MqttOptions, QoS};
use serde::Serialize;
use std::thread;
use std::time::Duration;
use sysinfo::{Disks, System};

#[derive(Serialize)]
struct HealthPayload {
    host: String,
    ip: String,
    cpu_pct: f32,
    mem_pct: f32,
    temp_c: f32,
    disk_root_pct: f32,
    ts: String,
}

fn main() -> Result<()> {
    let mut mqttoptions = MqttOptions::new("pi4-rust-health-publisher", "127.0.0.1", 1883);
    mqttoptions.set_keep_alive(Duration::from_secs(30));
    mqttoptions.set_credentials("pi4_publisher", "ChangePublisherPassword!");
    mqttoptions.set_last_will(LastWill::new(
        "mqtt/pi4/health/lwt",
        "offline",
        QoS::AtLeastOnce,
        true,
    ));

    let (mut client, mut connection) = Client::new(mqttoptions, 10);
    thread::spawn(move || for _ in connection.iter() {});

    client.publish("mqtt/pi4/health/lwt", QoS::AtLeastOnce, true, "online")?;

    loop {
        let mut sys = System::new_all();
        sys.refresh_all();
        let disks = Disks::new_with_refreshed_list();

        let cpu_pct = sys.global_cpu_usage();
        let mem_pct = if sys.total_memory() == 0 {
            0.0
        } else {
            (sys.used_memory() as f32 / sys.total_memory() as f32) * 100.0
        };

        let mut disk_root_pct = 0.0;
        for d in disks.list() {
            if d.mount_point().to_string_lossy() == "/" {
                let total = d.total_space() as f32;
                let avail = d.available_space() as f32;
                if total > 0.0 {
                    disk_root_pct = ((total - avail) / total) * 100.0;
                }
            }
        }

        let payload = HealthPayload {
            host: "pi-dns-core".to_string(),
            ip: "10.10.10.250".to_string(),
            cpu_pct,
            mem_pct,
            temp_c: 0.0,
            disk_root_pct,
            ts: Utc::now().to_rfc3339(),
        };

        let body = serde_json::to_string(&payload)?;
        client.publish("mqtt/pi4/health/json", QoS::AtLeastOnce, false, body)?;
        thread::sleep(Duration::from_secs(10));
    }
}

Compile and smoke test

Build in release mode and watch the broker topic directly to confirm payloads and cadence.

This smoke test validates behavior rather than just compilation. You are confirming that runtime publish cadence, authentication, and payload shape are all correct under the real broker path. If this test passes, migration risk is significantly reduced before service cutover.

bash
cd /opt/mqtt/rust-health-publisher
cargo build --release
./target/release/rust-health-publisher
mosquitto_sub -h 127.0.0.1 -u sql_ingestor -P 'ChangeIngestorPassword!' -t mqtt/pi4/health/json -C 2 -v

Checkpoint: Rust publisher sends valid JSON messages at expected intervals and existing ingestor can parse them.

Lesson 12

Run Rust Publisher as a Managed Service

Objective: operate the Rust publisher as a production daemon with restart behavior and log visibility.

Learning Focus: This lesson builds practical engineering judgment, not just task completion. As you run each step, connect the action to runtime behavior, failure signals, and design trade-offs so you can adapt the pattern in real systems.

Install and service-wrap

This wraps your compiled binary in systemd and lets you phase out the Python publisher when ready.

Cutover should be deliberate: install binary, register service, disable old publisher, then verify continuity at both topic and SQL layers. Treat this as an operational migration with rollback awareness, not a simple replacement command sequence.

bash + systemd
/etc/systemd/system/pi4-rust-health-pub.service
cd /opt/mqtt/rust-health-publisher
cargo build --release
sudo install -m 0755 target/release/rust-health-publisher /usr/local/bin/pi4-rust-health-publisher

sudo tee /etc/systemd/system/pi4-rust-health-pub.service >/dev/null <<'EOF'
[Unit]
Description=Pi4 Rust MQTT health publisher
After=network-online.target mosquitto.service

[Service]
ExecStart=/usr/local/bin/pi4-rust-health-publisher
Restart=always
RestartSec=2
User=pi
Group=pi
NoNewPrivileges=true

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl disable --now pi4-health-pub
sudo systemctl enable --now pi4-rust-health-pub
sudo systemctl status pi4-rust-health-pub --no-pager
sudo journalctl -u pi4-rust-health-pub -n 60 --no-pager

You verify service restarts automatically, publishes after reboot, and keeps SQL ingestion flowing.

Checkpoint: Rust daemon is the active publisher in production and Python publisher is retired cleanly.