Skip to main content

MQTT を使用した IIoT 向け 4G LTE 接続の設定

note

この文書は AI によって翻訳されています。内容に不正確な点や改善すべき点がございましたら、文書下部のコメント欄または以下の Issue ページにてご報告ください。
https://github.com/Seeed-Studio/wiki-documents/issues

はじめに

Raspberry Pi 用 4G LTE HAT は、信頼性の高い高速なセルラー接続を提供し、リモートおよび産業用 IoT (IIoT) アプリケーションに不可欠です。軽量なメッセージングプロトコルである MQTT を使用することで、デバイスは孤立した場所でもセルラーネットワークを介して効率的に通信できます。AT コマンドを使用した設定により、IoT デバイスをネットワークに接続するプロセスが簡素化されます。この 4G LTE と MQTT の組み合わせにより、リアルタイムのデータ伝送が強化され、スケーラブルな IIoT ソリューションとリモート管理機能が可能になります。

前提条件

ハードウェア要件

Raspberry Pi 5Raspberry Pi 4G LTE CAT4 HAT

ソフトウェア要件

通信ドライバーとツール

関連するドライバーや通信ツールをまだインストールしていない場合は、まず ガイド を確認してください。

Mosquitto Explorer

Mosquitto ブローカー、特に https://test.mosquitto.org で利用可能なテストブローカーを使用します。このブローカーは ユーザー名やパスワードを必要としません。利便性のために、テスト目的で Mosquitto をインストール することをお勧めします。

ATコマンドを使用してMQTTブローカーに接続する

ステップ 1: 受信モードを設定する

AT+QMTCFG="recv/mode",0,0,1

ステップ 2: MQTTクライアント用のネットワークを開く

AT+QMTOPEN=0,"test.mosquitto.org",1883

ステップ 3: MQTT接続ステータスを確認する(オプション)

AT+QMTOPEN?

ステップ 4: Mosquitto MQTTサーバーにクライアントを接続する

AT+QMTCONN=0,"clientExample"
note

クライアントIDは一意である必要があるため、非常にユニークなものを生成してください。Mosquittoのパブリックブローカーは、アクセスにユーザー名やパスワードを必要としません。

ステップ 5: トピックにメッセージを公開する

AT+QMTPUBEX=0,0,0,0,"test/topic",30 
note

>が表示されたら、メッセージを入力してCtrl+Zを押してください。

> This is test data, hello MQTT.

Mosquitto Explorerを開き、公開したトピックを入力します。そこで更新が表示されます。

ステップ 6: トピックを購読する

AT+QMTSUB=0,1,"test/topic",2

Mosquitto Explorerを開き、4Gモジュールから公開したいトピックとメッセージを入力します。

その後、4Gモジュール側で公開されたメッセージが正常に購読されたことが確認できます。

ステップ 7: トピックの購読を解除する

AT+QMTUNS=0,2,"test/topic"

ステップ 8: MQTTサーバーからクライアントを切断する

AT+QMTDISC=0

Pythonコードの実装

ステップ 1. ディレクトリと仮想環境の準備

  • Raspberry Piでターミナルを開きます。
  • 新しいプロジェクトフォルダを作成し、その中に移動します:
mkdir mqtt_EX
cd mqtt_EX
  • Python仮想環境をセットアップします:
python3 -m venv --system-site-packages env
  • 仮想環境を有効化します:
source env/bin/activate
  • 必要なライブラリをインストールします:
pip install pyserial 

ステップ 2. Pythonスクリプトの準備

  • Thonny Python IDE(Raspberry Piにプリインストール済み)を開きます。

  • Thonnyで新しいファイルを作成し、提供されたコードをエディタに貼り付けます。

  • 4G HATのシリアルポートに合わせてusb_portパラメータを更新します。通常、/dev/ttyUSB2または/dev/ttyUSB3である可能性があります。例:

usb_port = "/dev/ttyUSB2"
  • ファイルをmqtt_EXフォルダ内にtest_mqtt.pyとして保存します。
import serial
import time

# シリアルポートの設定
SERIAL_PORT = '/dev/ttyUSB2' # 環境に応じて調整してください
BAUD_RATE = 9600


def send_at_command(ser, command, delay=1):
"""
QuectelモジュールにATコマンドを送信し、応答を待ちます。
"""
ser.write((command + '\r\n').encode())
time.sleep(delay)
response = ser.read_all().decode()
print(f"Command: {command}\nResponse: {response}")
return response


def main():
# シリアル接続を開く
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=5)
if not ser.is_open:
ser.open()

try:
# MQTT受信モードを設定
send_at_command(ser, 'AT+QMTCFG="recv/mode",0,0,1')

# MQTT接続を開く
send_at_command(ser, 'AT+QMTOPEN=0,"test.mosquitto.org",1883')
send_at_command(ser, 'AT+QMTOPEN?') # 接続ステータスを確認

# MQTTブローカーに接続
send_at_command(ser, 'AT+QMTCONN=0,"clientExample"')

# トピックを購読
send_at_command(ser, 'AT+QMTSUB=0,1,"test/topic_subscribe",2')

print("メッセージを公開および購読中。停止するにはCtrl+Cを押してください。")

while True:
try:
# メッセージを公開
send_at_command(ser, 'AT+QMTPUBEX=0,0,0,0,"test/topic_publish",30')
send_at_command(ser, 'This is test data, hello MQTT.', delay=0.5)

# 購読トピックのメッセージを確認
print("購読トピックのメッセージを確認中...")
incoming = ser.read_all().decode()
if incoming:
print(f"Received: {incoming}")

# 操作間の遅延
time.sleep(2)
except KeyboardInterrupt:
print("ループを終了します...")
break

# トピックの購読を解除
send_at_command(ser, 'AT+QMTUNS=0,2,"test/topic_subscribe"')

# ブローカーから切断
send_at_command(ser, 'AT+QMTDISC=0')
finally:
# シリアル接続を閉じる
ser.close()


if __name__ == '__main__':
main()

ステップ 3. スクリプトを実行する

  • ターミナルを開き、プロジェクトディレクトリにいることを確認します:
cd mqtt_EX
  • 仮想環境を有効化します:
source env/bin/activate
  • Pythonを使用してスクリプトを実行します:
python test_mqtt.py
  • 出力結果

リソース

技術サポート & 製品ディスカッション

弊社の製品をお選びいただきありがとうございます!お客様が弊社製品をスムーズにご利用いただけるよう、さまざまなサポートを提供しております。お客様の好みやニーズに応じた複数のコミュニケーションチャネルをご用意しています。

Loading Comments...