Blog Archive

Arduino Indonesia. Gambar tema oleh Storman. Diberdayakan oleh Blogger.

Supported by Electronics 3 in 1

1. Jasa pencetakan PCB single layer dengan harga paling murah.

(Metode Pembuatan dengan Transfer Toner)
>PCB design sendiri (siap cetak) : Rp.150,-/Cm2
>PCB design dari kami : Rp.250,-/Cm2

(Metode Sablon Full Masking dan Silk Screen minimal pemesanan 100 Pcs)
>PCB design sendiri (siap cetak) : Rp.200,-/Cm2
>PCB design dari kami : Rp.250,-/Cm2

2. Jasa perancangan, perakitan, dan pembuatan trainer pembelajaran elektronika untuk SMK dan Mahasiswa.

3. Jasa perancangan, perakitan, dan pembuatan berbagai macam kontroller, sensor, aktuator, dan tranduser.
>Design Rangkaian / Sistem Elektronika
>Design Rangkaian / Sistem Instrumentasi
>Design Rangkaian / Sistem Kendali
>Kerjasama Riset (data atau peralatan)
>Kerjasama Produksi Produk-Produk KIT Elektronika
>Produksi Instrumentasi Elektronika

4. Jasa Pembuatan Proyek, Tugas Akhir, Tugas Laboratorium, PKM, Karya Ilmiah, SKRIPSI, dll

Like My Facebook

Popular Posts

Rabu, 10 Desember 2025

Cara Membuat Komunikasi Dua Arah ESP32 Menggunakan ESP-NOW di MicroPython

Pelajari cara membangun komunikasi dua arah antara dua papan ESP32 dengan memanfaatkan protokol ESP-NOW. Pada tahap awal, kita akan menguji contoh sederhana untuk menunjukkan cara mengimplementasikan komunikasi dua arah. Selanjutnya, kita akan membuat proyek yang lebih kompleks, di mana kedua papan saling bertukar data sensor dan menampilkan hasilnya pada layar OLED.

Pengenalan ESP-NOW

ESP-NOW adalah protokol komunikasi nirkabel yang dikembangkan oleh Espressif, memungkinkan beberapa papan ESP32 atau ESP8266 saling bertukar data berukuran kecil tanpa perlu koneksi Wi-Fi atau Bluetooth. Meskipun tidak memerlukan koneksi Wi-Fi penuh, modul Wi-Fi tetap harus aktif sehingga protokol ini sangat cocok untuk aplikasi berdaya rendah dan berlatensi rendah seperti jaringan sensor, perangkat kontrol jarak jauh, atau pertukaran data antar-papan.

 

ESP-NOW menggunakan model komunikasi tanpa koneksi (connectionless), sehingga perangkat dapat mengirim dan menerima data tanpa terhubung ke router maupun membuat access point, berbeda dengan komunikasi HTTP antar perangkat. Protokol ini mendukung mode unicast (mengirim data ke perangkat tertentu menggunakan alamat MAC) dan broadcast (mengirim data ke seluruh perangkat di sekitar menggunakan alamat MAC broadcast).

Mendapatkan Alamat MAC pada ESP32

Untuk menggunakan ESP-NOW, setiap papan ESP32 harus saling mengetahui alamat MAC-nya. Untuk membaca alamat MAC perangkat, jalankan kode berikut pada Thonny IDE menggunakan papan ESP32 Anda.

 

# Rui Santos & Sara Santos - Random Nerd Tutorials

# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/

 

import network


wlan = network.WLAN(network.STA_IF)

wlan.active(True)


# Get MAC address (returns bytes)

mac = wlan.config('mac')


# Convert to human-readable format

mac_address = ':'.join('%02x' % b for b in mac)


print("MAC Address:", mac_address)

 

Setelah kode dijalankan, alamat MAC papan akan ditampilkan pada jendela Shell.

 


 

Setelah program dieksekusi, alamat MAC dari papan ESP32 akan muncul pada output di jendela Shell.



 

Kita akan menggunakan papan ESP32 DOIT V1 dan ESP32 S3 DevKitC, namun contoh ini kompatibel dengan hampir semua varian ESP32.

Komunikasi Dua Arah ESP-NOW Antar Dua Papan ESP32 (MicroPython)

Pada bagian ini, kita akan mendemonstrasikan contoh dasar pertukaran pesan sederhana antar papan ESP32 menggunakan protokol ESP-NOW. Setiap papan berperan sebagai penerima sekaligus pengirim, sehingga dapat dikategorikan sebagai transceiver.

 

Firmware MicroPython terbaru sudah menyertakan dua pustaka ESP-NOW bawaan, yaitu `espnow` dan `aioespnow`. Pada contoh ini, kita akan menggunakan `aioespnow`, yaitu versi asinkron dari pustaka `espnow`.

 


Salin kode berikut ke Thonny IDE untuk dijalankan pada papan ESP32.


import time


# Initialize Wi-Fi in station mode

sta = network.WLAN(network.STA_IF)

sta.active(True)

sta.config(channel=1)  # Set channel explicitly if packets are not received

sta.disconnect()


# Initialize AIOESPNow

e = aioespnow.AIOESPNow()

try:

    e.active(True)

except OSError as err:

    print("Failed to initialize AIOESPNow:", err)

    raise


# Peer MAC address (replace with the actual MAC of the other board)

peer_mac = b'\xff\xff\xff\xff\xff\xff'  # Example peer MAC for unicast


# Add peer for unicast reliability

try:

    e.add_peer(peer_mac)

except OSError as err:

    print("Failed to add peer:", err)

    raise


# Stats tracking

last_stats_time = time.time()

stats_interval = 10  # Print stats every 10 seconds


# Async function to send messages

async def send_messages(e, peer):

    message_count = 0

    while True:

        try:

            message = f"Hello from ESP32 #{message_count}"

            if await e.asend(peer, message, sync=True):

                print(f"Sent message: {message}")

            else:

                print("Failed to send message")

            message_count += 1

            await asyncio.sleep(1)  # Send every 1 second

        except OSError as err:

            print("Send error:", err)

            await asyncio.sleep(5)


# Async function to receive messages

async def receive_messages(e):

    while True:

        try:

            async for mac, msg in e:

                print(f"Received from {mac.hex()}: {msg.decode()}")

        except OSError as err:

            print("Receive error:", err)

            await asyncio.sleep(5)


# Async function to print stats periodically

async def print_stats(e):

    global last_stats_time

    while True:

        if time.time() - last_stats_time >= stats_interval:

            stats = e.stats()

            print("\nESP-NOW Statistics:")

            print(f"  Packets Sent: {stats[0]}")

            print(f"  Packets Delivered: {stats[1]}")

            print(f"  Packets Dropped (TX): {stats[2]}")

            print(f"  Packets Received: {stats[3]}")

            print(f"  Packets Dropped (RX): {stats[4]}")

            last_stats_time = time.time()

        await asyncio.sleep(1)  # Check every second


# Main async function

async def main(e, peer):

    # Run send, receive, and stats tasks concurrently

    await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))


# Run the async program

try:

    asyncio.run(main(e, peer_mac))

except KeyboardInterrupt:

    print("Stopping transceiver...")

    e.active(False)

    sta.active(False)

 

Kode berikut mengonfigurasi papan ESP32 sebagai penerima sekaligus pengirim (transceiver) menggunakan ESP-NOW. Pada bagian kode, Anda perlu memasukkan alamat MAC dari papan tujuan yang akan menerima data.

 

Sebagai contoh, jika alamat MAC papan penerima adalah `30:AE:A4:F6:7D:4C`, maka alamat tersebut harus dikonversi ke format bytes sebagai berikut:

- `30:AE:A4:F6:7D:4C` → `b'\x30\xae\xa4\xf6\x7d\x4c'`

 

# Peer MAC address

receiver_mac = b'\x30\xae\xa4\xf6\x7d\x4c'

 

Unggah kode ini ke kedua papan, namun pastikan Anda menyesuaikan alamat MAC sesuai perangkat tujuan.

Penjelasan Cara Kerja Kode

Untuk memahami alur program secara lebih mendalam, disarankan Anda terlebih dahulu memahami konsep pemrograman asinkron di MicroPython.

Mengimpor Modul

Langkah pertama adalah mengimpor modul-modul yang diperlukan.

 

import network

import aioespnow

import asyncio

import time

 

Inisialisasi Antarmuka Wi-Fi

Selanjutnya, kita perlu menginisialisasi modul Wi-Fi (meskipun tidak digunakan untuk koneksi internet) karena ESP-NOW memerlukan Wi-Fi untuk aktif. Antarmuka dapat dijalankan dalam mode station (STA_IF) maupun access point (AP_IF).

 

# Initialize Wi-Fi in station mode

sta = network.WLAN(network.STA_IF)

sta.active(True)

sta.config(channel=1)  # Set channel 

sta.disconnect()

 

Inisialisasi ESP-NOW

Berikutnya, lakukan inisialisasi ESP-NOW. Buat instance `aioespnow` dengan nama `e`, kemudian aktifkan dengan memanggil metode `active()` dan memberikan argumen `True`.

 

# Initialize AIOESPNow

e = aioespnow.AIOESPNow()

try:

    e.active(True)

    print("AIOESPNow initialized")

except OSError as err:

    print("Failed to initialize AIOESPNow:", err)

    raise

 

Aktivasi ESP-NOW ditempatkan di dalam blok `try` dan `except` untuk menangani kemungkinan terjadinya error apabila proses inisialisasi gagal.

Menambahkan Peer ESP-NOW

Masukkan alamat MAC dari perangkat tujuan (papan ESP32 yang akan menerima data) sebagai peer ESP-NOW.

 

# Receiver MAC address (the board you want to send data to)

peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'

 

Selanjutnya, tambahkan alamat MAC perangkat penerima sebagai peer dengan menggunakan metode `add_peer()`.

 

try:

    e.add_peer(peer_mac)

except OSError as err:

    print("Failed to add peer:", err)

    raise

 

Fungsi untuk Mengirim Pesan ESP-NOW

Fungsi berikut digunakan untuk mengirim pesan melalui ESP-NOW ke perangkat peer. Fungsi ini menerima instance ESP-NOW `e` dan objek peer sebagai argumen. Pada contoh ini, pesan yang dikirim berupa teks “Hello from ESP32” yang diikuti dengan nilai penghitung (counter). Pengiriman dilakukan secara berkala setiap satu detik.

 

# Async function to send messages

async def send_messages(e, peer):

    message_count = 0

    while True:

        try:

            message = f"Hello from ESP32 #{message_count}"

            if await e.asend(peer, message, sync=True):

                print(f"Sent message: {message}")

            else:

                print("Failed to send message")

            message_count += 1

            await asyncio.sleep(1)  # Send every 1 second

        except OSError as err:

            print("Send error:", err)

            await asyncio.sleep(5)

 

Fungsi untuk Menerima Pesan ESP-NOW

Kita juga perlu membuat fungsi untuk menerima pesan yang dikirim melalui ESP-NOW. Saat pesan baru diterima, data tersebut akan ditampilkan pada jendela Shell MicroPython.

 

# Async function to receive messages

async def receive_messages(e):

    while True:

        try:

            async for mac, msg in e:

                print(f"Received from {mac.hex()}: {msg.decode()}")

        except OSError as err:

            print("Receive error:", err)

            await asyncio.sleep(5)

 

Mencetak Statistik ESP-NOW

Selanjutnya, kita membuat fungsi `print_stats()` yang akan dipanggil di bagian program berikutnya untuk menampilkan statistik paket ESP-NOW. Untuk memperoleh jumlah paket yang terkirim, diterima, maupun hilang, kita dapat memanggil metode `stats()` pada objek ESP-NOW `e`.

 

Metode tersebut akan mengembalikan nilai berupa 5-tuple yang berisi informasi jumlah paket yang dikirim, diterima, dan hilang.

 

(tx_pkts, tx_responses, tx_failures, rx_packets, rx_dropped_packets)

 

Berikut merupakan fungsi asinkron lengkap yang digunakan dalam proses ini.

 

async def print_stats(e):

    global last_stats_time

    while True:

        if time.time() - last_stats_time >= stats_interval:

            stats = e.stats()

            print("\nESP-NOW Statistics:")

            print(f"  Packets Sent: {stats[0]}")

            print(f"  Packets Delivered: {stats[1]}")

            print(f"  Packets Dropped (TX): {stats[2]}")

            print(f"  Packets Received: {stats[3]}")

            print(f"  Packets Dropped (RX): {stats[4]}")

            last_stats_time = time.time()

        await asyncio.sleep(1)  # Check every second

 

Fungsi Asinkron Utama

Baris berikut mendefinisikan fungsi asinkron `main()` yang menerima objek ESP-NOW `e` serta alamat MAC peer sebagai argumen.

 

async def main(e, peer):

 

Selanjutnya, kita menggunakan `asyncio.gather` untuk menjalankan tiga tugas asinkron secara bersamaan, yaitu:

- `send_messages(e, peer)`: mengirim data sensor BME280 ke perangkat peer;

- `receive_messages(e)`: memantau dan menerima data yang dikirim dari peer;

- `print_stats(e)`: menampilkan statistik ESP-NOW secara berkala.

 

await asyncio.gather(send_messages(e, peer), receive_messages(e), print_stats(e))

 

Menjalankan Program Asinkron

Terakhir, baris berikut digunakan untuk mengeksekusi program secara asinkron.

 

# Run the async program

try:

    asyncio.run(main(e, peer_mac))

except KeyboardInterrupt:

    print("Stopping transceiver...")

    e.active(False)

    sta.active(False)

 

Perintah `asyncio.run(main(e, peer_mac))` akan memulai event loop asyncio pada MicroPython dan mengeksekusi fungsi `main()`.

 

Pemanggilan ini ditempatkan di dalam blok `try–except` untuk menangani interupsi manual (KeyboardInterrupt). Saat interupsi terjadi, program akan menonaktifkan ESP-NOW serta mematikan modul Wi-Fi. 

Menjalankan Kode

Setelah papan ESP32 terhubung ke komputer dan komunikasi dengan Thonny IDE berhasil, Anda dapat mengunggah kode sebagai main.py ke papan atau menjalankannya langsung menggunakan tombol Run berwarna hijau. Pastikan Anda telah memasukkan alamat MAC perangkat penerima pada bagian kode yang sesuai.

 


Program akan mulai menampilkan pesan pada jendela Shell. Pengiriman pesan akan gagal selama papan yang satu belum siap menerima komunikasi.

Buka instance Thonny IDE lainnya dan lakukan koneksi ke papan ESP32 yang satunya.

Mengaktifkan Dua Instance Thonny IDE

Masuk ke menu Tools > Options, kemudian nonaktifkan opsi Allow only single Thonny instance.

Salin kode yang sama ke papan kedua, namun pastikan Anda memasukkan alamat MAC dari papan pasangannya. Setelah beberapa detik, kedua perangkat akan mulai saling mengirim dan menerima pesan melalui ESP-NOW.

 


Cuplikan berikut menampilkan output Shell MicroPython dari masing-masing papan.



Setelah kedua papan dinyalakan, keduanya akan mulai saling bertukar pesan secara otomatis.

ESP32 ESP-NOW Komunikasi Dua Arah – Pertukaran Data Sensor dan Tampilan pada OLED

Pada bagian ini, kita akan membangun contoh aplikasi nyata di mana dua papan ESP32 saling bertukar data sensor dan menampilkan data yang diterima pada layar OLED.

 


Gambaran Proyek

Diagram berikut menunjukkan arsitektur tingkat tinggi dari proyek yang akan kita bangun.

 


Dalam proyek ini, kita menggunakan dua papan ESP32. Setiap papan terhubung dengan sebuah layar OLED dan sensor BME280.

- Masing-masing papan membaca nilai suhu, kelembapan, dan tekanan dari sensornya.

- Setiap papan mengirimkan data pembacaan tersebut ke papan lain melalui ESP-NOW.

- Ketika sebuah papan menerima data dari pasangannya, data tersebut akan ditampilkan pada layar OLED.

- Setelah mengirim data, papan akan menampilkan status pengiriman pada OLED, apakah pesan berhasil dikirim atau tidak.

- Masing-masing papan harus mengetahui alamat MAC pasangannya untuk dapat mengirimkan data.

Pada contoh ini, komunikasi dua arah dilakukan antara dua papan, namun konfigurasi dapat diperluas dengan menambahkan lebih banyak papan agar seluruh perangkat dapat saling berkomunikasi.

Komponen yang Diperlukan

Untuk mengikuti tutorial ini, Anda memerlukan:

- 2x papan ESP32

- 2x sensor BME280

- 2x layar OLED 0.96 inci

- Breadboard

- Kabel jumper

Diagram Skematik

Hubungkan masing-masing papan ESP32 dengan satu layar OLED dan satu sensor BME280. Ikuti diagram skematik berikut, dan sesuaikan jika Anda menggunakan varian ESP32 dengan konfigurasi pin yang berbeda.

 


Mengimpor Pustaka

Pustaka untuk mengendalikan layar OLED dan membaca data dari sensor BME280 tidak termasuk dalam paket standar MicroPython. Oleh karena itu, kita perlu menambahkan modul-modul tersebut ke papan ESP32.

Modul MicroPython `ssd1306.py`

1. Salin kode berikut ke file baru pada Thonny IDE.

 

# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit


import time

import framebuf


# register definitions

SET_CONTRAST        = const(0x81)

SET_ENTIRE_ON       = const(0xa4)

SET_NORM_INV        = const(0xa6)

SET_DISP            = const(0xae)

SET_MEM_ADDR        = const(0x20)

SET_COL_ADDR        = const(0x21)

SET_PAGE_ADDR       = const(0x22)

SET_DISP_START_LINE = const(0x40)

SET_SEG_REMAP       = const(0xa0)

SET_MUX_RATIO       = const(0xa8)

SET_COM_OUT_DIR     = const(0xc0)

SET_DISP_OFFSET     = const(0xd3)

SET_COM_PIN_CFG     = const(0xda)

SET_DISP_CLK_DIV    = const(0xd5)

SET_PRECHARGE       = const(0xd9)

SET_VCOM_DESEL      = const(0xdb)

SET_CHARGE_PUMP     = const(0x8d)



class SSD1306:

    def __init__(self, width, height, external_vcc):

        self.width = width

        self.height = height

        self.external_vcc = external_vcc

        self.pages = self.height // 8

        # Note the subclass must initialize self.framebuf to a framebuffer.

        # This is necessary because the underlying data buffer is different

        # between I2C and SPI implementations (I2C needs an extra byte).

        self.poweron()

        self.init_display()


    def init_display(self):

        for cmd in (

            SET_DISP | 0x00, # off

            # address setting

            SET_MEM_ADDR, 0x00, # horizontal

            # resolution and layout

            SET_DISP_START_LINE | 0x00,

            SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0

            SET_MUX_RATIO, self.height - 1,

            SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0

            SET_DISP_OFFSET, 0x00,

            SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,

            # timing and driving scheme

            SET_DISP_CLK_DIV, 0x80,

            SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,

            SET_VCOM_DESEL, 0x30, # 0.83*Vcc

            # display

            SET_CONTRAST, 0xff, # maximum

            SET_ENTIRE_ON, # output follows RAM contents

            SET_NORM_INV, # not inverted

            # charge pump

            SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,

            SET_DISP | 0x01): # on

            self.write_cmd(cmd)

        self.fill(0)

        self.show()


    def poweroff(self):

        self.write_cmd(SET_DISP | 0x00)


    def contrast(self, contrast):

        self.write_cmd(SET_CONTRAST)

        self.write_cmd(contrast)


    def invert(self, invert):

        self.write_cmd(SET_NORM_INV | (invert & 1))


    def show(self):

        x0 = 0

        x1 = self.width - 1

        if self.width == 64:

            # displays with width of 64 pixels are shifted by 32

            x0 += 32

            x1 += 32

        self.write_cmd(SET_COL_ADDR)

        self.write_cmd(x0)

        self.write_cmd(x1)

        self.write_cmd(SET_PAGE_ADDR)

        self.write_cmd(0)

        self.write_cmd(self.pages - 1)

        self.write_framebuf()


    def fill(self, col):

        self.framebuf.fill(col)


    def pixel(self, x, y, col):

        self.framebuf.pixel(x, y, col)


    def scroll(self, dx, dy):

        self.framebuf.scroll(dx, dy)


    def text(self, string, x, y, col=1):

        self.framebuf.text(string, x, y, col)



class SSD1306_I2C(SSD1306):

    def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):

        self.i2c = i2c

        self.addr = addr

        self.temp = bytearray(2)

        # Add an extra byte to the data buffer to hold an I2C data/command byte

        # to use hardware-compatible I2C transactions.  A memoryview of the

        # buffer is used to mask this byte from the framebuffer operations

        # (without a major memory hit as memoryview doesn't copy to a separate

        # buffer).

        self.buffer = bytearray(((height // 8) * width) + 1)

        self.buffer[0] = 0x40  # Set first byte of data buffer to Co=0, D/C=1

        self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)

        super().__init__(width, height, external_vcc)


    def write_cmd(self, cmd):

        self.temp[0] = 0x80 # Co=1, D/C#=0

        self.temp[1] = cmd

        self.i2c.writeto(self.addr, self.temp)


    def write_framebuf(self):

        # Blast out the frame buffer using a single I2C transaction to support

        # hardware I2C interfaces.

        self.i2c.writeto(self.addr, self.buffer)


    def poweron(self):

        pass



class SSD1306_SPI(SSD1306):

    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):

        self.rate = 10 * 1024 * 1024

        dc.init(dc.OUT, value=0)

        res.init(res.OUT, value=0)

        cs.init(cs.OUT, value=1)

        self.spi = spi

        self.dc = dc

        self.res = res

        self.cs = cs

        self.buffer = bytearray((height // 8) * width)

        self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)

        super().__init__(width, height, external_vcc)


    def write_cmd(self, cmd):

        self.spi.init(baudrate=self.rate, polarity=0, phase=0)

        self.cs.high()

        self.dc.low()

        self.cs.low()

        self.spi.write(bytearray([cmd]))

        self.cs.high()


    def write_framebuf(self):

        self.spi.init(baudrate=self.rate, polarity=0, phase=0)

        self.cs.high()

        self.dc.high()

        self.cs.low()

        self.spi.write(self.buffer)

        self.cs.high()


    def poweron(self):

        self.res.high()

        time.sleep_ms(1)

        self.res.low()

        time.sleep_ms(10)

        self.res.high()

 

2. Buka menu File > Save As kemudian pilih MicroPython device sebagai lokasi penyimpanan.

 


3. Beri nama file tersebut ssd1306.py, lalu klik OK untuk menyimpannya ke dalam ESP filesystem.

Dengan langkah ini, pustaka telah berhasil diunggah ke papan Anda.

Modul MicroPython `BME280.py`

1. Salin kode berikut ke file baru pada Thonny IDE.

 

from machine import I2C

import time


# BME280 default address.

BME280_I2CADDR = 0x76


# Operating Modes

BME280_OSAMPLE_1 = 1

BME280_OSAMPLE_2 = 2

BME280_OSAMPLE_4 = 3

BME280_OSAMPLE_8 = 4

BME280_OSAMPLE_16 = 5


# BME280 Registers


BME280_REGISTER_DIG_T1 = 0x88  # Trimming parameter registers

BME280_REGISTER_DIG_T2 = 0x8A

BME280_REGISTER_DIG_T3 = 0x8C


BME280_REGISTER_DIG_P1 = 0x8E

BME280_REGISTER_DIG_P2 = 0x90

BME280_REGISTER_DIG_P3 = 0x92

BME280_REGISTER_DIG_P4 = 0x94

BME280_REGISTER_DIG_P5 = 0x96

BME280_REGISTER_DIG_P6 = 0x98

BME280_REGISTER_DIG_P7 = 0x9A

BME280_REGISTER_DIG_P8 = 0x9C

BME280_REGISTER_DIG_P9 = 0x9E


BME280_REGISTER_DIG_H1 = 0xA1

BME280_REGISTER_DIG_H2 = 0xE1

BME280_REGISTER_DIG_H3 = 0xE3

BME280_REGISTER_DIG_H4 = 0xE4

BME280_REGISTER_DIG_H5 = 0xE5

BME280_REGISTER_DIG_H6 = 0xE6

BME280_REGISTER_DIG_H7 = 0xE7


BME280_REGISTER_CHIPID = 0xD0

BME280_REGISTER_VERSION = 0xD1

BME280_REGISTER_SOFTRESET = 0xE0


BME280_REGISTER_CONTROL_HUM = 0xF2

BME280_REGISTER_CONTROL = 0xF4

BME280_REGISTER_CONFIG = 0xF5

BME280_REGISTER_PRESSURE_DATA = 0xF7

BME280_REGISTER_TEMP_DATA = 0xFA

BME280_REGISTER_HUMIDITY_DATA = 0xFD



class Device:

  """Class for communicating with an I2C device.


  Allows reading and writing 8-bit, 16-bit, and byte array values to

  registers on the device."""


  def __init__(self, address, i2c):

    """Create an instance of the I2C device at the specified address using

    the specified I2C interface object."""

    self._address = address

    self._i2c = i2c


  def writeRaw8(self, value):

    """Write an 8-bit value on the bus (without register)."""

    value = value & 0xFF

    self._i2c.writeto(self._address, value)


  def write8(self, register, value):

    """Write an 8-bit value to the specified register."""

    b=bytearray(1)

    b[0]=value & 0xFF

    self._i2c.writeto_mem(self._address, register, b)


  def write16(self, register, value):

    """Write a 16-bit value to the specified register."""

    value = value & 0xFFFF

    b=bytearray(2)

    b[0]= value & 0xFF

    b[1]= (value>>8) & 0xFF

    self.i2c.writeto_mem(self._address, register, value)


  def readRaw8(self):

    """Read an 8-bit value on the bus (without register)."""

    return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF


  def readU8(self, register):

    """Read an unsigned byte from the specified register."""

    return int.from_bytes(

        self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF


  def readS8(self, register):

    """Read a signed byte from the specified register."""

    result = self.readU8(register)

    if result > 127:

      result -= 256

    return result


  def readU16(self, register, little_endian=True):

    """Read an unsigned 16-bit value from the specified register, with the

    specified endianness (default little endian, or least significant byte

    first)."""

    result = int.from_bytes(

        self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF

    if not little_endian:

      result = ((result << 8) & 0xFF00) + (result >> 8)

    return result


  def readS16(self, register, little_endian=True):

    """Read a signed 16-bit value from the specified register, with the

    specified endianness (default little endian, or least significant byte

    first)."""

    result = self.readU16(register, little_endian)

    if result > 32767:

      result -= 65536

    return result


  def readU16LE(self, register):

    """Read an unsigned 16-bit value from the specified register, in little

    endian byte order."""

    return self.readU16(register, little_endian=True)


  def readU16BE(self, register):

    """Read an unsigned 16-bit value from the specified register, in big

    endian byte order."""

    return self.readU16(register, little_endian=False)


  def readS16LE(self, register):

    """Read a signed 16-bit value from the specified register, in little

    endian byte order."""

    return self.readS16(register, little_endian=True)


  def readS16BE(self, register):

    """Read a signed 16-bit value from the specified register, in big

    endian byte order."""

    return self.readS16(register, little_endian=False)



class BME280:

  def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,

               **kwargs):

    # Check that mode is valid.

    if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,

                    BME280_OSAMPLE_8, BME280_OSAMPLE_16]:

        raise ValueError(

            'Unexpected mode value {0}. Set mode to one of '

            'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '

            'BME280_ULTRAHIGHRES'.format(mode))

    self._mode = mode

    # Create I2C device.

    if i2c is None:

      raise ValueError('An I2C object is required.')

    self._device = Device(address, i2c)

    # Load calibration values.

    self._load_calibration()

    self._device.write8(BME280_REGISTER_CONTROL, 0x3F)

    self.t_fine = 0


  def _load_calibration(self):


    self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)

    self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)

    self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)


    self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)

    self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)

    self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)

    self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)

    self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)

    self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)

    self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)

    self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)

    self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)


    self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)

    self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)

    self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)

    self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)


    h4 = self._device.readS8(BME280_REGISTER_DIG_H4)

    h4 = (h4 << 24) >> 20

    self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)


    h5 = self._device.readS8(BME280_REGISTER_DIG_H6)

    h5 = (h5 << 24) >> 20

    self.dig_H5 = h5 | (

        self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)


  def read_raw_temp(self):

    """Reads the raw (uncompensated) temperature from the sensor."""

    meas = self._mode

    self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)

    meas = self._mode << 5 | self._mode << 2 | 1

    self._device.write8(BME280_REGISTER_CONTROL, meas)

    sleep_time = 1250 + 2300 * (1 << self._mode)


    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575

    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575

    time.sleep_us(sleep_time)  # Wait the required time

    msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)

    lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)

    xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)

    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4

    return raw


  def read_raw_pressure(self):

    """Reads the raw (uncompensated) pressure level from the sensor."""

    """Assumes that the temperature has already been read """

    """i.e. that enough delay has been provided"""

    msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)

    lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)

    xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)

    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4

    return raw


  def read_raw_humidity(self):

    """Assumes that the temperature has already been read """

    """i.e. that enough delay has been provided"""

    msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)

    lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)

    raw = (msb << 8) | lsb

    return raw


  def read_temperature(self):

    """Get the compensated temperature in 0.01 of a degree celsius."""

    adc = self.read_raw_temp()

    var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)

    var2 = ((

        (((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *

        self.dig_T3) >> 14

    self.t_fine = var1 + var2

    return (self.t_fine * 5 + 128) >> 8


  def read_pressure(self):

    """Gets the compensated pressure in Pascals."""

    adc = self.read_raw_pressure()

    var1 = self.t_fine - 128000

    var2 = var1 * var1 * self.dig_P6

    var2 = var2 + ((var1 * self.dig_P5) << 17)

    var2 = var2 + (self.dig_P4 << 35)

    var1 = (((var1 * var1 * self.dig_P3) >> 8) +

            ((var1 * self.dig_P2) >> 12))

    var1 = (((1 << 47) + var1) * self.dig_P1) >> 33

    if var1 == 0:

      return 0

    p = 1048576 - adc

    p = (((p << 31) - var2) * 3125) // var1

    var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25

    var2 = (self.dig_P8 * p) >> 19

    return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)


  def read_humidity(self):

    adc = self.read_raw_humidity()

    # print 'Raw humidity = {0:d}'.format (adc)

    h = self.t_fine - 76800

    h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +

         16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *

                          self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *

                          self.dig_H2 + 8192) >> 14))

    h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)

    h = 0 if h < 0 else h

    h = 419430400 if h > 419430400 else h

    return h >> 12


  @property

  def temperature(self):

    "Return the temperature in degrees."

    t = self.read_temperature()

    ti = t // 100

    td = t - ti * 100

    return "{}.{:02d}C".format(ti, td)


  @property

  def pressure(self):

    "Return the temperature in hPa."

    p = self.read_pressure() // 256

    pi = p // 100

    pd = p - pi * 100

    return "{}.{:02d}hPa".format(pi, pd)


  @property

  def humidity(self):

    "Return the humidity in percent."

    h = self.read_humidity()

    hi = h // 1024

    hd = h * 100 // 1024 - hi * 100

    return "{}.{:02d}%".format(hi, hd)

 

2. Buka menu File > Save As…



3. Pilih opsi penyimpanan MicroPython device.



4. Beri nama file BME280.py, kemudian tekan tombol OK.

 


Dengan langkah tersebut, pustaka telah berhasil diunggah ke papan ESP32 Anda.

Kode MicroPython: Pertukaran Data Sensor BME280 melalui ESP-NOW pada ESP32

Setelah seluruh modul yang diperlukan berhasil diunggah ke masing-masing papan, Anda dapat mengunggah kode berikut ke kedua papan ESP32.

 

Penting: pastikan Anda telah memasukkan alamat MAC perangkat penerima pada bagian kode yang sesuai.

 

# Rui Santos & Sara Santos - Random Nerd Tutorials

# Complete project details at https://RandomNerdTutorials.com/micropython-esp32-esp-now-two-way/


import network

import aioespnow

import asyncio

import time

import ujson

from machine import Pin, I2C

import BME280

import ssd1306


# Initialize I2C for BME280 and SSD1306

i2c = I2C(0, scl=Pin(22), sda=Pin(21))


# Initialize BME280 sensor

try:

    bme = BME280.BME280(i2c=i2c, address=0x76)

    print("BME280 initialized")

except Exception as err:

    print("Failed to initialize BME280:", err)

    raise


# Initialize SSD1306 OLED display

try:

    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)

    print("SSD1306 initialized")

except Exception as err:

    print("Failed to initialize SSD1306:", err)

    raise


# Initialize Wi-Fi in station mode

sta = network.WLAN(network.STA_IF)

sta.active(True)

sta.config(channel=1) # set Wi-Fi channel for more stable communication

sta.disconnect()

print("Wi-Fi initialized")


# Initialize AIOESPNow

e = aioespnow.AIOESPNow()

try:

    e.active(True)

    print("AIOESPNow initialized")

except OSError as err:

    print("Failed to initialize AIOESPNow:", err)

    raise


# Receiver MAC address (the board you want to send data to)

peer_mac = b'\xff\xff\xff\xff\xff\xff'


# Add peer

try:

    e.add_peer(peer_mac)

except OSError as err:

    print("Failed to add peer:", err)

    raise


# Variables to store readings and status

last_send_status = " "

incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}


# Function to get BME280 readings

def get_readings():

    try:

        temp = float(bme.temperature[:-1]) # Remove 'C'

        hum = float(bme.humidity[:-1])     # Remove '%'

        pres = float(bme.pressure[:-3])    # Remove 'hPa'

        print("BME280 readings:", temp, hum, pres)

        return temp, hum, pres

    except Exception as err:

        print("Error reading BME280:", err)

        return 0.0, 0.0, 0.0


# Function to update OLED display

def update_display():

    try:

        display.fill(0)

        display.text("INCOM. READINGS", 0, 0)

        display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)

        display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)

        display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)

        display.text(last_send_status, 0, 55)

        display.show()

        print("Display updated")

    except Exception as err:

        print("Error updating display:", err)


# Async function to send messages

async def send_messages(e, peer):

    global last_send_status

    while True:

        try:

            print("Sending data")

            temp, hum, pres = get_readings()

            # Create JSON string

            data_dict = {"temp": temp, "hum": hum, "pres": pres}

            json_str = ujson.dumps(data_dict)

            data = json_str.encode('utf-8')  # Convert to bytes

            print("Sending JSON:", json_str)

            if await e.asend(peer, data, sync=True):

                print("Sent with success")

                last_send_status = "Delivery Success :)"

            else:

                print("Send failed")

                last_send_status = "Delivery Fail :("

            update_display()

            print("Sending task complete")

            await asyncio.sleep(10)  # Send every 10 seconds

        except OSError as err:

            print("Send error:", err)

            last_send_status = "Delivery Fail :("

            update_display()

            await asyncio.sleep(0.1)  # Shorter delay in case of error


# Async function to receive messages

async def receive_messages(e):

    global incoming_readings

    while True:

        try:

            print("Checking for messages")

            async for mac, msg in e:

                try:

                    # Decode bytes to string and parse JSON

                    json_str = msg.decode('utf-8')

                    data_dict = ujson.loads(json_str)

                    temp = data_dict['temp']

                    hum = data_dict['hum']

                    pres = data_dict['pres']

                    incoming_readings['temp'] = temp

                    incoming_readings['hum'] = hum

                    incoming_readings['pres'] = pres

                    print("\nINCOMING READINGS")

                    print("Temperature: {:.1f} ÂșC".format(temp))

                    print("Humidity: {:.1f} %".format(hum))

                    print("Pressure: {:.1f} hPa".format(pres))

                    update_display()

                except (ValueError, KeyError) as err:

                    print("Error parsing JSON:", err)

            await asyncio.sleep(0.01)  # Yield if no received messages

        except OSError as err:

            print("Receive error:", err)

            await asyncio.sleep(0.1)  # Shorter delay in case of error


# Main async function

async def main(e, peer):

    print("Starting main loop")

    await asyncio.gather(send_messages(e, peer), receive_messages(e))


# Run the async program

try:

    print("Starting transceiver...")

    asyncio.run(main(e, peer_mac))

except KeyboardInterrupt:

    print("Stopping transceiver...")

    e.active(False)

    sta.active(False)

except Exception as err:

    print("Main loop error:", err)

finally:

    print("Cleaning up...")

    e.active(False)

    sta.active(False)

 

Cara Kerja Kode

Berikut adalah penjelasan singkat mengenai alur kerja kode. Anda dapat melanjutkan ke bagian demonstrasi jika tidak memerlukan penjelasan detail.

Mengimpor Library

Kita mulai dengan mengimpor modul-modul yang diperlukan.

 

import network

import aioespnow

import asyncio

import time

import ujson

from machine import Pin, I2C

import BME280

import ssd1306

Komunikasi I2C

Inisialisasikan komunikasi I2C pada GPIO 22 dan GPIO 21. Sesuaikan konfigurasi jika Anda menggunakan pin yang berbeda.

 

i2c = I2C(0, scl=Pin(22), sda=Pin(21))

 

BME280

Inisialisasikan sensor BME280 untuk mulai mengambil data pembacaan.

 

# Initialize BME280 sensor

try:

    bme = BME280.BME280(i2c=i2c, address=0x76)

    print("BME280 initialized")

except Exception as err:

    print("Failed to initialize BME280:", err)

    raise

 

OLED Display

Inisialisasikan modul layar OLED agar siap menampilkan data.

 

# Initialize SSD1306 OLED display

try:

    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)

    print("SSD1306 initialized")

except Exception as err:

    print("Failed to initialize SSD1306:", err)

    raise

 

Inisialisasi Antarmuka Wi-Fi

Selanjutnya, kita perlu menginisialisasi modul Wi-Fi (meskipun tidak digunakan untuk koneksi internet) karena ESP-NOW memerlukan Wi-Fi tetap aktif. Antarmuka dapat dijalankan dalam mode station (STA_IF) atau access point (AP_IF).

 

# Initialize Wi-Fi in station mode

sta = network.WLAN(network.STA_IF)

sta.active(True)

sta.config(channel=1)  # Set channel 

sta.disconnect()

 

Inisialisasi ESP-NOW

Selanjutnya, lakukan inisialisasi ESP-NOW. Buat sebuah instance `aioespnow` dengan nama `e`, kemudian aktifkan dengan memanggil metode `active()` dan memberikan argumen `True`.

 

# Initialize AIOESPNow

e = aioespnow.AIOESPNow()

try:

    e.active(True)

    print("AIOESPNow initialized")

except OSError as err:

    print("Failed to initialize AIOESPNow:", err)

    raise

 

Aktivasi ESP-NOW dilakukan di dalam blok `try` dan `except` untuk menangani kemungkinan error jika proses inisialisasi gagal.

Menambahkan Peer ESP-NOW

Masukkan alamat MAC perangkat peer (alamat MAC papan yang akan menerima data) untuk menambahkan peer ESP-NOW.

 

# Receiver MAC address (the board you want to send data to)

peer_mac = b'\x68\xb6\xb3\x22\x9e\x60'

 

Selanjutnya, tambahkan alamat MAC perangkat penerima sebagai peer dengan menggunakan metode `add_peer()`.

 

try:

    e.add_peer(peer_mac)

except OSError as err:

    print("Failed to add peer:", err)

    raise

Variabel

Buat variabel untuk menyimpan data sensor yang diterima serta status proses pengiriman. Data sensor yang masuk disimpan dalam struktur dictionary.

 

last_send_status = " "

incoming_readings = {'temp': 0.0, 'hum': 0.0, 'pres': 0.0}

 

Membaca Data Sensor BME280

Fungsi `get_readings()` mengembalikan nilai pembacaan sensor BME280 dalam urutan berikut: suhu, kelembapan, dan tekanan. Fungsi-fungsi dari pustaka BME280 mengembalikan hasil beserta karakter satuan, sehingga kita menghapus karakter tersebut dari akhir nilai pembacaan.

 

Jika terjadi kesalahan saat membaca sensor, fungsi ini akan mengembalikan nilai 0.0 untuk semua parameter.

 

# Function to get BME280 readings

def get_readings():

    try:

        temp = float(bme.temperature[:-1]) # Remove 'C'

        hum = float(bme.humidity[:-1])     # Remove '%'

        pres = float(bme.pressure[:-3])    # Remove 'hPa'

        print("BME280 readings:", temp, hum, pres)

        return temp, hum, pres

    except Exception as err:

        print("Error reading BME280:", err)

        return 0.0, 0.0, 0.0

 

Memperbarui Tampilan OLED

Fungsi berikut bertugas memperbarui layar OLED dengan data pembacaan sensor terbaru yang diterima. Selain itu, fungsi ini juga menampilkan status pengiriman terakhir.

 

# Function to update OLED display

def update_display():

    try:

        display.fill(0)

        display.text("INCOM. READINGS", 0, 0)

        display.text("Temp: {:.1f} C".format(incoming_readings['temp']), 0, 15)

        display.text("Hum: {:.1f} %".format(incoming_readings['hum']), 0, 25)

        display.text("Pres: {:.1f} hPa".format(incoming_readings['pres']), 0, 35)

        display.text(last_send_status, 0, 55)

        display.show()

        print("Display updated")

    except Exception as err:

        print("Error updating display:", err)

 

Fungsi send_message(e, peer) bertugas mengirimkan pesan ke peer yang ditentukan menggunakan instance ESP-NOW bernama e.

 

# Async function to send messages

async def send_messages(e, peer):

    global last_send_status

    while True:

        try:

            print("Sending data")

            temp, hum, pres = get_readings()

            # Create JSON string

            data_dict = {"temp": temp, "hum": hum, "pres": pres}

            json_str = ujson.dumps(data_dict)

            data = json_str.encode('utf-8')  # Convert to bytes

            print("Sending JSON:", json_str)

            if await e.asend(peer, data, sync=True):

                print("Sent with success")

                last_send_status = "Delivery Success :)"

            else:

                print("Send failed")

                last_send_status = "Delivery Fail :("

            update_display()

            print("Sending task complete")

            await asyncio.sleep(10)  # Send every 10 seconds

        except OSError as err:

            print("Send error:", err)

            last_send_status = "Delivery Fail :("

            update_display()

            await asyncio.sleep(0.1)  # Shorter delay in case of error

 

Pada fungsi ini, kita memulai dengan memanggil `get_readings()` untuk memperoleh nilai suhu, kelembapan, dan tekanan terkini. Selanjutnya, data tersebut diperbarui ke dalam dictionary, kemudian dikonversi menjadi JSON string dan diubah lagi menjadi format bytes.

 

temp, hum, pres = get_readings()

# Create JSON string

data_dict = {"temp": temp, "hum": hum, "pres": pres}

json_str = ujson.dumps(data_dict)

data = json_str.encode('utf-8')  # Convert to bytes

 

Selanjutnya, kita menggunakan prosedur yang sama seperti sebelumnya untuk mengirim data. Selama proses ini, layar OLED juga diperbarui untuk menampilkan hasil pengiriman data.

 

if await e.asend(peer, data, sync=True):

    print("Sent with success")

    last_send_status = "Delivery Success :)"

else:

    print("Send failed")

    last_send_status = "Delivery Fail :("

    update_display()

    print("Sending task complete")

    await asyncio.sleep(10)  # Send every 10 seconds

except OSError as err:

    print("Send error:", err)

    last_send_status = "Delivery Fail :("

    update_display()

    await asyncio.sleep(0.1)  # Shorter delay in case of error

 

Menerima Data Masuk

Fungsi `receive_messages()` bertugas untuk menerima data yang dikirim dari papan ESP32 lain.

 

# Async function to receive messages

async def receive_messages(e):

    global incoming_readings

    while True:

        try:

            print("Checking for messages")

            async for mac, msg in e:

                try:

                    # Decode bytes to string and parse JSON

                    json_str = msg.decode('utf-8')

                    data_dict = ujson.loads(json_str)

                    temp = data_dict['temp']

                    hum = data_dict['hum']

                    pres = data_dict['pres']

                    incoming_readings['temp'] = temp

                    incoming_readings['hum'] = hum

                    incoming_readings['pres'] = pres

                    print("\nINCOMING READINGS")

                    print("Temperature: {:.1f} ÂșC".format(temp))

                    print("Humidity: {:.1f} %".format(hum))

                    print("Pressure: {:.1f} hPa".format(pres))

                    update_display()

                except (ValueError, KeyError) as err:

                    print("Error parsing JSON:", err)

            await asyncio.sleep(0.01)  # Yield if no received messages

        except OSError as err:

            print("Receive error:", err)

            await asyncio.sleep(0.1)  # Shorter delay in case of error

 

Saat data diterima, kita mengonversinya menjadi JSON string, kemudian memperbarui dictionary `incoming_readings` dengan data yang diterima.

 

Selanjutnya, pembacaan tersebut ditampilkan di serial monitor dan fungsi `update_display()` dipanggil untuk memperbarui layar OLED dengan data sensor terbaru.

Membuat Fungsi Loop Asinkron dan Menjalankan Program

Terakhir, kita membuat fungsi loop asinkron utama (main) dan mengeksekusi program secara asinkron seperti pada contoh sebelumnya.

 

# Run the async program

try:

    print("Starting transceiver...")

    asyncio.run(main(e, peer_mac))

except KeyboardInterrupt:

    print("Stopping transceiver...")

    e.active(False)

    sta.active(False)

except Exception as err:

    print("Main loop error:", err)

finally:

    print("Cleaning up...")

    e.active(False)

    sta.active(False)

 

Mengunggah Kode ke Papan ESP32

Catatan penting: menjalankan file langsung melalui Thonny tidak menyimpannya secara permanen ke filesystem papan. Artinya, jika papan dicabut dari komputer dan diberi daya, program tidak akan berjalan karena tidak ada file Python yang tersimpan di papan. Fungsi Run di Thonny berguna untuk pengujian, namun untuk menyimpan kode secara permanen, Anda harus membuat dan menyimpan file ke filesystem papan.

 

Untuk menjalankan kode pada papan tanpa terhubung ke komputer, unggah file tersebut ke filesystem papan dengan nama main.py.

Langkahnya: buka File > Save As…, lalu pilih MicroPython Device sebagai lokasi penyimpanan.

 


Beri nama file tersebut main.py dan simpan ke dalam filesystem papan.

 


Demonstrasi

Setelah kode diunggah ke kedua papan, layar OLED akan menampilkan data sensor yang diterima dari papan lainnya, sekaligus menampilkan pesan 'Delivery Success' sebagai tanda pengiriman berhasil.

 













Siap Untuk Membuat Proyek Impianmu Menjadi Kenyataan?

Klik di sini untuk chat langsung via WhatsApp dan dapatkan dukungan langsung dari tim ahli kami! 

 

0 on: "Cara Membuat Komunikasi Dua Arah ESP32 Menggunakan ESP-NOW di MicroPython"