Dalam panduan MicroPython ini, kita akan menunjukkan cara mengonfigurasi sebuah ESP32 agar dapat menerima dan menampilkan data dari beberapa board ESP32 menggunakan protokol komunikasi ESP-NOW dalam konfigurasi many-to-one.
Kita akan membangun sebuah proyek contoh di mana board pengirim mengirimkan data sensor dalam format JSON, dan satu board penerima mengumpulkan data tersebut serta menampilkannya pada OLED display. Tutorial ini menggunakan dua board pengirim sebagai contoh, namun konfigurasi dapat dengan mudah diperluas untuk menampung lebih banyak board.
Memperkenalkan ESP-NOW
ESP-NOW adalah protokol komunikasi nirkabel yang dikembangkan oleh Espressif, memungkinkan beberapa board ESP32 atau ESP8266 untuk bertukar data dalam jumlah kecil tanpa memerlukan koneksi Wi-Fi atau Bluetooth penuh. Meskipun Wi-Fi controller harus diaktifkan, ESP-NOW tidak membutuhkan koneksi Wi-Fi sepenuhnya, sehingga sangat cocok untuk aplikasi low-power dan low-latency seperti jaringan sensor, remote control, atau pertukaran data antar board.
ESP-NOW menggunakan model komunikasi tanpa koneksi, artinya perangkat dapat mengirim dan menerima data tanpa harus terhubung ke router atau membuat access point (berbeda dengan komunikasi HTTP antar board). ESP-NOW mendukung dua jenis pengiriman data:
- Unicast: mengirim data ke perangkat tertentu menggunakan alamat MAC-nya.
- Broadcast: mengirim data ke semua perangkat di sekitar menggunakan alamat MAC broadcast.
Baru mengenal ESP-NOW? Simak panduan dasar kami: MicroPython: ESP-NOW dengan ESP32 (Getting Started).
Gambaran Proyek
Tutorial ini menunjukkan cara mengonfigurasi sebuah board ESP32 agar dapat menerima data dari beberapa board ESP32 lain melalui protokol komunikasi ESP-NOW dalam konfigurasi many-to-one, seperti terlihat pada gambar berikut.
- Satu board ESP32 berperan sebagai penerima.
- Beberapa board ESP32 berperan sebagai pengirim. Pada contoh ini, kita menggunakan dua board pengirim, namun Anda dapat menambahkan lebih banyak board sesuai kebutuhan.
- Board ESP32 penerima akan menerima pesan dari semua pengirim dan mengenali board mana yang mengirim pesan tersebut.
- Sebagai contoh, kita akan bertukar data sensor BME280 antar board. Proyek ini dapat dimodifikasi untuk menggunakan sensor lain atau menukar jenis data lain sesuai kebutuhan.
Komponen yang Dibutuhkan:
- 3x (atau lebih) board ESP32
- 2x sensor BME280
- 1x OLED Display SSD1306
- Breadboard
- Kabel jumper
ESP32: Mendapatkan Alamat MAC Board
Untuk berkomunikasi menggunakan ESP-NOW, Anda perlu mengetahui alamat MAC masing-masing board. Untuk mendapatkan alamat MAC board, salin kode berikut ke Thonny IDE dan jalankan pada board Anda.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Get MAC address (returns bytes)
mac = wlan.config('mac')
# Convert to human-readable format
mac_address = ':'.join('%02x' % b for b in mac)
print("MAC Address:", mac_address)
Setelah menjalankan kode, alamat MAC board akan ditampilkan di jendela shell.
Dapatkan alamat MAC untuk semua board Anda.
Sebagai contoh, saya mendapatkan:
- Board pengirim 1: 24:0A:C4:31:40:50
- Board pengirim 2: 30:AE:A4:F6:7D:4C
- Board penerima: 30:AE:A4:07:0D:64
Menyiapkan Board Pengirim
Dalam tutorial ini, kita akan mengirim data ke satu board ESP32 penerima menggunakan ESP-NOW dari dua board pengirim yang berbeda. Proyek ini dapat dimodifikasi untuk menambahkan lebih banyak board pengirim.
Setiap board akan diberi ID unik untuk identifikasi:
- ID = 1 untuk board1
- ID = 2 untuk board2
Menghubungkan Sensor BME280
Setiap board pengirim akan mengirim data lingkungan dari sensor BME280. Hubungkan sensor BME280 ke masing-masing board menggunakan pin I2C default ESP32.
Pelajari lebih lanjut tentang I2C pada ESP32 di: ESP32 I2C Communication: Set Pins, Multiple Bus Interfaces and Peripherals (Arduino IDE).
Jika Anda menggunakan model board yang berbeda, pin I2C default mungkin berbeda.
Mengimpor Library BME280
Library untuk mengakses sensor BME280 tidak termasuk dalam library standar MicroPython, sehingga Anda perlu mengunggah file library ini ke setiap board pengirim.
Module MicroPython BME280.py
1. Salin kode berikut ke file baru di Thonny IDE.
from machine import I2C
import time
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
# BME280 Registers
BME280_REGISTER_DIG_T1 = 0x88 # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C
BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E
BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7
BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD
class Device:
"""Class for communicating with an I2C device.
Allows reading and writing 8-bit, 16-bit, and byte array values to
registers on the device."""
def __init__(self, address, i2c):
"""Create an instance of the I2C device at the specified address using
the specified I2C interface object."""
self._address = address
self._i2c = i2c
def writeRaw8(self, value):
"""Write an 8-bit value on the bus (without register)."""
value = value & 0xFF
self._i2c.writeto(self._address, value)
def write8(self, register, value):
"""Write an 8-bit value to the specified register."""
b=bytearray(1)
b[0]=value & 0xFF
self._i2c.writeto_mem(self._address, register, b)
def write16(self, register, value):
"""Write a 16-bit value to the specified register."""
value = value & 0xFFFF
b=bytearray(2)
b[0]= value & 0xFF
b[1]= (value>>8) & 0xFF
self.i2c.writeto_mem(self._address, register, value)
def readRaw8(self):
"""Read an 8-bit value on the bus (without register)."""
return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF
def readU8(self, register):
"""Read an unsigned byte from the specified register."""
return int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF
def readS8(self, register):
"""Read a signed byte from the specified register."""
result = self.readU8(register)
if result > 127:
result -= 256
return result
def readU16(self, register, little_endian=True):
"""Read an unsigned 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
if not little_endian:
result = ((result << 8) & 0xFF00) + (result >> 8)
return result
def readS16(self, register, little_endian=True):
"""Read a signed 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = self.readU16(register, little_endian)
if result > 32767:
result -= 65536
return result
def readU16LE(self, register):
"""Read an unsigned 16-bit value from the specified register, in little
endian byte order."""
return self.readU16(register, little_endian=True)
def readU16BE(self, register):
"""Read an unsigned 16-bit value from the specified register, in big
endian byte order."""
return self.readU16(register, little_endian=False)
def readS16LE(self, register):
"""Read a signed 16-bit value from the specified register, in little
endian byte order."""
return self.readS16(register, little_endian=True)
def readS16BE(self, register):
"""Read a signed 16-bit value from the specified register, in big
endian byte order."""
return self.readS16(register, little_endian=False)
class BME280:
def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
# Create I2C device.
if i2c is None:
raise ValueError('An I2C object is required.')
self._device = Device(address, i2c)
# Load calibration values.
self._load_calibration()
self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
self.t_fine = 0
def _load_calibration(self):
self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)
self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)
self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)
h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
h4 = (h4 << 24) >> 20
self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)
h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
h5 = (h5 << 24) >> 20
self.dig_H5 = h5 | (
self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)
def read_raw_temp(self):
"""Reads the raw (uncompensated) temperature from the sensor."""
meas = self._mode
self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
meas = self._mode << 5 | self._mode << 2 | 1
self._device.write8(BME280_REGISTER_CONTROL, meas)
sleep_time = 1250 + 2300 * (1 << self._mode)
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
time.sleep_us(sleep_time) # Wait the required time
msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_pressure(self):
"""Reads the raw (uncompensated) pressure level from the sensor."""
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_humidity(self):
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
raw = (msb << 8) | lsb
return raw
def read_temperature(self):
"""Get the compensated temperature in 0.01 of a degree celsius."""
adc = self.read_raw_temp()
var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
var2 = ((
(((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
self.dig_T3) >> 14
self.t_fine = var1 + var2
return (self.t_fine * 5 + 128) >> 8
def read_pressure(self):
"""Gets the compensated pressure in Pascals."""
adc = self.read_raw_pressure()
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) >> 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
return 0
p = 1048576 - adc
p = (((p << 31) - var2) * 3125) // var1
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self.dig_P8 * p) >> 19
return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)
def read_humidity(self):
adc = self.read_raw_humidity()
# print 'Raw humidity = {0:d}'.format (adc)
h = self.t_fine - 76800
h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
return h >> 12
@property
def temperature(self):
"Return the temperature in degrees."
t = self.read_temperature()
ti = t // 100
td = t - ti * 100
return "{}.{:02d}C".format(ti, td)
@property
def pressure(self):
"Return the temperature in hPa."
p = self.read_pressure() // 256
pi = p // 100
pd = p - pi * 100
return "{}.{:02d}hPa".format(pi, pd)
@property
def humidity(self):
"Return the humidity in percent."
h = self.read_humidity()
hi = h // 1024
hd = h * 100 // 1024 - hi * 100
return "{}.{:02d}%".format(hi, hd)
2. Buka File > Save as…
3. Pilih MicroPython device sebagai lokasi penyimpanan
4. Beri nama file BME280.py, lalu klik OK
Selesai! Library BME280 sudah berhasil diunggah ke board Anda.
Ulangi proses ini untuk semua board pengirim.
ESP32 ESP-NOW Pengirim – Script MicroPython
Kode berikut akan membaca data dari sensor BME280 dan mengirimkannya melalui ESP-NOW dalam format JSON ke board ESP32 penerima. Pastikan untuk mengganti alamat MAC board penerima dan mengubah ID board untuk setiap board pengirim.
Dalam kode ini, kita menggunakan modul aioespnow, yang memungkinkan penggunaan ESP-NOW secara asinkron.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32-many-to-one/
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
# Board ID
BOARD_ID = 1
# Receiver's MAC address
peer_mac = b'\xff\xff\xff\xff\xff\xff'
# Interval for sending data (in seconds)
send_interval = 10
# Initialize I2C and BME280
try:
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Adjust pins as needed
bme = BME280.BME280(i2c=i2c, address=0x76)
except OSError as err:
print("Failed to initialize BME280:", err)
raise
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
sta.active(True)
sta.config(channel=1) # Set channel explicitly
sta.disconnect()
except OSError as err:
print("Failed to initialize Wi-Fi:", err)
raise
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Add peer
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
# Counter for reading ID
reading_id = 0
def read_temperature():
try:
return float(bme.temperature[:-1]) # Remove 'C' from string
except Exception as err:
print("Error reading temperature:", err)
return 0.0
def read_humidity():
try:
return float(bme.humidity[:-1]) # Remove '%' from string
except Exception as err:
print("Error reading humidity:", err)
return 0.0
def prepare_sensor_data():
global reading_id
data = {
'id': BOARD_ID,
'temp': read_temperature(),
'hum': read_humidity(),
'readingId': reading_id
}
reading_id += 1
# Serialize to JSON and encode to bytes
return ujson.dumps(data).encode('utf-8')
async def send_messages(e, peer):
while True:
try:
# Prepare and serialize sensor data
message = prepare_sensor_data()
# Send JSON bytes
if await e.asend(peer, message, sync=True):
print(f"Sent data: {message.decode('utf-8')}")
else:
print("Failed to send data")
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
await asyncio.sleep(send_interval) # Wait before next send
async def main(e, peer):
try:
await send_messages(e, peer)
except Exception as err:
print(f"Error in main: {err}")
await asyncio.sleep(5)
raise
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping sender...")
e.active(False)
sta.active(False)
Bagaimana Cara Kerja Kode?
Mengimpor Library
Mulailah dengan mengimpor library yang dibutuhkan untuk menggunakan ESP-NOW, mengakses sensor BME280, dan mengelola variabel JSON.
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
Menetapkan ID Board
Berikan ID unik pada setiap board agar board penerima dapat dengan mudah mengidentifikasinya. Pada contoh ini, kita hanya memberi nomor pada setiap board, namun Anda juga bisa menggunakan metode lain, misalnya memberi nama, atau langsung menggunakan alamat MAC sebagai identifikasi (dalam kasus ini, parameter ID tidak diperlukan).
# Board ID
BOARD_ID = 2
Alamat MAC Board Penerima
Masukkan alamat MAC board penerima. Sebagai contoh, alamat MAC board penerima saya adalah 30:AE:A4:07:0D:64. Oleh karena itu, alamat ini harus ditambahkan ke kode dalam format bytes seperti berikut:
# Receiver's MAC address
peer_mac = b'\x30\xae\xa4\x07\x0d\x64'
Interval Pengiriman
Data baru akan dikirim melalui ESP-NOW setiap 10 detik. Anda dapat menyesuaikan periode ini pada variabel send_interval.
# Interval for sending data (in seconds)
send_interval = 10
Inisialisasi Sensor BME280
Mulai komunikasi I2C pada GPIO 22 (SCL) dan 21 (SDA). Sesuaikan jika menggunakan pin yang berbeda. Selanjutnya, lakukan inisialisasi sensor BME280.
# Initialize I2C and BME280
try:
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Adjust pins as needed
bme = BME280.BME280(i2c=i2c, address=0x76)
except OSError as err:
print("Failed to initialize BME280:", err)
raise
Inisialisasi Antarmuka Wi-Fi dan ESP-NOW
Untuk menggunakan ESP-NOW, kita perlu mengaktifkan antarmuka Wi-Fi. Pada kode berikut, kita menginisialisasi Wi-Fi dan protokol komunikasi ESP-NOW menggunakan modul aioespnow.
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
sta.active(True)
sta.config(channel=1) # Set channel explicitly
sta.disconnect()
except OSError as err:
print("Failed to initialize Wi-Fi:", err)
raise
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
Menambahkan Peer
Pada baris berikutnya, kita menambahkan board penerima sebagai peer. Fungsi e.add_peer(peer_mac) diperlukan untuk memastikan komunikasi unicast yang andal menggunakan aioespnow, dengan mendaftarkan alamat MAC penerima agar pengirim dapat mengirim pesan ke board tersebut secara spesifik.
# Add peer
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
Membaca Data Sensor BME280
Fungsi berikut mengembalikan nilai suhu dan kelembaban dari sensor BME280.
def read_temperature():
try:
return float(bme.temperature[:-1]) # Remove 'C' from string
except Exception as err:
print("Error reading temperature:", err)
return 0.0
def read_humidity():
try:
return float(bme.humidity[:-1]) # Remove '%' from string
except Exception as err:
print("Error reading humidity:", err)
return 0.0
Fungsi dari library BME280 mengembalikan hasil beserta satuan (unit), sehingga kita perlu menghapus karakter satuan pada akhir pembacaan.
Menambahkan Data ke Variabel JSON
Kita membuat fungsi prepare_sensor_data() yang mengembalikan variabel JSON berisi data yang akan dikirim. Pada contoh ini, data yang dikirim meliputi: ID board, suhu, kelembaban, dan ID pembacaan (sebuah angka untuk melacak jumlah pembacaan sejak program dijalankan).
def prepare_sensor_data():
global reading_id
data = {
'id': BOARD_ID,
'temp': read_temperature(),
'hum': read_humidity(),
'readingId': reading_id
}
reading_id += 1
# Serialize to JSON and encode to bytes
return ujson.dumps(data).encode('utf-8')
Mengirim Pesan melalui ESP-NOW
Selanjutnya, kita membuat fungsi asinkron untuk mengirim pesan ESP-NOW secara asinkron.
async def send_messages(e, peer):
while True:
try:
# Prepare and serialize sensor data
message = prepare_sensor_data()
# Send JSON bytes
if await e.asend(peer, message, sync=True):
print(f"Sent data: {message.decode('utf-8')}")
else:
print("Failed to send data")
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
await asyncio.sleep(send_interval) # Wait before next send
Menjalankan Tugas Asinkron
Fungsi berikut menjalankan tugas send_messages untuk mengirim data melalui ESP-NOW secara asinkron.
async def main(e, peer):
try:
await send_messages(e, peer)
except Exception as err:
print(f"Error in main: {err}")
await asyncio.sleep(5)
raise
Terakhir, jalankan program secara asinkron.
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping sender...")
e.active(False)
sta.active(False)
Menjalankan dan Mengunggah Kode ke Board
Setelah menyalin kode ke Thonny IDE dan melakukan penyesuaian yang diperlukan, uji kode dengan menjalankannya di Thonny IDE.
Setelah koneksi dengan board terhubung, klik tombol run berwarna hijau untuk menjalankan program.
Hasil yang muncul di MicroPython shell seharusnya mirip dengan gambar berikut.
Saat ini, pengiriman data akan gagal karena board penerima belum disiapkan.
Mengunggah Kode ke Board
Catatan penting: menjalankan file di Thonny IDE saja tidak menyimpan kode secara permanen ke filesystem board. Artinya, jika Anda mencabut board dari komputer dan memberi daya, program tidak akan berjalan karena belum ada file Python yang tersimpan di board. Fungsi Run di Thonny berguna untuk uji coba kode, tetapi untuk menjalankannya secara permanen, Anda harus membuat dan menyimpan file ke filesystem board.
Agar kode dapat dijalankan tanpa tersambung ke komputer, unggah kode ke filesystem board dengan nama main.py:
File > Save as… > MicroPython Device
Beri nama file tersebut `main.py` dan simpan di board.
Sekarang, jika Anda merestart board, program akan mulai berjalan secara otomatis. Setelah itu, Anda mungkin tidak bisa mengakses MicroPython Shell.
Menyiapkan Board Penerima
Board ESP32 penerima akan menerima data dari beberapa board pengirim dan menampilkannya pada OLED display. Pada contoh ini, kita menerima data dari dua board pengirim.
Menghubungkan OLED Display
Hubungkan OLED display ke board ESP32 menggunakan pin I2C default, yaitu GPIO 22 (SCL) dan GPIO 21 (SDA). Sesuaikan jika menggunakan model ESP32 dengan pinout berbeda.
Mengimpor Library SSD1306
Library untuk mengakses OLED display (SSD1306) tidak termasuk dalam library standar MicroPython, sehingga Anda perlu mengunggah file library ini ke board penerima.
Module MicroPython SSD1306.py
1. Salin kode berikut ke file baru di Thonny IDE.
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit
import time
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xa4)
SET_NORM_INV = const(0xa6)
SET_DISP = const(0xae)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xa0)
SET_MUX_RATIO = const(0xa8)
SET_COM_OUT_DIR = const(0xc0)
SET_DISP_OFFSET = const(0xd3)
SET_COM_PIN_CFG = const(0xda)
SET_DISP_CLK_DIV = const(0xd5)
SET_PRECHARGE = const(0xd9)
SET_VCOM_DESEL = const(0xdb)
SET_CHARGE_PUMP = const(0x8d)
class SSD1306:
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
# Note the subclass must initialize self.framebuf to a framebuffer.
# This is necessary because the underlying data buffer is different
# between I2C and SPI implementations (I2C needs an extra byte).
self.poweron()
self.init_display()
def init_display(self):
for cmd in (
SET_DISP | 0x00, # off
# address setting
SET_MEM_ADDR, 0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV, 0x80,
SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
SET_VCOM_DESEL, 0x30, # 0.83*Vcc
# display
SET_CONTRAST, 0xff, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP | 0x00)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_framebuf()
def fill(self, col):
self.framebuf.fill(col)
def pixel(self, x, y, col):
self.framebuf.pixel(x, y, col)
def scroll(self, dx, dy):
self.framebuf.scroll(dx, dy)
def text(self, string, x, y, col=1):
self.framebuf.text(string, x, y, col)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
# Add an extra byte to the data buffer to hold an I2C data/command byte
# to use hardware-compatible I2C transactions. A memoryview of the
# buffer is used to mask this byte from the framebuffer operations
# (without a major memory hit as memoryview doesn't copy to a separate
# buffer).
self.buffer = bytearray(((height // 8) * width) + 1)
self.buffer[0] = 0x40 # Set first byte of data buffer to Co=0, D/C=1
self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_framebuf(self):
# Blast out the frame buffer using a single I2C transaction to support
# hardware I2C interfaces.
self.i2c.writeto(self.addr, self.buffer)
def poweron(self):
pass
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
self.buffer = bytearray((height // 8) * width)
self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.low()
self.cs.low()
self.spi.write(bytearray([cmd]))
self.cs.high()
def write_framebuf(self):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.high()
self.cs.low()
self.spi.write(self.buffer)
self.cs.high()
def poweron(self):
self.res.high()
time.sleep_ms(1)
self.res.low()
time.sleep_ms(10)
self.res.high()
2. Buka File > Save as… dan pilih MicroPython device sebagai lokasi penyimpanan.
3. Beri nama file ssd1306.py dan klik OK untuk menyimpan file ke filesystem ESP32.
Selesai! Library SSD1306 sudah berhasil diunggah ke board, dan Anda dapat menggunakan fungsionalitasnya dalam kode dengan mengimpor library ini.
ESP-NOW Board Penerima – Script MicroPython
Kode berikut akan mengonfigurasi board ESP32 Anda sebagai penerima ESP-NOW untuk menerima data dari dua board pengirim. Setelah data diterima, nilai akan ditampilkan pada OLED display.
Jangan lupa untuk menambahkan alamat MAC board pengirim pada kode sesuai konfigurasi Anda.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32-many-to-one/
import network
import aioespnow
import asyncio
import ujson
from machine import Pin, I2C
import ssd1306
# Initialize I2C for OLED
try:
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Adjust pins as needed
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
sta.active(True)
sta.config(channel=1) # Set channel explicitly
sta.disconnect()
except OSError as err:
print("Failed to initialize Wi-Fi:", err)
raise
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Sender's MAC addresses (replace with actual sender MACs)
sender_mac_1 = b'\x24\x0a\xc4\x31\x40\x50' # First sender's MAC (Board ID=1)
sender_mac_2 = b'\x30\xae\xa4\xf6\x7d\x4c' # Second sender's MAC (Board ID=2)
# Add peers
try:
e.add_peer(sender_mac_1)
except OSError as err:
print(f"Failed to add peer {sender_mac_1.hex()}:", err)
raise
try:
e.add_peer(sender_mac_2)
except OSError as err:
print(f"Failed to add peer {sender_mac_2.hex()}:", err)
raise
# Dictionary to store latest readings for each board
board_readings = {
1: {'temp': 0.0, 'hum': 0.0, 'readingId': 0},
2: {'temp': 0.0, 'hum': 0.0, 'readingId': 0}
}
# Update OLED display with temperature and humidity for both boards on separate lines.
def update_display():
try:
display.fill(0)
# Board 1 data
display.text("Board 1:", 0, 0)
display.text(f"Temp: {board_readings[1]['temp']:.1f} C", 0, 10)
display.text(f"Hum: {board_readings[1]['hum']:.1f} %", 0, 20)
# Board 2 data
display.text("Board 2:", 0, 32)
display.text(f"Temp: {board_readings[2]['temp']:.1f} C", 0, 42)
display.text(f"Hum: {board_readings[2]['hum']:.1f} %", 0, 52)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
# Async function to receive and process messages.
async def receive_messages(e):
print("Listening for ESP-NOW messages...")
while True:
try:
async for mac, msg in e:
try:
# Decode and parse JSON message
json_str = msg.decode('utf-8')
data = ujson.loads(json_str)
# Extract parameters
board_id = data['id']
temperature = data['temp']
humidity = data['hum']
reading_id = data['readingId']
# Store in board_readings dictionary
if board_id in (1, 2):
board_readings[board_id] = {
'temp': temperature,
'hum': humidity,
'readingId': reading_id
}
# Update OLED display
update_display()
# Display on MicroPython terminal
print(f"\nReceived from {mac.hex()}:")
print(f" Board ID: {board_id}")
print(f" Temperature: {temperature} C")
print(f" Humidity: {humidity} %")
print(f" Reading ID: {reading_id}")
except (ValueError, KeyError) as err:
print(f"Error parsing JSON: {err}")
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(5)
async def main(e):
await receive_messages(e)
# Run the async program
try:
asyncio.run(main(e))
except KeyboardInterrupt:
print("Stopping receiver...")
e.active(False)
sta.active(False)
Bagaimana Cara Kerja Kode?
Mengimpor Library
Mulailah dengan mengimpor library yang dibutuhkan. Pastikan file ssd1306.py telah diunggah sebelumnya agar board dapat berinteraksi dengan OLED display.
import network
import aioespnow
import asyncio
import ujson
from machine import Pin, I2C
import ssd1306
Inisialisasi OLED Display
Lakukan inisialisasi komunikasi I2C dan OLED display menggunakan baris kode berikut.
# Initialize I2C for OLED
try:
i2c = I2C(0, scl=Pin(22), sda=Pin(21)) # Adjust pins as needed
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
Inisialisasi Antarmuka Wi-Fi dan ESP-NOW
Lakukan inisialisasi Wi-Fi dan protokol ESP-NOW.
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
try:
sta.active(True)
sta.config(channel=1) # Set channel explicitly
sta.disconnect()
except OSError as err:
print("Failed to initialize Wi-Fi:", err)
raise
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
Menambahkan Board Pengirim sebagai Peer
Tambahkan board pengirim sebagai peer. Meskipun tidak wajib menambahkan board pengirim di sisi penerima, langkah ini menjamin komunikasi yang lebih andal.
Masukkan alamat MAC board pengirim pada variabel berikut (pastikan alamat MAC dalam format bytes).
# Sender's MAC addresses (replace with actual sender MACs)
sender_mac_1 = b'\x24\x0a\xc4\x31\x40\x50' # First sender's MAC (Board ID=1)
sender_mac_2 = b'\x30\xae\xa4\xf6\x7d\x4c' # Second sender's MAC (Board ID=2)
Selanjutnya, daftarkan board tersebut sebagai peer ESP-NOW:
# Add peers
try:
e.add_peer(sender_mac_1)
except OSError as err:
print(f"Failed to add peer {sender_mac_1.hex()}:", err)
raise
try:
e.add_peer(sender_mac_2)
except OSError as err:
print(f"Failed to add peer {sender_mac_2.hex()}:", err)
raise
Dictionary untuk Menyimpan Data Pembacaan
Buat sebuah dictionary untuk menyimpan data sensor terbaru dari setiap board. Menyimpan data dalam dictionary memungkinkan semua informasi diterima tersimpan dalam satu variabel.
board_readings = {
1: {'temp': 0.0, 'hum': 0.0, 'readingId': 0},
2: {'temp': 0.0, 'hum': 0.0, 'readingId': 0}
}
Memperbarui Tampilan
Fungsi update_display() mengambil data yang diterima dari setiap board, yang sudah tersimpan dalam variabel board_readings, dan menampilkannya pada OLED screen.
def update_display():
try:
display.fill(0)
# Board 1 data
display.text("Board 1:", 0, 0)
display.text(f"Temp: {board_readings[1]['temp']:.1f} C", 0, 10)
display.text(f"Hum: {board_readings[1]['hum']:.1f} %", 0, 20)
# Board 2 data
display.text("Board 2:", 0, 32)
display.text(f"Temp: {board_readings[2]['temp']:.1f} C", 0, 42)
display.text(f"Hum: {board_readings[2]['hum']:.1f} %", 0, 52)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
Menerima Pesan ESP-NOW
Kita membuat fungsi asinkron untuk menerima dan memproses pesan yang datang melalui ESP-NOW.
async def receive_messages(e):
print("Listening for ESP-NOW messages...")
while True:
try:
async for mac, msg in e:
Data diterima dalam format JSON. Pertama, kita decode dan parse pesan JSON tersebut.
try:
# Decode and parse JSON message
json_str = msg.decode('utf-8')
data = ujson.loads(json_str)
Kita ekstrak setiap parameter ke dalam variabel terpisah.
# Extract parameters into individual variables
board_id = data['id']
temperature = data['temp']
humidity = data['hum']
reading_id = data['readingId']
Terakhir, kita menyimpan data ke posisi yang sesuai dalam dictionary board_readings berdasarkan ID board.
# Store in board_readings dictionary
if board_id in (1, 2):
board_readings[board_id] = {
'temp': temperature,
'hum': humidity,
'readingId': reading_id
}
Setelah itu, variabel board_readings telah diperbarui, sehingga kita dapat memanggil fungsi update_display() untuk menampilkan data terbaru di layar.
update_display()
Terakhir, kita menampilkan data yang diterima di MicroPython shell.
# Display on serial monitor
print(f"\nReceived from {mac.hex()}:")
print(f" Board ID: {board_id}")
print(f" Temperature: {temperature} C")
print(f" Humidity: {humidity} %")
print(f" Reading ID: {reading_id}")
Menjalankan Tugas Asinkron
Fungsi berikut menjalankan tugas receive_messages untuk menerima data melalui ESP-NOW secara asinkron.
async def main(e):
await receive_messages(e)
Terakhir, jalankan program secara asinkron.
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping sender...")
e.active(False)
sta.active(False)
Demonstrasi
Unggah dan jalankan kode sebelumnya pada board penerima. Jika tersambung ke Thonny IDE Terminal, data yang diterima akan ditampilkan di sana.
Secara bersamaan, pembacaan dari setiap board pengirim juga akan ditampilkan pada OLED screen.
Siap Untuk Membuat Proyek Impianmu Menjadi Kenyataan?
Klik di sini untuk chat langsung via WhatsApp dan dapatkan dukungan langsung dari tim ahli kami!












0 on: "Implementasi ESP-NOW Many-to-One di ESP32 dengan MicroPython "