חיבור באמצעות MQTT מ-esp32 לרספברי פיי

שידור מידע דרך ה-WiFi בפרוטוקול MQTT ממיקרובקר אל רספברי פיי וההיפך.

בפוסט הקודם יצרתי עציץ תבוני שיכול לדבר וכתבתי בסוף שיש בעיה עם רספברי פיי וחיישנים אנלוגיים, ובכלל – רספברי פיי הוא כבד וחזק אבל בגדול מה שאנחנו צריכים לשלוח מהעציץ התבוני שלנו זה מידע אנלוגי ואפשר לעשות את זה עם מיקרו בקר מסוג esp32. במקרה הזה אפשר גם לשים כמה בקרים בכמה עציצים ולהפוך את כל הסלון לתבוני. הרספברי פיי יהיה המחשב שמריץ את הכל, אבל הטריגרים יהיו המיקרו בקרים.

יש כמובן עוד לא מעט תרחישים שבהם תקשורת כזו תהיה שימושית. בגדול – אין שום מניעה לפתוח API גם ברספברי וגם ב-ESP32 ולתקשר ב-REST. אבל ב-IoT מקובל להשתמש בפרוטוקול שנקרא MQTT – ראשי תבות של Message Queuing Telemetry Transport. יש לו תמיכה מעולה ברספברי פיי ובמיקרופייתון ובאמת כדאי להכיר.

הידע הנדרש להבנת פוסט זה הוא ידע מספק להתחברות אל רספברי פיי (אפשר להתחיל עם המדריך להתחברות מרחוק לרספברי פיי) וידע בסיסי ב-esp32 (אפשר לקרוא את הפוסט על חיבור ל esp32).

מה זה MQTT?

בגדול MQTT הוא pub\sub. כלומר יש לי ברוקר (שאותו אני מתקין על הרספברי פיי) והוא זה שמקבל הודעות. מתכנתי ווב? זה עובד באופן זהה לקפקא שגם אותו אפשר להתקין אם ממש רוצים על רספברי פיי. רגע, לא להכנס ללחץ!

איך pub\sub עובד? יש שרת מרכזי שאליו אפשר להרשם כדי לקבל הודעות (רק צריך לפרט את סוג ההודעות) ואליו אפשר גם לשלוח הודעות (רק צריך לציין את סוג ההודעה ששולחים). באמת הכי פשוט בעולם. אני למשל שולח הודעה בסוג ״ahla" ורק מי שמאזין לסוג ההודעות ahla יקבל את ההודעה שלי – התוכן כמובן תלוי בי. כאשר יכולים להיות כמה מאזינים לאותה הודעה. במקרה שלנו, אנחנו רוצים אפשרות שכמה יוכלו לשלוח הודעות לרספברי פיי – וזה מתאים בדיוק למה שאנחנו צריכים.

התקנה של הברוקר והקליינט על רספברי פיי

אז אנחנו נתקין על רספברי פיי שתי תוכנות – התוכנה הראשונה היא המוח – הברוקר – זה שהתפקיד שלו הוא לקבל את כל ההודעות שנשלחות לפרסום ולשלוח אותם רק לאלו שמאזינים לנושא הזה.
התוכנה השניה היא זו שמאזינה להודעות ויכולה לשלוח הודעות ואת שתיהן נתקין כמובן על הרספברי פיי, שיישא בנטל של הברוקר.

אני בחרתי ב-Mosquitto שהיא קלה מאד להפעלה ולהתקנה ומשמשת לכל הפעולות.

את התוכנות נתקין כך:

sudo apt install -y mosquitto mosquitto-clients

נמתין לסיום ההתקנה ואז נבקש מהרספברי פיי להעלות את מוסקיטו תמיד בכל העלאה:

sudo systemctl enable mosquitto.service

השלב הבא הוא לבדוק שהכל תקין עם:

mosquitto -v

אם יש תקלה מסוג: Error: Address already in use – לא להבהל. יש להרוג את התהליך של mosquitto ואז להפעיל שוב.

ps -ef | grep mosquitto
# Choosing mosquitto_pid
sudo kill mosquitto_pid

וכך זה נראה אצלי:

Error: Address already in use
ואז הרצתי את 
ps -ef | grep mosquitto
קיבלתי שני תהליכים. אחד מהם היה
mosquit+ 28407     1  0 18:32 ?        00:00:00 /usr/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf
אז הרגתי אותו עם
sudo kill 28407

ואז mosquitto -v יעבוד.

עכשיו נכנס להגדרות. נכתוב:

sudo nano /etc/mosquitto/mosquitto.conf

נכניס שתי שורות בתחתית הקובץ – בגדול אנו מגדירים בהן שאנו מאזינים לפורט מסוים ואנו לא משתמשים באימות משתמשים (כרגע).

listener 1883
allow_anonymous true

נאתחל את השירות באמצעות:

sudo systemctl restart mosquitto

זהו! זה השלב הקשה. עכשיו בואו ונבדוק את העניין. נפתח שני חלונות ב-shell. אחד יאזין והשני ישדר. זה קצת מטופש, אבל יאפשר לנו לבדוק שהכל עובד. כמו לחייג לעצמך.

בחלון הראשון נקליד

mosquitto_sub -d -t testTopic

אנו נראה שמתחיל לרוץ לאיטו לוג. זה הלוג שמאזין לכל ההודעות שהנושא שלהן הוא testTopic. אין כרגע אירועים.

בחלון השני נשלח אירוע! איך? עם הפקודה:

mosquitto_pub -d -t testTopic -m "Hello world!"

אם נעבור לחלון השני, נראה את ה-Hello world! אם נחזור ונשדר, אולי עם טקסט אחר כמו wassap אז נראה גם את ההודעה הזו.

המחשה של השליחה של ההודעות. שתי פקודות שמפבלשות ל testTopic עם התוכן: Hello World ו wassap.
המחשה של הקבלה של ההודעות. לוג שיש בו קבלה של שתי הודעות: Hello World ו wassap.

הכל עובד? מעולה! עכשיו זה השלב של ה-esp32!

התקנת MQTT על ESP32

ניגש אל ה-esp32 ונצרוב קושחה חדשה.

אנו ניצור קובץ חדש בשם umqttsimple.py ונכניס אליו את התוכן הבא, זה בעצם ה״דרייבר״ שבאמצעותו מתחברים:

    import usocket as socket
    import socket
import ustruct as struct
from ubinascii import hexlify

class MQTTException(Exception):

class MQTTClient:

    def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
                 ssl=False, ssl_params={}):
        if port == 0:
            port = 8883 if ssl else 1883
        self.client_id = client_id
        self.sock = None
        self.server = server
        self.port = port
        self.ssl = ssl
        self.ssl_params = ssl_params
        self.pid = 0
        self.cb = None
        self.user = user
        self.pswd = password
        self.keepalive = keepalive
        self.lw_topic = None
        self.lw_msg = None
        self.lw_qos = 0
        self.lw_retain = False

    def _send_str(self, s):
        self.sock.write(struct.pack("!H", len(s)))

    def _recv_len(self):
        n = 0
        sh = 0
        while 1:
            b = self.sock.read(1)[0]
            n |= (b & 0x7f) << sh
            if not b & 0x80:
                return n
            sh += 7

    def set_callback(self, f):
        self.cb = f

    def set_last_will(self, topic, msg, retain=False, qos=0):
        assert 0 <= qos <= 2
        assert topic
        self.lw_topic = topic
        self.lw_msg = msg
        self.lw_qos = qos
        self.lw_retain = retain

    def connect(self, clean_session=True):
        self.sock = socket.socket()
        addr = socket.getaddrinfo(self.server, self.port)[0][-1]
        if self.ssl:
            import ussl
            self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
        premsg = bytearray(b"\x10
        if self.lw_topic:
        if self.user is not None:
        resp = self.sock.read(4)
        assert resp[0] == 0x20 and resp[1] == 0x02
        if resp[3] != 0:
            raise MQTTException(resp[3])
        return resp[2] & 1

    def disconnect(self):

    def ping(self):

    def publish(self, topic, msg, retain=False, qos=0):
        pkt = bytearray(b"\x30\0\0\0")
        pkt[0] |= qos << 1 | retain
        sz = 2 + len(topic) + len(msg)
        if qos > 0:
            sz += 2
        assert sz < 2097152
        i = 1
        while sz > 0x7f:
            pkt[i] = (sz & 0x7f) | 0x80
            sz >>= 7
            i += 1
        pkt[i] = sz
        #print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt, i + 1)
        if qos > 0:
            self.pid += 1
            pid = self.pid
            struct.pack_into("!H", pkt, 0, pid)
            self.sock.write(pkt, 2)
        if qos == 1:
            while 1:
                op = self.wait_msg()
                if op == 0x40:
                    sz = self.sock.read(1)
                    assert sz == b"\x02"
                    rcv_pid = self.sock.read(2)
                    rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
                    if pid == rcv_pid:
        elif qos == 2:
            assert 0

    def subscribe(self, topic, qos=0):
        assert self.cb is not None, "Subscribe callback is not set"
        pkt = bytearray(b"\x82\0\0\0")
        self.pid += 1
        struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
        #print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(qos.to_bytes(1, "little"))
        while 1:
            op = self.wait_msg()
            if op == 0x90:
                resp = self.sock.read(4)
                assert resp[1] == pkt[2] and resp[2] == pkt[3]
                if resp[3] == 0x80:
                    raise MQTTException(resp[3])

    # Wait for a single incoming MQTT message and process it.
    # Subscribed messages are delivered to a callback previously
    # set by .set_callback() method. Other (internal) MQTT
    # messages processed internally.
    def wait_msg(self):
        res = self.sock.read(1)
        if res is None:
            return None
        if res == b"":
            raise OSError(-1)
        if res == b"\xd0":  # PINGRESP
            sz = self.sock.read(1)[0]
            assert sz == 0
            return None
        op = res[0]
        if op & 0xf0 != 0x30:
            return op
        sz = self._recv_len()
        topic_len = self.sock.read(2)
        topic_len = (topic_len[0] << 8) | topic_len[1]
        topic = self.sock.read(topic_len)
        sz -= topic_len + 2
        if op & 6:
            pid = self.sock.read(2)
            pid = pid[0] << 8 | pid[1]
            sz -= 2
        msg = self.sock.read(sz)
        self.cb(topic, msg)
        if op & 6 == 2:
            pkt = bytearray(b"\x40\x02\0\0")
            struct.pack_into("!H", pkt, 2, pid)
        elif op & 6 == 4:
            assert 0

    # Checks whether a pending message from server is available.
    # If not, returns immediately with None. Otherwise, does
    # the same processing as wait_msg.
    def check_msg(self):
        return self.wait_msg()
השלב הבא הוא ללכת ל boot.py שכבר קיים ולהכניס לשם קוד שעושה כמה דברים – הראשון הוא להכריז על כתובת ה-IP של השרת שלנו, שאותה מצאנו קודם. אנחנו צריכים הרי לדעת לאן להתחבר. השניה היא להתחבר ל-wifi 🙂

החליפו בקוד הזה את YOUR_MQTT_SERVER_IP ב-IP של הרספברי פיי, את ה-YOUR_SSID בשם של הרשת שלכם ואת YOUR_PASSWORD בסיסמה של הרשת שלכם. שימרו את הקובץ. אפשר גם להריץ אותו כדי לוודא חיבור.

# This file is executed on every boot (including wake-boot from deepsleep)
#import esp
#import webrepl
import time
from umqttsimple import MQTTClient
import ubinascii
import machine
import micropython
import network
import esp
import gc

mqtt_server = 'YOUR_MQTT_SERVER_IP'

client_id = ubinascii.hexlify(machine.unique_id())
topic_sub = b'from_mothership'
topic_pub = b'to_mothership'

last_message = 0
message_interval = 5
counter = 0

wlan = network.WLAN(network.STA_IF)

# might already be connected somehow.
if wlan.isconnected() == False:
    wlan.connect("YOUR_SSID", "YOUR_PASSWORD")

# Wait for connection.
while wlan.isconnected() == False:


השלב הבא והאחרון הוא ליצור קובץ בשם main.py ונכניס בו את הקוד הזה:

# Complete project details at https://RandomNerdTutorials.com

def sub_cb(topic, msg):
  print((topic, msg))
  if topic == b'from_mothership' and msg == b'received':
    print('ESP received notification from mothership!')

def connect_and_subscribe():
  global client_id, mqtt_server, topic_sub
  client = MQTTClient(client_id, mqtt_server)
  print('Connected to %s MQTT broker, subscribed to %s topic' % (mqtt_server, topic_sub))
  return client

def restart_and_reconnect():
  print('Failed to connect to MQTT broker. Reconnecting...')

  client = connect_and_subscribe()
except OSError as e:

while True:
    if (time.time() - last_message) > message_interval:
      msg = b'Test to mothership #%d' % counter
      client.publish(topic_pub, msg)
      last_message = time.time()
      counter += 1
  except OSError as e:

מה שאני יוצר פה זה קוד שמאזין לנושא from_mothership וגם קוד ששולח הודעות כל 5 שניות לנושא to_mothership. הקונטקסט ברור – אני מאזין לכל מה שמגיע מספינת האם (הרספברי פיי) ושולח הודעות למה שהרספברי פיי מאזין לו.

הבדיקה: שידור מרספברי אל ESP32 ומ-ESP32 אל הרספברי

עכשיו הבדיקה הגדולה! אני אחזור חזרה אל הרספברי פיי ואקליד:

mosquitto_pub -d -t from_mothership -m "wassap!"

אם הכל כשורה – אני אראה את הטקסט בלוג של טוני!

צילום מסך של ה-shell של thonny עם הטקסט המצופה:
(b'from_mothership', b'wassap!')

השלב הבא הוא ללכת לחלון השני של הרספברי פיי ולהאזין ל "to_mothership״ באמצעות הקוד הזה:

mosquitto_sub -d -t to_mothership

אם הכל כשורה – אני אראה שידור כל 5 שניות אל ספינת האם! מה-esp32 אל הרספברי פיי!

צילום מסך של ה-shell של רספברי פיי שמקבל הודעות כאלו:
Client mosqsub|28795-raspberry received PUBLISH (d0, q0, r0, m0, 'to_mothership', ... (21 bytes))
Test to mothership #0
Client mosqsub|28795-raspberry received PUBLISH (d0, q0, r0, m0, 'to_mothership', ... (21 bytes))
Test to mothership #1
Client mosqsub|28795-raspberry received PUBLISH (d0, q0, r0, m0, 'to_mothership', ... (21 bytes))
Test to mothership #2
Client mosqsub|28795-raspberry received PUBLISH (d0, q0, r0, m0, 'to_mothership', ... (21 bytes))
Test to mothership #3

וזו הנקודה שאפשר קצת להשען אחורה ולהנות. יש כמובן עוד המון מה ללמוד בנושא, אבל בשביל מטרות פשוטות הידע הזה יספיק. בפוסט הבא נראה איך באמצעות פייתון אפשר לקבל מידע ממיקרו בקר ואז לבצע הפעלה ברספברי פיי.

