Apache KafkaによるリアルタイムIoTデータ処理ノード
この文書は AI によって翻訳されています。内容に不正確な点や改善すべき点がございましたら、文書下部のコメント欄または以下の Issue ページにてご報告ください。
https://github.com/Seeed-Studio/wiki-documents/issues
最先端の処理ノードであるKafka-ESP32は、Apache KafkaとESP32C6マイクロコントローラーのパワーを組み合わせ、IoTデータストリームを効率的に処理するソリューションを提供します。XIAO ESP32C6とDHT20環境センサーを使用することで、データを収集し、ESP32C6を介してシームレスにApache Kafkaに送信します。Kafkaの高スループット、低レイテンシのメッセージング機能により、リアルタイムのデータ処理と分析が可能になり、その分散アーキテクチャによりスケーラビリティも容易です。Kafka-ESP32は、カスタムアプリケーションや統合の開発を可能にし、データ駆動型の現代におけるIoT資産の管理と活用方法を革新します。
必要な材料
この例では、XIAO ESP32C6とGrove DHT20温湿度センサーを使用して、AWS IoT CoreのSageMakerタスクを完了する方法を紹介します。このルーチンを完了するために必要なすべてのハードウェアデバイスは以下の通りです。
Docker インストール
なぜ Docker を使用するのか?Docker を使用すると、1台のマシン上で複数のコンピュータ環境をシミュレートし、アプリケーションを非常に簡単にデプロイできます。そのため、このプロジェクトでは Docker を使用して環境をセットアップし、効率を向上させます。
ステップ 1. Docker をダウンロード
お使いのコンピュータに応じて異なる種類のインストーラーをダウンロードしてください。こちらをクリックして移動します。

お使いのコンピュータが Windows の場合、ステップ 2 を完了するまで Docker をインストールしないでください。
ステップ 2. WSL(Windows Subsystem for Linux)のインストール
このステップは Windows 用です。お使いのコンピュータが Mac または Linux の場合、このステップをスキップできます。
- 管理者として以下のコードを実行します。

dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart
dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart
こちらからツールをダウンロードし、ダブルクリックしてインストールします。
Microsoft Store に移動して、好きな Linux バージョンを検索してダウンロードします。ここでは Ubuntu をインストールしました。

- Linux をインストールした後、開いてユーザー名とパスワードを設定し、初期化が完了するまで数分待つ必要があります。

- 以下の指示を実行して WSL を使用します。

- WSL をインストールした後、Docker インストーラーをダブルクリックしてインストールできます。以下の画像が表示されたら、正常に動作しています。

サービスのデプロイ
開始する前に、このプロジェクトで使用する各サービスの機能を紹介します。

以下は、このプロジェクトのディレクトリ構造の参考例です。次のステップでこれらのファイルを1つずつ作成します。各ファイルの位置は非常に重要です。このディレクトリ構造を参考にすることを強くお勧めします。kafka_xiao_project ディレクトリを作成し、これらのファイルを含めます。

ステップ 3. Python サーバーのデプロイ
MCU デバイスの性能不足により、Kafka のクライアントとして直接使用することはできません。そのため、データ転送を行うサーバーを構築する必要があります。このステップでは、Python を使用して簡単なサーバーを構築します。XIAO ESP32C6 は主に DHT20 から環境データを収集し、それをサーバーに送信します。
- まず app.py ファイルを作成します。これはサーバーの動作を定義します。
from flask import Flask
from kafka import KafkaProducer, KafkaConsumer
app = Flask(__name__)
@app.route('/favicon.ico')
def favicon():
return '', 204
@app.route('/<temperature>/<humidity>')
def send_data(temperature, humidity):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
data = f'Temperature: {temperature}, Humidity: {humidity}'
producer.send('my_topic', data.encode('utf-8'))
return f'Temperature: {temperature}, Humidity: {humidity}'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)
- requirements.txt を作成します。これは依存ライブラリを定義します。
flask
kafka-python
- Dockerfile を作成します。
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "app.py"]
- 上記の3つのファイルを作成した後、以下のコードを実行して Docker イメージをビルドします。
docker build -t pyserver .

ステップ 4. Jupyter Notebook のデプロイ
Jupyter Notebook は主にデバッグに使用される非常に便利なツールです。また、Python を使用して Kafka を操作することもできます。
- まず Dockerfile を作成します。
FROM python:3.9
RUN pip install jupyter
WORKDIR /notebook
EXPOSE 8888
CMD ["jupyter", "notebook", "--ip=0.0.0.0", "--port=8888", "--no-browser", "--allow-root"]
- Jupyter の Docker イメージをビルドします。
docker build -t jupyter .

ステップ 5. Docker クラスターの起動
docker-compose.yml を使用して Docker クラスターを構築できます。docker-compose 内の各サービスは独立したコンピュータを表し、kafka-net を使用してそれらを相互接続します。
- まず docker-compose.yml を作成します。
services:
zookeeper:
container_name: zookeeper
hostname: zookeeper
image: docker.io/bitnami/zookeeper
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka-net
kafka:
container_name: kafka
hostname: kafka
image: docker.io/bitnami/kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
depends_on:
- zookeeper
networks:
- kafka-net
jupyter:
image: jupyter:latest
depends_on:
- kafka
volumes:
- ./myjupyter:/notebook
ports:
- "8888:8888"
environment:
- JUPYTER_ENABLE_LAB=yes
networks:
- kafka-net
pyserver:
image: pyserver:latest
depends_on:
- kafka
volumes:
- ./myserver/app.py:/app/app.py
ports:
- "5001:5001"
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
- 次に、以下のコマンドを実行してこの Docker クラスターを起動します。
docker-compose up -d
ポートが使用中の場合があります。その場合はポートを5001から5002などに変更するか、ポートを占有しているアプリケーションを終了してください。

- 次のいくつかの操作では、正常に動作しているかどうかをテストします。まず、Docker Desktopソフトウェアを開き、pyserverをクリックします。

- 現在、サーバーは http://127.0.0.1:5001 で動作しています。このリンクをクリックして開きます。

- 次に、以下の形式で2つのパラメータを入力して、Dockerクラスターが正常に動作しているかどうかをテストします。

- Kafka内に入り、データがKafkaに送信されたかどうかを確認します。
docker exec -it kafka bash
cd opt/bitnami/kafka/bin/
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my_topic --from-beginning

- 別のパラメータで再度試してみると、データが即座にKafkaに送信されたことが確認できます。おめでとうございます!Dockerクラスターが完璧に動作しています。

ステップ7. PythonでKafkaをテストする
このステップは主にPythonを使用してKafkaを操作する方法についてです。このステップをスキップしても、プロジェクト全体の操作には影響しません。
- Docker Desktopを開き、jupyterをクリックします。

- このリンクをクリックしてjupyterにアクセスします。

- jupyterに正常にアクセスすると、以下のページが表示されます。

- 右クリックしてNew Notebookを作成し、Python3(ipykernel)を使用します。

pip install kafka-python
を実行してkafka-pythonライブラリをインストールします。

- ライブラリをインストールした後、jupyterを再起動する必要があります。

- 次に、以下のコードを実行してPythonを使用してKafkaにデータを送信します。
from kafka import KafkaProducer, KafkaConsumer
# プロデューサーを初期化
producer = KafkaProducer(bootstrap_servers='localhost:9093')
# メッセージを送信
producer.send('my_topic', b'Hello, Kafka2')

- また、Kafka内でこれらのデータを確認することもできます。
from kafka import KafkaConsumer
# コンシューマーを初期化
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers='localhost:9093',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='group1'
)
# データを受信して表示
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")

XIAO ESP32C6 と Apache Kafka
Kafka は、スケールに応じたデータストリームのリアルタイム処理を可能にする分散型ストリーミングプラットフォームです。システム間でのデータのパブリッシュ-サブスクライブ型メッセージングを可能にし、フォールトトレランス、永続性、高スループットを提供します。Kafka は、さまざまな分野でリアルタイムデータパイプラインやストリーミングアプリケーションを構築するために広く使用されています。
ここでは、XIAO ESP32C6 と DHT20 温湿度センサーを使用してデータを収集し、リアルタイムで Kafka に送信します。
ステップ 8. データを収集して Apache Kafka に送信する
- 以下のコードを Arduino IDE にコピーしてください。
#include <WiFi.h>
#include <HTTPClient.h>
// WiFi の名前とパスワードをここに変更してください。
const char* ssid = "Maker_2.4G";
const char* password = "15935700";
// コンピュータの IP アドレスとサーバーポートをここに変更してください。
const char* serverUrl = "http://192.168.1.175:5001";
void setup() {
Serial.begin(115200);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(1000);
Serial.println("WiFi に接続中...");
}
Serial.println("WiFi に接続しました");
}
void loop() {
if (WiFi.status() == WL_CONNECTED) {
HTTPClient http;
// アクセスリンクを作成
String url = serverUrl;
url += "/";
url += "30.532"; // 温度
url += "/";
url += "60.342"; // 湿度
http.begin(url);
int httpResponseCode = http.GET();
// HTTP レスポンスを取得して出力
if (httpResponseCode == 200) {
String response = http.getString();
Serial.println("サーバーの応答:");
Serial.println(response);
} else {
Serial.print("HTTP エラーコード: ");
Serial.println(httpResponseCode);
}
http.end();
} else {
Serial.println("WiFi が切断されました");
}
delay(5000); // 5 秒ごとにサーバーにアクセス
}
コンピュータの IP アドレスがわからない場合は、ipconfig
(Windows)または ifconfig | grep net
(Mac または Linux)を実行して確認できます。

- Type-C ケーブルを使用してコンピュータと C6 を接続し、Grove ケーブルを使用して XIAO 拡張ボードの I2C ポート と DHT20 センサーを接続します。

- 開発ボードを選択します。

- コードをアップロードし、シリアルモニターを開きます。

- Kafka を実行している Windows PowerShell を開きます。これで環境データが Kafka に送信されているのが確認できます。おめでとうございます!このプロジェクトを正常に実行できました!

リソース
- [リンク] Apache Kafka 入門
技術サポートと製品ディスカッション
弊社製品をお選びいただきありがとうございます!製品をご利用いただく際に、できる限りスムーズな体験を提供するため、さまざまなサポートを提供しております。異なる好みやニーズに対応するため、複数のコミュニケーションチャネルをご用意しています。