1. 间接通信架构概述

间接通信是现代分布式系统架构中的核心设计模式,它通过消息队列、事件总线、发布订阅等机制实现系统间的松耦合通信。作为架构师,深入理解间接通信的原理、实现方式和应用场景,对于构建高可用、可扩展、易维护的企业级系统至关重要。本文从架构师的角度深入分析间接通信的实现原理、设计模式和最佳实践,为企业级应用提供完整的间接通信解决方案。

1.1 间接通信架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ (业务逻辑、服务调用、事件发布) │
├─────────────────────────────────────────────────────────┤
│ 通信层 │
│ (消息队列、事件总线、发布订阅) │
├─────────────────────────────────────────────────────────┤
│ 路由层 │
│ (消息路由、事件分发、负载均衡) │
├─────────────────────────────────────────────────────────┤
│ 存储层 │
│ (消息持久化、事件存储、状态管理) │
├─────────────────────────────────────────────────────────┤
│ 传输层 │
│ (网络协议、序列化、压缩、加密) │
└─────────────────────────────────────────────────────────┘

1.2 间接通信关键指标

  1. 可靠性: 消息传递保证、故障恢复、数据一致性
  2. 性能: 吞吐量、延迟、并发处理能力
  3. 可扩展性: 水平扩展、负载分布、容量规划
  4. 解耦性: 系统间耦合度、依赖关系、接口稳定性
  5. 可维护性: 监控能力、调试便利性、运维复杂度

2. 消息队列深度实现

2.1 消息队列框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
/**
* 消息队列框架
* 实现消息队列的核心功能
*/
public class MessageQueueFramework {

/**
* 消息接口
*/
public interface Message {
String getId();
String getTopic();
Object getPayload();
Map<String, Object> getHeaders();
long getTimestamp();
int getPriority();
}

/**
* 消息实现
*/
public static class MessageImpl implements Message {
private String id;
private String topic;
private Object payload;
private Map<String, Object> headers;
private long timestamp;
private int priority;

public MessageImpl(String topic, Object payload) {
this.id = UUID.randomUUID().toString();
this.topic = topic;
this.payload = payload;
this.headers = new HashMap<>();
this.timestamp = System.currentTimeMillis();
this.priority = 0;
}

public MessageImpl(String topic, Object payload, int priority) {
this(topic, payload);
this.priority = priority;
}

// getters and setters
public String getId() { return id; }
public String getTopic() { return topic; }
public Object getPayload() { return payload; }
public Map<String, Object> getHeaders() { return headers; }
public long getTimestamp() { return timestamp; }
public int getPriority() { return priority; }

public void setHeader(String key, Object value) {
headers.put(key, value);
}

public Object getHeader(String key) {
return headers.get(key);
}
}

/**
* 消息处理器接口
*/
public interface MessageHandler {
void handle(Message message);
String getTopic();
int getPriority();
}

/**
* 消息队列接口
*/
public interface MessageQueue {
void publish(Message message);
void subscribe(String topic, MessageHandler handler);
void unsubscribe(String topic, MessageHandler handler);
void start();
void stop();
QueueStatus getStatus();
}

/**
* 队列状态
*/
public static class QueueStatus {
private boolean running;
private int messageCount;
private int subscriberCount;
private long totalMessagesProcessed;
private double averageProcessingTime;

// getters and setters
public boolean isRunning() { return running; }
public void setRunning(boolean running) { this.running = running; }
public int getMessageCount() { return messageCount; }
public void setMessageCount(int messageCount) { this.messageCount = messageCount; }
public int getSubscriberCount() { return subscriberCount; }
public void setSubscriberCount(int subscriberCount) { this.subscriberCount = subscriberCount; }
public long getTotalMessagesProcessed() { return totalMessagesProcessed; }
public void setTotalMessagesProcessed(long totalMessagesProcessed) { this.totalMessagesProcessed = totalMessagesProcessed; }
public double getAverageProcessingTime() { return averageProcessingTime; }
public void setAverageProcessingTime(double averageProcessingTime) { this.averageProcessingTime = averageProcessingTime; }
}

/**
* 内存消息队列实现
*/
public static class InMemoryMessageQueue implements MessageQueue {
private final Map<String, List<MessageHandler>> subscribers;
private final Map<String, BlockingQueue<Message>> topicQueues;
private final ExecutorService executorService;
private final AtomicBoolean running;
private final AtomicLong totalProcessed;
private final AtomicLong totalProcessingTime;

public InMemoryMessageQueue() {
this.subscribers = new ConcurrentHashMap<>();
this.topicQueues = new ConcurrentHashMap<>();
this.executorService = Executors.newCachedThreadPool();
this.running = new AtomicBoolean(false);
this.totalProcessed = new AtomicLong(0);
this.totalProcessingTime = new AtomicLong(0);
}

@Override
public void publish(Message message) {
if (!running.get()) {
throw new IllegalStateException("Message queue is not running");
}

String topic = message.getTopic();
BlockingQueue<Message> queue = topicQueues.computeIfAbsent(topic, k -> new LinkedBlockingQueue<>());

try {
queue.offer(message, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to publish message", e);
}
}

@Override
public void subscribe(String topic, MessageHandler handler) {
subscribers.computeIfAbsent(topic, k -> new ArrayList<>()).add(handler);

// 启动消息处理任务
if (running.get()) {
startMessageProcessing(topic);
}
}

@Override
public void unsubscribe(String topic, MessageHandler handler) {
List<MessageHandler> handlers = subscribers.get(topic);
if (handlers != null) {
handlers.remove(handler);
}
}

@Override
public void start() {
if (running.compareAndSet(false, true)) {
// 启动所有主题的消息处理
for (String topic : subscribers.keySet()) {
startMessageProcessing(topic);
}
}
}

@Override
public void stop() {
if (running.compareAndSet(true, false)) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

private void startMessageProcessing(String topic) {
executorService.submit(() -> {
BlockingQueue<Message> queue = topicQueues.computeIfAbsent(topic, k -> new LinkedBlockingQueue<>());
List<MessageHandler> handlers = subscribers.get(topic);

while (running.get()) {
try {
Message message = queue.take();
if (handlers != null) {
for (MessageHandler handler : handlers) {
long startTime = System.currentTimeMillis();
try {
handler.handle(message);
totalProcessed.incrementAndGet();
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
} finally {
long processingTime = System.currentTimeMillis() - startTime;
totalProcessingTime.addAndGet(processingTime);
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}

@Override
public QueueStatus getStatus() {
QueueStatus status = new QueueStatus();
status.setRunning(running.get());
status.setMessageCount(topicQueues.values().stream().mapToInt(Queue::size).sum());
status.setSubscriberCount(subscribers.values().stream().mapToInt(List::size).sum());
status.setTotalMessagesProcessed(totalProcessed.get());

if (totalProcessed.get() > 0) {
status.setAverageProcessingTime((double) totalProcessingTime.get() / totalProcessed.get());
}

return status;
}
}

/**
* 持久化消息队列实现
*/
public static class PersistentMessageQueue implements MessageQueue {
private final MessageStore messageStore;
private final InMemoryMessageQueue inMemoryQueue;
private final ScheduledExecutorService scheduler;

public PersistentMessageQueue(MessageStore messageStore) {
this.messageStore = messageStore;
this.inMemoryQueue = new InMemoryMessageQueue();
this.scheduler = Executors.newScheduledThreadPool(2);
}

@Override
public void publish(Message message) {
// 持久化消息
messageStore.store(message);

// 发布到内存队列
inMemoryQueue.publish(message);
}

@Override
public void subscribe(String topic, MessageHandler handler) {
inMemoryQueue.subscribe(topic, handler);

// 启动消息恢复任务
scheduler.scheduleAtFixedRate(() -> {
recoverMessages(topic);
}, 0, 30, TimeUnit.SECONDS);
}

@Override
public void unsubscribe(String topic, MessageHandler handler) {
inMemoryQueue.unsubscribe(topic, handler);
}

@Override
public void start() {
inMemoryQueue.start();
}

@Override
public void stop() {
inMemoryQueue.stop();
scheduler.shutdown();
}

@Override
public QueueStatus getStatus() {
QueueStatus status = inMemoryQueue.getStatus();
// 添加持久化相关状态
return status;
}

private void recoverMessages(String topic) {
// 恢复未处理的消息
List<Message> unprocessedMessages = messageStore.getUnprocessedMessages(topic);
for (Message message : unprocessedMessages) {
inMemoryQueue.publish(message);
}
}
}

/**
* 消息存储接口
*/
public interface MessageStore {
void store(Message message);
List<Message> getUnprocessedMessages(String topic);
void markAsProcessed(String messageId);
void deleteMessage(String messageId);
}

/**
* 内存消息存储实现
*/
public static class InMemoryMessageStore implements MessageStore {
private final Map<String, Message> messages;
private final Map<String, Set<String>> topicMessages;
private final Set<String> processedMessages;

public InMemoryMessageStore() {
this.messages = new ConcurrentHashMap<>();
this.topicMessages = new ConcurrentHashMap<>();
this.processedMessages = ConcurrentHashMap.newKeySet();
}

@Override
public void store(Message message) {
messages.put(message.getId(), message);
topicMessages.computeIfAbsent(message.getTopic(), k -> ConcurrentHashMap.newKeySet()).add(message.getId());
}

@Override
public List<Message> getUnprocessedMessages(String topic) {
Set<String> messageIds = topicMessages.get(topic);
if (messageIds == null) {
return new ArrayList<>();
}

return messageIds.stream()
.filter(id -> !processedMessages.contains(id))
.map(messages::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

@Override
public void markAsProcessed(String messageId) {
processedMessages.add(messageId);
}

@Override
public void deleteMessage(String messageId) {
Message message = messages.remove(messageId);
if (message != null) {
Set<String> topicMessageIds = topicMessages.get(message.getTopic());
if (topicMessageIds != null) {
topicMessageIds.remove(messageId);
}
processedMessages.remove(messageId);
}
}
}
}

/**
* 用户服务消息处理器
*/
@Component
public class UserServiceMessageHandler implements MessageQueueFramework.MessageHandler {
private final UserService userService;
private final EventBus eventBus;

public UserServiceMessageHandler(UserService userService, EventBus eventBus) {
this.userService = userService;
this.eventBus = eventBus;
}

@Override
public void handle(MessageQueueFramework.Message message) {
try {
String topic = message.getTopic();
Object payload = message.getPayload();

switch (topic) {
case "user.created":
handleUserCreated(payload);
break;
case "user.updated":
handleUserUpdated(payload);
break;
case "user.deleted":
handleUserDeleted(payload);
break;
default:
System.out.println("Unknown topic: " + topic);
}
} catch (Exception e) {
System.err.println("Error handling message: " + e.getMessage());
e.printStackTrace();
}
}

private void handleUserCreated(Object payload) {
// 处理用户创建消息
System.out.println("Processing user created: " + payload);

// 发布领域事件
eventBus.publish(new UserCreatedEvent((String) payload));
}

private void handleUserUpdated(Object payload) {
// 处理用户更新消息
System.out.println("Processing user updated: " + payload);

// 发布领域事件
eventBus.publish(new UserUpdatedEvent((String) payload));
}

private void handleUserDeleted(Object payload) {
// 处理用户删除消息
System.out.println("Processing user deleted: " + payload);

// 发布领域事件
eventBus.publish(new UserDeletedEvent((String) payload));
}

@Override
public String getTopic() {
return "user.*";
}

@Override
public int getPriority() {
return 1;
}
}

/**
* 订单服务消息处理器
*/
@Component
public class OrderServiceMessageHandler implements MessageQueueFramework.MessageHandler {
private final OrderService orderService;
private final InventoryService inventoryService;
private final PaymentService paymentService;

public OrderServiceMessageHandler(OrderService orderService,
InventoryService inventoryService,
PaymentService paymentService) {
this.orderService = orderService;
this.inventoryService = inventoryService;
this.paymentService = paymentService;
}

@Override
public void handle(MessageQueueFramework.Message message) {
try {
String topic = message.getTopic();
Object payload = message.getPayload();

switch (topic) {
case "order.created":
handleOrderCreated(payload);
break;
case "order.paid":
handleOrderPaid(payload);
break;
case "order.cancelled":
handleOrderCancelled(payload);
break;
default:
System.out.println("Unknown topic: " + topic);
}
} catch (Exception e) {
System.err.println("Error handling message: " + e.getMessage());
e.printStackTrace();
}
}

private void handleOrderCreated(Object payload) {
// 处理订单创建消息
System.out.println("Processing order created: " + payload);

// 预留库存
inventoryService.reserveInventory((String) payload);
}

private void handleOrderPaid(Object payload) {
// 处理订单支付消息
System.out.println("Processing order paid: " + payload);

// 确认库存
inventoryService.confirmReservation((String) payload);
}

private void handleOrderCancelled(Object payload) {
// 处理订单取消消息
System.out.println("Processing order cancelled: " + payload);

// 释放库存
inventoryService.releaseReservation((String) payload);
}

@Override
public String getTopic() {
return "order.*";
}

@Override
public int getPriority() {
return 2;
}
}

2.2 高级消息队列特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
/**
* 高级消息队列特性
* 实现消息确认、重试、死信队列等功能
*/
public class AdvancedMessageQueueFeatures {

/**
* 消息确认机制
*/
public static class MessageAcknowledgment {
private final Map<String, Message> pendingMessages;
private final Map<String, Long> messageTimestamps;
private final ScheduledExecutorService scheduler;

public MessageAcknowledgment() {
this.pendingMessages = new ConcurrentHashMap<>();
this.messageTimestamps = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(1);

// 启动超时检查任务
scheduler.scheduleAtFixedRate(this::checkTimeouts, 0, 30, TimeUnit.SECONDS);
}

public void sendMessage(MessageQueueFramework.Message message, MessageQueueFramework.MessageHandler handler) {
String messageId = message.getId();
pendingMessages.put(messageId, message);
messageTimestamps.put(messageId, System.currentTimeMillis());

try {
handler.handle(message);
acknowledge(messageId);
} catch (Exception e) {
// 处理失败,等待重试
System.err.println("Message processing failed: " + e.getMessage());
}
}

public void acknowledge(String messageId) {
pendingMessages.remove(messageId);
messageTimestamps.remove(messageId);
}

public void reject(String messageId, boolean requeue) {
Message message = pendingMessages.remove(messageId);
messageTimestamps.remove(messageId);

if (requeue && message != null) {
// 重新入队
requeueMessage(message);
}
}

private void requeueMessage(MessageQueueFramework.Message message) {
// 重新入队逻辑
System.out.println("Requeuing message: " + message.getId());
}

private void checkTimeouts() {
long currentTime = System.currentTimeMillis();
long timeout = 30000; // 30秒超时

List<String> timeoutMessages = messageTimestamps.entrySet().stream()
.filter(entry -> currentTime - entry.getValue() > timeout)
.map(Map.Entry::getKey)
.collect(Collectors.toList());

for (String messageId : timeoutMessages) {
System.err.println("Message timeout: " + messageId);
reject(messageId, true);
}
}
}

/**
* 消息重试机制
*/
public static class MessageRetry {
private final Map<String, Integer> retryCounts;
private final Map<String, Long> retryTimestamps;
private final int maxRetries;
private final long retryDelay;

public MessageRetry(int maxRetries, long retryDelay) {
this.retryCounts = new ConcurrentHashMap<>();
this.retryTimestamps = new ConcurrentHashMap<>();
this.maxRetries = maxRetries;
this.retryDelay = retryDelay;
}

public boolean shouldRetry(String messageId) {
int retryCount = retryCounts.getOrDefault(messageId, 0);
return retryCount < maxRetries;
}

public void recordRetry(String messageId) {
retryCounts.put(messageId, retryCounts.getOrDefault(messageId, 0) + 1);
retryTimestamps.put(messageId, System.currentTimeMillis());
}

public long getRetryDelay(String messageId) {
int retryCount = retryCounts.getOrDefault(messageId, 0);
return retryDelay * (long) Math.pow(2, retryCount); // 指数退避
}

public void resetRetry(String messageId) {
retryCounts.remove(messageId);
retryTimestamps.remove(messageId);
}
}

/**
* 死信队列
*/
public static class DeadLetterQueue {
private final BlockingQueue<MessageQueueFramework.Message> deadLetterMessages;
private final MessageQueueFramework.MessageHandler deadLetterHandler;

public DeadLetterQueue(MessageQueueFramework.MessageHandler deadLetterHandler) {
this.deadLetterMessages = new LinkedBlockingQueue<>();
this.deadLetterHandler = deadLetterHandler;
}

public void sendToDeadLetter(MessageQueueFramework.Message message, String reason) {
// 添加死信原因到消息头
message.setHeader("deadLetterReason", reason);
message.setHeader("deadLetterTimestamp", System.currentTimeMillis());

deadLetterMessages.offer(message);

// 异步处理死信消息
CompletableFuture.runAsync(() -> {
try {
deadLetterHandler.handle(message);
} catch (Exception e) {
System.err.println("Error processing dead letter message: " + e.getMessage());
}
});
}

public List<MessageQueueFramework.Message> getDeadLetterMessages() {
return new ArrayList<>(deadLetterMessages);
}

public int getDeadLetterCount() {
return deadLetterMessages.size();
}
}

/**
* 消息过滤器
*/
public static class MessageFilter {
private final Map<String, Predicate<MessageQueueFramework.Message>> filters;

public MessageFilter() {
this.filters = new ConcurrentHashMap<>();
}

public void addFilter(String topic, Predicate<MessageQueueFramework.Message> filter) {
filters.put(topic, filter);
}

public boolean shouldProcess(String topic, MessageQueueFramework.Message message) {
Predicate<MessageQueueFramework.Message> filter = filters.get(topic);
return filter == null || filter.test(message);
}

public void removeFilter(String topic) {
filters.remove(topic);
}
}

/**
* 消息路由
*/
public static class MessageRouter {
private final Map<String, List<String>> routingRules;
private final MessageQueueFramework.MessageQueue messageQueue;

public MessageRouter(MessageQueueFramework.MessageQueue messageQueue) {
this.routingRules = new ConcurrentHashMap<>();
this.messageQueue = messageQueue;
}

public void addRoutingRule(String sourceTopic, String targetTopic) {
routingRules.computeIfAbsent(sourceTopic, k -> new ArrayList<>()).add(targetTopic);
}

public void routeMessage(MessageQueueFramework.Message message) {
List<String> targetTopics = routingRules.get(message.getTopic());
if (targetTopics != null) {
for (String targetTopic : targetTopics) {
MessageQueueFramework.Message routedMessage = new MessageQueueFramework.MessageImpl(targetTopic, message.getPayload());
messageQueue.publish(routedMessage);
}
}
}

public void removeRoutingRule(String sourceTopic, String targetTopic) {
List<String> targetTopics = routingRules.get(sourceTopic);
if (targetTopics != null) {
targetTopics.remove(targetTopic);
}
}
}

/**
* 消息压缩
*/
public static class MessageCompression {
private final CompressionAlgorithm algorithm;

public MessageCompression(CompressionAlgorithm algorithm) {
this.algorithm = algorithm;
}

public byte[] compress(Object payload) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(payload);
oos.close();

byte[] data = baos.toByteArray();
return algorithm.compress(data);
} catch (Exception e) {
throw new RuntimeException("Failed to compress message", e);
}
}

public Object decompress(byte[] compressedData) {
try {
byte[] data = algorithm.decompress(compressedData);
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);
return ois.readObject();
} catch (Exception e) {
throw new RuntimeException("Failed to decompress message", e);
}
}
}

/**
* 压缩算法接口
*/
public interface CompressionAlgorithm {
byte[] compress(byte[] data);
byte[] decompress(byte[] compressedData);
}

/**
* GZIP压缩算法实现
*/
public static class GzipCompression implements CompressionAlgorithm {
@Override
public byte[] compress(byte[] data) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzos = new GZIPOutputStream(baos);
gzos.write(data);
gzos.close();
return baos.toByteArray();
} catch (Exception e) {
throw new RuntimeException("GZIP compression failed", e);
}
}

@Override
public byte[] decompress(byte[] compressedData) {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(compressedData);
GZIPInputStream gzis = new GZIPInputStream(bais);
ByteArrayOutputStream baos = new ByteArrayOutputStream();

byte[] buffer = new byte[1024];
int len;
while ((len = gzis.read(buffer)) != -1) {
baos.write(buffer, 0, len);
}

gzis.close();
return baos.toByteArray();
} catch (Exception e) {
throw new RuntimeException("GZIP decompression failed", e);
}
}
}
}

3. 事件总线实现

3.1 事件总线框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
/**
* 事件总线框架
* 实现事件驱动的核心机制
*/
public class EventBusFramework {

/**
* 事件接口
*/
public interface Event {
String getEventId();
String getEventType();
long getTimestamp();
String getSource();
Map<String, Object> getMetadata();
}

/**
* 事件实现
*/
public static class EventImpl implements Event {
private String eventId;
private String eventType;
private long timestamp;
private String source;
private Map<String, Object> metadata;
private Object payload;

public EventImpl(String eventType, String source, Object payload) {
this.eventId = UUID.randomUUID().toString();
this.eventType = eventType;
this.timestamp = System.currentTimeMillis();
this.source = source;
this.payload = payload;
this.metadata = new HashMap<>();
}

// getters and setters
public String getEventId() { return eventId; }
public String getEventType() { return eventType; }
public long getTimestamp() { return timestamp; }
public String getSource() { return source; }
public Map<String, Object> getMetadata() { return metadata; }
public Object getPayload() { return payload; }

public void setMetadata(String key, Object value) {
metadata.put(key, value);
}

public Object getMetadata(String key) {
return metadata.get(key);
}
}

/**
* 事件处理器接口
*/
public interface EventHandler {
void handle(Event event);
String getEventType();
int getPriority();
boolean canHandle(Event event);
}

/**
* 事件总线
*/
public static class EventBus {
private final Map<String, List<EventHandler>> handlers;
private final ExecutorService executorService;
private final EventStore eventStore;
private final EventFilter eventFilter;
private final AtomicBoolean running;

public EventBus(EventStore eventStore) {
this.handlers = new ConcurrentHashMap<>();
this.executorService = Executors.newCachedThreadPool();
this.eventStore = eventStore;
this.eventFilter = new EventFilter();
this.running = new AtomicBoolean(false);
}

public void start() {
running.set(true);
}

public void stop() {
running.set(false);
executorService.shutdown();
}

public void publish(Event event) {
if (!running.get()) {
throw new IllegalStateException("Event bus is not running");
}

// 存储事件
eventStore.store(event);

// 过滤事件
if (!eventFilter.shouldProcess(event)) {
return;
}

// 异步处理事件
executorService.submit(() -> processEvent(event));
}

public void publishSync(Event event) {
if (!running.get()) {
throw new IllegalStateException("Event bus is not running");
}

// 存储事件
eventStore.store(event);

// 过滤事件
if (!eventFilter.shouldProcess(event)) {
return;
}

// 同步处理事件
processEvent(event);
}

public void subscribe(String eventType, EventHandler handler) {
handlers.computeIfAbsent(eventType, k -> new ArrayList<>()).add(handler);

// 按优先级排序
handlers.get(eventType).sort(Comparator.comparingInt(EventHandler::getPriority));
}

public void unsubscribe(String eventType, EventHandler handler) {
List<EventHandler> eventHandlers = handlers.get(eventType);
if (eventHandlers != null) {
eventHandlers.remove(handler);
}
}

private void processEvent(Event event) {
List<EventHandler> eventHandlers = handlers.get(event.getEventType());
if (eventHandlers != null) {
for (EventHandler handler : eventHandlers) {
if (handler.canHandle(event)) {
try {
handler.handle(event);
} catch (Exception e) {
System.err.println("Error handling event: " + e.getMessage());
e.printStackTrace();
}
}
}
}
}

public EventFilter getEventFilter() {
return eventFilter;
}
}

/**
* 事件过滤器
*/
public static class EventFilter {
private final List<Predicate<Event>> filters;

public EventFilter() {
this.filters = new ArrayList<>();
}

public void addFilter(Predicate<Event> filter) {
filters.add(filter);
}

public boolean shouldProcess(Event event) {
return filters.stream().allMatch(filter -> filter.test(event));
}

public void removeFilter(Predicate<Event> filter) {
filters.remove(filter);
}

public void clearFilters() {
filters.clear();
}
}

/**
* 事件存储接口
*/
public interface EventStore {
void store(Event event);
List<Event> getEvents(String eventType);
List<Event> getEvents(String eventType, long fromTimestamp);
Event getEvent(String eventId);
void deleteEvent(String eventId);
}

/**
* 内存事件存储实现
*/
public static class InMemoryEventStore implements EventStore {
private final Map<String, Event> events;
private final Map<String, List<String>> eventTypeIndex;
private final List<Event> eventList;

public InMemoryEventStore() {
this.events = new ConcurrentHashMap<>();
this.eventTypeIndex = new ConcurrentHashMap<>();
this.eventList = new ArrayList<>();
}

@Override
public void store(Event event) {
events.put(event.getEventId(), event);
eventTypeIndex.computeIfAbsent(event.getEventType(), k -> new ArrayList<>()).add(event.getEventId());
eventList.add(event);
}

@Override
public List<Event> getEvents(String eventType) {
List<String> eventIds = eventTypeIndex.get(eventType);
if (eventIds == null) {
return new ArrayList<>();
}

return eventIds.stream()
.map(events::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

@Override
public List<Event> getEvents(String eventType, long fromTimestamp) {
return getEvents(eventType).stream()
.filter(event -> event.getTimestamp() >= fromTimestamp)
.collect(Collectors.toList());
}

@Override
public Event getEvent(String eventId) {
return events.get(eventId);
}

@Override
public void deleteEvent(String eventId) {
Event event = events.remove(eventId);
if (event != null) {
List<String> eventIds = eventTypeIndex.get(event.getEventType());
if (eventIds != null) {
eventIds.remove(eventId);
}
eventList.remove(event);
}
}

public List<Event> getAllEvents() {
return new ArrayList<>(eventList);
}
}

/**
* 事件重放
*/
public static class EventReplay {
private final EventStore eventStore;
private final EventBus eventBus;

public EventReplay(EventStore eventStore, EventBus eventBus) {
this.eventStore = eventStore;
this.eventBus = eventBus;
}

public void replayEvents(String eventType) {
List<Event> events = eventStore.getEvents(eventType);
for (Event event : events) {
eventBus.publish(event);
}
}

public void replayEvents(String eventType, long fromTimestamp) {
List<Event> events = eventStore.getEvents(eventType, fromTimestamp);
for (Event event : events) {
eventBus.publish(event);
}
}

public void replayAllEvents() {
// 实现重放所有事件的逻辑
}
}
}

/**
* 用户事件处理器
*/
@Component
public class UserEventHandler implements EventBusFramework.EventHandler {
private final EmailService emailService;
private final NotificationService notificationService;
private final AuditService auditService;

public UserEventHandler(EmailService emailService,
NotificationService notificationService,
AuditService auditService) {
this.emailService = emailService;
this.notificationService = notificationService;
this.auditService = auditService;
}

@Override
public void handle(EventBusFramework.Event event) {
try {
switch (event.getEventType()) {
case "user.created":
handleUserCreated(event);
break;
case "user.updated":
handleUserUpdated(event);
break;
case "user.deleted":
handleUserDeleted(event);
break;
default:
System.out.println("Unknown event type: " + event.getEventType());
}
} catch (Exception e) {
System.err.println("Error handling event: " + e.getMessage());
e.printStackTrace();
}
}

private void handleUserCreated(EventBusFramework.Event event) {
// 发送欢迎邮件
emailService.sendWelcomeEmail((String) event.getMetadata("email"), "User");

// 发送通知
notificationService.sendNotification("user.created", event.getEventId());

// 记录审计日志
auditService.logEvent("USER_CREATED", event.getEventId(), event);
}

private void handleUserUpdated(EventBusFramework.Event event) {
// 发送更新通知
notificationService.sendNotification("user.updated", event.getEventId());

// 记录审计日志
auditService.logEvent("USER_UPDATED", event.getEventId(), event);
}

private void handleUserDeleted(EventBusFramework.Event event) {
// 发送删除通知
notificationService.sendNotification("user.deleted", event.getEventId());

// 记录审计日志
auditService.logEvent("USER_DELETED", event.getEventId(), event);
}

@Override
public String getEventType() {
return "user.*";
}

@Override
public int getPriority() {
return 1;
}

@Override
public boolean canHandle(EventBusFramework.Event event) {
return event.getEventType().startsWith("user.");
}
}

/**
* 订单事件处理器
*/
@Component
public class OrderEventHandler implements EventBusFramework.EventHandler {
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final ShippingService shippingService;
private final NotificationService notificationService;

public OrderEventHandler(InventoryService inventoryService,
PaymentService paymentService,
ShippingService shippingService,
NotificationService notificationService) {
this.inventoryService = inventoryService;
this.paymentService = paymentService;
this.shippingService = shippingService;
this.notificationService = notificationService;
}

@Override
public void handle(EventBusFramework.Event event) {
try {
switch (event.getEventType()) {
case "order.created":
handleOrderCreated(event);
break;
case "order.paid":
handleOrderPaid(event);
break;
case "order.shipped":
handleOrderShipped(event);
break;
case "order.delivered":
handleOrderDelivered(event);
break;
case "order.cancelled":
handleOrderCancelled(event);
break;
default:
System.out.println("Unknown event type: " + event.getEventType());
}
} catch (Exception e) {
System.err.println("Error handling event: " + e.getMessage());
e.printStackTrace();
}
}

private void handleOrderCreated(EventBusFramework.Event event) {
// 预留库存
inventoryService.reserveInventory((String) event.getMetadata("productId"),
(Integer) event.getMetadata("quantity"));

// 发送订单确认通知
notificationService.sendNotification("order.created", event.getEventId());
}

private void handleOrderPaid(EventBusFramework.Event event) {
// 处理支付
paymentService.processPayment((String) event.getMetadata("paymentInfo"));

// 准备发货
shippingService.prepareShipment(event.getEventId());

// 发送支付确认通知
notificationService.sendNotification("order.paid", event.getEventId());
}

private void handleOrderShipped(EventBusFramework.Event event) {
// 更新库存
inventoryService.releaseReservedInventory((String) event.getMetadata("productId"),
(Integer) event.getMetadata("quantity"));

// 发送发货通知
notificationService.sendShippingNotification(event.getEventId(),
(String) event.getMetadata("trackingNumber"));
}

private void handleOrderDelivered(EventBusFramework.Event event) {
// 发送送达确认
notificationService.sendDeliveryConfirmation(event.getEventId());

// 触发评价流程
notificationService.sendReviewRequest(event.getEventId());
}

private void handleOrderCancelled(EventBusFramework.Event event) {
// 释放库存
inventoryService.releaseReservedInventory((String) event.getMetadata("productId"),
(Integer) event.getMetadata("quantity"));

// 处理退款
if (event.getMetadata("paymentInfo") != null) {
paymentService.processRefund((String) event.getMetadata("paymentInfo"));
}

// 发送取消通知
notificationService.sendNotification("order.cancelled", event.getEventId());
}

@Override
public String getEventType() {
return "order.*";
}

@Override
public int getPriority() {
return 2;
}

@Override
public boolean canHandle(EventBusFramework.Event event) {
return event.getEventType().startsWith("order.");
}
}

4. 发布订阅模式

4.1 发布订阅框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
/**
* 发布订阅框架
* 实现发布订阅模式的核心功能
*/
public class PubSubFramework {

/**
* 订阅者接口
*/
public interface Subscriber {
void onMessage(String topic, Object message);
String getSubscriberId();
List<String> getSubscribedTopics();
}

/**
* 发布者接口
*/
public interface Publisher {
void publish(String topic, Object message);
void publish(String topic, Object message, Map<String, Object> metadata);
}

/**
* 发布订阅管理器
*/
public static class PubSubManager implements Publisher {
private final Map<String, List<Subscriber>> subscribers;
private final Map<String, Subscriber> subscriberRegistry;
private final ExecutorService executorService;
private final MessageQueueFramework.MessageQueue messageQueue;
private final AtomicBoolean running;

public PubSubManager(MessageQueueFramework.MessageQueue messageQueue) {
this.subscribers = new ConcurrentHashMap<>();
this.subscriberRegistry = new ConcurrentHashMap<>();
this.executorService = Executors.newCachedThreadPool();
this.messageQueue = messageQueue;
this.running = new AtomicBoolean(false);
}

public void start() {
running.set(true);
}

public void stop() {
running.set(false);
executorService.shutdown();
}

public void subscribe(String topic, Subscriber subscriber) {
subscribers.computeIfAbsent(topic, k -> new ArrayList<>()).add(subscriber);
subscriberRegistry.put(subscriber.getSubscriberId(), subscriber);

// 订阅消息队列
messageQueue.subscribe(topic, new MessageQueueFramework.MessageHandler() {
@Override
public void handle(MessageQueueFramework.Message message) {
distributeMessage(topic, message.getPayload());
}

@Override
public String getTopic() {
return topic;
}

@Override
public int getPriority() {
return 1;
}
});
}

public void unsubscribe(String topic, Subscriber subscriber) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
if (topicSubscribers != null) {
topicSubscribers.remove(subscriber);
}
subscriberRegistry.remove(subscriber.getSubscriberId());
}

public void unsubscribeAll(Subscriber subscriber) {
for (String topic : subscriber.getSubscribedTopics()) {
unsubscribe(topic, subscriber);
}
}

@Override
public void publish(String topic, Object message) {
publish(topic, message, new HashMap<>());
}

@Override
public void publish(String topic, Object message, Map<String, Object> metadata) {
if (!running.get()) {
throw new IllegalStateException("PubSub manager is not running");
}

// 创建消息
MessageQueueFramework.Message msg = new MessageQueueFramework.MessageImpl(topic, message);

// 添加元数据
for (Map.Entry<String, Object> entry : metadata.entrySet()) {
msg.setHeader(entry.getKey(), entry.getValue());
}

// 发布到消息队列
messageQueue.publish(msg);
}

private void distributeMessage(String topic, Object message) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
if (topicSubscribers != null) {
for (Subscriber subscriber : topicSubscribers) {
executorService.submit(() -> {
try {
subscriber.onMessage(topic, message);
} catch (Exception e) {
System.err.println("Error delivering message to subscriber: " + e.getMessage());
e.printStackTrace();
}
});
}
}
}

public List<Subscriber> getSubscribers(String topic) {
return subscribers.getOrDefault(topic, new ArrayList<>());
}

public List<String> getTopics() {
return new ArrayList<>(subscribers.keySet());
}

public int getSubscriberCount(String topic) {
List<Subscriber> topicSubscribers = subscribers.get(topic);
return topicSubscribers != null ? topicSubscribers.size() : 0;
}
}

/**
* 主题过滤器
*/
public static class TopicFilter {
private final Map<String, Predicate<Object>> filters;

public TopicFilter() {
this.filters = new ConcurrentHashMap<>();
}

public void addFilter(String topic, Predicate<Object> filter) {
filters.put(topic, filter);
}

public boolean shouldProcess(String topic, Object message) {
Predicate<Object> filter = filters.get(topic);
return filter == null || filter.test(message);
}

public void removeFilter(String topic) {
filters.remove(topic);
}
}

/**
* 消息路由
*/
public static class MessageRouter {
private final Map<String, List<String>> routingRules;
private final PubSubManager pubSubManager;

public MessageRouter(PubSubManager pubSubManager) {
this.routingRules = new ConcurrentHashMap<>();
this.pubSubManager = pubSubManager;
}

public void addRoutingRule(String sourceTopic, String targetTopic) {
routingRules.computeIfAbsent(sourceTopic, k -> new ArrayList<>()).add(targetTopic);
}

public void routeMessage(String sourceTopic, Object message) {
List<String> targetTopics = routingRules.get(sourceTopic);
if (targetTopics != null) {
for (String targetTopic : targetTopics) {
pubSubManager.publish(targetTopic, message);
}
}
}

public void removeRoutingRule(String sourceTopic, String targetTopic) {
List<String> targetTopics = routingRules.get(sourceTopic);
if (targetTopics != null) {
targetTopics.remove(targetTopic);
}
}
}

/**
* 消息持久化
*/
public static class MessagePersistence {
private final Map<String, List<Object>> persistentMessages;
private final Map<String, Long> messageTimestamps;

public MessagePersistence() {
this.persistentMessages = new ConcurrentHashMap<>();
this.messageTimestamps = new ConcurrentHashMap<>();
}

public void storeMessage(String topic, Object message) {
persistentMessages.computeIfAbsent(topic, k -> new ArrayList<>()).add(message);
messageTimestamps.put(topic, System.currentTimeMillis());
}

public List<Object> getMessages(String topic) {
return persistentMessages.getOrDefault(topic, new ArrayList<>());
}

public List<Object> getMessages(String topic, long fromTimestamp) {
return getMessages(topic).stream()
.filter(msg -> messageTimestamps.get(topic) >= fromTimestamp)
.collect(Collectors.toList());
}

public void clearMessages(String topic) {
persistentMessages.remove(topic);
messageTimestamps.remove(topic);
}
}
}

/**
* 用户服务订阅者
*/
@Component
public class UserServiceSubscriber implements PubSubFramework.Subscriber {
private final UserService userService;
private final String subscriberId;
private final List<String> subscribedTopics;

public UserServiceSubscriber(UserService userService) {
this.userService = userService;
this.subscriberId = "user-service-" + UUID.randomUUID().toString();
this.subscribedTopics = Arrays.asList("user.created", "user.updated", "user.deleted");
}

@Override
public void onMessage(String topic, Object message) {
try {
switch (topic) {
case "user.created":
handleUserCreated(message);
break;
case "user.updated":
handleUserUpdated(message);
break;
case "user.deleted":
handleUserDeleted(message);
break;
default:
System.out.println("Unknown topic: " + topic);
}
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
e.printStackTrace();
}
}

private void handleUserCreated(Object message) {
// 处理用户创建消息
System.out.println("User service processing user created: " + message);

// 执行用户创建后的业务逻辑
userService.processUserCreated((String) message);
}

private void handleUserUpdated(Object message) {
// 处理用户更新消息
System.out.println("User service processing user updated: " + message);

// 执行用户更新后的业务逻辑
userService.processUserUpdated((String) message);
}

private void handleUserDeleted(Object message) {
// 处理用户删除消息
System.out.println("User service processing user deleted: " + message);

// 执行用户删除后的业务逻辑
userService.processUserDeleted((String) message);
}

@Override
public String getSubscriberId() {
return subscriberId;
}

@Override
public List<String> getSubscribedTopics() {
return subscribedTopics;
}
}

/**
* 订单服务订阅者
*/
@Component
public class OrderServiceSubscriber implements PubSubFramework.Subscriber {
private final OrderService orderService;
private final String subscriberId;
private final List<String> subscribedTopics;

public OrderServiceSubscriber(OrderService orderService) {
this.orderService = orderService;
this.subscriberId = "order-service-" + UUID.randomUUID().toString();
this.subscribedTopics = Arrays.asList("order.created", "order.paid", "order.cancelled");
}

@Override
public void onMessage(String topic, Object message) {
try {
switch (topic) {
case "order.created":
handleOrderCreated(message);
break;
case "order.paid":
handleOrderPaid(message);
break;
case "order.cancelled":
handleOrderCancelled(message);
break;
default:
System.out.println("Unknown topic: " + topic);
}
} catch (Exception e) {
System.err.println("Error processing message: " + e.getMessage());
e.printStackTrace();
}
}

private void handleOrderCreated(Object message) {
// 处理订单创建消息
System.out.println("Order service processing order created: " + message);

// 执行订单创建后的业务逻辑
orderService.processOrderCreated((String) message);
}

private void handleOrderPaid(Object message) {
// 处理订单支付消息
System.out.println("Order service processing order paid: " + message);

// 执行订单支付后的业务逻辑
orderService.processOrderPaid((String) message);
}

private void handleOrderCancelled(Object message) {
// 处理订单取消消息
System.out.println("Order service processing order cancelled: " + message);

// 执行订单取消后的业务逻辑
orderService.processOrderCancelled((String) message);
}

@Override
public String getSubscriberId() {
return subscriberId;
}

@Override
public List<String> getSubscribedTopics() {
return subscribedTopics;
}
}

/**
* 通知服务订阅者
*/
@Component
public class NotificationServiceSubscriber implements PubSubFramework.Subscriber {
private final NotificationService notificationService;
private final String subscriberId;
private final List<String> subscribedTopics;

public NotificationServiceSubscriber(NotificationService notificationService) {
this.notificationService = notificationService;
this.subscriberId = "notification-service-" + UUID.randomUUID().toString();
this.subscribedTopics = Arrays.asList("user.*", "order.*");
}

@Override
public void onMessage(String topic, Object message) {
try {
// 根据主题类型发送不同的通知
if (topic.startsWith("user.")) {
handleUserNotification(topic, message);
} else if (topic.startsWith("order.")) {
handleOrderNotification(topic, message);
}
} catch (Exception e) {
System.err.println("Error processing notification: " + e.getMessage());
e.printStackTrace();
}
}

private void handleUserNotification(String topic, Object message) {
// 处理用户相关通知
System.out.println("Notification service processing user notification: " + topic + " - " + message);

// 发送用户通知
notificationService.sendUserNotification(topic, (String) message);
}

private void handleOrderNotification(String topic, Object message) {
// 处理订单相关通知
System.out.println("Notification service processing order notification: " + topic + " - " + message);

// 发送订单通知
notificationService.sendOrderNotification(topic, (String) message);
}

@Override
public String getSubscriberId() {
return subscriberId;
}

@Override
public List<String> getSubscribedTopics() {
return subscribedTopics;
}
}

5. 异步处理机制

5.1 异步处理框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
/**
* 异步处理框架
* 实现异步任务处理和结果回调
*/
public class AsyncProcessingFramework {

/**
* 异步任务接口
*/
public interface AsyncTask<T> {
T execute() throws Exception;
String getTaskId();
int getPriority();
long getTimeout();
}

/**
* 异步任务实现
*/
public static class AsyncTaskImpl<T> implements AsyncTask<T> {
private String taskId;
private Callable<T> task;
private int priority;
private long timeout;

public AsyncTaskImpl(String taskId, Callable<T> task) {
this.taskId = taskId;
this.task = task;
this.priority = 0;
this.timeout = 30000; // 30秒默认超时
}

public AsyncTaskImpl(String taskId, Callable<T> task, int priority, long timeout) {
this.taskId = taskId;
this.task = task;
this.priority = priority;
this.timeout = timeout;
}

@Override
public T execute() throws Exception {
return task.call();
}

@Override
public String getTaskId() {
return taskId;
}

@Override
public int getPriority() {
return priority;
}

@Override
public long getTimeout() {
return timeout;
}
}

/**
* 异步结果
*/
public static class AsyncResult<T> {
private String taskId;
private T result;
private Exception exception;
private boolean completed;
private long completionTime;

public AsyncResult(String taskId) {
this.taskId = taskId;
this.completed = false;
}

public void setResult(T result) {
this.result = result;
this.completed = true;
this.completionTime = System.currentTimeMillis();
}

public void setException(Exception exception) {
this.exception = exception;
this.completed = true;
this.completionTime = System.currentTimeMillis();
}

public boolean isCompleted() {
return completed;
}

public T getResult() {
return result;
}

public Exception getException() {
return exception;
}

public long getCompletionTime() {
return completionTime;
}

public String getTaskId() {
return taskId;
}
}

/**
* 异步处理器
*/
public static class AsyncProcessor {
private final ExecutorService executorService;
private final Map<String, AsyncResult<?>> results;
private final ScheduledExecutorService scheduler;
private final AtomicBoolean running;

public AsyncProcessor(int threadPoolSize) {
this.executorService = Executors.newFixedThreadPool(threadPoolSize);
this.results = new ConcurrentHashMap<>();
this.scheduler = Executors.newScheduledThreadPool(2);
this.running = new AtomicBoolean(false);
}

public void start() {
running.set(true);
}

public void stop() {
running.set(false);
executorService.shutdown();
scheduler.shutdown();
}

public <T> CompletableFuture<T> submit(AsyncTask<T> task) {
if (!running.get()) {
throw new IllegalStateException("Async processor is not running");
}

AsyncResult<T> result = new AsyncResult<>(task.getTaskId());
results.put(task.getTaskId(), result);

CompletableFuture<T> future = CompletableFuture.supplyAsync(() -> {
try {
T taskResult = task.execute();
result.setResult(taskResult);
return taskResult;
} catch (Exception e) {
result.setException(e);
throw new RuntimeException(e);
}
}, executorService);

// 设置超时
scheduler.schedule(() -> {
if (!result.isCompleted()) {
result.setException(new TimeoutException("Task timeout: " + task.getTaskId()));
}
}, task.getTimeout(), TimeUnit.MILLISECONDS);

return future;
}

public <T> AsyncResult<T> getResult(String taskId) {
@SuppressWarnings("unchecked")
AsyncResult<T> result = (AsyncResult<T>) results.get(taskId);
return result;
}

public <T> T getResult(String taskId, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
AsyncResult<T> result = getResult(taskId);
if (result == null) {
throw new IllegalArgumentException("Task not found: " + taskId);
}

long startTime = System.currentTimeMillis();
while (!result.isCompleted()) {
if (System.currentTimeMillis() - startTime > unit.toMillis(timeout)) {
throw new TimeoutException("Timeout waiting for result");
}
Thread.sleep(100);
}

if (result.getException() != null) {
throw new ExecutionException(result.getException());
}

return result.getResult();
}

public void removeResult(String taskId) {
results.remove(taskId);
}

public List<String> getCompletedTasks() {
return results.entrySet().stream()
.filter(entry -> entry.getValue().isCompleted())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}

public List<String> getPendingTasks() {
return results.entrySet().stream()
.filter(entry -> !entry.getValue().isCompleted())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
}

/**
* 任务队列
*/
public static class TaskQueue {
private final PriorityBlockingQueue<AsyncTask<?>> queue;
private final AsyncProcessor processor;
private final AtomicBoolean running;

public TaskQueue(AsyncProcessor processor) {
this.queue = new PriorityBlockingQueue<>(11, Comparator.comparingInt(AsyncTask::getPriority));
this.processor = processor;
this.running = new AtomicBoolean(false);
}

public void start() {
running.set(true);
// 启动任务处理线程
Thread taskProcessor = new Thread(this::processTasks);
taskProcessor.setDaemon(true);
taskProcessor.start();
}

public void stop() {
running.set(false);
}

public void enqueue(AsyncTask<?> task) {
queue.offer(task);
}

private void processTasks() {
while (running.get()) {
try {
AsyncTask<?> task = queue.take();
processor.submit(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

public int getQueueSize() {
return queue.size();
}

public boolean isEmpty() {
return queue.isEmpty();
}
}

/**
* 任务调度器
*/
public static class TaskScheduler {
private final ScheduledExecutorService scheduler;
private final AsyncProcessor processor;
private final Map<String, ScheduledFuture<?>> scheduledTasks;

public TaskScheduler(AsyncProcessor processor) {
this.scheduler = Executors.newScheduledThreadPool(4);
this.processor = processor;
this.scheduledTasks = new ConcurrentHashMap<>();
}

public void schedule(AsyncTask<?> task, long delay, TimeUnit unit) {
ScheduledFuture<?> future = scheduler.schedule(() -> {
processor.submit(task);
}, delay, unit);

scheduledTasks.put(task.getTaskId(), future);
}

public void scheduleAtFixedRate(AsyncTask<?> task, long initialDelay, long period, TimeUnit unit) {
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
processor.submit(task);
}, initialDelay, period, unit);

scheduledTasks.put(task.getTaskId(), future);
}

public void scheduleWithFixedDelay(AsyncTask<?> task, long initialDelay, long delay, TimeUnit unit) {
ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(() -> {
processor.submit(task);
}, initialDelay, delay, unit);

scheduledTasks.put(task.getTaskId(), future);
}

public void cancelTask(String taskId) {
ScheduledFuture<?> future = scheduledTasks.remove(taskId);
if (future != null) {
future.cancel(false);
}
}

public void shutdown() {
scheduler.shutdown();
}
}
}

/**
* 用户服务异步处理器
*/
@Component
public class UserServiceAsyncProcessor {
private final AsyncProcessingFramework.AsyncProcessor processor;
private final AsyncProcessingFramework.TaskQueue taskQueue;
private final AsyncProcessingFramework.TaskScheduler taskScheduler;

public UserServiceAsyncProcessor() {
this.processor = new AsyncProcessingFramework.AsyncProcessor(10);
this.taskQueue = new AsyncProcessingFramework.TaskQueue(processor);
this.taskScheduler = new AsyncProcessingFramework.TaskScheduler(processor);

// 启动处理器
processor.start();
taskQueue.start();
}

public CompletableFuture<String> createUserAsync(String username, String email) {
AsyncProcessingFramework.AsyncTask<String> task = new AsyncProcessingFramework.AsyncTaskImpl<>(
"create-user-" + UUID.randomUUID().toString(),
() -> {
// 模拟用户创建过程
Thread.sleep(2000);
return "User created: " + username;
},
1,
10000
);

return processor.submit(task);
}

public CompletableFuture<String> updateUserAsync(String userId, String email) {
AsyncProcessingFramework.AsyncTask<String> task = new AsyncProcessingFramework.AsyncTaskImpl<>(
"update-user-" + UUID.randomUUID().toString(),
() -> {
// 模拟用户更新过程
Thread.sleep(1500);
return "User updated: " + userId;
},
2,
8000
);

return processor.submit(task);
}

public CompletableFuture<String> deleteUserAsync(String userId) {
AsyncProcessingFramework.AsyncTask<String> task = new AsyncProcessingFramework.AsyncTaskImpl<>(
"delete-user-" + UUID.randomUUID().toString(),
() -> {
// 模拟用户删除过程
Thread.sleep(1000);
return "User deleted: " + userId;
},
3,
5000
);

return processor.submit(task);
}

public void scheduleUserCleanup() {
AsyncProcessingFramework.AsyncTask<String> task = new AsyncProcessingFramework.AsyncTaskImpl<>(
"user-cleanup-" + UUID.randomUUID().toString(),
() -> {
// 模拟用户清理过程
Thread.sleep(5000);
return "User cleanup completed";
},
0,
30000
);

// 每天执行一次用户清理
taskScheduler.scheduleAtFixedRate(task, 0, 24, TimeUnit.HOURS);
}

public void shutdown() {
processor.stop();
taskQueue.stop();
taskScheduler.shutdown();
}
}

/**
* 订单服务异步处理器
*/
@Component
public class OrderServiceAsyncProcessor {
private final AsyncProcessingFramework.AsyncProcessor processor;
private final AsyncProcessingFramework.TaskQueue taskQueue;
private final AsyncProcessingFramework.TaskScheduler taskScheduler;

public OrderServiceAsyncProcessor() {
this.processor = new AsyncProcessingFramework.AsyncProcessor(15);
this.taskQueue = new AsyncProcessingFramework.TaskQueue(processor);
this.taskScheduler = new AsyncProcessingFramework.TaskScheduler(processor);

// 启动处理器
processor.start();
taskQueue.start();
}

public CompletableFuture<String> processOrderAsync(String orderId) {
AsyncProcessingFramework.AsyncTask<String> task = new AsyncProcessingFramework.AsyncTaskImpl<>(
"process-order-" + UUID.randomUUID().toString(),
() -> {
// 模拟订单处理过程
Thread.sleep(3000);
return "Order processed: " + orderId;
},
1,
15000
);

return processor.submit(task);
}

public CompletableFuture<String> calculateOrderTotalAsync(String orderId) {
AsyncProcessingFramework.AsyncTask<String> task = new AsyncProcessingFramework.AsyncTaskImpl<>(
"calculate-total-" + UUID.randomUUID().toString(),
() -> {
// 模拟订单总价计算过程
Thread.sleep(1000);
return "Order total calculated: " + orderId;
},
2,
5000
);

return processor.submit(task);
}

public CompletableFuture<String> generateInvoiceAsync(String orderId) {
AsyncProcessingFramework.AsyncTask<String> task = new AsyncProcessingFramework.AsyncTaskImpl<>(
"generate-invoice-" + UUID.randomUUID().toString(),
() -> {
// 模拟发票生成过程
Thread.sleep(2000);
return "Invoice generated: " + orderId;
},
3,
10000
);

return processor.submit(task);
}

public void scheduleOrderExpirationCheck() {
AsyncProcessingFramework.AsyncTask<String> task = new AsyncProcessingFramework.AsyncTaskImpl<>(
"order-expiration-check-" + UUID.randomUUID().toString(),
() -> {
// 模拟订单过期检查过程
Thread.sleep(2000);
return "Order expiration check completed";
},
0,
20000
);

// 每小时检查一次订单过期
taskScheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.HOURS);
}

public void shutdown() {
processor.stop();
taskQueue.stop();
taskScheduler.shutdown();
}
}

6. 企业级间接通信架构

6.1 企业级通信管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
/**
* 企业级间接通信管理器
* 集成消息队列、事件总线、发布订阅、异步处理
*/
@Component
public class EnterpriseIndirectCommunicationManager {
private final MessageQueueFramework.MessageQueue messageQueue;
private final EventBusFramework.EventBus eventBus;
private final PubSubFramework.PubSubManager pubSubManager;
private final AsyncProcessingFramework.AsyncProcessor asyncProcessor;
private final CommunicationMonitor communicationMonitor;
private final CommunicationConfig communicationConfig;

public EnterpriseIndirectCommunicationManager() {
// 初始化消息队列
this.messageQueue = new MessageQueueFramework.InMemoryMessageQueue();

// 初始化事件总线
EventBusFramework.EventStore eventStore = new EventBusFramework.InMemoryEventStore();
this.eventBus = new EventBusFramework.EventBus(eventStore);

// 初始化发布订阅管理器
this.pubSubManager = new PubSubFramework.PubSubManager(messageQueue);

// 初始化异步处理器
this.asyncProcessor = new AsyncProcessingFramework.AsyncProcessor(20);

// 初始化监控器
this.communicationMonitor = new CommunicationMonitor();

// 初始化配置
this.communicationConfig = new CommunicationConfig();

// 启动所有组件
startComponents();
}

private void startComponents() {
messageQueue.start();
eventBus.start();
pubSubManager.start();
asyncProcessor.start();
communicationMonitor.start();
}

/**
* 发布消息
*/
public void publishMessage(String topic, Object payload) {
try {
MessageQueueFramework.Message message = new MessageQueueFramework.MessageImpl(topic, payload);
messageQueue.publish(message);

// 记录监控数据
communicationMonitor.recordMessagePublished(topic, payload);
} catch (Exception e) {
System.err.println("Error publishing message: " + e.getMessage());
communicationMonitor.recordError("message_publish", e);
}
}

/**
* 发布事件
*/
public void publishEvent(String eventType, String source, Object payload) {
try {
EventBusFramework.Event event = new EventBusFramework.EventImpl(eventType, source, payload);
eventBus.publish(event);

// 记录监控数据
communicationMonitor.recordEventPublished(eventType, payload);
} catch (Exception e) {
System.err.println("Error publishing event: " + e.getMessage());
communicationMonitor.recordError("event_publish", e);
}
}

/**
* 发布订阅消息
*/
public void publishPubSub(String topic, Object message) {
try {
pubSubManager.publish(topic, message);

// 记录监控数据
communicationMonitor.recordPubSubPublished(topic, message);
} catch (Exception e) {
System.err.println("Error publishing pub/sub message: " + e.getMessage());
communicationMonitor.recordError("pubsub_publish", e);
}
}

/**
* 提交异步任务
*/
public <T> CompletableFuture<T> submitAsyncTask(AsyncProcessingFramework.AsyncTask<T> task) {
try {
CompletableFuture<T> future = asyncProcessor.submit(task);

// 记录监控数据
communicationMonitor.recordAsyncTaskSubmitted(task.getTaskId());

return future;
} catch (Exception e) {
System.err.println("Error submitting async task: " + e.getMessage());
communicationMonitor.recordError("async_task_submit", e);
throw e;
}
}

/**
* 订阅消息
*/
public void subscribeMessage(String topic, MessageQueueFramework.MessageHandler handler) {
messageQueue.subscribe(topic, handler);
communicationMonitor.recordMessageSubscription(topic);
}

/**
* 订阅事件
*/
public void subscribeEvent(String eventType, EventBusFramework.EventHandler handler) {
eventBus.subscribe(eventType, handler);
communicationMonitor.recordEventSubscription(eventType);
}

/**
* 订阅发布订阅
*/
public void subscribePubSub(String topic, PubSubFramework.Subscriber subscriber) {
pubSubManager.subscribe(topic, subscriber);
communicationMonitor.recordPubSubSubscription(topic);
}

/**
* 获取通信状态
*/
public CommunicationStatus getCommunicationStatus() {
CommunicationStatus status = new CommunicationStatus();

// 收集消息队列状态
status.setMessageQueueStatus(messageQueue.getStatus());

// 收集事件总线状态
status.setEventBusRunning(eventBus.getEventFilter() != null);

// 收集发布订阅状态
status.setPubSubTopics(pubSubManager.getTopics());

// 收集异步处理状态
status.setCompletedTasks(asyncProcessor.getCompletedTasks());
status.setPendingTasks(asyncProcessor.getPendingTasks());

// 收集监控数据
status.setMonitoringData(communicationMonitor.getMonitoringData());

return status;
}

/**
* 配置通信参数
*/
public void configureCommunication(CommunicationConfig config) {
this.communicationConfig.update(config);

// 应用配置
applyConfiguration();
}

private void applyConfiguration() {
// 应用通信配置
System.out.println("Applying communication configuration: " + communicationConfig);
}

/**
* 关闭通信管理器
*/
public void shutdown() {
messageQueue.stop();
eventBus.stop();
pubSubManager.stop();
asyncProcessor.stop();
communicationMonitor.stop();
}
}

/**
* 通信监控器
*/
public class CommunicationMonitor {
private final Map<String, Long> messageCounts;
private final Map<String, Long> eventCounts;
private final Map<String, Long> pubSubCounts;
private final Map<String, Long> asyncTaskCounts;
private final List<Exception> errors;
private final AtomicBoolean running;

public CommunicationMonitor() {
this.messageCounts = new ConcurrentHashMap<>();
this.eventCounts = new ConcurrentHashMap<>();
this.pubSubCounts = new ConcurrentHashMap<>();
this.asyncTaskCounts = new ConcurrentHashMap<>();
this.errors = Collections.synchronizedList(new ArrayList<>());
this.running = new AtomicBoolean(false);
}

public void start() {
running.set(true);
}

public void stop() {
running.set(false);
}

public void recordMessagePublished(String topic, Object payload) {
messageCounts.merge(topic, 1L, Long::sum);
}

public void recordEventPublished(String eventType, Object payload) {
eventCounts.merge(eventType, 1L, Long::sum);
}

public void recordPubSubPublished(String topic, Object message) {
pubSubCounts.merge(topic, 1L, Long::sum);
}

public void recordAsyncTaskSubmitted(String taskId) {
asyncTaskCounts.merge("submitted", 1L, Long::sum);
}

public void recordMessageSubscription(String topic) {
messageCounts.merge(topic + "_subscriptions", 1L, Long::sum);
}

public void recordEventSubscription(String eventType) {
eventCounts.merge(eventType + "_subscriptions", 1L, Long::sum);
}

public void recordPubSubSubscription(String topic) {
pubSubCounts.merge(topic + "_subscriptions", 1L, Long::sum);
}

public void recordError(String operation, Exception error) {
errors.add(error);
System.err.println("Communication error in " + operation + ": " + error.getMessage());
}

public Map<String, Object> getMonitoringData() {
Map<String, Object> data = new HashMap<>();
data.put("messageCounts", new HashMap<>(messageCounts));
data.put("eventCounts", new HashMap<>(eventCounts));
data.put("pubSubCounts", new HashMap<>(pubSubCounts));
data.put("asyncTaskCounts", new HashMap<>(asyncTaskCounts));
data.put("errorCount", errors.size());
data.put("running", running.get());
return data;
}
}

/**
* 通信配置
*/
public class CommunicationConfig {
private int messageQueueThreads;
private int eventBusThreads;
private int pubSubThreads;
private int asyncProcessorThreads;
private long messageTimeout;
private long eventTimeout;
private long asyncTaskTimeout;
private boolean enableCompression;
private boolean enableEncryption;

public CommunicationConfig() {
this.messageQueueThreads = 10;
this.eventBusThreads = 5;
this.pubSubThreads = 8;
this.asyncProcessorThreads = 20;
this.messageTimeout = 30000;
this.eventTimeout = 15000;
this.asyncTaskTimeout = 60000;
this.enableCompression = false;
this.enableEncryption = false;
}

public void update(CommunicationConfig config) {
this.messageQueueThreads = config.messageQueueThreads;
this.eventBusThreads = config.eventBusThreads;
this.pubSubThreads = config.pubSubThreads;
this.asyncProcessorThreads = config.asyncProcessorThreads;
this.messageTimeout = config.messageTimeout;
this.eventTimeout = config.eventTimeout;
this.asyncTaskTimeout = config.asyncTaskTimeout;
this.enableCompression = config.enableCompression;
this.enableEncryption = config.enableEncryption;
}

// getters and setters
public int getMessageQueueThreads() { return messageQueueThreads; }
public void setMessageQueueThreads(int messageQueueThreads) { this.messageQueueThreads = messageQueueThreads; }
public int getEventBusThreads() { return eventBusThreads; }
public void setEventBusThreads(int eventBusThreads) { this.eventBusThreads = eventBusThreads; }
public int getPubSubThreads() { return pubSubThreads; }
public void setPubSubThreads(int pubSubThreads) { this.pubSubThreads = pubSubThreads; }
public int getAsyncProcessorThreads() { return asyncProcessorThreads; }
public void setAsyncProcessorThreads(int asyncProcessorThreads) { this.asyncProcessorThreads = asyncProcessorThreads; }
public long getMessageTimeout() { return messageTimeout; }
public void setMessageTimeout(long messageTimeout) { this.messageTimeout = messageTimeout; }
public long getEventTimeout() { return eventTimeout; }
public void setEventTimeout(long eventTimeout) { this.eventTimeout = eventTimeout; }
public long getAsyncTaskTimeout() { return asyncTaskTimeout; }
public void setAsyncTaskTimeout(long asyncTaskTimeout) { this.asyncTaskTimeout = asyncTaskTimeout; }
public boolean isEnableCompression() { return enableCompression; }
public void setEnableCompression(boolean enableCompression) { this.enableCompression = enableCompression; }
public boolean isEnableEncryption() { return enableEncryption; }
public void setEnableEncryption(boolean enableEncryption) { this.enableEncryption = enableEncryption; }
}

/**
* 通信状态
*/
public class CommunicationStatus {
private MessageQueueFramework.QueueStatus messageQueueStatus;
private boolean eventBusRunning;
private List<String> pubSubTopics;
private List<String> completedTasks;
private List<String> pendingTasks;
private Map<String, Object> monitoringData;

// getters and setters
public MessageQueueFramework.QueueStatus getMessageQueueStatus() { return messageQueueStatus; }
public void setMessageQueueStatus(MessageQueueFramework.QueueStatus messageQueueStatus) { this.messageQueueStatus = messageQueueStatus; }
public boolean isEventBusRunning() { return eventBusRunning; }
public void setEventBusRunning(boolean eventBusRunning) { this.eventBusRunning = eventBusRunning; }
public List<String> getPubSubTopics() { return pubSubTopics; }
public void setPubSubTopics(List<String> pubSubTopics) { this.pubSubTopics = pubSubTopics; }
public List<String> getCompletedTasks() { return completedTasks; }
public void setCompletedTasks(List<String> completedTasks) { this.completedTasks = completedTasks; }
public List<String> getPendingTasks() { return pendingTasks; }
public void setPendingTasks(List<String> pendingTasks) { this.pendingTasks = pendingTasks; }
public Map<String, Object> getMonitoringData() { return monitoringData; }
public void setMonitoringData(Map<String, Object> monitoringData) { this.monitoringData = monitoringData; }
}

7. 总结

本文深入探讨了间接通信的架构师级别技术,涵盖了消息队列、事件总线、发布订阅、异步处理,以及企业级间接通信架构的最佳实践。

关键技术要点:

  1. 消息队列

    • 消息发布订阅、消息确认、重试机制
    • 死信队列、消息过滤、消息路由
    • 消息压缩、持久化存储
  2. 事件总线

    • 事件发布订阅、事件存储、事件重放
    • 事件过滤器、异步事件处理
    • 事件处理器、事件路由
  3. 发布订阅

    • 主题订阅、消息分发、订阅者管理
    • 消息路由、消息持久化
    • 主题过滤器、消息路由
  4. 异步处理

    • 异步任务提交、结果回调、超时处理
    • 任务队列、任务调度、优先级处理
    • 异步结果管理、任务监控
  5. 企业级架构

    • 统一通信管理、监控告警、配置管理
    • 性能优化、故障处理、可扩展性
    • 集成多种通信模式

架构设计原则:

  • 解耦性:通过间接通信实现系统间的松耦合
  • 可靠性:通过消息确认、重试、死信队列保证可靠性
  • 可扩展性:支持水平扩展和负载分布
  • 可维护性:提供监控、调试、运维支持
  • 性能优化:通过异步处理、消息压缩等技术提升性能

作为架构师,我们需要深入理解间接通信的原理和实现,掌握各种通信模式的特点,并能够根据业务需求选择最合适的通信方案。通过本文的实战案例,我们可以更好地理解间接通信在企业级应用中的重要作用。

间接通信是现代分布式系统架构的核心技术,它通过消息队列、事件总线、发布订阅等机制实现系统间的松耦合通信。只有深入理解间接通信技术的本质,才能设计出真正优秀的分布式系统架构解决方案。