Pelajari cara membangun komunikasi dua arah antara dua papan ESP32 dengan memanfaatkan protokol ESP-NOW. Pada tahap awal, kita akan menguji contoh sederhana untuk menunjukkan cara mengimplementasikan komunikasi dua arah. Selanjutnya, kita akan membuat proyek yang lebih kompleks, di mana kedua papan saling bertukar data sensor dan menampilkan hasilnya pada layar OLED.
Pengenalan ESP-NOW
ESP-NOW adalah protokol komunikasi nirkabel yang dikembangkan oleh Espressif, memungkinkan beberapa papan ESP32 atau ESP8266 saling bertukar data berukuran kecil tanpa perlu koneksi Wi-Fi atau Bluetooth. Meskipun tidak memerlukan koneksi Wi-Fi penuh, modul Wi-Fi tetap harus aktif sehingga protokol ini sangat cocok untuk aplikasi berdaya rendah dan berlatensi rendah seperti jaringan sensor, perangkat kontrol jarak jauh, atau pertukaran data antar-papan.
ESP-NOW menggunakan model komunikasi tanpa koneksi (connectionless), sehingga perangkat dapat mengirim dan menerima data tanpa terhubung ke router maupun membuat access point, berbeda dengan komunikasi HTTP antar perangkat. Protokol ini mendukung mode unicast (mengirim data ke perangkat tertentu menggunakan alamat MAC) dan broadcast (mengirim data ke seluruh perangkat di sekitar menggunakan alamat MAC broadcast).
Mendapatkan Alamat MAC pada ESP32
Untuk menggunakan ESP-NOW, setiap papan ESP32 harus saling mengetahui alamat MAC-nya. Untuk membaca alamat MAC perangkat, jalankan kode berikut pada Thonny IDE menggunakan papan ESP32 Anda.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
# Get MAC address (returns bytes)
mac = wlan.config('mac')
# Convert to human-readable format
mac_address = ':'.join('%02x' % b for b in mac)
print("MAC Address:", mac_address)
Setelah kode dijalankan, alamat MAC papan akan ditampilkan pada jendela Shell.
Setelah program dieksekusi, alamat MAC dari papan ESP32 akan muncul pada output di jendela Shell.
Kita akan menggunakan papan ESP32 DOIT V1 dan ESP32 S3 DevKitC, namun contoh ini kompatibel dengan hampir semua varian ESP32.
Komunikasi Dua Arah ESP-NOW Antar Dua Papan ESP32 (MicroPython)
Pada bagian ini, kita akan mendemonstrasikan contoh dasar pertukaran pesan sederhana antar papan ESP32 menggunakan protokol ESP-NOW. Setiap papan berperan sebagai penerima sekaligus pengirim, sehingga dapat dikategorikan sebagai transceiver.
Firmware MicroPython terbaru sudah menyertakan dua pustaka ESP-NOW bawaan, yaitu `espnow` dan `aioespnow`. Pada contoh ini, kita akan menggunakan `aioespnow`, yaitu versi asinkron dari pustaka `espnow`.
Salin kode berikut ke Thonny IDE untuk dijalankan pada papan ESP32.
import time
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel explicitly if packets are not received
sta.disconnect()
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Peer MAC address (replace with the actual MAC of the other board)
peer_mac = b'\xff\xff\xff\xff\xff\xff' # Example peer MAC for unicast
# Add peer for unicast reliability
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
# Stats tracking
last_stats_time = time.time()
stats_interval = 10 # Print stats every 10 seconds
# Async function to send messages
async def send_messages(e, peer):
message_count = 0
while True:
try:
message = f"Hello from ESP32 #{message_count}"
if await e.asend(peer, message, sync=True):
print(f"Sent message: {message}")
else:
print("Failed to send message")
message_count += 1
await asyncio.sleep(1) # Send every 1 second
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
# Async function to receive messages
async def receive_messages(e):
while True:
try:
async for mac, msg in e:
print(f"Received from {mac.hex()}: {msg.decode()}")
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(5)
# Async function to print stats periodically
async def print_stats(e):
global last_stats_time
while True:
if time.time() - last_stats_time >= stats_interval:
stats = e.stats()
print("\nESP-NOW Statistics:")
print(f" Packets Sent: {stats[0]}")
print(f" Packets Delivered: {stats[1]}")
print(f" Packets Dropped (TX): {stats[2]}")
print(f" Packets Received: {stats[3]}")
print(f" Packets Dropped (RX): {stats[4]}")
last_stats_time = time.time()
await asyncio.sleep(1) # Check every second
# Main async function
async def main(e, peer):
# Run send, receive, and stats tasks concurrently
await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
Kode berikut mengonfigurasi papan ESP32 sebagai penerima sekaligus pengirim (transceiver) menggunakan ESP-NOW. Pada bagian kode, Anda perlu memasukkan alamat MAC dari papan tujuan yang akan menerima data.
Sebagai contoh, jika alamat MAC papan penerima adalah `30:AE:A4:F6:7D:4C`, maka alamat tersebut harus dikonversi ke format bytes sebagai berikut:
- `30:AE:A4:F6:7D:4C` → `b'\x30\xae\xa4\xf6\x7d\x4c'`
# Peer MAC address
receiver_mac = b'\x30\xae\xa4\xf6\x7d\x4c'
Unggah kode ini ke kedua papan, namun pastikan Anda menyesuaikan alamat MAC sesuai perangkat tujuan.
Penjelasan Cara Kerja Kode
Untuk memahami alur program secara lebih mendalam, disarankan Anda terlebih dahulu memahami konsep pemrograman asinkron di MicroPython.
Mengimpor Modul
Langkah pertama adalah mengimpor modul-modul yang diperlukan.
import network
import aioespnow
import asyncio
import time
Inisialisasi Antarmuka Wi-Fi
Selanjutnya, kita perlu menginisialisasi modul Wi-Fi (meskipun tidak digunakan untuk koneksi internet) karena ESP-NOW memerlukan Wi-Fi untuk aktif. Antarmuka dapat dijalankan dalam mode station (STA_IF) maupun access point (AP_IF).
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel
sta.disconnect()
Inisialisasi ESP-NOW
Berikutnya, lakukan inisialisasi ESP-NOW. Buat instance `aioespnow` dengan nama `e`, kemudian aktifkan dengan memanggil metode `active()` dan memberikan argumen `True`.
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
Aktivasi ESP-NOW ditempatkan di dalam blok `try` dan `except` untuk menangani kemungkinan terjadinya error apabila proses inisialisasi gagal.
Menambahkan Peer ESP-NOW
Masukkan alamat MAC dari perangkat tujuan (papan ESP32 yang akan menerima data) sebagai peer ESP-NOW.
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'
Selanjutnya, tambahkan alamat MAC perangkat penerima sebagai peer dengan menggunakan metode `add_peer()`.
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
Fungsi untuk Mengirim Pesan ESP-NOW
Fungsi berikut digunakan untuk mengirim pesan melalui ESP-NOW ke perangkat peer. Fungsi ini menerima instance ESP-NOW `e` dan objek peer sebagai argumen. Pada contoh ini, pesan yang dikirim berupa teks “Hello from ESP32” yang diikuti dengan nilai penghitung (counter). Pengiriman dilakukan secara berkala setiap satu detik.
# Async function to send messages
async def send_messages(e, peer):
message_count = 0
while True:
try:
message = f"Hello from ESP32 #{message_count}"
if await e.asend(peer, message, sync=True):
print(f"Sent message: {message}")
else:
print("Failed to send message")
message_count += 1
await asyncio.sleep(1) # Send every 1 second
except OSError as err:
print("Send error:", err)
await asyncio.sleep(5)
Fungsi untuk Menerima Pesan ESP-NOW
Kita juga perlu membuat fungsi untuk menerima pesan yang dikirim melalui ESP-NOW. Saat pesan baru diterima, data tersebut akan ditampilkan pada jendela Shell MicroPython.
# Async function to receive messages
async def receive_messages(e):
while True:
try:
async for mac, msg in e:
print(f"Received from {mac.hex()}: {msg.decode()}")
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(5)
Mencetak Statistik ESP-NOW
Selanjutnya, kita membuat fungsi `print_stats()` yang akan dipanggil di bagian program berikutnya untuk menampilkan statistik paket ESP-NOW. Untuk memperoleh jumlah paket yang terkirim, diterima, maupun hilang, kita dapat memanggil metode `stats()` pada objek ESP-NOW `e`.
Metode tersebut akan mengembalikan nilai berupa 5-tuple yang berisi informasi jumlah paket yang dikirim, diterima, dan hilang.
(tx_pkts, tx_responses, tx_failures, rx_packets, rx_dropped_packets)
Berikut merupakan fungsi asinkron lengkap yang digunakan dalam proses ini.
async def print_stats(e):
global last_stats_time
while True:
if time.time() - last_stats_time >= stats_interval:
stats = e.stats()
print("\nESP-NOW Statistics:")
print(f" Packets Sent: {stats[0]}")
print(f" Packets Delivered: {stats[1]}")
print(f" Packets Dropped (TX): {stats[2]}")
print(f" Packets Received: {stats[3]}")
print(f" Packets Dropped (RX): {stats[4]}")
last_stats_time = time.time()
await asyncio.sleep(1) # Check every second
Fungsi Asinkron Utama
Baris berikut mendefinisikan fungsi asinkron `main()` yang menerima objek ESP-NOW `e` serta alamat MAC peer sebagai argumen.
async def main(e, peer):
Selanjutnya, kita menggunakan `asyncio.gather` untuk menjalankan tiga tugas asinkron secara bersamaan, yaitu:
- `send_messages(e, peer)`: mengirim data sensor BME280 ke perangkat peer;
- `receive_messages(e)`: memantau dan menerima data yang dikirim dari peer;
- `print_stats(e)`: menampilkan statistik ESP-NOW secara berkala.
await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))
Menjalankan Program Asinkron
Terakhir, baris berikut digunakan untuk mengeksekusi program secara asinkron.
# Run the async program
try:
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
Perintah `asyncio.run(main(e, peer_mac))` akan memulai event loop asyncio pada MicroPython dan mengeksekusi fungsi `main()`.
Pemanggilan ini ditempatkan di dalam blok `try–except` untuk menangani interupsi manual (KeyboardInterrupt). Saat interupsi terjadi, program akan menonaktifkan ESP-NOW serta mematikan modul Wi-Fi.
Menjalankan Kode
Setelah papan ESP32 terhubung ke komputer dan komunikasi dengan Thonny IDE berhasil, Anda dapat mengunggah kode sebagai main.py ke papan atau menjalankannya langsung menggunakan tombol Run berwarna hijau. Pastikan Anda telah memasukkan alamat MAC perangkat penerima pada bagian kode yang sesuai.
Program akan mulai menampilkan pesan pada jendela Shell. Pengiriman pesan akan gagal selama papan yang satu belum siap menerima komunikasi.
Buka instance Thonny IDE lainnya dan lakukan koneksi ke papan ESP32 yang satunya.
Mengaktifkan Dua Instance Thonny IDE
Masuk ke menu Tools > Options, kemudian nonaktifkan opsi Allow only single Thonny instance.
Salin kode yang sama ke papan kedua, namun pastikan Anda memasukkan alamat MAC dari papan pasangannya. Setelah beberapa detik, kedua perangkat akan mulai saling mengirim dan menerima pesan melalui ESP-NOW.
Cuplikan berikut menampilkan output Shell MicroPython dari masing-masing papan.
Setelah kedua papan dinyalakan, keduanya akan mulai saling bertukar pesan secara otomatis.
ESP32 ESP-NOW Komunikasi Dua Arah – Pertukaran Data Sensor dan Tampilan pada OLED
Pada bagian ini, kita akan membangun contoh aplikasi nyata di mana dua papan ESP32 saling bertukar data sensor dan menampilkan data yang diterima pada layar OLED.
Gambaran Proyek
Diagram berikut menunjukkan arsitektur tingkat tinggi dari proyek yang akan kita bangun.
Dalam proyek ini, kita menggunakan dua papan ESP32. Setiap papan terhubung dengan sebuah layar OLED dan sensor BME280.
- Masing-masing papan membaca nilai suhu, kelembapan, dan tekanan dari sensornya.
- Setiap papan mengirimkan data pembacaan tersebut ke papan lain melalui ESP-NOW.
- Ketika sebuah papan menerima data dari pasangannya, data tersebut akan ditampilkan pada layar OLED.
- Setelah mengirim data, papan akan menampilkan status pengiriman pada OLED, apakah pesan berhasil dikirim atau tidak.
- Masing-masing papan harus mengetahui alamat MAC pasangannya untuk dapat mengirimkan data.
Pada contoh ini, komunikasi dua arah dilakukan antara dua papan, namun konfigurasi dapat diperluas dengan menambahkan lebih banyak papan agar seluruh perangkat dapat saling berkomunikasi.
Komponen yang Diperlukan
Untuk mengikuti tutorial ini, Anda memerlukan:
- 2x papan ESP32
- 2x sensor BME280
- 2x layar OLED 0.96 inci
- Breadboard
- Kabel jumper
Diagram Skematik
Hubungkan masing-masing papan ESP32 dengan satu layar OLED dan satu sensor BME280. Ikuti diagram skematik berikut, dan sesuaikan jika Anda menggunakan varian ESP32 dengan konfigurasi pin yang berbeda.
Mengimpor Pustaka
Pustaka untuk mengendalikan layar OLED dan membaca data dari sensor BME280 tidak termasuk dalam paket standar MicroPython. Oleh karena itu, kita perlu menambahkan modul-modul tersebut ke papan ESP32.
Modul MicroPython `ssd1306.py`
1. Salin kode berikut ke file baru pada Thonny IDE.
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit
import time
import framebuf
# register definitions
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xa4)
SET_NORM_INV = const(0xa6)
SET_DISP = const(0xae)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xa0)
SET_MUX_RATIO = const(0xa8)
SET_COM_OUT_DIR = const(0xc0)
SET_DISP_OFFSET = const(0xd3)
SET_COM_PIN_CFG = const(0xda)
SET_DISP_CLK_DIV = const(0xd5)
SET_PRECHARGE = const(0xd9)
SET_VCOM_DESEL = const(0xdb)
SET_CHARGE_PUMP = const(0x8d)
class SSD1306:
def __init__(self, width, height, external_vcc):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.pages = self.height // 8
# Note the subclass must initialize self.framebuf to a framebuffer.
# This is necessary because the underlying data buffer is different
# between I2C and SPI implementations (I2C needs an extra byte).
self.poweron()
self.init_display()
def init_display(self):
for cmd in (
SET_DISP | 0x00, # off
# address setting
SET_MEM_ADDR, 0x00, # horizontal
# resolution and layout
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0
SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0
SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,
# timing and driving scheme
SET_DISP_CLK_DIV, 0x80,
SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,
SET_VCOM_DESEL, 0x30, # 0.83*Vcc
# display
SET_CONTRAST, 0xff, # maximum
SET_ENTIRE_ON, # output follows RAM contents
SET_NORM_INV, # not inverted
# charge pump
SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,
SET_DISP | 0x01): # on
self.write_cmd(cmd)
self.fill(0)
self.show()
def poweroff(self):
self.write_cmd(SET_DISP | 0x00)
def contrast(self, contrast):
self.write_cmd(SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(SET_NORM_INV | (invert & 1))
def show(self):
x0 = 0
x1 = self.width - 1
if self.width == 64:
# displays with width of 64 pixels are shifted by 32
x0 += 32
x1 += 32
self.write_cmd(SET_COL_ADDR)
self.write_cmd(x0)
self.write_cmd(x1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_framebuf()
def fill(self, col):
self.framebuf.fill(col)
def pixel(self, x, y, col):
self.framebuf.pixel(x, y, col)
def scroll(self, dx, dy):
self.framebuf.scroll(dx, dy)
def text(self, string, x, y, col=1):
self.framebuf.text(string, x, y, col)
class SSD1306_I2C(SSD1306):
def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):
self.i2c = i2c
self.addr = addr
self.temp = bytearray(2)
# Add an extra byte to the data buffer to hold an I2C data/command byte
# to use hardware-compatible I2C transactions. A memoryview of the
# buffer is used to mask this byte from the framebuffer operations
# (without a major memory hit as memoryview doesn't copy to a separate
# buffer).
self.buffer = bytearray(((height // 8) * width) + 1)
self.buffer[0] = 0x40 # Set first byte of data buffer to Co=0, D/C=1
self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_framebuf(self):
# Blast out the frame buffer using a single I2C transaction to support
# hardware I2C interfaces.
self.i2c.writeto(self.addr, self.buffer)
def poweron(self):
pass
class SSD1306_SPI(SSD1306):
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):
self.rate = 10 * 1024 * 1024
dc.init(dc.OUT, value=0)
res.init(res.OUT, value=0)
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
self.buffer = bytearray((height // 8) * width)
self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)
super().__init__(width, height, external_vcc)
def write_cmd(self, cmd):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.low()
self.cs.low()
self.spi.write(bytearray([cmd]))
self.cs.high()
def write_framebuf(self):
self.spi.init(baudrate=self.rate, polarity=0, phase=0)
self.cs.high()
self.dc.high()
self.cs.low()
self.spi.write(self.buffer)
self.cs.high()
def poweron(self):
self.res.high()
time.sleep_ms(1)
self.res.low()
time.sleep_ms(10)
self.res.high()
2. Buka menu File > Save As kemudian pilih MicroPython device sebagai lokasi penyimpanan.
3. Beri nama file tersebut ssd1306.py, lalu klik OK untuk menyimpannya ke dalam ESP filesystem.
Dengan langkah ini, pustaka telah berhasil diunggah ke papan Anda.
Modul MicroPython `BME280.py`
1. Salin kode berikut ke file baru pada Thonny IDE.
from machine import I2C
import time
# BME280 default address.
BME280_I2CADDR = 0x76
# Operating Modes
BME280_OSAMPLE_1 = 1
BME280_OSAMPLE_2 = 2
BME280_OSAMPLE_4 = 3
BME280_OSAMPLE_8 = 4
BME280_OSAMPLE_16 = 5
# BME280 Registers
BME280_REGISTER_DIG_T1 = 0x88 # Trimming parameter registers
BME280_REGISTER_DIG_T2 = 0x8A
BME280_REGISTER_DIG_T3 = 0x8C
BME280_REGISTER_DIG_P1 = 0x8E
BME280_REGISTER_DIG_P2 = 0x90
BME280_REGISTER_DIG_P3 = 0x92
BME280_REGISTER_DIG_P4 = 0x94
BME280_REGISTER_DIG_P5 = 0x96
BME280_REGISTER_DIG_P6 = 0x98
BME280_REGISTER_DIG_P7 = 0x9A
BME280_REGISTER_DIG_P8 = 0x9C
BME280_REGISTER_DIG_P9 = 0x9E
BME280_REGISTER_DIG_H1 = 0xA1
BME280_REGISTER_DIG_H2 = 0xE1
BME280_REGISTER_DIG_H3 = 0xE3
BME280_REGISTER_DIG_H4 = 0xE4
BME280_REGISTER_DIG_H5 = 0xE5
BME280_REGISTER_DIG_H6 = 0xE6
BME280_REGISTER_DIG_H7 = 0xE7
BME280_REGISTER_CHIPID = 0xD0
BME280_REGISTER_VERSION = 0xD1
BME280_REGISTER_SOFTRESET = 0xE0
BME280_REGISTER_CONTROL_HUM = 0xF2
BME280_REGISTER_CONTROL = 0xF4
BME280_REGISTER_CONFIG = 0xF5
BME280_REGISTER_PRESSURE_DATA = 0xF7
BME280_REGISTER_TEMP_DATA = 0xFA
BME280_REGISTER_HUMIDITY_DATA = 0xFD
class Device:
"""Class for communicating with an I2C device.
Allows reading and writing 8-bit, 16-bit, and byte array values to
registers on the device."""
def __init__(self, address, i2c):
"""Create an instance of the I2C device at the specified address using
the specified I2C interface object."""
self._address = address
self._i2c = i2c
def writeRaw8(self, value):
"""Write an 8-bit value on the bus (without register)."""
value = value & 0xFF
self._i2c.writeto(self._address, value)
def write8(self, register, value):
"""Write an 8-bit value to the specified register."""
b=bytearray(1)
b[0]=value & 0xFF
self._i2c.writeto_mem(self._address, register, b)
def write16(self, register, value):
"""Write a 16-bit value to the specified register."""
value = value & 0xFFFF
b=bytearray(2)
b[0]= value & 0xFF
b[1]= (value>>8) & 0xFF
self.i2c.writeto_mem(self._address, register, value)
def readRaw8(self):
"""Read an 8-bit value on the bus (without register)."""
return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF
def readU8(self, register):
"""Read an unsigned byte from the specified register."""
return int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF
def readS8(self, register):
"""Read a signed byte from the specified register."""
result = self.readU8(register)
if result > 127:
result -= 256
return result
def readU16(self, register, little_endian=True):
"""Read an unsigned 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = int.from_bytes(
self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF
if not little_endian:
result = ((result << 8) & 0xFF00) + (result >> 8)
return result
def readS16(self, register, little_endian=True):
"""Read a signed 16-bit value from the specified register, with the
specified endianness (default little endian, or least significant byte
first)."""
result = self.readU16(register, little_endian)
if result > 32767:
result -= 65536
return result
def readU16LE(self, register):
"""Read an unsigned 16-bit value from the specified register, in little
endian byte order."""
return self.readU16(register, little_endian=True)
def readU16BE(self, register):
"""Read an unsigned 16-bit value from the specified register, in big
endian byte order."""
return self.readU16(register, little_endian=False)
def readS16LE(self, register):
"""Read a signed 16-bit value from the specified register, in little
endian byte order."""
return self.readS16(register, little_endian=True)
def readS16BE(self, register):
"""Read a signed 16-bit value from the specified register, in big
endian byte order."""
return self.readS16(register, little_endian=False)
class BME280:
def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,
**kwargs):
# Check that mode is valid.
if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,
BME280_OSAMPLE_8, BME280_OSAMPLE_16]:
raise ValueError(
'Unexpected mode value {0}. Set mode to one of '
'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '
'BME280_ULTRAHIGHRES'.format(mode))
self._mode = mode
# Create I2C device.
if i2c is None:
raise ValueError('An I2C object is required.')
self._device = Device(address, i2c)
# Load calibration values.
self._load_calibration()
self._device.write8(BME280_REGISTER_CONTROL, 0x3F)
self.t_fine = 0
def _load_calibration(self):
self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)
self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)
self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)
self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)
self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)
self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)
self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)
self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)
self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)
self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)
self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)
self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)
self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)
self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)
self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)
self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)
h4 = self._device.readS8(BME280_REGISTER_DIG_H4)
h4 = (h4 << 24) >> 20
self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)
h5 = self._device.readS8(BME280_REGISTER_DIG_H6)
h5 = (h5 << 24) >> 20
self.dig_H5 = h5 | (
self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)
def read_raw_temp(self):
"""Reads the raw (uncompensated) temperature from the sensor."""
meas = self._mode
self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)
meas = self._mode << 5 | self._mode << 2 | 1
self._device.write8(BME280_REGISTER_CONTROL, meas)
sleep_time = 1250 + 2300 * (1 << self._mode)
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
sleep_time = sleep_time + 2300 * (1 << self._mode) + 575
time.sleep_us(sleep_time) # Wait the required time
msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)
lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_pressure(self):
"""Reads the raw (uncompensated) pressure level from the sensor."""
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)
lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)
xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)
raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4
return raw
def read_raw_humidity(self):
"""Assumes that the temperature has already been read """
"""i.e. that enough delay has been provided"""
msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)
lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)
raw = (msb << 8) | lsb
return raw
def read_temperature(self):
"""Get the compensated temperature in 0.01 of a degree celsius."""
adc = self.read_raw_temp()
var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)
var2 = ((
(((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *
self.dig_T3) >> 14
self.t_fine = var1 + var2
return (self.t_fine * 5 + 128) >> 8
def read_pressure(self):
"""Gets the compensated pressure in Pascals."""
adc = self.read_raw_pressure()
var1 = self.t_fine - 128000
var2 = var1 * var1 * self.dig_P6
var2 = var2 + ((var1 * self.dig_P5) << 17)
var2 = var2 + (self.dig_P4 << 35)
var1 = (((var1 * var1 * self.dig_P3) >> 8) +
((var1 * self.dig_P2) >> 12))
var1 = (((1 << 47) + var1) * self.dig_P1) >> 33
if var1 == 0:
return 0
p = 1048576 - adc
p = (((p << 31) - var2) * 3125) // var1
var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25
var2 = (self.dig_P8 * p) >> 19
return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)
def read_humidity(self):
adc = self.read_raw_humidity()
# print 'Raw humidity = {0:d}'.format (adc)
h = self.t_fine - 76800
h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +
16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *
self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *
self.dig_H2 + 8192) >> 14))
h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)
h = 0 if h < 0 else h
h = 419430400 if h > 419430400 else h
return h >> 12
@property
def temperature(self):
"Return the temperature in degrees."
t = self.read_temperature()
ti = t // 100
td = t - ti * 100
return "{}.{:02d}C".format(ti, td)
@property
def pressure(self):
"Return the temperature in hPa."
p = self.read_pressure() // 256
pi = p // 100
pd = p - pi * 100
return "{}.{:02d}hPa".format(pi, pd)
@property
def humidity(self):
"Return the humidity in percent."
h = self.read_humidity()
hi = h // 1024
hd = h * 100 // 1024 - hi * 100
return "{}.{:02d}%".format(hi, hd)
2. Buka menu File > Save As…
3. Pilih opsi penyimpanan MicroPython device.
4. Beri nama file BME280.py, kemudian tekan tombol OK.
Dengan langkah tersebut, pustaka telah berhasil diunggah ke papan ESP32 Anda.
Kode MicroPython: Pertukaran Data Sensor BME280 melalui ESP-NOW pada ESP32
Setelah seluruh modul yang diperlukan berhasil diunggah ke masing-masing papan, Anda dapat mengunggah kode berikut ke kedua papan ESP32.
Penting: pastikan Anda telah memasukkan alamat MAC perangkat penerima pada bagian kode yang sesuai.
# Rui Santos & Sara Santos - Random Nerd Tutorials
# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306
# Initialize I2C for BME280 and SSD1306
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
# Initialize BME280 sensor
try:
bme = BME280.BME280(i2c=i2c, address=0x76)
print("BME280 initialized")
except Exception as err:
print("Failed to initialize BME280:", err)
raise
# Initialize SSD1306 OLED display
try:
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # set Wi-Fi channel for more stable communication
sta.disconnect()
print("Wi-Fi initialized")
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\xff\xff\xff\xff\xff\xff'
# Add peer
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
# Variables to store readings and status
last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}
# Function to get BME280 readings
def get_readings():
try:
temp = float(bme.temperature[:-1]) # Remove 'C'
hum = float(bme.humidity[:-1]) # Remove '%'
pres = float(bme.pressure[:-3]) # Remove 'hPa'
print("BME280 readings:", temp, hum, pres)
return temp, hum, pres
except Exception as err:
print("Error reading BME280:", err)
return 0.0, 0.0, 0.0
# Function to update OLED display
def update_display():
try:
display.fill(0)
display.text("INCOM. READINGS", 0, 0)
display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
display.text(last_send_status, 0, 55)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
# Async function to send messages
async def send_messages(e, peer):
global last_send_status
while True:
try:
print("Sending data")
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
print("Sending JSON:", json_str)
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
# Async function to receive messages
async def receive_messages(e):
global incoming_readings
while True:
try:
print("Checking for messages")
async for mac, msg in e:
try:
# Decode bytes to string and parse JSON
json_str = msg.decode('utf-8')
data_dict = ujson.loads(json_str)
temp = data_dict['temp']
hum = data_dict['hum']
pres = data_dict['pres']
incoming_readings['temp'] = temp
incoming_readings['hum'] = hum
incoming_readings['pres'] = pres
print("\nINCOMING READINGS")
print("Temperature: {:.1f} ÂșC".format(temp))
print("Humidity: {:.1f} %".format(hum))
print("Pressure: {:.1f} hPa".format(pres))
update_display()
except (ValueError, KeyError) as err:
print("Error parsing JSON:", err)
await asyncio.sleep(0.01) # Yield if no received messages
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(0.1) # Shorter delay in case of error
# Main async function
async def main(e, peer):
print("Starting main loop")
await asyncio.gather(send_messages(e, peer), receive_messages(e))
# Run the async program
try:
print("Starting transceiver...")
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
except Exception as err:
print("Main loop error:", err)
finally:
print("Cleaning up...")
e.active(False)
sta.active(False)
Cara Kerja Kode
Berikut adalah penjelasan singkat mengenai alur kerja kode. Anda dapat melanjutkan ke bagian demonstrasi jika tidak memerlukan penjelasan detail.
Mengimpor Library
Kita mulai dengan mengimpor modul-modul yang diperlukan.
import network
import aioespnow
import asyncio
import time
import ujson
from machine import Pin, I2C
import BME280
import ssd1306
Komunikasi I2C
Inisialisasikan komunikasi I2C pada GPIO 22 dan GPIO 21. Sesuaikan konfigurasi jika Anda menggunakan pin yang berbeda.
i2c = I2C(0, scl=Pin(22), sda=Pin(21))
BME280
Inisialisasikan sensor BME280 untuk mulai mengambil data pembacaan.
# Initialize BME280 sensor
try:
bme = BME280.BME280(i2c=i2c, address=0x76)
print("BME280 initialized")
except Exception as err:
print("Failed to initialize BME280:", err)
raise
OLED Display
Inisialisasikan modul layar OLED agar siap menampilkan data.
# Initialize SSD1306 OLED display
try:
display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)
print("SSD1306 initialized")
except Exception as err:
print("Failed to initialize SSD1306:", err)
raise
Inisialisasi Antarmuka Wi-Fi
Selanjutnya, kita perlu menginisialisasi modul Wi-Fi (meskipun tidak digunakan untuk koneksi internet) karena ESP-NOW memerlukan Wi-Fi tetap aktif. Antarmuka dapat dijalankan dalam mode station (STA_IF) atau access point (AP_IF).
# Initialize Wi-Fi in station mode
sta = network.WLAN(network.STA_IF)
sta.active(True)
sta.config(channel=1) # Set channel
sta.disconnect()
Inisialisasi ESP-NOW
Selanjutnya, lakukan inisialisasi ESP-NOW. Buat sebuah instance `aioespnow` dengan nama `e`, kemudian aktifkan dengan memanggil metode `active()` dan memberikan argumen `True`.
# Initialize AIOESPNow
e = aioespnow.AIOESPNow()
try:
e.active(True)
print("AIOESPNow initialized")
except OSError as err:
print("Failed to initialize AIOESPNow:", err)
raise
Aktivasi ESP-NOW dilakukan di dalam blok `try` dan `except` untuk menangani kemungkinan error jika proses inisialisasi gagal.
Menambahkan Peer ESP-NOW
Masukkan alamat MAC perangkat peer (alamat MAC papan yang akan menerima data) untuk menambahkan peer ESP-NOW.
# Receiver MAC address (the board you want to send data to)
peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'
Selanjutnya, tambahkan alamat MAC perangkat penerima sebagai peer dengan menggunakan metode `add_peer()`.
try:
e.add_peer(peer_mac)
except OSError as err:
print("Failed to add peer:", err)
raise
Variabel
Buat variabel untuk menyimpan data sensor yang diterima serta status proses pengiriman. Data sensor yang masuk disimpan dalam struktur dictionary.
last_send_status = " "
incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}
Membaca Data Sensor BME280
Fungsi `get_readings()` mengembalikan nilai pembacaan sensor BME280 dalam urutan berikut: suhu, kelembapan, dan tekanan. Fungsi-fungsi dari pustaka BME280 mengembalikan hasil beserta karakter satuan, sehingga kita menghapus karakter tersebut dari akhir nilai pembacaan.
Jika terjadi kesalahan saat membaca sensor, fungsi ini akan mengembalikan nilai 0.0 untuk semua parameter.
# Function to get BME280 readings
def get_readings():
try:
temp = float(bme.temperature[:-1]) # Remove 'C'
hum = float(bme.humidity[:-1]) # Remove '%'
pres = float(bme.pressure[:-3]) # Remove 'hPa'
print("BME280 readings:", temp, hum, pres)
return temp, hum, pres
except Exception as err:
print("Error reading BME280:", err)
return 0.0, 0.0, 0.0
Memperbarui Tampilan OLED
Fungsi berikut bertugas memperbarui layar OLED dengan data pembacaan sensor terbaru yang diterima. Selain itu, fungsi ini juga menampilkan status pengiriman terakhir.
# Function to update OLED display
def update_display():
try:
display.fill(0)
display.text("INCOM. READINGS", 0, 0)
display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)
display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)
display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)
display.text(last_send_status, 0, 55)
display.show()
print("Display updated")
except Exception as err:
print("Error updating display:", err)
Fungsi send_message(e, peer) bertugas mengirimkan pesan ke peer yang ditentukan menggunakan instance ESP-NOW bernama e.
# Async function to send messages
async def send_messages(e, peer):
global last_send_status
while True:
try:
print("Sending data")
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
print("Sending JSON:", json_str)
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
Pada fungsi ini, kita memulai dengan memanggil `get_readings()` untuk memperoleh nilai suhu, kelembapan, dan tekanan terkini. Selanjutnya, data tersebut diperbarui ke dalam dictionary, kemudian dikonversi menjadi JSON string dan diubah lagi menjadi format bytes.
temp, hum, pres = get_readings()
# Create JSON string
data_dict = {"temp": temp, "hum": hum, "pres": pres}
json_str = ujson.dumps(data_dict)
data = json_str.encode('utf-8') # Convert to bytes
Selanjutnya, kita menggunakan prosedur yang sama seperti sebelumnya untuk mengirim data. Selama proses ini, layar OLED juga diperbarui untuk menampilkan hasil pengiriman data.
if await e.asend(peer, data, sync=True):
print("Sent with success")
last_send_status = "Delivery Success :)"
else:
print("Send failed")
last_send_status = "Delivery Fail :("
update_display()
print("Sending task complete")
await asyncio.sleep(10) # Send every 10 seconds
except OSError as err:
print("Send error:", err)
last_send_status = "Delivery Fail :("
update_display()
await asyncio.sleep(0.1) # Shorter delay in case of error
Menerima Data Masuk
Fungsi `receive_messages()` bertugas untuk menerima data yang dikirim dari papan ESP32 lain.
# Async function to receive messages
async def receive_messages(e):
global incoming_readings
while True:
try:
print("Checking for messages")
async for mac, msg in e:
try:
# Decode bytes to string and parse JSON
json_str = msg.decode('utf-8')
data_dict = ujson.loads(json_str)
temp = data_dict['temp']
hum = data_dict['hum']
pres = data_dict['pres']
incoming_readings['temp'] = temp
incoming_readings['hum'] = hum
incoming_readings['pres'] = pres
print("\nINCOMING READINGS")
print("Temperature: {:.1f} ÂșC".format(temp))
print("Humidity: {:.1f} %".format(hum))
print("Pressure: {:.1f} hPa".format(pres))
update_display()
except (ValueError, KeyError) as err:
print("Error parsing JSON:", err)
await asyncio.sleep(0.01) # Yield if no received messages
except OSError as err:
print("Receive error:", err)
await asyncio.sleep(0.1) # Shorter delay in case of error
Saat data diterima, kita mengonversinya menjadi JSON string, kemudian memperbarui dictionary `incoming_readings` dengan data yang diterima.
Selanjutnya, pembacaan tersebut ditampilkan di serial monitor dan fungsi `update_display()` dipanggil untuk memperbarui layar OLED dengan data sensor terbaru.
Membuat Fungsi Loop Asinkron dan Menjalankan Program
Terakhir, kita membuat fungsi loop asinkron utama (main) dan mengeksekusi program secara asinkron seperti pada contoh sebelumnya.
# Run the async program
try:
print("Starting transceiver...")
asyncio.run(main(e, peer_mac))
except KeyboardInterrupt:
print("Stopping transceiver...")
e.active(False)
sta.active(False)
except Exception as err:
print("Main loop error:", err)
finally:
print("Cleaning up...")
e.active(False)
sta.active(False)
Mengunggah Kode ke Papan ESP32
Catatan penting: menjalankan file langsung melalui Thonny tidak menyimpannya secara permanen ke filesystem papan. Artinya, jika papan dicabut dari komputer dan diberi daya, program tidak akan berjalan karena tidak ada file Python yang tersimpan di papan. Fungsi Run di Thonny berguna untuk pengujian, namun untuk menyimpan kode secara permanen, Anda harus membuat dan menyimpan file ke filesystem papan.
Untuk menjalankan kode pada papan tanpa terhubung ke komputer, unggah file tersebut ke filesystem papan dengan nama main.py.
Langkahnya: buka File > Save As…, lalu pilih MicroPython Device sebagai lokasi penyimpanan.
Beri nama file tersebut main.py dan simpan ke dalam filesystem papan.
Demonstrasi
Setelah kode diunggah ke kedua papan, layar OLED akan menampilkan data sensor yang diterima dari papan lainnya, sekaligus menampilkan pesan 'Delivery Success' sebagai tanda pengiriman berhasil.
Siap Untuk Membuat Proyek Impianmu Menjadi Kenyataan?
Klik di sini untuk chat langsung via WhatsApp dan dapatkan dukungan langsung dari tim ahli kami!






0 on: "Cara Membuat Komunikasi Dua Arah ESP32 Menggunakan ESP-NOW di MicroPython"