基于 Apache Kafka 的实时物联网数据处理节点
我们的前沿处理节点 Kafka-ESP32,结合了 Apache Kafka 和 ESP32C6 微控制器的强大功能,为处理物联网数据流提供了高效的解决方案。通过使用 XIAO ESP32C6 配合 DHT20 环境传感器,数据被收集并通过 ESP32C6 无缝发送到 Apache Kafka。Kafka 的高吞吐量、低延迟消息传递能力实现了实时数据处理和分析,而其分布式架构允许轻松扩展。Kafka-ESP32 使您能够开发自定义应用程序和集成,彻底改变您在当今数据驱动环境中管理和利用物联网资产的方式。
所需材料
本示例将介绍如何使用 XIAO ESP32C6 配合 Grove DHT20 温湿度传感器来完成 AWS IoT Core 的 SageMaker 任务。以下是完成此例程所需的所有硬件设备。
Docker 安装
为什么使用 Docker?因为 Docker 可以在单台机器上模拟多台计算机的环境,并且可以非常轻松地部署应用程序。因此,在这个项目中,我们将使用 Docker 来设置环境并提高效率。
步骤 1. 下载 Docker
根据您的计算机下载不同类型的安装程序。点击这里跳转。

如果您的计算机是 Windows,请在完成步骤 2之前不要安装 docker。
步骤 2. 安装 WSL(Windows 子系统 for Linux)
此步骤适用于 Windows。如果您的计算机是 Mac 或 Linux,可以跳过此步骤。
- 以管理员身份运行以下代码。

dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart
dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart
-
从这里下载此工具并双击安装。
-
前往您的 Microsoft Store 搜索并下载您喜欢的 linux 版本,这里我安装了 Ubuntu。

- 安装 Linux 后,您需要打开它并设置您的用户名和密码,然后需要等待一分钟进行初始化。

- 运行以下指令来使用 WSL。

- 安装 WSL 后,现在您可以双击您的 docker 安装程序来安装它。当您看到以下图像时,表示它正在工作。

部署服务
在我们开始之前,我想介绍一下这个项目中每个服务的功能。

这是此项目的目录结构供您参考。我将在以下步骤中逐一创建这些文件。每个文件的位置都非常重要。我强烈建议您参考此目录结构。创建一个 kafka_xiao_project 目录并包含这些文件。

步骤 3. 部署 Python 服务器
由于 MCU 设备性能不足,无法直接用作 kafka 的客户端。因此您需要构建一个服务器来进行数据传输。此步骤是使用 python 构建一个简单的服务器。XIAO ESP32C6 主要用于从 DHT20 收集环境数据并将其发送到服务器。
- 首先我们需要创建 app.py 文件,这是服务器要做的事情。
from flask import Flask
from kafka import KafkaProducer, KafkaConsumer
app = Flask(__name__)
@app.route('/favicon.ico')
def favicon():
return '', 204
@app.route('/<temperature>/<humidity>')
def send_data(temperature, humidity):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
data = f'Temperature: {temperature}, Humidity: {humidity}'
producer.send('my_topic', data.encode('utf-8'))
return f'Temperature: {temperature}, Humidity: {humidity}'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)
- 创建 requirements.txt,这是依赖库。
flask
kafka-python
- 创建 Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "app.py"]
- 创建这 3 个文件后,我们可以通过运行以下代码来构建 docker 镜像。
docker build -t pyserver .

步骤 4. 部署 Jupyter Notebook
Jupyter Notebook 主要用于调试,它是一个非常好用的工具。同时我们可以使用 python 来操作 Kafka。
- 首先创建 Dockerfile。
FROM python:3.9
RUN pip install jupyter
WORKDIR /notebook
EXPOSE 8888
CMD ["jupyter", "notebook", "--ip=0.0.0.0", "--port=8888", "--no-browser", "--allow-root"]
- 构建 jupyter docker 镜像。
docker build -t jupyter .

步骤 5. 启动 Docker 集群
我们可以使用 docker-compose.yml 来构建 docker 集群。docker-compose 中的每个服务代表一台独立的计算机,我们使用 kafka-net 将它们相互连接。
- 所以首先我们需要创建 docker-compose.yml。
services:
zookeeper:
container_name: zookeeper
hostname: zookeeper
image: docker.io/bitnami/zookeeper
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka-net
kafka:
container_name: kafka
hostname: kafka
image: docker.io/bitnami/kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
depends_on:
- zookeeper
networks:
- kafka-net
jupyter:
image: jupyter:latest
depends_on:
- kafka
volumes:
- ./myjupyter:/notebook
ports:
- "8888:8888"
environment:
- JUPYTER_ENABLE_LAB=yes
networks:
- kafka-net
pyserver:
image: pyserver:latest
depends_on:
- kafka
volumes:
- ./myserver/app.py:/app/app.py
ports:
- "5001:5001"
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
- 然后我们通过运行以下命令来启动这个 docker 集群。
docker-compose up -d
端口可能被占用,你可以将端口从 5001 更改为 5002 等,或者关闭占用该端口的应用程序。

- 在接下来的几个操作中,我们将测试它是否工作正常。首先我们打开软件 docker desktop 并点击进入 pyserver。

- 现在服务器运行在
http://127.0.0.1:5001
。点击此链接打开它。

- 然后按照这种格式输入两个参数来测试 docker 集群是否工作正常。

- 我们进入 Kafka 内部查看数据是否已发送到 Kafka。
docker exec -it kafka bash
cd opt/bitnami/kafka/bin/
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my_topic --from-beginning

- 我们可以用不同的参数再试一次,你可以看到数据立即发送到了 Kafka。现在,恭喜!你的 docker 集群工作完美。

步骤 7. 通过 Python 测试 Kafka
这一步主要是关于如何使用 Python 操作 Kafka。你也可以跳过这一步。对整体项目操作没有影响。
- 打开 docker desktop 并点击进入 jupyter。

- 点击此链接访问 jupyter。

- 当你成功访问 jupyter 时,你将看到这个页面。

- 点击鼠标右键并创建 New Notebook,使用 Python3(ipykernel)。

- 通过运行
pip install kafka-python
安装 kafka-python 库。

- 安装该库后,我们需要重启 jupyter。

- 现在运行以下代码通过 Python 向 Kafka 发送一些数据。
from kafka import KafkaProducer, KafkaConsumer
#initialize producer
producer = KafkaProducer(bootstrap_servers='localhost:9093')
#send message
producer.send('my_topic', b'Hello, Kafka2')

- 你也可以在 kafka 中检查这些数据。
from kafka import KafkaConsumer
# initialize consumer
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers='localhost:9093',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='group1'
)
# receive data and print
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")

XIAO ESP32C6 和 Apache Kafka
Kafka 是一个分布式流处理平台,能够大规模实时处理数据流。它允许系统之间进行数据的发布-订阅消息传递,提供容错性、持久性和高吞吐量。Kafka 广泛用于构建各个领域的实时数据管道和流处理应用程序。
现在,我们将使用 XIAO ESP32C6 和 DHT20 温湿度传感器来收集数据并实时发送到 Kafka。
步骤 8. 收集数据并发送到 Apache Kafka
- 将以下代码复制到您的 Arduino IDE 中。
#include <WiFi.h>
#include <HTTPClient.h>
//在此处更改为您的 wifi 名称和密码。
const char* ssid = "Maker_2.4G";
const char* password = "15935700";
//在此处更改为您的计算机 IP 地址和服务器端口。
const char* serverUrl = "http://192.168.1.175:5001";
void setup() {
Serial.begin(115200);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(1000);
Serial.println("Connecting to WiFi...");
}
Serial.println("Connected to WiFi");
}
void loop() {
if (WiFi.status() == WL_CONNECTED) {
HTTPClient http;
//创建访问链接
String url = serverUrl;
url += "/";
url += "30.532"; // 温度
url += "/";
url += "60.342"; // 湿度
http.begin(url);
int httpResponseCode = http.GET();
//获取 http 响应并打印
if (httpResponseCode == 200) {
String response = http.getString();
Serial.println("Server response:");
Serial.println(response);
} else {
Serial.print("HTTP error code: ");
Serial.println(httpResponseCode);
}
http.end();
} else {
Serial.println("WiFi disconnected");
}
delay(5000); // 每 5 秒访问一次服务器。
}
如果您不知道您的计算机 IP 地址是什么。您可以运行 ipconfig
(Windows)或 ifconfig | grep net
(Mac 或 Linux)来检查它。

- 使用 Type-C 线缆将您的计算机连接到 C6,并使用 Grove 线缆将 XIAO 扩展板的 I2C 端口 连接到 DHT20 传感器。

- 选择您的开发板。

- 上传代码并打开串口监视器。

- 打开正在运行 kafka 的 Windows PowerShell。现在您将看到环境数据正在发送到 Kafka。恭喜!您成功运行了这个项目!

资源
- [链接] Apache Kafka 介绍
技术支持与产品讨论
感谢您选择我们的产品!我们在这里为您提供不同的支持,以确保您使用我们产品的体验尽可能顺畅。我们提供多种沟通渠道,以满足不同的偏好和需求。