Example Code for MicroPython-MQTT Data Reporting

Last revision 2026/01/22

This example demonstrates how the ESP32-P4 reports data to a network via the MQTT protocol. (Modify the configuration in lines 11–20 before use.)

Hardware Preparation

Software Preparation

Sample Code

#!/usr/bin/env python3
# main.py
import json
import network
import time
import random
from machine import ADC, Pin
from umqtt.simple import MQTTClient

# ========== User Configurable Section ==========
WIFI_SSID = "xxx"                  # Your WiFi network name
WIFI_PASS = "xxx"                  # Your WiFi password
ADC_PIN   = 16                     # ADC0 pin of the ESP32-P4 (for temperature sensor input)
MQTT_SERVER = "192.168.31.160"     # MQTT broker IP address
MQTT_PORT   = 32768                # MQTT broker port (default: 1883 for unencrypted, 8883 for TLS)
# Generate a unique MQTT client ID (8-bit random number to avoid conflicts)
MQTT_CLIENT_ID = f"micropython-client-{random.getrandbits(8)}"
MQTT_USER   = "xxx"                # MQTT broker username (if authentication is enabled)
MQTT_PASS   = "xxx"                # MQTT broker password (if authentication is enabled)
PUB_TOPIC   = "esp/adc/temp"       # MQTT topic to publish data to
PUB_INTERVAL = 2                   # Data reporting interval (in seconds)
# ===============================================

def wifi_connect(ssid, pwd):
    """Connect the ESP32-P4 to a WiFi network."""
    sta_interface = network.WLAN(network.STA_IF)  # Initialize station (client) mode
    sta_interface.active(True)                    # Enable WiFi
    if not sta_interface.isconnected():
        print("Connecting to Wi-Fi...")
        sta_interface.connect(ssid, pwd)          # Attempt WiFi connection
        # Wait up to 20 seconds for connection
        for _ in range(20):
            if sta_interface.isconnected():
                break
            time.sleep(1)
    # Print connection status and network info (IP, subnet, gateway, DNS)
    print("Wi-Fi connected:", sta_interface.ifconfig())

# --------------- Temperature Conversion ---------------
def read_temperature():
    """
    Read and calculate temperature from the ADC-connected sensor.
    Linear conversion logic:
    - Reference voltage: 3.3V
    - Temperature sensor output: 10mV/°C (typical for analog temp sensors like TMP36)
    """
    adc = ADC(Pin(ADC_PIN))                      # Initialize ADC on the specified pin
    adc.atten(ADC.ATTN_11DB)                     # Set ADC attenuation to measure 0–3.3V (11dB range)
    raw_adc_value = adc.read()                   # Read raw ADC value (0–4095 for 12-bit ADC)
    voltage = raw_adc_value / 4095 * 3.3         # Convert raw value to voltage (3.3V full scale)
    temperature = (voltage - 1.4) * 100          # Calculate temperature (adjust offset based on sensor specs)
    return round(temperature, 1)                 # Round to 1 decimal place for readability

# --------------- MQTT Connection ---------------
def mqtt_connect():
    """Establish a connection to the MQTT broker and return the client object."""
    client = MQTTClient(
        client_id=MQTT_CLIENT_ID,
        server=MQTT_SERVER,
        port=MQTT_PORT,
        user=MQTT_USER,
        password=MQTT_PASS
    )
    client.connect()  # Connect to the MQTT broker
    print(f"[MQTT] Connected to broker: {MQTT_SERVER}")
    return client

def mqtt_publish(client, data: dict):
    """Publish data to the preconfigured MQTT topic."""
    payload = json.dumps(data)  # Convert Python dictionary to JSON string
    client.publish(PUB_TOPIC, payload)  # Publish payload to the topic
    print(f"[MQTT] Sent -> Topic: {PUB_TOPIC}, Payload: {payload}")

# --------------- Main Function ---------------
def main():
    wifi_connect(WIFI_SSID, WIFI_PASS)  # Connect to WiFi first
    mqtt_client = mqtt_connect()        # Connect to MQTT broker
    # Continuously read temperature and publish data
    while True:
        current_temp = read_temperature()
        # Publish temperature as a JSON object (e.g., {"temperature": 25.3})
        mqtt_publish(mqtt_client, {"temperature": current_temp})
        time.sleep(PUB_INTERVAL)  # Wait for the next reporting cycle

# Run the main function when the script is executed directly
if __name__ == "__main__":
    main()

Was this article helpful?

TOP