Example Code for MicroPython-MQTT Data Reporting
Last revision 2026/01/22
This example demonstrates how the ESP32-P4 reports data to a network via the MQTT protocol. (Modify the configuration in lines 11–20 before use.)
Hardware Preparation
Software Preparation
Sample Code
#!/usr/bin/env python3
# main.py
import json
import network
import time
import random
from machine import ADC, Pin
from umqtt.simple import MQTTClient
# ========== User Configurable Section ==========
WIFI_SSID = "xxx" # Your WiFi network name
WIFI_PASS = "xxx" # Your WiFi password
ADC_PIN = 16 # ADC0 pin of the ESP32-P4 (for temperature sensor input)
MQTT_SERVER = "192.168.31.160" # MQTT broker IP address
MQTT_PORT = 32768 # MQTT broker port (default: 1883 for unencrypted, 8883 for TLS)
# Generate a unique MQTT client ID (8-bit random number to avoid conflicts)
MQTT_CLIENT_ID = f"micropython-client-{random.getrandbits(8)}"
MQTT_USER = "xxx" # MQTT broker username (if authentication is enabled)
MQTT_PASS = "xxx" # MQTT broker password (if authentication is enabled)
PUB_TOPIC = "esp/adc/temp" # MQTT topic to publish data to
PUB_INTERVAL = 2 # Data reporting interval (in seconds)
# ===============================================
def wifi_connect(ssid, pwd):
"""Connect the ESP32-P4 to a WiFi network."""
sta_interface = network.WLAN(network.STA_IF) # Initialize station (client) mode
sta_interface.active(True) # Enable WiFi
if not sta_interface.isconnected():
print("Connecting to Wi-Fi...")
sta_interface.connect(ssid, pwd) # Attempt WiFi connection
# Wait up to 20 seconds for connection
for _ in range(20):
if sta_interface.isconnected():
break
time.sleep(1)
# Print connection status and network info (IP, subnet, gateway, DNS)
print("Wi-Fi connected:", sta_interface.ifconfig())
# --------------- Temperature Conversion ---------------
def read_temperature():
"""
Read and calculate temperature from the ADC-connected sensor.
Linear conversion logic:
- Reference voltage: 3.3V
- Temperature sensor output: 10mV/°C (typical for analog temp sensors like TMP36)
"""
adc = ADC(Pin(ADC_PIN)) # Initialize ADC on the specified pin
adc.atten(ADC.ATTN_11DB) # Set ADC attenuation to measure 0–3.3V (11dB range)
raw_adc_value = adc.read() # Read raw ADC value (0–4095 for 12-bit ADC)
voltage = raw_adc_value / 4095 * 3.3 # Convert raw value to voltage (3.3V full scale)
temperature = (voltage - 1.4) * 100 # Calculate temperature (adjust offset based on sensor specs)
return round(temperature, 1) # Round to 1 decimal place for readability
# --------------- MQTT Connection ---------------
def mqtt_connect():
"""Establish a connection to the MQTT broker and return the client object."""
client = MQTTClient(
client_id=MQTT_CLIENT_ID,
server=MQTT_SERVER,
port=MQTT_PORT,
user=MQTT_USER,
password=MQTT_PASS
)
client.connect() # Connect to the MQTT broker
print(f"[MQTT] Connected to broker: {MQTT_SERVER}")
return client
def mqtt_publish(client, data: dict):
"""Publish data to the preconfigured MQTT topic."""
payload = json.dumps(data) # Convert Python dictionary to JSON string
client.publish(PUB_TOPIC, payload) # Publish payload to the topic
print(f"[MQTT] Sent -> Topic: {PUB_TOPIC}, Payload: {payload}")
# --------------- Main Function ---------------
def main():
wifi_connect(WIFI_SSID, WIFI_PASS) # Connect to WiFi first
mqtt_client = mqtt_connect() # Connect to MQTT broker
# Continuously read temperature and publish data
while True:
current_temp = read_temperature()
# Publish temperature as a JSON object (e.g., {"temperature": 25.3})
mqtt_publish(mqtt_client, {"temperature": current_temp})
time.sleep(PUB_INTERVAL) # Wait for the next reporting cycle
# Run the main function when the script is executed directly
if __name__ == "__main__":
main()
Was this article helpful?
