Raspberry Pi 上的 4G LTE HAT - TCP/IP 网络详解
简介
TCP(传输控制协议)对于使用 Raspberry Pi 4G HAT 的应用至关重要,因为它确保了通过互联网进行可靠、有序且经过错误检查的数据传输。这对于实时应用(如物联网设备)尤为重要,因为这些应用需要与云服务器或远程系统保持一致的通信。TCP 的可靠性使其非常适合发送关键数据(例如传感器读数、遥测数据)和接收命令,确保数据不会丢失或损坏。它支持多种应用,例如远程监控、消息系统和文件传输,Raspberry Pi 可以作为网关或客户端使用。
前置条件
硬件需求
软件需求
通信驱动和工具
如果尚未安装相关驱动和通信工具,请先查看 指南。
其他需求
您需要一个预先配置的 TCP 服务器,具体信息如下:
- 主机:例如,您自己的服务器,或者用于测试的公共服务器 tcpbin.com。
- 端口号:一个专用端口号,在本例中,我们将使用
tcpbin.com
的端口4242
。
tcpbin.com 是一个公共 TCP 服务器,用于测试和调试 TCP/IP 通信。它会回显发送给它的消息,非常适合验证连接性和数据传输。
向 tcpbin.com 发送 AT 指令
在 Raspberry Pi 上打开 Minicom 或在 Windows 上通过 USB 打开 Qcom Tool。
sudo minicom -D /dev/ttyUSB2
步骤 | 操作 | AT 指令 | 预期响应 |
---|---|---|---|
1 | 配置移动网络提供商的 APN。 | AT+QICSGP=1,1,"dialogbb","","",1 | OK |
2 | 激活 PDP 上下文。 | AT+QIACT=1 | OK |
3 | 验证 PDP 上下文是否已激活。 | AT+QIACT? | +QIACT: 1,1,1,"<Your_IP_Address>" OK |
4 | 打开与服务器的 TCP 连接。 | AT+QIOPEN=1,0,"TCP","tcpbin.com",4242,0,0 | +QIOPEN: 0,0 (表示连接成功) |
5 | 向服务器发送消息。 | AT+QISEND=0 | > (提示输入消息) 输入消息后:Hello TCPBin<Ctrl+Z> SEND OK |
6 | 读取服务器的响应。 | AT+QIRD=0,1500 | +QIRD: <length> Hello TCPBin OK |
7 | 关闭 TCP 连接。 | AT+QICLOSE=0 | OK |
自动化 Python 脚本
以下是一个自动化的 Python 脚本,用于在 Raspberry Pi 上与 Quectel 4G LTE 模块交互。该脚本使用 serial 库向模块发送 AT 指令。您可以自定义 APN、端口、波特率、TCP 端口和地址等变量。
Python 代码实现
第一步:准备目录和虚拟环境
- 在 Raspberry Pi 上打开终端。
- 创建一个新的项目文件夹并进入该文件夹:
mkdir TCP_EX
cd TCP_EX
- 设置 Python 虚拟环境:
python3 -m venv --system-site-packages env
- 激活虚拟环境:
source env/bin/activate
- 安装所需的库:
pip install pyserial
第二步:准备 Python 脚本
-
打开 Thonny Python IDE(Raspberry Pi 上预装)。
-
在 Thonny 中创建一个新文件,并将提供的代码粘贴到编辑器中。
-
更新 usb_port 参数以匹配 Raspberry Pi 上 4G HAT 的串口。通常是
/dev/ttyUSB2
或/dev/ttyUSB3
。示例:
usb_port = "/dev/ttyUSB2"
- 将文件保存为 test_mqtt.py,保存在 TCP_EX 文件夹中。
import serial
import time
# 配置变量
APN = "dialogbb" # 替换为您的网络 APN
PORT = "/dev/ttyUSB2" # 连接到 Quectel 模块的串口
BAUDRATE = 9600 # 通信波特率
TCP_ADDRESS = "tcpbin.com" # TCP 服务器地址
TCP_PORT = 4242 # TCP 服务器端口
MESSAGE = "Hello TCPBin" # 要发送的消息
def send_command(ser, command, wait_for="OK", timeout=5):
"""
向模块发送 AT 指令并等待响应。
"""
ser.write((command + "\r\n").encode())
time.sleep(0.5)
end_time = time.time() + timeout
response = b""
while time.time() < end_time:
if ser.in_waiting > 0:
response += ser.read(ser.in_waiting)
if wait_for.encode() in response:
break
print(f">> {command}")
print(response.decode().strip())
return response.decode().strip()
def main():
try:
# 打开串口连接
ser = serial.Serial(PORT, BAUDRATE, timeout=1)
time.sleep(2) # 允许模块初始化
# 1. 配置 APN
send_command(ser, f'AT+QICSGP=1,1,"{APN}","","",1')
# 2. 激活 PDP 上下文
send_command(ser, "AT+QIACT=1")
# 3. 检查 PDP 上下文状态
send_command(ser, "AT+QIACT?")
# 4. 打开 TCP 连接
send_command(ser, f'AT+QIOPEN=1,0,"TCP","{TCP_ADDRESS}",{TCP_PORT},0,0')
time.sleep(5) # 允许连接建立
# 5. 发送数据
send_command(ser, f"AT+QISEND=0")
ser.write((MESSAGE + "\x1A").encode()) # 发送消息并以 Ctrl+Z 结束
time.sleep(1) # 允许发送时间
print("消息已发送。")
# 6. 读取响应
response = send_command(ser, "AT+QIRD=0,1500")
print(f"服务器响应: {response}")
# 7. 关闭连接
send_command(ser, "AT+QICLOSE=0")
print("连接已关闭。")
# 关闭串口
ser.close()
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()
第三步:运行脚本
- 打开终端,确保您位于项目目录中:
cd TCP_EX
- 激活虚拟环境:
source env/bin/activate
- 使用 Python 运行脚本:
python test_tcp.py
- 输出结果
资源
- [PDF 书籍] TCP 应用指南
技术支持与产品讨论
感谢您选择我们的产品!我们将为您提供多种支持,确保您在使用我们的产品时拥有尽可能顺畅的体验。我们提供多种沟通渠道,以满足不同的偏好和需求。