物联网TCP实战

1. 概述

1.1 物联网TCP通信的重要性

物联网(IoT - Internet of Things)是连接物理世界和数字世界的桥梁,而TCP协议作为可靠的传输层协议,在物联网通信中扮演着重要角色。

本文内容

  • TCP协议基础:TCP协议原理和特点
  • 物联网设备连接:设备如何建立TCP连接
  • 数据传输:数据发送和接收机制
  • 心跳机制:保持连接活跃的心跳设计
  • 断线重连:断线重连策略和实现
  • 实战案例:完整的物联网TCP通信实战

1.2 本文内容结构

本文将从以下几个方面深入探讨物联网TCP实战:

  1. TCP协议基础:TCP协议原理和特点
  2. 连接管理:TCP连接建立和维护
  3. 数据传输:数据发送和接收
  4. 心跳机制:心跳保活设计
  5. 断线重连:重连策略实现
  6. 消息编解码:消息格式设计
  7. 实战案例:完整物联网TCP通信系统

2. TCP协议基础

2.1 TCP协议特点

2.1.1 TCP协议概述

TCP(Transmission Control Protocol):面向连接的、可靠的、基于字节流的传输层通信协议。

TCP协议特点

  • 面向连接:需要先建立连接
  • 可靠传输:保证数据顺序和完整性
  • 全双工通信:双向数据传输
  • 流量控制:防止发送方发送过快
  • 拥塞控制:网络拥塞时降低发送速率

TCP vs UDP

1
2
3
4
5
6
7
// TCP:面向连接,可靠
// 优点:可靠、有序、有流量控制
// 缺点:开销大、速度相对慢

// UDP:无连接,不可靠
// 优点:速度快、开销小
// 缺点:不可靠、无序、无流量控制

2.2 TCP三次握手

2.2.1 连接建立过程

TCP三次握手

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 三次握手过程
// 1. 客户端发送SYN
// 2. 服务端响应SYN-ACK
// 3. 客户端发送ACK

public class TCPHandshake {

// 客户端
public void connect() {
// 1. 发送SYN
sendSYN();

// 2. 接收SYN-ACK
receiveSYNACK();

// 3. 发送ACK
sendACK();

// 连接建立成功
}

// 服务端
public void accept() {
// 1. 接收SYN
receiveSYN();

// 2. 发送SYN-ACK
sendSYNACK();

// 3. 接收ACK
receiveACK();

// 连接建立成功
}
}

3. 连接管理

3.1 服务端实现

3.1.1 TCP服务端

TCP服务端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import java.io.*;
import java.net.*;
import java.util.concurrent.*;

public class IoTTCPServer {

private ServerSocket serverSocket;
private ExecutorService executorService;
private Map<String, Socket> deviceConnections;

public IoTTCPServer(int port) throws IOException {
this.serverSocket = new ServerSocket(port);
this.executorService = Executors.newCachedThreadPool();
this.deviceConnections = new ConcurrentHashMap<>();
}

public void start() {
System.out.println("IoT TCP Server started on port: " + serverSocket.getLocalPort());

while (true) {
try {
// 接受客户端连接
Socket clientSocket = serverSocket.accept();
String deviceId = getDeviceId(clientSocket);

System.out.println("Device connected: " + deviceId);

// 保存设备连接
deviceConnections.put(deviceId, clientSocket);

// 为每个设备创建处理线程
executorService.submit(new DeviceHandler(clientSocket, deviceId));

} catch (IOException e) {
e.printStackTrace();
}
}
}

private String getDeviceId(Socket socket) {
// 从连接中获取设备ID
return socket.getRemoteSocketAddress().toString();
}

// 设备处理器
private class DeviceHandler implements Runnable {
private Socket socket;
private String deviceId;
private BufferedReader reader;
private PrintWriter writer;

public DeviceHandler(Socket socket, String deviceId) {
this.socket = socket;
this.deviceId = deviceId;
}

@Override
public void run() {
try {
reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), "UTF-8")
);
writer = new PrintWriter(
new OutputStreamWriter(socket.getOutputStream(), "UTF-8"),
true
);

String message;
while ((message = reader.readLine()) != null) {
// 处理设备消息
handleDeviceMessage(deviceId, message);
}

} catch (IOException e) {
System.out.println("Device disconnected: " + deviceId);
} finally {
cleanup();
}
}

private void handleDeviceMessage(String deviceId, String message) {
System.out.println("Received from " + deviceId + ": " + message);
// 处理业务逻辑
}

private void cleanup() {
try {
if (reader != null) reader.close();
if (writer != null) writer.close();
if (socket != null) socket.close();
deviceConnections.remove(deviceId);
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void sendToDevice(String deviceId, String message) {
Socket socket = deviceConnections.get(deviceId);
if (socket != null && !socket.isClosed()) {
try {
PrintWriter writer = new PrintWriter(
new OutputStreamWriter(socket.getOutputStream(), "UTF-8"),
true
);
writer.println(message);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

3.2 客户端实现

3.2.1 TCP客户端

TCP客户端实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import java.io.*;
import java.net.*;

public class IoTTCPClient {

private Socket socket;
private BufferedReader reader;
private PrintWriter writer;
private String serverHost;
private int serverPort;
private String deviceId;
private volatile boolean connected = false;

public IoTTCPClient(String serverHost, int serverPort, String deviceId) {
this.serverHost = serverHost;
this.serverPort = serverPort;
this.deviceId = deviceId;
}

public boolean connect() {
try {
socket = new Socket(serverHost, serverPort);
socket.setKeepAlive(true);
socket.setSoTimeout(30000); // 30秒超时

reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), "UTF-8")
);
writer = new PrintWriter(
new OutputStreamWriter(socket.getOutputStream(), "UTF-8"),
true
);

// 发送设备ID
sendMessage("DEVICE_ID:" + deviceId);

connected = true;
System.out.println("Connected to server: " + serverHost + ":" + serverPort);

// 启动接收线程
new Thread(this::receiveMessages).start();

return true;

} catch (IOException e) {
System.out.println("Failed to connect: " + e.getMessage());
connected = false;
return false;
}
}

public void sendMessage(String message) {
if (connected && writer != null) {
writer.println(message);
}
}

private void receiveMessages() {
try {
String message;
while (connected && (message = reader.readLine()) != null) {
handleServerMessage(message);
}
} catch (IOException e) {
System.out.println("Connection lost: " + e.getMessage());
connected = false;
}
}

private void handleServerMessage(String message) {
System.out.println("Received from server: " + message);
// 处理服务器消息
}

public void disconnect() {
connected = false;
try {
if (reader != null) reader.close();
if (writer != null) writer.close();
if (socket != null) socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}

public boolean isConnected() {
return connected && socket != null && !socket.isClosed();
}
}

4. 数据传输

4.1 消息格式设计

4.1.1 消息协议

消息格式设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 消息格式:消息类型|消息长度|消息体
// 例如:DATA|100|{...}

public class IoTMessage {

public enum MessageType {
HEARTBEAT, // 心跳
DATA, // 数据
COMMAND, // 命令
RESPONSE, // 响应
ERROR // 错误
}

private MessageType type;
private int length;
private String body;
private long timestamp;

public IoTMessage(MessageType type, String body) {
this.type = type;
this.body = body;
this.length = body.getBytes().length;
this.timestamp = System.currentTimeMillis();
}

// 编码
public String encode() {
return type.name() + "|" + length + "|" + body;
}

// 解码
public static IoTMessage decode(String message) {
String[] parts = message.split("\\|", 3);
if (parts.length != 3) {
throw new IllegalArgumentException("Invalid message format");
}

MessageType type = MessageType.valueOf(parts[0]);
int length = Integer.parseInt(parts[1]);
String body = parts[2];

IoTMessage msg = new IoTMessage(type, body);
return msg;
}

// Getters
public MessageType getType() { return type; }
public int getLength() { return length; }
public String getBody() { return body; }
public long getTimestamp() { return timestamp; }
}

4.2 数据发送

4.2.1 发送机制

数据发送实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class DataSender {

private IoTTCPClient client;
private BlockingQueue<IoTMessage> messageQueue;

public DataSender(IoTTCPClient client) {
this.client = client;
this.messageQueue = new LinkedBlockingQueue<>();
}

public void sendData(String data) {
IoTMessage message = new IoTMessage(
IoTMessage.MessageType.DATA,
data
);
messageQueue.offer(message);
}

public void start() {
new Thread(() -> {
while (client.isConnected()) {
try {
IoTMessage message = messageQueue.take();
String encoded = message.encode();
client.sendMessage(encoded);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
}

5. 心跳机制

5.1 心跳设计

5.1.1 心跳保活

心跳机制实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.util.concurrent.*;

public class HeartbeatManager {

private IoTTCPClient client;
private ScheduledExecutorService scheduler;
private long heartbeatInterval = 30; // 30秒
private long timeout = 90; // 90秒超时
private long lastHeartbeatTime;
private volatile boolean running = false;

public HeartbeatManager(IoTTCPClient client) {
this.client = client;
this.scheduler = Executors.newScheduledThreadPool(2);
this.lastHeartbeatTime = System.currentTimeMillis();
}

public void start() {
running = true;

// 定时发送心跳
scheduler.scheduleAtFixedRate(
this::sendHeartbeat,
0,
heartbeatInterval,
TimeUnit.SECONDS
);

// 定时检查超时
scheduler.scheduleAtFixedRate(
this::checkTimeout,
timeout,
timeout,
TimeUnit.SECONDS
);
}

private void sendHeartbeat() {
if (client.isConnected() && running) {
IoTMessage heartbeat = new IoTMessage(
IoTMessage.MessageType.HEARTBEAT,
"PING"
);
client.sendMessage(heartbeat.encode());
}
}

public void receiveHeartbeat() {
lastHeartbeatTime = System.currentTimeMillis();
}

private void checkTimeout() {
long elapsed = System.currentTimeMillis() - lastHeartbeatTime;
if (elapsed > timeout * 1000) {
System.out.println("Heartbeat timeout, connection may be lost");
// 触发重连
client.disconnect();
}
}

public void stop() {
running = false;
scheduler.shutdown();
}
}

5.2 服务端心跳处理

5.2.1 服务端心跳响应

服务端心跳处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ServerHeartbeatHandler {

private Map<String, Long> deviceLastHeartbeat;

public ServerHeartbeatHandler() {
this.deviceLastHeartbeat = new ConcurrentHashMap<>();
}

public void handleHeartbeat(String deviceId, IoTMessage message) {
if (message.getType() == IoTMessage.MessageType.HEARTBEAT) {
// 更新最后心跳时间
deviceLastHeartbeat.put(deviceId, System.currentTimeMillis());

// 响应心跳
IoTMessage response = new IoTMessage(
IoTMessage.MessageType.HEARTBEAT,
"PONG"
);
// 发送响应
}
}

public void checkDeviceHealth() {
long currentTime = System.currentTimeMillis();
long timeout = 120000; // 2分钟超时

deviceLastHeartbeat.entrySet().removeIf(entry -> {
if (currentTime - entry.getValue() > timeout) {
System.out.println("Device timeout: " + entry.getKey());
// 清理设备连接
return true;
}
return false;
});
}
}

6. 断线重连

6.1 重连策略

6.1.1 自动重连

断线重连实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.concurrent.*;

public class ReconnectionManager {

private IoTTCPClient client;
private ScheduledExecutorService scheduler;
private int maxRetries = 10;
private long initialDelay = 1; // 1秒
private long maxDelay = 60; // 60秒
private int currentRetry = 0;

public ReconnectionManager(IoTTCPClient client) {
this.client = client;
this.scheduler = Executors.newScheduledThreadPool(1);
}

public void startReconnect() {
if (currentRetry >= maxRetries) {
System.out.println("Max retries reached, stop reconnecting");
return;
}

// 指数退避
long delay = Math.min(
initialDelay * (long) Math.pow(2, currentRetry),
maxDelay
);

System.out.println("Reconnecting in " + delay + " seconds... (attempt " + (currentRetry + 1) + ")");

scheduler.schedule(() -> {
if (!client.isConnected()) {
boolean success = client.connect();
if (success) {
System.out.println("Reconnected successfully");
currentRetry = 0;
} else {
currentRetry++;
startReconnect();
}
}
}, delay, TimeUnit.SECONDS);
}

public void reset() {
currentRetry = 0;
}
}

6.2 连接状态监控

6.2.1 状态监控

连接状态监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ConnectionMonitor {

private IoTTCPClient client;
private ScheduledExecutorService scheduler;

public ConnectionMonitor(IoTTCPClient client) {
this.client = client;
this.scheduler = Executors.newScheduledThreadPool(1);
}

public void start() {
scheduler.scheduleAtFixedRate(
this::checkConnection,
10,
10,
TimeUnit.SECONDS
);
}

private void checkConnection() {
if (!client.isConnected()) {
System.out.println("Connection lost, triggering reconnect");
// 触发重连
}
}

public void stop() {
scheduler.shutdown();
}
}

7. 消息编解码

7.1 JSON编解码

7.1.1 JSON消息格式

JSON消息编解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

public class MessageCodec {

private Gson gson;

public MessageCodec() {
this.gson = new GsonBuilder()
.setDateFormat("yyyy-MM-dd HH:mm:ss")
.create();
}

// 编码
public String encode(Object obj) {
return gson.toJson(obj);
}

// 解码
public <T> T decode(String json, Class<T> clazz) {
return gson.fromJson(json, clazz);
}
}

// 设备数据模型
public class DeviceData {
private String deviceId;
private String sensorType;
private double value;
private long timestamp;

// Constructors, getters, setters
public DeviceData(String deviceId, String sensorType, double value) {
this.deviceId = deviceId;
this.sensorType = sensorType;
this.value = value;
this.timestamp = System.currentTimeMillis();
}

// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public String getSensorType() { return sensorType; }
public void setSensorType(String sensorType) { this.sensorType = sensorType; }
public double getValue() { return value; }
public void setValue(double value) { this.value = value; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
}

7.2 二进制编解码

7.2.1 二进制协议

二进制消息编解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.io.*;
import java.nio.ByteBuffer;

public class BinaryCodec {

// 编码
public byte[] encode(IoTMessage message) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);

try {
// 消息类型(1字节)
dos.writeByte(message.getType().ordinal());

// 时间戳(8字节)
dos.writeLong(message.getTimestamp());

// 消息体长度(4字节)
byte[] bodyBytes = message.getBody().getBytes("UTF-8");
dos.writeInt(bodyBytes.length);

// 消息体
dos.write(bodyBytes);

return baos.toByteArray();

} catch (IOException e) {
throw new RuntimeException("Encode error", e);
}
}

// 解码
public IoTMessage decode(byte[] data) {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream dis = new DataInputStream(bais);

try {
// 消息类型
int typeOrdinal = dis.readByte();
IoTMessage.MessageType type = IoTMessage.MessageType.values()[typeOrdinal];

// 时间戳
long timestamp = dis.readLong();

// 消息体长度
int bodyLength = dis.readInt();

// 消息体
byte[] bodyBytes = new byte[bodyLength];
dis.readFully(bodyBytes);
String body = new String(bodyBytes, "UTF-8");

IoTMessage message = new IoTMessage(type, body);
return message;

} catch (IOException e) {
throw new RuntimeException("Decode error", e);
}
}
}

8. 实战案例

8.1 完整物联网TCP系统

8.1.1 系统架构

完整物联网TCP通信系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 主程序:服务端
public class IoTTCPServerMain {

public static void main(String[] args) {
try {
IoTTCPServer server = new IoTTCPServer(8888);
server.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}

// 主程序:客户端
public class IoTTCPClientMain {

public static void main(String[] args) {
String deviceId = "DEVICE_001";
IoTTCPClient client = new IoTTCPClient("localhost", 8888, deviceId);

// 连接
if (client.connect()) {
// 启动心跳
HeartbeatManager heartbeat = new HeartbeatManager(client);
heartbeat.start();

// 启动数据发送
DataSender sender = new DataSender(client);
sender.start();

// 模拟发送数据
MessageCodec codec = new MessageCodec();
for (int i = 0; i < 10; i++) {
DeviceData data = new DeviceData(
deviceId,
"TEMPERATURE",
25.5 + i
);
String json = codec.encode(data);
sender.sendData(json);

Thread.sleep(5000);
}

// 保持运行
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}

client.disconnect();
heartbeat.stop();
}
}
}

8.2 设备管理

8.2.1 设备连接管理

设备连接管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class DeviceManager {

private Map<String, DeviceInfo> devices;
private IoTTCPServer server;

public DeviceManager(IoTTCPServer server) {
this.devices = new ConcurrentHashMap<>();
this.server = server;
}

public void registerDevice(String deviceId, Socket socket) {
DeviceInfo info = new DeviceInfo();
info.setDeviceId(deviceId);
info.setSocket(socket);
info.setConnectTime(System.currentTimeMillis());
info.setStatus(DeviceStatus.ONLINE);

devices.put(deviceId, info);
}

public void unregisterDevice(String deviceId) {
DeviceInfo info = devices.remove(deviceId);
if (info != null) {
info.setStatus(DeviceStatus.OFFLINE);
}
}

public void sendCommand(String deviceId, String command) {
server.sendToDevice(deviceId, command);
}

public List<DeviceInfo> getAllDevices() {
return new ArrayList<>(devices.values());
}

public DeviceInfo getDevice(String deviceId) {
return devices.get(deviceId);
}
}

// 设备信息
public class DeviceInfo {
private String deviceId;
private Socket socket;
private DeviceStatus status;
private long connectTime;
private long lastHeartbeatTime;

// Getters and setters
public String getDeviceId() { return deviceId; }
public void setDeviceId(String deviceId) { this.deviceId = deviceId; }
public Socket getSocket() { return socket; }
public void setSocket(Socket socket) { this.socket = socket; }
public DeviceStatus getStatus() { return status; }
public void setStatus(DeviceStatus status) { this.status = status; }
public long getConnectTime() { return connectTime; }
public void setConnectTime(long connectTime) { this.connectTime = connectTime; }
public long getLastHeartbeatTime() { return lastHeartbeatTime; }
public void setLastHeartbeatTime(long lastHeartbeatTime) { this.lastHeartbeatTime = lastHeartbeatTime; }
}

enum DeviceStatus {
ONLINE,
OFFLINE,
ERROR
}

9. 性能优化

9.1 连接池管理

9.1.1 连接池

连接池管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.*;

public class ConnectionPool {

private BlockingQueue<Socket> pool;
private int maxSize;
private String host;
private int port;

public ConnectionPool(String host, int port, int maxSize) {
this.host = host;
this.port = port;
this.maxSize = maxSize;
this.pool = new LinkedBlockingQueue<>(maxSize);

// 初始化连接池
initializePool();
}

private void initializePool() {
for (int i = 0; i < maxSize; i++) {
try {
Socket socket = new Socket(host, port);
pool.offer(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
}

public Socket borrowConnection() throws InterruptedException {
Socket socket = pool.take();
if (socket.isClosed() || !socket.isConnected()) {
// 重新创建连接
try {
socket = new Socket(host, port);
} catch (IOException e) {
e.printStackTrace();
}
}
return socket;
}

public void returnConnection(Socket socket) {
if (socket != null && !socket.isClosed()) {
pool.offer(socket);
}
}
}

9.2 异步处理

9.2.1 异步消息处理

异步消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.concurrent.*;

public class AsyncMessageProcessor {

private ExecutorService executor;
private BlockingQueue<IoTMessage> messageQueue;

public AsyncMessageProcessor(int threadCount) {
this.executor = Executors.newFixedThreadPool(threadCount);
this.messageQueue = new LinkedBlockingQueue<>();

// 启动处理线程
for (int i = 0; i < threadCount; i++) {
executor.submit(this::processMessages);
}
}

public void submit(IoTMessage message) {
messageQueue.offer(message);
}

private void processMessages() {
while (true) {
try {
IoTMessage message = messageQueue.take();
handleMessage(message);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

private void handleMessage(IoTMessage message) {
// 处理消息
switch (message.getType()) {
case DATA:
processData(message);
break;
case COMMAND:
processCommand(message);
break;
case HEARTBEAT:
processHeartbeat(message);
break;
}
}

private void processData(IoTMessage message) {
// 处理数据消息
}

private void processCommand(IoTMessage message) {
// 处理命令消息
}

private void processHeartbeat(IoTMessage message) {
// 处理心跳消息
}
}

10. 总结

10.1 核心要点

  1. TCP协议:可靠的传输层协议,适合物联网通信
  2. 连接管理:建立和维护TCP连接
  3. 数据传输:消息格式设计和编解码
  4. 心跳机制:保持连接活跃
  5. 断线重连:自动重连策略
  6. 性能优化:连接池、异步处理

10.2 关键理解

  1. TCP可靠性:保证数据顺序和完整性
  2. 连接管理:正确管理连接生命周期
  3. 心跳保活:防止连接被中间设备关闭
  4. 重连策略:指数退避重连
  5. 消息格式:统一的消息格式设计

10.3 最佳实践

  1. 连接复用:使用连接池管理连接
  2. 异步处理:异步处理消息提高性能
  3. 心跳机制:定期发送心跳保持连接
  4. 错误处理:完善的错误处理和重试机制
  5. 监控告警:监控连接状态和消息处理

相关文章