基于CentOS Stream 8的物联网数据采集与展示方案
2025-08-22 16:06:45,

系统架构全景图

图表  

 

一、系统平台优化(CentOS Stream 8)

1. 系统基础配置

bash
# 1. 系统更新与加固
sudo dnf update -y sudo dnf install epel-release -y sudo dnf install fail2ban firewalld -y # 2. 创建专用运维账户 sudo useradd -m -s /bin/bash iotadmin sudo passwd iotadmin sudo usermod -aG wheel iotadmin # 3. SSH安全加固 sudo sed -i 's/^PermitRootLogin yes/PermitRootLogin no/' /etc/ssh/sshd_config sudo sed -i 's/^PasswordAuthentication yes/PasswordAuthentication no/' /etc/ssh/sshd_config sudo systemctl restart sshd # 4. 防火墙配置 sudo systemctl enable --now firewalld sudo firewall-cmd --permanent --add-port=1883/tcp # MQTT sudo firewall-cmd --permanent --add-port=8883/tcp # MQTT/SSL sudo firewall-cmd --permanent --add-port=9092/tcp # Kafka sudo firewall-cmd --permanent --add-port=3000/tcp # Grafana sudo firewall-cmd --reload

2. 内核参数优化(/etc/sysctl.conf)

conf
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 15

二、MQTT Broker集群部署(EMQX企业版)

1. 集群化部署

bash
# 安装EMQX企业版
curl -s https://assets.emqx.com/scripts/install-emqx-rpm.sh | sudo bash sudo dnf install emqx-enterprise -y # 配置集群(3节点示例) # 节点1(10.0.0.1): echo "cluster.name = iot_platform" >> /etc/emqx/emqx.conf echo "node.name = emqx@10.0.0.1" >> /etc/emqx/emqx.conf # 节点2(10.0.0.2): emqx_ctl cluster join emqx@10.0.0.1

2. 安全加固配置

bash
# 1. 启用TLS加密
sudo mkdir /etc/emqx/certs sudo certbot certonly --standalone -d mqtt.example.com sudo cp /etc/letsencrypt/live/mqtt.example.com/* /etc/emqx/certs/ # 2. 配置EMQX(/etc/emqx/emqx.conf) listeners.ssl.default { bind = "0.0.0.0:8883" max_connections = 100000 ssl_options { keyfile = "/etc/emqx/certs/privkey.pem" certfile = "/etc/emqx/certs/fullchain.pem" } } # 3. 设备级认证 emqx_ctl users add device_001 $6$rounds=10000$somesalt$hashed_password

3. 主题权限控制

conf
# /etc/emqx/acl.conf
{allow, {user, "device_001"}, publish, ["sensors/001/#"]}
{allow, {user, "backend"}, subscribe, ["sensors/#"]}
{deny, all}

三、数据处理与存储架构

1. 消息队列缓冲(Kafka)

bash
# 安装Kafka
wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz tar -xzf kafka_2.13-3.4.0.tgz # 配置集群(3节点) # server.properties broker.id=1 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://node1:9092 zookeeper.connect=node1:2181,node2:2181,node3:2181

2. 时序数据库(TimescaleDB)

bash
# 安装PostgreSQL 15 + TimescaleDB
sudo dnf install https://download.postgresql.org/pub/repos/yum/reporpms/EL-8-x86_64/pgdg-redhat-repo-latest.noarch.rpm sudo dnf module disable postgresql sudo dnf install postgresql15-server postgresql15-contrib timescaledb-2-postgresql-15 # 初始化数据库 sudo /usr/pgsql-15/bin/postgresql-15-setup initdb sudo systemctl enable --now postgresql-15 # 创建超级表 CREATE TABLE sensor_data ( time TIMESTAMPTZ NOT NULL, device_id TEXT NOT NULL, value DOUBLE PRECISION NOT NULL ); SELECT create_hypertable('sensor_data', 'time');

3. 数据清洗服务(Python示例)

python
from kafka import KafkaConsumer import psycopg2 # Kafka消费者 consumer = KafkaConsumer( 'raw_sensor_data'Bootstrap_servers=['kafka1:9092''kafka2:9092'], security_protocol='SSL', ssl_cafile='ca.pem' ) # TimescaleDB连接 conn = psycopg2.connect("dbname=tsdb user=tsdbadmin") cursor = conn.cursor() for message in consumer: data = json.loads(message.value) # 数据验证 if not validate_sensor_data(data): continue # 数据清洗 cleaned = clean_data(data) # 写入数据库 cursor.execute( "INSERT INTO sensor_data (time, device_id, value) VALUES (%s, %s, %s)"(cleaned['timestamp'], cleaned['device_id'], cleaned['value']) ) conn.commit() # 更新缓存 Redis.set(f"latest:{cleaned['device_id']}", json.dumps(cleaned))

四、安全加固体系

1. 传输层安全

协议 端口 加密方式 证书管理
MQTT 8883 TLS 1.3 Let's Encrypt自动更新
HTTPS 443 TLS 1.3 企业级证书
Database 5432 TLS双向认证 自签名CA

2. 数据加密策略

python
# 设备端数据加密示例
from cryptography.hazmat.primitives.ciphers import Cipher, alGorithms, modes from cryptography.hazmat.backends import default_backend def encrypt_data(data, key): iv = os.urandom(12) cipher = Cipher( algorithms.AES(key), modes.GCM(iv), backend=default_backend() ) encryptor = cipher.encryptor() ciphertext = encryptor.update(data) + encryptor.finalize() return iv + encryptor.tag + ciphertext

3. 访问控制矩阵

角色 MQTT权限 DB访问 API权限
设备 发布特定主题
数据服务 订阅所有主题 只写 内部网络访问
前端应用 只读 JWT认证+RBAC
管理员 管理主题 读写 管理员权限

五、运维监控体系

1. 监控组件部署

bash
# Prometheus安装
sudo dnf install prometheus # Node Exporter sudo dnf install node_exporter # Grafana sudo dnf install grafana

2. 关键监控指标

yaml
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 150

3. 告警规则示例

yaml
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 151

六、设备接入与扩展方案

1. 设备接入流程

图表

 

2. 设备管理API设计

python
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 152

3. 多协议支持方案

协议 转换方式 适用场景
HTTP EMQX Webhook 传统设备改造
CoAP CoAP-MQTT代理网关 低功耗设备
Modbus 边缘计算转换 工业设备
LoRaWAN 网络服务器集成 长距离物联网

七、日常运维手册

1. 备份策略

bash
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 153

2. 灾难恢复流程

  1. 恢复最新数据库备份

  2. 重建EMQX集群

  3. 恢复Kafka偏移量

  4. 验证数据完整性

  5. 逐步恢复设备连接

3. 性能调优命令

bash
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 154

八、扩展架构设计

1. 边缘计算集成

图表

 

2. 数据管道扩展

python
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 155

3. 多区域部署

bash
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 156

九、前端展示系统

1. 实时数据大屏

JavaScript
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 157

2. 设备管理界面功能

  1. 设备状态监控(在线/离线)

  2. 实时数据曲线(Chart.js)

  3. 历史数据查询(时间范围选择)

  4. 告警管理(阈值设置)

  5. 固件OTA升级

3. 移动APP集成

Kotlin
# 网络性能优化
net.core.somaxconn = 65535
net.core.netdev_max_backlog = 65536
net.ipv4.tcp_max_syn_backlog = 65536

# 文件句柄限制
fs.file-max = 2097152
fs.nr_open = 2097152

# MQTT连接优化
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_intvl = 158

十、持续演进路线

  1. 阶段1(基础平台)

    • EMQX集群部署

    • 核心数据处理流水线

    • 基础监控

  2. 阶段2(安全加固)

    • 设备证书管理

    • 数据端到端加密

    • 审计日志

  3. 阶段3(智能扩展)

    • 边缘计算节点

    • AI异常检测

    • 预测性维护

  4. 阶段4(全球化部署)

    • 多区域集群

    • 数据主权合规

    • 跨云架构

本方案基于CentOS Stream 8构建了企业级物联网平台,通过多层次安全加固、全链路监控、弹性扩展架构,支持从数百到数百万设备的平滑扩展,日均处理能力可达亿级数据点,满足工业4.0场景需求。