Blog Archive

Arduino Indonesia. Gambar tema oleh Storman. Diberdayakan oleh Blogger.

Supported by Electronics 3 in 1

1. Jasa pencetakan PCB single layer dengan harga paling murah.

(Metode Pembuatan dengan Transfer Toner)
>PCB design sendiri (siap cetak) : Rp.150,-/Cm2
>PCB design dari kami : Rp.250,-/Cm2

(Metode Sablon Full Masking dan Silk Screen minimal pemesanan 100 Pcs)
>PCB design sendiri (siap cetak) : Rp.200,-/Cm2
>PCB design dari kami : Rp.250,-/Cm2

2. Jasa perancangan, perakitan, dan pembuatan trainer pembelajaran elektronika untuk SMK dan Mahasiswa.

3. Jasa perancangan, perakitan, dan pembuatan berbagai macam kontroller, sensor, aktuator, dan tranduser.
>Design Rangkaian / Sistem Elektronika
>Design Rangkaian / Sistem Instrumentasi
>Design Rangkaian / Sistem Kendali
>Kerjasama Riset (data atau peralatan)
>Kerjasama Produksi Produk-Produk KIT Elektronika
>Produksi Instrumentasi Elektronika

4. Jasa Pembuatan Proyek, Tugas Akhir, Tugas Laboratorium, PKM, Karya Ilmiah, SKRIPSI, dll

Like My Facebook

Popular Posts

Selasa, 09 Desember 2025

Implementasi ESP-NOW Many-to-One di ESP32 dengan MicroPython

Dalam panduan MicroPython ini, kita akan menunjukkan cara mengonfigurasi sebuah ESP32 agar dapat menerima dan menampilkan data dari beberapa board ESP32 menggunakan protokol komunikasi ESP-NOW dalam konfigurasi many-to-one.

 

Kita akan membangun sebuah proyek contoh di mana board pengirim mengirimkan data sensor dalam format JSON, dan satu board penerima mengumpulkan data tersebut serta menampilkannya pada OLED display. Tutorial ini menggunakan dua board pengirim sebagai contoh, namun konfigurasi dapat dengan mudah diperluas untuk menampung lebih banyak board.

Memperkenalkan ESP-NOW

ESP-NOW adalah protokol komunikasi nirkabel yang dikembangkan oleh Espressif, memungkinkan beberapa board ESP32 atau ESP8266 untuk bertukar data dalam jumlah kecil tanpa memerlukan koneksi Wi-Fi atau Bluetooth penuh. Meskipun Wi-Fi controller harus diaktifkan, ESP-NOW tidak membutuhkan koneksi Wi-Fi sepenuhnya, sehingga sangat cocok untuk aplikasi low-power dan low-latency seperti jaringan sensor, remote control, atau pertukaran data antar board.


ESP-NOW menggunakan model komunikasi tanpa koneksi, artinya perangkat dapat mengirim dan menerima data tanpa harus terhubung ke router atau membuat access point (berbeda dengan komunikasi HTTP antar board). ESP-NOW mendukung dua jenis pengiriman data:

- Unicast: mengirim data ke perangkat tertentu menggunakan alamat MAC-nya.

- Broadcast: mengirim data ke semua perangkat di sekitar menggunakan alamat MAC broadcast.

Baru mengenal ESP-NOW? Simak panduan dasar kami: MicroPython: ESP-NOW dengan ESP32 (Getting Started).

Gambaran Proyek

Tutorial ini menunjukkan cara mengonfigurasi sebuah board ESP32 agar dapat menerima data dari beberapa board ESP32 lain melalui protokol komunikasi ESP-NOW dalam konfigurasi many-to-one, seperti terlihat pada gambar berikut.



- Satu board ESP32 berperan sebagai penerima.

- Beberapa board ESP32 berperan sebagai pengirim. Pada contoh ini, kita menggunakan dua board pengirim, namun Anda dapat menambahkan lebih banyak board sesuai kebutuhan.

- Board ESP32 penerima akan menerima pesan dari semua pengirim dan mengenali board mana yang mengirim pesan tersebut.

- Sebagai contoh, kita akan bertukar data sensor BME280 antar board. Proyek ini dapat dimodifikasi untuk menggunakan sensor lain atau menukar jenis data lain sesuai kebutuhan.

Komponen yang Dibutuhkan:

- 3x (atau lebih) board ESP32

- 2x sensor BME280

- 1x OLED Display SSD1306

- Breadboard

- Kabel jumper

ESP32: Mendapatkan Alamat MAC Board

Untuk berkomunikasi menggunakan ESP-NOW, Anda perlu mengetahui alamat MAC masing-masing board. Untuk mendapatkan alamat MAC board, salin kode berikut ke Thonny IDE dan jalankan pada board Anda.


# Rui Santos & Sara Santos - Random Nerd Tutorials

# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32/

 

import network


wlan = network.WLAN(network.STA_IF)

wlan.active(True)


# Get MAC address (returns bytes)

mac = wlan.config('mac')


# Convert to human-readable format

mac_address = ':'.join('%02x' % b for b in mac)


print("MAC Address:", mac_address)

 

Setelah menjalankan kode, alamat MAC board akan ditampilkan di jendela shell.


 

Dapatkan alamat MAC untuk semua board Anda.

Sebagai contoh, saya mendapatkan:

- Board pengirim 1: 24:0A:C4:31:40:50

- Board pengirim 2: 30:AE:A4:F6:7D:4C

- Board penerima: 30:AE:A4:07:0D:64

Menyiapkan Board Pengirim

Dalam tutorial ini, kita akan mengirim data ke satu board ESP32 penerima menggunakan ESP-NOW dari dua board pengirim yang berbeda. Proyek ini dapat dimodifikasi untuk menambahkan lebih banyak board pengirim.

Setiap board akan diberi ID unik untuk identifikasi:

- ID = 1 untuk board1

- ID = 2 untuk board2

 

Menghubungkan Sensor BME280

Setiap board pengirim akan mengirim data lingkungan dari sensor BME280. Hubungkan sensor BME280 ke masing-masing board menggunakan pin I2C default ESP32.

 

Pelajari lebih lanjut tentang I2C pada ESP32 di: ESP32 I2C Communication: Set Pins, Multiple Bus Interfaces and Peripherals (Arduino IDE).



Jika Anda menggunakan model board yang berbeda, pin I2C default mungkin berbeda.

Mengimpor Library BME280

Library untuk mengakses sensor BME280 tidak termasuk dalam library standar MicroPython, sehingga Anda perlu mengunggah file library ini ke setiap board pengirim.

Module MicroPython BME280.py

1. Salin kode berikut ke file baru di Thonny IDE.

 

from machine import I2C

import time


# BME280 default address.

BME280_I2CADDR = 0x76


# Operating Modes

BME280_OSAMPLE_1 = 1

BME280_OSAMPLE_2 = 2

BME280_OSAMPLE_4 = 3

BME280_OSAMPLE_8 = 4

BME280_OSAMPLE_16 = 5


# BME280 Registers


BME280_REGISTER_DIG_T1 = 0x88  # Trimming parameter registers

BME280_REGISTER_DIG_T2 = 0x8A

BME280_REGISTER_DIG_T3 = 0x8C


BME280_REGISTER_DIG_P1 = 0x8E

BME280_REGISTER_DIG_P2 = 0x90

BME280_REGISTER_DIG_P3 = 0x92

BME280_REGISTER_DIG_P4 = 0x94

BME280_REGISTER_DIG_P5 = 0x96

BME280_REGISTER_DIG_P6 = 0x98

BME280_REGISTER_DIG_P7 = 0x9A

BME280_REGISTER_DIG_P8 = 0x9C

BME280_REGISTER_DIG_P9 = 0x9E


BME280_REGISTER_DIG_H1 = 0xA1

BME280_REGISTER_DIG_H2 = 0xE1

BME280_REGISTER_DIG_H3 = 0xE3

BME280_REGISTER_DIG_H4 = 0xE4

BME280_REGISTER_DIG_H5 = 0xE5

BME280_REGISTER_DIG_H6 = 0xE6

BME280_REGISTER_DIG_H7 = 0xE7


BME280_REGISTER_CHIPID = 0xD0

BME280_REGISTER_VERSION = 0xD1

BME280_REGISTER_SOFTRESET = 0xE0


BME280_REGISTER_CONTROL_HUM = 0xF2

BME280_REGISTER_CONTROL = 0xF4

BME280_REGISTER_CONFIG = 0xF5

BME280_REGISTER_PRESSURE_DATA = 0xF7

BME280_REGISTER_TEMP_DATA = 0xFA

BME280_REGISTER_HUMIDITY_DATA = 0xFD



class Device:

  """Class for communicating with an I2C device.


  Allows reading and writing 8-bit, 16-bit, and byte array values to

  registers on the device."""


  def __init__(self, address, i2c):

    """Create an instance of the I2C device at the specified address using

    the specified I2C interface object."""

    self._address = address

    self._i2c = i2c


  def writeRaw8(self, value):

    """Write an 8-bit value on the bus (without register)."""

    value = value & 0xFF

    self._i2c.writeto(self._address, value)


  def write8(self, register, value):

    """Write an 8-bit value to the specified register."""

    b=bytearray(1)

    b[0]=value & 0xFF

    self._i2c.writeto_mem(self._address, register, b)


  def write16(self, register, value):

    """Write a 16-bit value to the specified register."""

    value = value & 0xFFFF

    b=bytearray(2)

    b[0]= value & 0xFF

    b[1]= (value>>8) & 0xFF

    self.i2c.writeto_mem(self._address, register, value)


  def readRaw8(self):

    """Read an 8-bit value on the bus (without register)."""

    return int.from_bytes(self._i2c.readfrom(self._address, 1),'little') & 0xFF


  def readU8(self, register):

    """Read an unsigned byte from the specified register."""

    return int.from_bytes(

        self._i2c.readfrom_mem(self._address, register, 1),'little') & 0xFF


  def readS8(self, register):

    """Read a signed byte from the specified register."""

    result = self.readU8(register)

    if result > 127:

      result -= 256

    return result


  def readU16(self, register, little_endian=True):

    """Read an unsigned 16-bit value from the specified register, with the

    specified endianness (default little endian, or least significant byte

    first)."""

    result = int.from_bytes(

        self._i2c.readfrom_mem(self._address, register, 2),'little') & 0xFFFF

    if not little_endian:

      result = ((result << 8) & 0xFF00) + (result >> 8)

    return result


  def readS16(self, register, little_endian=True):

    """Read a signed 16-bit value from the specified register, with the

    specified endianness (default little endian, or least significant byte

    first)."""

    result = self.readU16(register, little_endian)

    if result > 32767:

      result -= 65536

    return result


  def readU16LE(self, register):

    """Read an unsigned 16-bit value from the specified register, in little

    endian byte order."""

    return self.readU16(register, little_endian=True)


  def readU16BE(self, register):

    """Read an unsigned 16-bit value from the specified register, in big

    endian byte order."""

    return self.readU16(register, little_endian=False)


  def readS16LE(self, register):

    """Read a signed 16-bit value from the specified register, in little

    endian byte order."""

    return self.readS16(register, little_endian=True)


  def readS16BE(self, register):

    """Read a signed 16-bit value from the specified register, in big

    endian byte order."""

    return self.readS16(register, little_endian=False)



class BME280:

  def __init__(self, mode=BME280_OSAMPLE_1, address=BME280_I2CADDR, i2c=None,

               **kwargs):

    # Check that mode is valid.

    if mode not in [BME280_OSAMPLE_1, BME280_OSAMPLE_2, BME280_OSAMPLE_4,

                    BME280_OSAMPLE_8, BME280_OSAMPLE_16]:

        raise ValueError(

            'Unexpected mode value {0}. Set mode to one of '

            'BME280_ULTRALOWPOWER, BME280_STANDARD, BME280_HIGHRES, or '

            'BME280_ULTRAHIGHRES'.format(mode))

    self._mode = mode

    # Create I2C device.

    if i2c is None:

      raise ValueError('An I2C object is required.')

    self._device = Device(address, i2c)

    # Load calibration values.

    self._load_calibration()

    self._device.write8(BME280_REGISTER_CONTROL, 0x3F)

    self.t_fine = 0


  def _load_calibration(self):


    self.dig_T1 = self._device.readU16LE(BME280_REGISTER_DIG_T1)

    self.dig_T2 = self._device.readS16LE(BME280_REGISTER_DIG_T2)

    self.dig_T3 = self._device.readS16LE(BME280_REGISTER_DIG_T3)


    self.dig_P1 = self._device.readU16LE(BME280_REGISTER_DIG_P1)

    self.dig_P2 = self._device.readS16LE(BME280_REGISTER_DIG_P2)

    self.dig_P3 = self._device.readS16LE(BME280_REGISTER_DIG_P3)

    self.dig_P4 = self._device.readS16LE(BME280_REGISTER_DIG_P4)

    self.dig_P5 = self._device.readS16LE(BME280_REGISTER_DIG_P5)

    self.dig_P6 = self._device.readS16LE(BME280_REGISTER_DIG_P6)

    self.dig_P7 = self._device.readS16LE(BME280_REGISTER_DIG_P7)

    self.dig_P8 = self._device.readS16LE(BME280_REGISTER_DIG_P8)

    self.dig_P9 = self._device.readS16LE(BME280_REGISTER_DIG_P9)


    self.dig_H1 = self._device.readU8(BME280_REGISTER_DIG_H1)

    self.dig_H2 = self._device.readS16LE(BME280_REGISTER_DIG_H2)

    self.dig_H3 = self._device.readU8(BME280_REGISTER_DIG_H3)

    self.dig_H6 = self._device.readS8(BME280_REGISTER_DIG_H7)


    h4 = self._device.readS8(BME280_REGISTER_DIG_H4)

    h4 = (h4 << 24) >> 20

    self.dig_H4 = h4 | (self._device.readU8(BME280_REGISTER_DIG_H5) & 0x0F)


    h5 = self._device.readS8(BME280_REGISTER_DIG_H6)

    h5 = (h5 << 24) >> 20

    self.dig_H5 = h5 | (

        self._device.readU8(BME280_REGISTER_DIG_H5) >> 4 & 0x0F)


  def read_raw_temp(self):

    """Reads the raw (uncompensated) temperature from the sensor."""

    meas = self._mode

    self._device.write8(BME280_REGISTER_CONTROL_HUM, meas)

    meas = self._mode << 5 | self._mode << 2 | 1

    self._device.write8(BME280_REGISTER_CONTROL, meas)

    sleep_time = 1250 + 2300 * (1 << self._mode)


    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575

    sleep_time = sleep_time + 2300 * (1 << self._mode) + 575

    time.sleep_us(sleep_time)  # Wait the required time

    msb = self._device.readU8(BME280_REGISTER_TEMP_DATA)

    lsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 1)

    xlsb = self._device.readU8(BME280_REGISTER_TEMP_DATA + 2)

    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4

    return raw


  def read_raw_pressure(self):

    """Reads the raw (uncompensated) pressure level from the sensor."""

    """Assumes that the temperature has already been read """

    """i.e. that enough delay has been provided"""

    msb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA)

    lsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 1)

    xlsb = self._device.readU8(BME280_REGISTER_PRESSURE_DATA + 2)

    raw = ((msb << 16) | (lsb << 8) | xlsb) >> 4

    return raw


  def read_raw_humidity(self):

    """Assumes that the temperature has already been read """

    """i.e. that enough delay has been provided"""

    msb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA)

    lsb = self._device.readU8(BME280_REGISTER_HUMIDITY_DATA + 1)

    raw = (msb << 8) | lsb

    return raw


  def read_temperature(self):

    """Get the compensated temperature in 0.01 of a degree celsius."""

    adc = self.read_raw_temp()

    var1 = ((adc >> 3) - (self.dig_T1 << 1)) * (self.dig_T2 >> 11)

    var2 = ((

        (((adc >> 4) - self.dig_T1) * ((adc >> 4) - self.dig_T1)) >> 12) *

        self.dig_T3) >> 14

    self.t_fine = var1 + var2

    return (self.t_fine * 5 + 128) >> 8


  def read_pressure(self):

    """Gets the compensated pressure in Pascals."""

    adc = self.read_raw_pressure()

    var1 = self.t_fine - 128000

    var2 = var1 * var1 * self.dig_P6

    var2 = var2 + ((var1 * self.dig_P5) << 17)

    var2 = var2 + (self.dig_P4 << 35)

    var1 = (((var1 * var1 * self.dig_P3) >> 8) +

            ((var1 * self.dig_P2) >> 12))

    var1 = (((1 << 47) + var1) * self.dig_P1) >> 33

    if var1 == 0:

      return 0

    p = 1048576 - adc

    p = (((p << 31) - var2) * 3125) // var1

    var1 = (self.dig_P9 * (p >> 13) * (p >> 13)) >> 25

    var2 = (self.dig_P8 * p) >> 19

    return ((p + var1 + var2) >> 8) + (self.dig_P7 << 4)


  def read_humidity(self):

    adc = self.read_raw_humidity()

    # print 'Raw humidity = {0:d}'.format (adc)

    h = self.t_fine - 76800

    h = (((((adc << 14) - (self.dig_H4 << 20) - (self.dig_H5 * h)) +

         16384) >> 15) * (((((((h * self.dig_H6) >> 10) * (((h *

                          self.dig_H3) >> 11) + 32768)) >> 10) + 2097152) *

                          self.dig_H2 + 8192) >> 14))

    h = h - (((((h >> 15) * (h >> 15)) >> 7) * self.dig_H1) >> 4)

    h = 0 if h < 0 else h

    h = 419430400 if h > 419430400 else h

    return h >> 12


  @property

  def temperature(self):

    "Return the temperature in degrees."

    t = self.read_temperature()

    ti = t // 100

    td = t - ti * 100

    return "{}.{:02d}C".format(ti, td)


  @property

  def pressure(self):

    "Return the temperature in hPa."

    p = self.read_pressure() // 256

    pi = p // 100

    pd = p - pi * 100

    return "{}.{:02d}hPa".format(pi, pd)


  @property

  def humidity(self):

    "Return the humidity in percent."

    h = self.read_humidity()

    hi = h // 1024

    hd = h * 100 // 1024 - hi * 100

    return "{}.{:02d}%".format(hi, hd)

 

2. Buka File > Save as…

 


3. Pilih MicroPython device sebagai lokasi penyimpanan


4. Beri nama file BME280.py, lalu klik OK

 


Selesai! Library BME280 sudah berhasil diunggah ke board Anda.

Ulangi proses ini untuk semua board pengirim.

ESP32 ESP-NOW Pengirim – Script MicroPython

Kode berikut akan membaca data dari sensor BME280 dan mengirimkannya melalui ESP-NOW dalam format JSON ke board ESP32 penerima. Pastikan untuk mengganti alamat MAC board penerima dan mengubah ID board untuk setiap board pengirim.

 

Dalam kode ini, kita menggunakan modul aioespnow, yang memungkinkan penggunaan ESP-NOW secara asinkron. 

 

# Rui Santos & Sara Santos - Random Nerd Tutorials

# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32-many-to-one/


import network

import aioespnow

import asyncio

import time

import ujson

from machine import Pin, I2C

import BME280


# Board ID

BOARD_ID = 1


# Receiver's MAC address

peer_mac = b'\xff\xff\xff\xff\xff\xff'


# Interval for sending data (in seconds)

send_interval = 10 


# Initialize I2C and BME280

try:

    i2c = I2C(0, scl=Pin(22), sda=Pin(21))  # Adjust pins as needed

    bme = BME280.BME280(i2c=i2c, address=0x76)

except OSError as err:

    print("Failed to initialize BME280:", err)

    raise


# Initialize Wi-Fi in station mode

sta = network.WLAN(network.STA_IF)

try:

    sta.active(True)

    sta.config(channel=1)  # Set channel explicitly

    sta.disconnect()

except OSError as err:

    print("Failed to initialize Wi-Fi:", err)

    raise


# Initialize AIOESPNow

e = aioespnow.AIOESPNow()

try:

    e.active(True)

except OSError as err:

    print("Failed to initialize AIOESPNow:", err)

    raise


# Add peer

try:

    e.add_peer(peer_mac)

except OSError as err:

    print("Failed to add peer:", err)

    raise


# Counter for reading ID

reading_id = 0


def read_temperature():

    try:

        return float(bme.temperature[:-1])  # Remove 'C' from string

    except Exception as err:

        print("Error reading temperature:", err)

        return 0.0


def read_humidity():

    try:

        return float(bme.humidity[:-1])  # Remove '%' from string

    except Exception as err:

        print("Error reading humidity:", err)

        return 0.0


def prepare_sensor_data():

    global reading_id

    data = {

        'id': BOARD_ID,

        'temp': read_temperature(),

        'hum': read_humidity(),

        'readingId': reading_id

    }

    reading_id += 1

    # Serialize to JSON and encode to bytes

    return ujson.dumps(data).encode('utf-8')


async def send_messages(e, peer):

    while True:

        try:

            # Prepare and serialize sensor data

            message = prepare_sensor_data()

            # Send JSON bytes

            if await e.asend(peer, message, sync=True):

                print(f"Sent data: {message.decode('utf-8')}")

            else:

                print("Failed to send data")

        except OSError as err:

            print("Send error:", err)

            await asyncio.sleep(5)

        await asyncio.sleep(send_interval)  # Wait before next send


async def main(e, peer):

    try:

        await send_messages(e, peer)

    except Exception as err:

        print(f"Error in main: {err}")

        await asyncio.sleep(5)

        raise


# Run the async program

try:

    asyncio.run(main(e, peer_mac))

except KeyboardInterrupt:

    print("Stopping sender...")

    e.active(False)

    sta.active(False)

 

Bagaimana Cara Kerja Kode?

Mengimpor Library

Mulailah dengan mengimpor library yang dibutuhkan untuk menggunakan ESP-NOW, mengakses sensor BME280, dan mengelola variabel JSON.

 

import network

import aioespnow

import asyncio

import time

import ujson

from machine import Pin, I2C

import BME280

 

Menetapkan ID Board

Berikan ID unik pada setiap board agar board penerima dapat dengan mudah mengidentifikasinya. Pada contoh ini, kita hanya memberi nomor pada setiap board, namun Anda juga bisa menggunakan metode lain, misalnya memberi nama, atau langsung menggunakan alamat MAC sebagai identifikasi (dalam kasus ini, parameter ID tidak diperlukan).

 

# Board ID

BOARD_ID = 2

 

Alamat MAC Board Penerima

Masukkan alamat MAC board penerima. Sebagai contoh, alamat MAC board penerima saya adalah 30:AE:A4:07:0D:64. Oleh karena itu, alamat ini harus ditambahkan ke kode dalam format bytes seperti berikut:

 

# Receiver's MAC address

peer_mac = b'\x30\xae\xa4\x07\x0d\x64'

 

Interval Pengiriman

Data baru akan dikirim melalui ESP-NOW setiap 10 detik. Anda dapat menyesuaikan periode ini pada variabel send_interval.

 

# Interval for sending data (in seconds)

send_interval = 10 

 

Inisialisasi Sensor BME280

Mulai komunikasi I2C pada GPIO 22 (SCL) dan 21 (SDA). Sesuaikan jika menggunakan pin yang berbeda. Selanjutnya, lakukan inisialisasi sensor BME280.

 

# Initialize I2C and BME280

try:

    i2c = I2C(0, scl=Pin(22), sda=Pin(21))  # Adjust pins as needed

    bme = BME280.BME280(i2c=i2c, address=0x76)

except OSError as err:

    print("Failed to initialize BME280:", err)

    raise

 

Inisialisasi Antarmuka Wi-Fi dan ESP-NOW

Untuk menggunakan ESP-NOW, kita perlu mengaktifkan antarmuka Wi-Fi. Pada kode berikut, kita menginisialisasi Wi-Fi dan protokol komunikasi ESP-NOW menggunakan modul aioespnow.

 

# Initialize Wi-Fi in station mode

sta = network.WLAN(network.STA_IF)

try:

    sta.active(True)

    sta.config(channel=1)  # Set channel explicitly

    sta.disconnect()

except OSError as err:

    print("Failed to initialize Wi-Fi:", err)

    raise


# Initialize AIOESPNow

e = aioespnow.AIOESPNow()

try:

    e.active(True)

except OSError as err:

    print("Failed to initialize AIOESPNow:", err)

    raise

 

Menambahkan Peer

Pada baris berikutnya, kita menambahkan board penerima sebagai peer. Fungsi e.add_peer(peer_mac) diperlukan untuk memastikan komunikasi unicast yang andal menggunakan aioespnow, dengan mendaftarkan alamat MAC penerima agar pengirim dapat mengirim pesan ke board tersebut secara spesifik.

 

# Add peer

try:

    e.add_peer(peer_mac)

except OSError as err:

    print("Failed to add peer:", err)

    raise

 

Membaca Data Sensor BME280

Fungsi berikut mengembalikan nilai suhu dan kelembaban dari sensor BME280.

 

def read_temperature():

    try:

        return float(bme.temperature[:-1])  # Remove 'C' from string

    except Exception as err:

        print("Error reading temperature:", err)

        return 0.0


def read_humidity():

    try:

        return float(bme.humidity[:-1])  # Remove '%' from string

    except Exception as err:

        print("Error reading humidity:", err)

        return 0.0

 

Fungsi dari library BME280 mengembalikan hasil beserta satuan (unit), sehingga kita perlu menghapus karakter satuan pada akhir pembacaan.

Menambahkan Data ke Variabel JSON

Kita membuat fungsi prepare_sensor_data() yang mengembalikan variabel JSON berisi data yang akan dikirim. Pada contoh ini, data yang dikirim meliputi: ID board, suhu, kelembaban, dan ID pembacaan (sebuah angka untuk melacak jumlah pembacaan sejak program dijalankan).

 

def prepare_sensor_data():

    global reading_id

    data = {

        'id': BOARD_ID,

        'temp': read_temperature(),

        'hum': read_humidity(),

        'readingId': reading_id

    }

    reading_id += 1

    # Serialize to JSON and encode to bytes

    return ujson.dumps(data).encode('utf-8')

 

Mengirim Pesan melalui ESP-NOW

Selanjutnya, kita membuat fungsi asinkron untuk mengirim pesan ESP-NOW secara asinkron.

 

async def send_messages(e, peer):

    while True:

        try:

            # Prepare and serialize sensor data

            message = prepare_sensor_data()

            # Send JSON bytes

            if await e.asend(peer, message, sync=True):

                print(f"Sent data: {message.decode('utf-8')}")

            else:

                print("Failed to send data")

        except OSError as err:

            print("Send error:", err)

            await asyncio.sleep(5)

        await asyncio.sleep(send_interval)  # Wait before next send

 

Menjalankan Tugas Asinkron

Fungsi berikut menjalankan tugas send_messages untuk mengirim data melalui ESP-NOW secara asinkron.

 

async def main(e, peer):

    try:

        await send_messages(e, peer)

    except Exception as err:

        print(f"Error in main: {err}")

        await asyncio.sleep(5)

        raise

 

Terakhir, jalankan program secara asinkron.

 

# Run the async program

try:

    asyncio.run(main(e, peer_mac))

except KeyboardInterrupt:

    print("Stopping sender...")

    e.active(False)

    sta.active(False)

 

Menjalankan dan Mengunggah Kode ke Board

Setelah menyalin kode ke Thonny IDE dan melakukan penyesuaian yang diperlukan, uji kode dengan menjalankannya di Thonny IDE.

Setelah koneksi dengan board terhubung, klik tombol run berwarna hijau untuk menjalankan program.

 


Hasil yang muncul di MicroPython shell seharusnya mirip dengan gambar berikut.



Saat ini, pengiriman data akan gagal karena board penerima belum disiapkan.

Mengunggah Kode ke Board

Catatan penting: menjalankan file di Thonny IDE saja tidak menyimpan kode secara permanen ke filesystem board. Artinya, jika Anda mencabut board dari komputer dan memberi daya, program tidak akan berjalan karena belum ada file Python yang tersimpan di board. Fungsi Run di Thonny berguna untuk uji coba kode, tetapi untuk menjalankannya secara permanen, Anda harus membuat dan menyimpan file ke filesystem board.

Agar kode dapat dijalankan tanpa tersambung ke komputer, unggah kode ke filesystem board dengan nama main.py:

File > Save as… > MicroPython Device

 


Beri nama file tersebut `main.py` dan simpan di board.

 


Sekarang, jika Anda merestart board, program akan mulai berjalan secara otomatis. Setelah itu, Anda mungkin tidak bisa mengakses MicroPython Shell.

Menyiapkan Board Penerima

Board ESP32 penerima akan menerima data dari beberapa board pengirim dan menampilkannya pada OLED display. Pada contoh ini, kita menerima data dari dua board pengirim.

 

Menghubungkan OLED Display

Hubungkan OLED display ke board ESP32 menggunakan pin I2C default, yaitu GPIO 22 (SCL) dan GPIO 21 (SDA). Sesuaikan jika menggunakan model ESP32 dengan pinout berbeda.

 


Mengimpor Library SSD1306

Library untuk mengakses OLED display (SSD1306) tidak termasuk dalam library standar MicroPython, sehingga Anda perlu mengunggah file library ini ke board penerima.

Module MicroPython SSD1306.py

1. Salin kode berikut ke file baru di Thonny IDE.

 

# MicroPython SSD1306 OLED driver, I2C and SPI interfaces created by Adafruit


import time

import framebuf


# register definitions

SET_CONTRAST        = const(0x81)

SET_ENTIRE_ON       = const(0xa4)

SET_NORM_INV        = const(0xa6)

SET_DISP            = const(0xae)

SET_MEM_ADDR        = const(0x20)

SET_COL_ADDR        = const(0x21)

SET_PAGE_ADDR       = const(0x22)

SET_DISP_START_LINE = const(0x40)

SET_SEG_REMAP       = const(0xa0)

SET_MUX_RATIO       = const(0xa8)

SET_COM_OUT_DIR     = const(0xc0)

SET_DISP_OFFSET     = const(0xd3)

SET_COM_PIN_CFG     = const(0xda)

SET_DISP_CLK_DIV    = const(0xd5)

SET_PRECHARGE       = const(0xd9)

SET_VCOM_DESEL      = const(0xdb)

SET_CHARGE_PUMP     = const(0x8d)



class SSD1306:

    def __init__(self, width, height, external_vcc):

        self.width = width

        self.height = height

        self.external_vcc = external_vcc

        self.pages = self.height // 8

        # Note the subclass must initialize self.framebuf to a framebuffer.

        # This is necessary because the underlying data buffer is different

        # between I2C and SPI implementations (I2C needs an extra byte).

        self.poweron()

        self.init_display()


    def init_display(self):

        for cmd in (

            SET_DISP | 0x00, # off

            # address setting

            SET_MEM_ADDR, 0x00, # horizontal

            # resolution and layout

            SET_DISP_START_LINE | 0x00,

            SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0

            SET_MUX_RATIO, self.height - 1,

            SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0

            SET_DISP_OFFSET, 0x00,

            SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12,

            # timing and driving scheme

            SET_DISP_CLK_DIV, 0x80,

            SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1,

            SET_VCOM_DESEL, 0x30, # 0.83*Vcc

            # display

            SET_CONTRAST, 0xff, # maximum

            SET_ENTIRE_ON, # output follows RAM contents

            SET_NORM_INV, # not inverted

            # charge pump

            SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14,

            SET_DISP | 0x01): # on

            self.write_cmd(cmd)

        self.fill(0)

        self.show()


    def poweroff(self):

        self.write_cmd(SET_DISP | 0x00)


    def contrast(self, contrast):

        self.write_cmd(SET_CONTRAST)

        self.write_cmd(contrast)


    def invert(self, invert):

        self.write_cmd(SET_NORM_INV | (invert & 1))


    def show(self):

        x0 = 0

        x1 = self.width - 1

        if self.width == 64:

            # displays with width of 64 pixels are shifted by 32

            x0 += 32

            x1 += 32

        self.write_cmd(SET_COL_ADDR)

        self.write_cmd(x0)

        self.write_cmd(x1)

        self.write_cmd(SET_PAGE_ADDR)

        self.write_cmd(0)

        self.write_cmd(self.pages - 1)

        self.write_framebuf()


    def fill(self, col):

        self.framebuf.fill(col)


    def pixel(self, x, y, col):

        self.framebuf.pixel(x, y, col)


    def scroll(self, dx, dy):

        self.framebuf.scroll(dx, dy)


    def text(self, string, x, y, col=1):

        self.framebuf.text(string, x, y, col)



class SSD1306_I2C(SSD1306):

    def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False):

        self.i2c = i2c

        self.addr = addr

        self.temp = bytearray(2)

        # Add an extra byte to the data buffer to hold an I2C data/command byte

        # to use hardware-compatible I2C transactions.  A memoryview of the

        # buffer is used to mask this byte from the framebuffer operations

        # (without a major memory hit as memoryview doesn't copy to a separate

        # buffer).

        self.buffer = bytearray(((height // 8) * width) + 1)

        self.buffer[0] = 0x40  # Set first byte of data buffer to Co=0, D/C=1

        self.framebuf = framebuf.FrameBuffer1(memoryview(self.buffer)[1:], width, height)

        super().__init__(width, height, external_vcc)


    def write_cmd(self, cmd):

        self.temp[0] = 0x80 # Co=1, D/C#=0

        self.temp[1] = cmd

        self.i2c.writeto(self.addr, self.temp)


    def write_framebuf(self):

        # Blast out the frame buffer using a single I2C transaction to support

        # hardware I2C interfaces.

        self.i2c.writeto(self.addr, self.buffer)


    def poweron(self):

        pass



class SSD1306_SPI(SSD1306):

    def __init__(self, width, height, spi, dc, res, cs, external_vcc=False):

        self.rate = 10 * 1024 * 1024

        dc.init(dc.OUT, value=0)

        res.init(res.OUT, value=0)

        cs.init(cs.OUT, value=1)

        self.spi = spi

        self.dc = dc

        self.res = res

        self.cs = cs

        self.buffer = bytearray((height // 8) * width)

        self.framebuf = framebuf.FrameBuffer1(self.buffer, width, height)

        super().__init__(width, height, external_vcc)


    def write_cmd(self, cmd):

        self.spi.init(baudrate=self.rate, polarity=0, phase=0)

        self.cs.high()

        self.dc.low()

        self.cs.low()

        self.spi.write(bytearray([cmd]))

        self.cs.high()


    def write_framebuf(self):

        self.spi.init(baudrate=self.rate, polarity=0, phase=0)

        self.cs.high()

        self.dc.high()

        self.cs.low()

        self.spi.write(self.buffer)

        self.cs.high()


    def poweron(self):

        self.res.high()

        time.sleep_ms(1)

        self.res.low()

        time.sleep_ms(10)

        self.res.high()

 

2. Buka File > Save as… dan pilih MicroPython device sebagai lokasi penyimpanan.

 


3. Beri nama file ssd1306.py dan klik OK untuk menyimpan file ke filesystem ESP32.

Selesai! Library SSD1306 sudah berhasil diunggah ke board, dan Anda dapat menggunakan fungsionalitasnya dalam kode dengan mengimpor library ini.

ESP-NOW Board Penerima – Script MicroPython

Kode berikut akan mengonfigurasi board ESP32 Anda sebagai penerima ESP-NOW untuk menerima data dari dua board pengirim. Setelah data diterima, nilai akan ditampilkan pada OLED display.

Jangan lupa untuk menambahkan alamat MAC board pengirim pada kode sesuai konfigurasi Anda.

 

# Rui Santos & Sara Santos - Random Nerd Tutorials

# Complete project details at https://RandomNerdTutorials.com/micropython-esp-now-esp32-many-to-one/


import network

import aioespnow

import asyncio

import ujson

from machine import Pin, I2C

import ssd1306


# Initialize I2C for OLED

try:

    i2c = I2C(0, scl=Pin(22), sda=Pin(21))  # Adjust pins as needed

    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)

    print("SSD1306 initialized")

except Exception as err:

    print("Failed to initialize SSD1306:", err)

    raise


# Initialize Wi-Fi in station mode

sta = network.WLAN(network.STA_IF)

try:

    sta.active(True)

    sta.config(channel=1)  # Set channel explicitly

    sta.disconnect()

except OSError as err:

    print("Failed to initialize Wi-Fi:", err)

    raise


# Initialize AIOESPNow

e = aioespnow.AIOESPNow()

try:

    e.active(True)

except OSError as err:

    print("Failed to initialize AIOESPNow:", err)

    raise


# Sender's MAC addresses (replace with actual sender MACs)

sender_mac_1 = b'\x24\x0a\xc4\x31\x40\x50'  # First sender's MAC (Board ID=1)

sender_mac_2 = b'\x30\xae\xa4\xf6\x7d\x4c'  # Second sender's MAC (Board ID=2)


# Add peers

try:

    e.add_peer(sender_mac_1)

except OSError as err:

    print(f"Failed to add peer {sender_mac_1.hex()}:", err)

    raise


try:

    e.add_peer(sender_mac_2)

except OSError as err:

    print(f"Failed to add peer {sender_mac_2.hex()}:", err)

    raise


# Dictionary to store latest readings for each board

board_readings = {

    1: {'temp': 0.0, 'hum': 0.0, 'readingId': 0},

    2: {'temp': 0.0, 'hum': 0.0, 'readingId': 0}

}


# Update OLED display with temperature and humidity for both boards on separate lines.

def update_display():

    try:

        display.fill(0)

        # Board 1 data

        display.text("Board 1:", 0, 0)

        display.text(f"Temp: {board_readings[1]['temp']:.1f} C", 0, 10)

        display.text(f"Hum: {board_readings[1]['hum']:.1f} %", 0, 20)

        # Board 2 data

        display.text("Board 2:", 0, 32)

        display.text(f"Temp: {board_readings[2]['temp']:.1f} C", 0, 42)

        display.text(f"Hum: {board_readings[2]['hum']:.1f} %", 0, 52)

        display.show()

        print("Display updated")

    except Exception as err:

        print("Error updating display:", err)


# Async function to receive and process messages.

async def receive_messages(e):

    print("Listening for ESP-NOW messages...")

    while True:

        try:

            async for mac, msg in e:

                try:

                    # Decode and parse JSON message

                    json_str = msg.decode('utf-8')

                    data = ujson.loads(json_str)

                    

                    # Extract parameters

                    board_id = data['id']

                    temperature = data['temp']

                    humidity = data['hum']

                    reading_id = data['readingId']

                    

                    # Store in board_readings dictionary

                    if board_id in (1, 2):

                        board_readings[board_id] = {

                            'temp': temperature,

                            'hum': humidity,

                            'readingId': reading_id

                        }

                        # Update OLED display

                        update_display()

                    

                    # Display on MicroPython terminal

                    print(f"\nReceived from {mac.hex()}:")

                    print(f"  Board ID: {board_id}")

                    print(f"  Temperature: {temperature} C")

                    print(f"  Humidity: {humidity} %")

                    print(f"  Reading ID: {reading_id}")

                except (ValueError, KeyError) as err:

                    print(f"Error parsing JSON: {err}")

        except OSError as err:

            print("Receive error:", err)

            await asyncio.sleep(5)


async def main(e):

    await receive_messages(e)


# Run the async program

try:

    asyncio.run(main(e))

except KeyboardInterrupt:

    print("Stopping receiver...")

    e.active(False)

    sta.active(False)

 

Bagaimana Cara Kerja Kode?

Mengimpor Library

Mulailah dengan mengimpor library yang dibutuhkan. Pastikan file ssd1306.py telah diunggah sebelumnya agar board dapat berinteraksi dengan OLED display.

 

import network

import aioespnow

import asyncio

import ujson

from machine import Pin, I2C

import ssd1306

 

Inisialisasi OLED Display

Lakukan inisialisasi komunikasi I2C dan OLED display menggunakan baris kode berikut.

 

# Initialize I2C for OLED

try:

    i2c = I2C(0, scl=Pin(22), sda=Pin(21))  # Adjust pins as needed

    display = ssd1306.SSD1306_I2C(128, 64, i2c, addr=0x3C)

    print("SSD1306 initialized")

except Exception as err:

    print("Failed to initialize SSD1306:", err)

    raise

 

Inisialisasi Antarmuka Wi-Fi dan ESP-NOW

Lakukan inisialisasi Wi-Fi dan protokol ESP-NOW.

 

# Initialize Wi-Fi in station mode

sta = network.WLAN(network.STA_IF)

try:

    sta.active(True)

    sta.config(channel=1)  # Set channel explicitly

    sta.disconnect()

except OSError as err:

    print("Failed to initialize Wi-Fi:", err)

    raise


# Initialize AIOESPNow

e = aioespnow.AIOESPNow()

try:

    e.active(True)

except OSError as err:

    print("Failed to initialize AIOESPNow:", err)

    raise

 

Menambahkan Board Pengirim sebagai Peer

Tambahkan board pengirim sebagai peer. Meskipun tidak wajib menambahkan board pengirim di sisi penerima, langkah ini menjamin komunikasi yang lebih andal.

 

Masukkan alamat MAC board pengirim pada variabel berikut (pastikan alamat MAC dalam format bytes).

 

# Sender's MAC addresses (replace with actual sender MACs)

sender_mac_1 = b'\x24\x0a\xc4\x31\x40\x50'  # First sender's MAC (Board ID=1)

sender_mac_2 = b'\x30\xae\xa4\xf6\x7d\x4c'  # Second sender's MAC (Board ID=2)

 

Selanjutnya, daftarkan board tersebut sebagai peer ESP-NOW:

 

# Add peers

try:

    e.add_peer(sender_mac_1)

except OSError as err:

    print(f"Failed to add peer {sender_mac_1.hex()}:", err)

    raise


try:

    e.add_peer(sender_mac_2)

except OSError as err:

    print(f"Failed to add peer {sender_mac_2.hex()}:", err)

    raise

 

Dictionary untuk Menyimpan Data Pembacaan

Buat sebuah dictionary untuk menyimpan data sensor terbaru dari setiap board. Menyimpan data dalam dictionary memungkinkan semua informasi diterima tersimpan dalam satu variabel.

 

board_readings = {

    1: {'temp': 0.0, 'hum': 0.0, 'readingId': 0},

    2: {'temp': 0.0, 'hum': 0.0, 'readingId': 0}

}

 

Memperbarui Tampilan

Fungsi update_display() mengambil data yang diterima dari setiap board, yang sudah tersimpan dalam variabel board_readings, dan menampilkannya pada OLED screen.

 

def update_display():

    try:

        display.fill(0)

        # Board 1 data

        display.text("Board 1:", 0, 0)

        display.text(f"Temp: {board_readings[1]['temp']:.1f} C", 0, 10)

        display.text(f"Hum: {board_readings[1]['hum']:.1f} %", 0, 20)

        # Board 2 data

        display.text("Board 2:", 0, 32)

        display.text(f"Temp: {board_readings[2]['temp']:.1f} C", 0, 42)

        display.text(f"Hum: {board_readings[2]['hum']:.1f} %", 0, 52)

        display.show()

        print("Display updated")

    except Exception as err:

        print("Error updating display:", err)

 

Menerima Pesan ESP-NOW

Kita membuat fungsi asinkron untuk menerima dan memproses pesan yang datang melalui ESP-NOW.

 

async def receive_messages(e):

    print("Listening for ESP-NOW messages...")

    while True:

        try:

            async for mac, msg in e:

 

Data diterima dalam format JSON. Pertama, kita decode dan parse pesan JSON tersebut.

 

try:

    # Decode and parse JSON message

    json_str = msg.decode('utf-8')

    data = ujson.loads(json_str)

 

Kita ekstrak setiap parameter ke dalam variabel terpisah.

 

# Extract parameters into individual variables

board_id = data['id']

temperature = data['temp']

humidity = data['hum']

reading_id = data['readingId']

 

Terakhir, kita menyimpan data ke posisi yang sesuai dalam dictionary board_readings berdasarkan ID board.

 

# Store in board_readings dictionary

if board_id in (1, 2):

    board_readings[board_id] = {

    'temp': temperature,

    'hum': humidity,

    'readingId': reading_id

}

 

Setelah itu, variabel board_readings telah diperbarui, sehingga kita dapat memanggil fungsi update_display() untuk menampilkan data terbaru di layar.

 

update_display()

 

Terakhir, kita menampilkan data yang diterima di MicroPython shell.

 

# Display on serial monitor

print(f"\nReceived from {mac.hex()}:")

print(f"  Board ID: {board_id}")

print(f"  Temperature: {temperature} C")

print(f"  Humidity: {humidity} %")

print(f"  Reading ID: {reading_id}")

 

Menjalankan Tugas Asinkron

Fungsi berikut menjalankan tugas receive_messages untuk menerima data melalui ESP-NOW secara asinkron.

 

async def main(e):

    await receive_messages(e)

 

Terakhir, jalankan program secara asinkron.

 

# Run the async program

try:

    asyncio.run(main(e, peer_mac))

except KeyboardInterrupt:

    print("Stopping sender...")

    e.active(False)

    sta.active(False)

 

Demonstrasi

Unggah dan jalankan kode sebelumnya pada board penerima. Jika tersambung ke Thonny IDE Terminal, data yang diterima akan ditampilkan di sana.

 

Secara bersamaan, pembacaan dari setiap board pengirim juga akan ditampilkan pada OLED screen.

 



 

 

 

 

 

 

 

 

Siap Untuk Membuat Proyek Impianmu Menjadi Kenyataan?

Klik di sini untuk chat langsung via WhatsApp dan dapatkan dukungan langsung dari tim ahli kami! 

 

0 on: "Implementasi ESP-NOW Many-to-One di ESP32 dengan MicroPython "