Skip to main content

基于 Apache Kafka 的实时物联网数据处理节点

我们的前沿处理节点 Kafka-ESP32,结合了 Apache Kafka 和 ESP32C6 微控制器的强大功能,为处理物联网数据流提供了高效的解决方案。通过使用 XIAO ESP32C6 配合 DHT20 环境传感器,数据被收集并通过 ESP32C6 无缝发送到 Apache Kafka。Kafka 的高吞吐量、低延迟消息传递能力实现了实时数据处理和分析,而其分布式架构允许轻松扩展。Kafka-ESP32 使您能够开发自定义应用程序和集成,彻底改变您在当今数据驱动环境中管理和利用物联网资产的方式。

所需材料

本示例将介绍如何使用 XIAO ESP32C6 配合 Grove DHT20 温湿度传感器来完成 AWS IoT Core 的 SageMaker 任务。以下是完成此例程所需的所有硬件设备。

Docker 安装

为什么使用 Docker?因为 Docker 可以在单台机器上模拟多台计算机的环境,并且可以非常轻松地部署应用程序。因此,在这个项目中,我们将使用 Docker 来设置环境并提高效率。

步骤 1. 下载 Docker

根据您的计算机下载不同类型的安装程序。点击这里跳转。

tip

如果您的计算机是 Windows,请在完成步骤 2之前不要安装 docker。

步骤 2. 安装 WSL(Windows 子系统 for Linux)

tip

此步骤适用于 Windows。如果您的计算机是 Mac 或 Linux,可以跳过此步骤。

  1. 以管理员身份运行以下代码。
dism.exe /online /enable-feature /featurename:Microsoft-Windows-Subsystem-Linux /all /norestart
dism.exe /online /enable-feature /featurename:VirtualMachinePlatform /all /norestart
  1. 这里下载此工具并双击安装。

  2. 前往您的 Microsoft Store 搜索并下载您喜欢的 linux 版本,这里我安装了 Ubuntu。

  1. 安装 Linux 后,您需要打开它并设置您的用户名和密码,然后需要等待一分钟进行初始化。
  1. 运行以下指令来使用 WSL
  1. 安装 WSL 后,现在您可以双击您的 docker 安装程序来安装它。当您看到以下图像时,表示它正在工作。

部署服务

在我们开始之前,我想介绍一下这个项目中每个服务的功能。

这是此项目的目录结构供您参考。我将在以下步骤中逐一创建这些文件。每个文件的位置都非常重要。我强烈建议您参考此目录结构。创建一个 kafka_xiao_project 目录并包含这些文件。

步骤 3. 部署 Python 服务器

由于 MCU 设备性能不足,无法直接用作 kafka 的客户端。因此您需要构建一个服务器来进行数据传输。此步骤是使用 python 构建一个简单的服务器。XIAO ESP32C6 主要用于从 DHT20 收集环境数据并将其发送到服务器。

  1. 首先我们需要创建 app.py 文件,这是服务器要做的事情。
from flask import Flask
from kafka import KafkaProducer, KafkaConsumer

app = Flask(__name__)

@app.route('/favicon.ico')
def favicon():
return '', 204

@app.route('/<temperature>/<humidity>')
def send_data(temperature, humidity):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
data = f'Temperature: {temperature}, Humidity: {humidity}'
producer.send('my_topic', data.encode('utf-8'))
return f'Temperature: {temperature}, Humidity: {humidity}'

if __name__ == '__main__':
app.run(host='0.0.0.0', port=5001)
  1. 创建 requirements.txt,这是依赖库。
flask
kafka-python
  1. 创建 Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "app.py"]
  1. 创建这 3 个文件后,我们可以通过运行以下代码来构建 docker 镜像。
docker build -t pyserver .

步骤 4. 部署 Jupyter Notebook

Jupyter Notebook 主要用于调试,它是一个非常好用的工具。同时我们可以使用 python 来操作 Kafka。

  1. 首先创建 Dockerfile
FROM python:3.9

RUN pip install jupyter

WORKDIR /notebook

EXPOSE 8888

CMD ["jupyter", "notebook", "--ip=0.0.0.0", "--port=8888", "--no-browser", "--allow-root"]
  1. 构建 jupyter docker 镜像。
docker build -t jupyter .

步骤 5. 启动 Docker 集群

我们可以使用 docker-compose.yml 来构建 docker 集群。docker-compose 中的每个服务代表一台独立的计算机,我们使用 kafka-net 将它们相互连接。

  1. 所以首先我们需要创建 docker-compose.yml
services:
zookeeper:
container_name: zookeeper
hostname: zookeeper
image: docker.io/bitnami/zookeeper
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
networks:
- kafka-net

kafka:
container_name: kafka
hostname: kafka
image: docker.io/bitnami/kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_BROKER_ID=0
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
depends_on:
- zookeeper
networks:
- kafka-net

jupyter:
image: jupyter:latest
depends_on:
- kafka
volumes:
- ./myjupyter:/notebook
ports:
- "8888:8888"
environment:
- JUPYTER_ENABLE_LAB=yes
networks:
- kafka-net

pyserver:
image: pyserver:latest
depends_on:
- kafka
volumes:
- ./myserver/app.py:/app/app.py
ports:
- "5001:5001"
networks:
- kafka-net

networks:
kafka-net:
driver: bridge
  1. 然后我们通过运行以下命令来启动这个 docker 集群。
docker-compose up -d
tip

端口可能被占用,你可以将端口从 5001 更改为 5002 等,或者关闭占用该端口的应用程序。

  1. 在接下来的几个操作中,我们将测试它是否工作正常。首先我们打开软件 docker desktop 并点击进入 pyserver
  1. 现在服务器运行在 http://127.0.0.1:5001 。点击此链接打开它。
  1. 然后按照这种格式输入两个参数来测试 docker 集群是否工作正常。
  1. 我们进入 Kafka 内部查看数据是否已发送到 Kafka。
docker exec -it kafka bash

cd opt/bitnami/kafka/bin/

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic my_topic --from-beginning
  1. 我们可以用不同的参数再试一次,你可以看到数据立即发送到了 Kafka。现在,恭喜!你的 docker 集群工作完美。

步骤 7. 通过 Python 测试 Kafka

tip

这一步主要是关于如何使用 Python 操作 Kafka。你也可以跳过这一步。对整体项目操作没有影响。

  1. 打开 docker desktop 并点击进入 jupyter。
  1. 点击此链接访问 jupyter。
  1. 当你成功访问 jupyter 时,你将看到这个页面。
  1. 点击鼠标右键并创建 New Notebook,使用 Python3(ipykernel)。
  1. 通过运行 pip install kafka-python 安装 kafka-python 库。
  1. 安装该库后,我们需要重启 jupyter。
  1. 现在运行以下代码通过 Python 向 Kafka 发送一些数据。
from kafka import KafkaProducer, KafkaConsumer

#initialize producer
producer = KafkaProducer(bootstrap_servers='localhost:9093')
#send message
producer.send('my_topic', b'Hello, Kafka2')
  1. 你也可以在 kafka 中检查这些数据。
from kafka import KafkaConsumer

# initialize consumer
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers='localhost:9093',
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='group1'
)

# receive data and print
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")

XIAO ESP32C6 和 Apache Kafka

Kafka 是一个分布式流处理平台,能够大规模实时处理数据流。它允许系统之间进行数据的发布-订阅消息传递,提供容错性、持久性和高吞吐量。Kafka 广泛用于构建各个领域的实时数据管道和流处理应用程序。

现在,我们将使用 XIAO ESP32C6 和 DHT20 温湿度传感器来收集数据并实时发送到 Kafka。

步骤 8. 收集数据并发送到 Apache Kafka

  1. 将以下代码复制到您的 Arduino IDE 中。
#include <WiFi.h>
#include <HTTPClient.h>

//在此处更改为您的 wifi 名称和密码。
const char* ssid = "Maker_2.4G";
const char* password = "15935700";

//在此处更改为您的计算机 IP 地址和服务器端口。
const char* serverUrl = "http://192.168.1.175:5001";

void setup() {
Serial.begin(115200);

WiFi.begin(ssid, password);

while (WiFi.status() != WL_CONNECTED) {
delay(1000);
Serial.println("Connecting to WiFi...");
}

Serial.println("Connected to WiFi");
}

void loop() {
if (WiFi.status() == WL_CONNECTED) {
HTTPClient http;

//创建访问链接
String url = serverUrl;
url += "/";
url += "30.532"; // 温度
url += "/";
url += "60.342"; // 湿度

http.begin(url);

int httpResponseCode = http.GET();

//获取 http 响应并打印
if (httpResponseCode == 200) {
String response = http.getString();
Serial.println("Server response:");
Serial.println(response);
} else {
Serial.print("HTTP error code: ");
Serial.println(httpResponseCode);
}

http.end();
} else {
Serial.println("WiFi disconnected");
}

delay(5000); // 每 5 秒访问一次服务器。
}

如果您不知道您的计算机 IP 地址是什么。您可以运行 ipconfig(Windows)或 ifconfig | grep net(Mac 或 Linux)来检查它。

  1. 使用 Type-C 线缆将您的计算机连接到 C6,并使用 Grove 线缆将 XIAO 扩展板的 I2C 端口 连接到 DHT20 传感器。
  1. 选择您的开发板。
  1. 上传代码并打开串口监视器。
  1. 打开正在运行 kafka 的 Windows PowerShell。现在您将看到环境数据正在发送到 Kafka。恭喜!您成功运行了这个项目!

资源

技术支持与产品讨论

感谢您选择我们的产品!我们在这里为您提供不同的支持,以确保您使用我们产品的体验尽可能顺畅。我们提供多种沟通渠道,以满足不同的偏好和需求。

Loading Comments...