Skip to main content

Raspberry Pi 上的 4G LTE HAT - TCP/IP 网络详解

简介

TCP(传输控制协议)对于使用 Raspberry Pi 4G HAT 的应用至关重要,因为它确保了通过互联网进行可靠、有序且经过错误检查的数据传输。这对于实时应用(如物联网设备)尤为重要,因为这些应用需要与云服务器或远程系统保持一致的通信。TCP 的可靠性使其非常适合发送关键数据(例如传感器读数、遥测数据)和接收命令,确保数据不会丢失或损坏。它支持多种应用,例如远程监控、消息系统和文件传输,Raspberry Pi 可以作为网关或客户端使用。

前置条件

硬件需求

Raspberry Pi 5Raspberry Pi 4G LTE CAT4 HAT

软件需求

通信驱动和工具

如果尚未安装相关驱动和通信工具,请先查看 指南

其他需求

您需要一个预先配置的 TCP 服务器,具体信息如下:

  • 主机:例如,您自己的服务器,或者用于测试的公共服务器 tcpbin.com。
  • 端口号:一个专用端口号,在本例中,我们将使用 tcpbin.com 的端口 4242

tcpbin.com 是一个公共 TCP 服务器,用于测试和调试 TCP/IP 通信。它会回显发送给它的消息,非常适合验证连接性和数据传输。

向 tcpbin.com 发送 AT 指令

在 Raspberry Pi 上打开 Minicom 或在 Windows 上通过 USB 打开 Qcom Tool。

sudo minicom -D /dev/ttyUSB2
步骤操作AT 指令预期响应
1配置移动网络提供商的 APN。AT+QICSGP=1,1,"dialogbb","","",1OK
2激活 PDP 上下文。AT+QIACT=1OK
3验证 PDP 上下文是否已激活。AT+QIACT?+QIACT: 1,1,1,"<Your_IP_Address>" OK
4打开与服务器的 TCP 连接。AT+QIOPEN=1,0,"TCP","tcpbin.com",4242,0,0+QIOPEN: 0,0 (表示连接成功)
5向服务器发送消息。AT+QISEND=0> (提示输入消息) 输入消息后:Hello TCPBin<Ctrl+Z> SEND OK
6读取服务器的响应。AT+QIRD=0,1500+QIRD: <length> Hello TCPBin OK
7关闭 TCP 连接。AT+QICLOSE=0OK

自动化 Python 脚本

以下是一个自动化的 Python 脚本,用于在 Raspberry Pi 上与 Quectel 4G LTE 模块交互。该脚本使用 serial 库向模块发送 AT 指令。您可以自定义 APN、端口、波特率、TCP 端口和地址等变量。

Python 代码实现

第一步:准备目录和虚拟环境

  • 在 Raspberry Pi 上打开终端。
  • 创建一个新的项目文件夹并进入该文件夹:
mkdir TCP_EX
cd TCP_EX
  • 设置 Python 虚拟环境:
python3 -m venv --system-site-packages env
  • 激活虚拟环境:
source env/bin/activate
  • 安装所需的库:
pip install pyserial 

第二步:准备 Python 脚本

  • 打开 Thonny Python IDE(Raspberry Pi 上预装)。

  • 在 Thonny 中创建一个新文件,并将提供的代码粘贴到编辑器中。

  • 更新 usb_port 参数以匹配 Raspberry Pi 上 4G HAT 的串口。通常是 /dev/ttyUSB2/dev/ttyUSB3。示例:

usb_port = "/dev/ttyUSB2"
  • 将文件保存为 test_mqtt.py,保存在 TCP_EX 文件夹中。
import serial
import time

# 配置变量
APN = "dialogbb" # 替换为您的网络 APN
PORT = "/dev/ttyUSB2" # 连接到 Quectel 模块的串口
BAUDRATE = 9600 # 通信波特率
TCP_ADDRESS = "tcpbin.com" # TCP 服务器地址
TCP_PORT = 4242 # TCP 服务器端口
MESSAGE = "Hello TCPBin" # 要发送的消息


def send_command(ser, command, wait_for="OK", timeout=5):
"""
向模块发送 AT 指令并等待响应。
"""
ser.write((command + "\r\n").encode())
time.sleep(0.5)
end_time = time.time() + timeout
response = b""
while time.time() < end_time:
if ser.in_waiting > 0:
response += ser.read(ser.in_waiting)
if wait_for.encode() in response:
break
print(f">> {command}")
print(response.decode().strip())
return response.decode().strip()


def main():
try:
# 打开串口连接
ser = serial.Serial(PORT, BAUDRATE, timeout=1)
time.sleep(2) # 允许模块初始化

# 1. 配置 APN
send_command(ser, f'AT+QICSGP=1,1,"{APN}","","",1')

# 2. 激活 PDP 上下文
send_command(ser, "AT+QIACT=1")

# 3. 检查 PDP 上下文状态
send_command(ser, "AT+QIACT?")

# 4. 打开 TCP 连接
send_command(ser, f'AT+QIOPEN=1,0,"TCP","{TCP_ADDRESS}",{TCP_PORT},0,0')
time.sleep(5) # 允许连接建立

# 5. 发送数据
send_command(ser, f"AT+QISEND=0")
ser.write((MESSAGE + "\x1A").encode()) # 发送消息并以 Ctrl+Z 结束
time.sleep(1) # 允许发送时间
print("消息已发送。")

# 6. 读取响应
response = send_command(ser, "AT+QIRD=0,1500")
print(f"服务器响应: {response}")

# 7. 关闭连接
send_command(ser, "AT+QICLOSE=0")
print("连接已关闭。")

# 关闭串口
ser.close()

except Exception as e:
print(f"错误: {e}")


if __name__ == "__main__":
main()


第三步:运行脚本

  • 打开终端,确保您位于项目目录中:
cd TCP_EX
  • 激活虚拟环境:
source env/bin/activate
  • 使用 Python 运行脚本:
python test_tcp.py
  • 输出结果

资源

技术支持与产品讨论

感谢您选择我们的产品!我们将为您提供多种支持,确保您在使用我们的产品时拥有尽可能顺畅的体验。我们提供多种沟通渠道,以满足不同的偏好和需求。

Loading Comments...