第233集SpringBoot大事务优化架构实战:事务拆分、异步处理、分布式事务的企业级解决方案

前言

在当今高并发、大数据处理的企业级应用中,大事务已成为系统性能和稳定性的重要瓶颈。传统的大事务处理方式往往导致长时间锁表、资源占用过高、系统响应缓慢等问题,严重影响用户体验和系统可用性。基于SpringBoot的事务优化架构,不仅能够有效拆分大事务,还能实现异步处理、分布式事务协调和智能事务管理。随着微服务架构和分布式系统的普及,构建可扩展、高性能的事务处理框架,已成为企业级架构师必须掌握的核心技能。

本文将深入探讨SpringBoot中大事务优化的架构设计与实战应用,从事务拆分到异步处理,从分布式事务到性能优化,为企业构建稳定、高效的事务处理解决方案提供全面的技术指导。

一、SpringBoot大事务优化架构概述与核心原理

1.1 大事务优化架构设计

SpringBoot大事务优化系统采用分层架构设计,通过事务拆分、异步处理、分布式协调等技术,实现高效的事务处理能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
graph TB
A[大事务请求] --> B[事务分析器]
B --> C[事务拆分器]
C --> D[事务调度器]
D --> E[异步处理器]
E --> F[分布式协调器]
F --> G[结果聚合器]
G --> H[响应返回]

I[事务拆分策略] --> J[按业务拆分]
I --> K[按数据拆分]
I --> L[按时间拆分]
I --> M[按优先级拆分]

N[异步处理模式] --> O[消息队列]
N --> P[事件驱动]
N --> Q[回调处理]
N --> R[批量处理]

S[分布式事务] --> T[两阶段提交]
S --> U[Saga模式]
S --> V[TCC模式]
S --> W[最终一致性]

X[监控体系] --> Y[事务监控]
X --> Z[性能监控]
X --> AA[异常监控]
X --> BB[资源监控]

1.2 大事务优化核心特性

1.2.1 事务拆分策略

  • 业务拆分:按业务逻辑将大事务拆分为多个小事务
  • 数据拆分:按数据范围将大事务拆分为多个子事务
  • 时间拆分:按时间维度将大事务拆分为多个阶段
  • 优先级拆分:按业务优先级拆分事务处理顺序

1.2.2 异步处理机制

  • 消息队列:使用消息队列实现异步事务处理
  • 事件驱动:基于事件驱动的异步处理模式
  • 回调处理:异步处理完成后的回调通知
  • 批量处理:批量处理提高事务处理效率

1.2.3 分布式事务协调

  • 两阶段提交:经典的分布式事务协议
  • Saga模式:长事务的分布式处理模式
  • TCC模式:Try-Confirm-Cancel的补偿模式
  • 最终一致性:基于消息的最终一致性保证

二、SpringBoot大事务优化核心实现

2.1 事务拆分服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
// 事务拆分服务
@Service
@Slf4j
public class TransactionSplitService {

@Autowired
private TransactionAnalyzer transactionAnalyzer;

@Autowired
private TransactionExecutor transactionExecutor;

@Autowired
private TransactionMonitorService monitorService;

/**
* 拆分大事务
*/
public <T> CompletableFuture<TransactionResult<T>> splitLargeTransaction(
LargeTransactionRequest<T> request) {

long startTime = System.currentTimeMillis();
String transactionId = generateTransactionId();

try {
// 1. 分析事务
TransactionAnalysis analysis = transactionAnalyzer.analyzeTransaction(request);

// 2. 拆分事务
List<SubTransaction> subTransactions = splitTransaction(analysis, request);

// 3. 执行子事务
CompletableFuture<List<SubTransactionResult>> subResults = executeSubTransactions(subTransactions);

// 4. 聚合结果
CompletableFuture<TransactionResult<T>> finalResult = subResults.thenApply(results -> {
try {
TransactionResult<T> aggregatedResult = aggregateResults(results, request);

// 5. 记录监控信息
long executionTime = System.currentTimeMillis() - startTime;
monitorService.recordTransactionExecution(transactionId, executionTime, subTransactions.size());

return aggregatedResult;

} catch (Exception e) {
log.error("事务结果聚合失败: transactionId={}", transactionId, e);
throw new RuntimeException("事务结果聚合失败", e);
}
});

return finalResult;

} catch (Exception e) {
log.error("大事务拆分失败: transactionId={}", transactionId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 按业务拆分事务
*/
public List<SubTransaction> splitByBusiness(LargeTransactionRequest<?> request) {
List<SubTransaction> subTransactions = new ArrayList<>();

// 1. 识别业务模块
List<BusinessModule> modules = identifyBusinessModules(request);

// 2. 为每个模块创建子事务
for (BusinessModule module : modules) {
SubTransaction subTransaction = new SubTransaction();
subTransaction.setId(generateSubTransactionId());
subTransaction.setModule(module);
subTransaction.setPriority(module.getPriority());
subTransaction.setDependencies(module.getDependencies());

subTransactions.add(subTransaction);
}

// 3. 排序子事务(按依赖关系)
return sortSubTransactionsByDependency(subTransactions);
}

/**
* 按数据拆分事务
*/
public List<SubTransaction> splitByData(LargeTransactionRequest<?> request) {
List<SubTransaction> subTransactions = new ArrayList<>();

// 1. 获取数据分片
List<DataShard> shards = getDataShards(request.getDataSource(), request.getShardKey());

// 2. 为每个分片创建子事务
for (DataShard shard : shards) {
SubTransaction subTransaction = new SubTransaction();
subTransaction.setId(generateSubTransactionId());
subTransaction.setDataShard(shard);
subTransaction.setPriority(1); // 数据分片事务优先级相同

subTransactions.add(subTransaction);
}

return subTransactions;
}

/**
* 按时间拆分事务
*/
public List<SubTransaction> splitByTime(LargeTransactionRequest<?> request) {
List<SubTransaction> subTransactions = new ArrayList<>();

// 1. 计算时间窗口
List<TimeWindow> windows = calculateTimeWindows(request.getTimeRange(), request.getWindowSize());

// 2. 为每个时间窗口创建子事务
for (TimeWindow window : windows) {
SubTransaction subTransaction = new SubTransaction();
subTransaction.setId(generateSubTransactionId());
subTransaction.setTimeWindow(window);
subTransaction.setPriority(window.getPriority());

subTransactions.add(subTransaction);
}

return subTransactions;
}

/**
* 执行子事务
*/
private CompletableFuture<List<SubTransactionResult>> executeSubTransactions(
List<SubTransaction> subTransactions) {

// 1. 按优先级分组
Map<Integer, List<SubTransaction>> priorityGroups = subTransactions.stream()
.collect(Collectors.groupingBy(SubTransaction::getPriority));

// 2. 按优先级顺序执行
CompletableFuture<List<SubTransactionResult>> result = CompletableFuture.completedFuture(new ArrayList<>());

List<Integer> sortedPriorities = priorityGroups.keySet().stream()
.sorted()
.collect(Collectors.toList());

for (Integer priority : sortedPriorities) {
List<SubTransaction> currentGroup = priorityGroups.get(priority);

result = result.thenCompose(previousResults -> {
// 并行执行同优先级的子事务
List<CompletableFuture<SubTransactionResult>> futures = currentGroup.stream()
.map(this::executeSubTransaction)
.collect(Collectors.toList());

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<SubTransactionResult> currentResults = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

List<SubTransactionResult> allResults = new ArrayList<>(previousResults);
allResults.addAll(currentResults);
return allResults;
});
});
}

return result;
}

/**
* 执行单个子事务
*/
private CompletableFuture<SubTransactionResult> executeSubTransaction(SubTransaction subTransaction) {
return CompletableFuture.supplyAsync(() -> {
try {
return transactionExecutor.executeSubTransaction(subTransaction);
} catch (Exception e) {
log.error("子事务执行失败: subTransactionId={}", subTransaction.getId(), e);
return SubTransactionResult.failed(subTransaction.getId(), e.getMessage());
}
});
}

/**
* 聚合结果
*/
private <T> TransactionResult<T> aggregateResults(List<SubTransactionResult> results,
LargeTransactionRequest<T> request) {
TransactionResult<T> aggregatedResult = new TransactionResult<>();

// 1. 检查是否有失败的结果
List<SubTransactionResult> failedResults = results.stream()
.filter(result -> !result.isSuccess())
.collect(Collectors.toList());

if (!failedResults.isEmpty()) {
aggregatedResult.setSuccess(false);
aggregatedResult.setErrorMessage("存在失败的子事务: " + failedResults.size());
return aggregatedResult;
}

// 2. 合并成功的结果
List<T> allData = results.stream()
.filter(SubTransactionResult::isSuccess)
.flatMap(result -> result.getData().stream())
.collect(Collectors.toList());

aggregatedResult.setSuccess(true);
aggregatedResult.setData(allData);
aggregatedResult.setSubTransactionCount(results.size());

return aggregatedResult;
}

/**
* 识别业务模块
*/
private List<BusinessModule> identifyBusinessModules(LargeTransactionRequest<?> request) {
// 实现业务模块识别逻辑
return new ArrayList<>();
}

/**
* 获取数据分片
*/
private List<DataShard> getDataShards(String dataSource, String shardKey) {
// 实现数据分片获取逻辑
return new ArrayList<>();
}

/**
* 计算时间窗口
*/
private List<TimeWindow> calculateTimeWindows(TimeRange timeRange, Duration windowSize) {
// 实现时间窗口计算逻辑
return new ArrayList<>();
}

/**
* 按依赖关系排序子事务
*/
private List<SubTransaction> sortSubTransactionsByDependency(List<SubTransaction> subTransactions) {
// 实现拓扑排序逻辑
return subTransactions;
}

/**
* 生成事务ID
*/
private String generateTransactionId() {
return "TX" + System.currentTimeMillis() + RandomUtils.nextInt(1000, 9999);
}

/**
* 生成子事务ID
*/
private String generateSubTransactionId() {
return "STX" + System.currentTimeMillis() + RandomUtils.nextInt(1000, 9999);
}
}

// 大事务请求
public class LargeTransactionRequest<T> {
private String transactionId;
private String dataSource;
private String shardKey;
private TimeRange timeRange;
private Duration windowSize;
private TransactionSplitStrategy splitStrategy;
private Class<T> resultType;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

// 子事务
public class SubTransaction {
private String id;
private BusinessModule module;
private DataShard dataShard;
private TimeWindow timeWindow;
private int priority;
private List<String> dependencies;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

// 子事务结果
public class SubTransactionResult {
private String subTransactionId;
private boolean success;
private List<Object> data;
private String errorMessage;
private long executionTime;

public static SubTransactionResult success(String subTransactionId, List<Object> data) {
SubTransactionResult result = new SubTransactionResult();
result.subTransactionId = subTransactionId;
result.success = true;
result.data = data;
return result;
}

public static SubTransactionResult failed(String subTransactionId, String errorMessage) {
SubTransactionResult result = new SubTransactionResult();
result.subTransactionId = subTransactionId;
result.success = false;
result.errorMessage = errorMessage;
result.data = new ArrayList<>();
return result;
}

// 构造函数和getter/setter方法
}

// 事务结果
public class TransactionResult<T> {
private boolean success;
private List<T> data;
private String errorMessage;
private int subTransactionCount;
private long totalExecutionTime;
private Map<String, Object> metadata;

// 构造函数和getter/setter方法
}

2.2 异步事务处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// 异步事务处理器
@Service
@Slf4j
public class AsyncTransactionProcessor {

@Autowired
private MessageQueueService messageQueueService;

@Autowired
private TransactionCallbackService callbackService;

@Autowired
private TransactionRetryService retryService;

private final ThreadPoolExecutor asyncExecutor = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "async-transaction-" + threadNumber.getAndIncrement());
thread.setDaemon(false);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);

/**
* 异步处理事务
*/
public CompletableFuture<AsyncTransactionResult> processAsyncTransaction(
AsyncTransactionRequest request) {

String transactionId = generateTransactionId();

try {
// 1. 创建异步事务上下文
AsyncTransactionContext context = createAsyncTransactionContext(transactionId, request);

// 2. 发送异步处理消息
AsyncTransactionMessage message = new AsyncTransactionMessage();
message.setTransactionId(transactionId);
message.setRequest(request);
message.setCreateTime(System.currentTimeMillis());

messageQueueService.sendAsyncTransactionMessage(message);

// 3. 返回异步结果
AsyncTransactionResult result = new AsyncTransactionResult();
result.setTransactionId(transactionId);
result.setStatus(AsyncTransactionStatus.PROCESSING);
result.setMessage("事务已提交异步处理");

return CompletableFuture.completedFuture(result);

} catch (Exception e) {
log.error("异步事务处理失败: transactionId={}", transactionId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 处理异步事务消息
*/
@RabbitListener(queues = "async.transaction.queue")
public void handleAsyncTransactionMessage(AsyncTransactionMessage message) {
String transactionId = message.getTransactionId();

try {
log.info("开始处理异步事务: transactionId={}", transactionId);

// 1. 更新事务状态
updateTransactionStatus(transactionId, AsyncTransactionStatus.PROCESSING);

// 2. 执行事务处理
TransactionResult<?> result = executeAsyncTransaction(message.getRequest());

// 3. 更新事务状态
if (result.isSuccess()) {
updateTransactionStatus(transactionId, AsyncTransactionStatus.COMPLETED);
log.info("异步事务处理完成: transactionId={}", transactionId);
} else {
updateTransactionStatus(transactionId, AsyncTransactionStatus.FAILED);
log.error("异步事务处理失败: transactionId={}, error={}",
transactionId, result.getErrorMessage());

// 4. 重试处理
handleTransactionFailure(transactionId, result.getErrorMessage());
}

// 5. 执行回调
executeCallback(transactionId, result);

} catch (Exception e) {
log.error("异步事务处理异常: transactionId={}", transactionId, e);

// 更新事务状态
updateTransactionStatus(transactionId, AsyncTransactionStatus.FAILED);

// 重试处理
handleTransactionFailure(transactionId, e.getMessage());
}
}

/**
* 批量处理异步事务
*/
@Scheduled(fixedDelay = 5000)
public void processBatchAsyncTransactions() {
try {
// 1. 获取待处理的批量事务
List<AsyncTransactionMessage> batchMessages = messageQueueService.getBatchAsyncTransactionMessages(100);

if (batchMessages.isEmpty()) {
return;
}

log.info("开始批量处理异步事务: count={}", batchMessages.size());

// 2. 并行处理批量事务
List<CompletableFuture<Void>> futures = batchMessages.stream()
.map(message -> CompletableFuture.runAsync(() -> {
try {
handleAsyncTransactionMessage(message);
} catch (Exception e) {
log.error("批量事务处理失败: transactionId={}", message.getTransactionId(), e);
}
}, asyncExecutor))
.collect(Collectors.toList());

// 3. 等待所有事务处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

log.info("批量异步事务处理完成: count={}", batchMessages.size());

} catch (Exception e) {
log.error("批量异步事务处理异常", e);
}
}

/**
* 执行异步事务
*/
private TransactionResult<?> executeAsyncTransaction(AsyncTransactionRequest request) {
try {
// 根据请求类型执行不同的处理逻辑
switch (request.getTransactionType()) {
case DATA_PROCESSING:
return executeDataProcessingTransaction(request);
case FILE_PROCESSING:
return executeFileProcessingTransaction(request);
case BATCH_UPDATE:
return executeBatchUpdateTransaction(request);
case REPORT_GENERATION:
return executeReportGenerationTransaction(request);
default:
return TransactionResult.error("不支持的事务类型: " + request.getTransactionType());
}

} catch (Exception e) {
log.error("异步事务执行失败: transactionType={}", request.getTransactionType(), e);
return TransactionResult.error("异步事务执行失败: " + e.getMessage());
}
}

/**
* 执行数据处理事务
*/
private TransactionResult<?> executeDataProcessingTransaction(AsyncTransactionRequest request) {
// 实现数据处理事务逻辑
return TransactionResult.success(new ArrayList<>());
}

/**
* 执行文件处理事务
*/
private TransactionResult<?> executeFileProcessingTransaction(AsyncTransactionRequest request) {
// 实现文件处理事务逻辑
return TransactionResult.success(new ArrayList<>());
}

/**
* 执行批量更新事务
*/
private TransactionResult<?> executeBatchUpdateTransaction(AsyncTransactionRequest request) {
// 实现批量更新事务逻辑
return TransactionResult.success(new ArrayList<>());
}

/**
* 执行报表生成事务
*/
private TransactionResult<?> executeReportGenerationTransaction(AsyncTransactionRequest request) {
// 实现报表生成事务逻辑
return TransactionResult.success(new ArrayList<>());
}

/**
* 处理事务失败
*/
private void handleTransactionFailure(String transactionId, String errorMessage) {
try {
// 1. 检查是否需要重试
if (retryService.shouldRetry(transactionId)) {
// 2. 安排重试
retryService.scheduleRetry(transactionId, errorMessage);
} else {
// 3. 标记为最终失败
updateTransactionStatus(transactionId, AsyncTransactionStatus.FAILED_FINAL);
log.error("事务最终失败: transactionId={}, error={}", transactionId, errorMessage);
}

} catch (Exception e) {
log.error("处理事务失败异常: transactionId={}", transactionId, e);
}
}

/**
* 执行回调
*/
private void executeCallback(String transactionId, TransactionResult<?> result) {
try {
callbackService.executeCallback(transactionId, result);
} catch (Exception e) {
log.error("执行回调失败: transactionId={}", transactionId, e);
}
}

/**
* 更新事务状态
*/
private void updateTransactionStatus(String transactionId, AsyncTransactionStatus status) {
// 实现事务状态更新逻辑
log.debug("更新事务状态: transactionId={}, status={}", transactionId, status);
}

/**
* 创建异步事务上下文
*/
private AsyncTransactionContext createAsyncTransactionContext(String transactionId, AsyncTransactionRequest request) {
AsyncTransactionContext context = new AsyncTransactionContext();
context.setTransactionId(transactionId);
context.setRequest(request);
context.setCreateTime(System.currentTimeMillis());
context.setStatus(AsyncTransactionStatus.PROCESSING);
return context;
}

/**
* 生成事务ID
*/
private String generateTransactionId() {
return "ATX" + System.currentTimeMillis() + RandomUtils.nextInt(1000, 9999);
}

@PreDestroy
public void destroy() {
asyncExecutor.shutdown();
try {
if (!asyncExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
asyncExecutor.shutdownNow();
}
} catch (InterruptedException e) {
asyncExecutor.shutdownNow();
Thread.currentThread().interrupt();
}

log.info("异步事务处理器已关闭");
}
}

// 异步事务请求
public class AsyncTransactionRequest {
private String requestId;
private AsyncTransactionType transactionType;
private Map<String, Object> parameters;
private String callbackUrl;
private int priority;
private long timeout;

// 构造函数和getter/setter方法
}

// 异步事务消息
public class AsyncTransactionMessage {
private String transactionId;
private AsyncTransactionRequest request;
private long createTime;
private int retryCount;

// 构造函数和getter/setter方法
}

// 异步事务结果
public class AsyncTransactionResult {
private String transactionId;
private AsyncTransactionStatus status;
private String message;
private long createTime;
private long completeTime;

// 构造函数和getter/setter方法
}

// 异步事务状态
public enum AsyncTransactionStatus {
PENDING, // 等待中
PROCESSING, // 处理中
COMPLETED, // 已完成
FAILED, // 失败
FAILED_FINAL, // 最终失败
CANCELLED // 已取消
}

// 异步事务类型
public enum AsyncTransactionType {
DATA_PROCESSING, // 数据处理
FILE_PROCESSING, // 文件处理
BATCH_UPDATE, // 批量更新
REPORT_GENERATION // 报表生成
}

2.3 分布式事务协调器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
// 分布式事务协调器
@Service
@Slf4j
public class DistributedTransactionCoordinator {

@Autowired
private TransactionLogService transactionLogService;

@Autowired
private MessageQueueService messageQueueService;

@Autowired
private TransactionMonitorService monitorService;

/**
* 执行两阶段提交
*/
public CompletableFuture<DistributedTransactionResult> executeTwoPhaseCommit(
TwoPhaseCommitRequest request) {

String transactionId = generateTransactionId();

try {
// 1. 记录事务日志
transactionLogService.recordTransactionStart(transactionId, request);

// 2. 准备阶段
CompletableFuture<List<PrepareResult>> prepareResults = executePreparePhase(request);

// 3. 提交阶段
CompletableFuture<DistributedTransactionResult> finalResult = prepareResults.thenCompose(prepareResultsList -> {
try {
// 检查所有参与者是否准备成功
boolean allPrepared = prepareResultsList.stream().allMatch(PrepareResult::isSuccess);

if (allPrepared) {
// 执行提交
return executeCommitPhase(request, prepareResultsList);
} else {
// 执行回滚
return executeRollbackPhase(request, prepareResultsList);
}

} catch (Exception e) {
log.error("两阶段提交执行失败: transactionId={}", transactionId, e);
return CompletableFuture.failedFuture(e);
}
});

return finalResult;

} catch (Exception e) {
log.error("两阶段提交初始化失败: transactionId={}", transactionId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 执行Saga模式
*/
public CompletableFuture<DistributedTransactionResult> executeSagaMode(SagaRequest request) {
String transactionId = generateTransactionId();

try {
// 1. 记录Saga事务开始
transactionLogService.recordSagaTransactionStart(transactionId, request);

// 2. 执行Saga步骤
CompletableFuture<DistributedTransactionResult> result = executeSagaSteps(request);

// 3. 记录事务完成
result.thenAccept(transactionResult -> {
transactionLogService.recordTransactionComplete(transactionId, transactionResult);
});

return result;

} catch (Exception e) {
log.error("Saga模式执行失败: transactionId={}", transactionId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 执行TCC模式
*/
public CompletableFuture<DistributedTransactionResult> executeTccMode(TccRequest request) {
String transactionId = generateTransactionId();

try {
// 1. 记录TCC事务开始
transactionLogService.recordTccTransactionStart(transactionId, request);

// 2. Try阶段
CompletableFuture<List<TryResult>> tryResults = executeTryPhase(request);

// 3. Confirm/Cancel阶段
CompletableFuture<DistributedTransactionResult> finalResult = tryResults.thenCompose(tryResultsList -> {
try {
// 检查所有Try操作是否成功
boolean allTried = tryResultsList.stream().allMatch(TryResult::isSuccess);

if (allTried) {
// 执行Confirm
return executeConfirmPhase(request, tryResultsList);
} else {
// 执行Cancel
return executeCancelPhase(request, tryResultsList);
}

} catch (Exception e) {
log.error("TCC模式执行失败: transactionId={}", transactionId, e);
return CompletableFuture.failedFuture(e);
}
});

return finalResult;

} catch (Exception e) {
log.error("TCC模式初始化失败: transactionId={}", transactionId, e);
return CompletableFuture.failedFuture(e);
}
}

/**
* 执行准备阶段
*/
private CompletableFuture<List<PrepareResult>> executePreparePhase(TwoPhaseCommitRequest request) {
List<CompletableFuture<PrepareResult>> prepareFutures = request.getParticipants().stream()
.map(participant -> CompletableFuture.supplyAsync(() -> {
try {
return executeParticipantPrepare(participant);
} catch (Exception e) {
log.error("参与者准备失败: participantId={}", participant.getId(), e);
return PrepareResult.failed(participant.getId(), e.getMessage());
}
}))
.collect(Collectors.toList());

return CompletableFuture.allOf(prepareFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> prepareFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}

/**
* 执行提交阶段
*/
private CompletableFuture<DistributedTransactionResult> executeCommitPhase(
TwoPhaseCommitRequest request, List<PrepareResult> prepareResults) {

List<CompletableFuture<CommitResult>> commitFutures = request.getParticipants().stream()
.map(participant -> CompletableFuture.supplyAsync(() -> {
try {
return executeParticipantCommit(participant);
} catch (Exception e) {
log.error("参与者提交失败: participantId={}", participant.getId(), e);
return CommitResult.failed(participant.getId(), e.getMessage());
}
}))
.collect(Collectors.toList());

return CompletableFuture.allOf(commitFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<CommitResult> commitResults = commitFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

boolean allCommitted = commitResults.stream().allMatch(CommitResult::isSuccess);

DistributedTransactionResult result = new DistributedTransactionResult();
result.setSuccess(allCommitted);
result.setCommitResults(commitResults);

if (!allCommitted) {
result.setErrorMessage("部分参与者提交失败");
}

return result;
});
}

/**
* 执行回滚阶段
*/
private CompletableFuture<DistributedTransactionResult> executeRollbackPhase(
TwoPhaseCommitRequest request, List<PrepareResult> prepareResults) {

List<CompletableFuture<RollbackResult>> rollbackFutures = request.getParticipants().stream()
.map(participant -> CompletableFuture.supplyAsync(() -> {
try {
return executeParticipantRollback(participant);
} catch (Exception e) {
log.error("参与者回滚失败: participantId={}", participant.getId(), e);
return RollbackResult.failed(participant.getId(), e.getMessage());
}
}))
.collect(Collectors.toList());

return CompletableFuture.allOf(rollbackFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<RollbackResult> rollbackResults = rollbackFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

boolean allRolledBack = rollbackResults.stream().allMatch(RollbackResult::isSuccess);

DistributedTransactionResult result = new DistributedTransactionResult();
result.setSuccess(allRolledBack);
result.setRollbackResults(rollbackResults);

if (!allRolledBack) {
result.setErrorMessage("部分参与者回滚失败");
}

return result;
});
}

/**
* 执行Saga步骤
*/
private CompletableFuture<DistributedTransactionResult> executeSagaSteps(SagaRequest request) {
CompletableFuture<DistributedTransactionResult> result = CompletableFuture.completedFuture(
new DistributedTransactionResult());

for (SagaStep step : request.getSteps()) {
result = result.thenCompose(previousResult -> {
if (!previousResult.isSuccess()) {
return CompletableFuture.completedFuture(previousResult);
}

return executeSagaStep(step);
});
}

return result;
}

/**
* 执行Saga步骤
*/
private CompletableFuture<DistributedTransactionResult> executeSagaStep(SagaStep step) {
return CompletableFuture.supplyAsync(() -> {
try {
// 执行步骤
StepResult stepResult = executeStep(step);

if (stepResult.isSuccess()) {
DistributedTransactionResult result = new DistributedTransactionResult();
result.setSuccess(true);
result.setStepResults(List.of(stepResult));
return result;
} else {
// 执行补偿
return executeCompensation(step);
}

} catch (Exception e) {
log.error("Saga步骤执行失败: stepId={}", step.getId(), e);
return executeCompensation(step);
}
});
}

/**
* 执行Try阶段
*/
private CompletableFuture<List<TryResult>> executeTryPhase(TccRequest request) {
List<CompletableFuture<TryResult>> tryFutures = request.getParticipants().stream()
.map(participant -> CompletableFuture.supplyAsync(() -> {
try {
return executeParticipantTry(participant);
} catch (Exception e) {
log.error("参与者Try失败: participantId={}", participant.getId(), e);
return TryResult.failed(participant.getId(), e.getMessage());
}
}))
.collect(Collectors.toList());

return CompletableFuture.allOf(tryFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> tryFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}

/**
* 执行Confirm阶段
*/
private CompletableFuture<DistributedTransactionResult> executeConfirmPhase(
TccRequest request, List<TryResult> tryResults) {

List<CompletableFuture<ConfirmResult>> confirmFutures = request.getParticipants().stream()
.map(participant -> CompletableFuture.supplyAsync(() -> {
try {
return executeParticipantConfirm(participant);
} catch (Exception e) {
log.error("参与者Confirm失败: participantId={}", participant.getId(), e);
return ConfirmResult.failed(participant.getId(), e.getMessage());
}
}))
.collect(Collectors.toList());

return CompletableFuture.allOf(confirmFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<ConfirmResult> confirmResults = confirmFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

boolean allConfirmed = confirmResults.stream().allMatch(ConfirmResult::isSuccess);

DistributedTransactionResult result = new DistributedTransactionResult();
result.setSuccess(allConfirmed);
result.setConfirmResults(confirmResults);

if (!allConfirmed) {
result.setErrorMessage("部分参与者确认失败");
}

return result;
});
}

/**
* 执行Cancel阶段
*/
private CompletableFuture<DistributedTransactionResult> executeCancelPhase(
TccRequest request, List<TryResult> tryResults) {

List<CompletableFuture<CancelResult>> cancelFutures = request.getParticipants().stream()
.map(participant -> CompletableFuture.supplyAsync(() -> {
try {
return executeParticipantCancel(participant);
} catch (Exception e) {
log.error("参与者Cancel失败: participantId={}", participant.getId(), e);
return CancelResult.failed(participant.getId(), e.getMessage());
}
}))
.collect(Collectors.toList());

return CompletableFuture.allOf(cancelFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
List<CancelResult> cancelResults = cancelFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

boolean allCancelled = cancelResults.stream().allMatch(CancelResult::isSuccess);

DistributedTransactionResult result = new DistributedTransactionResult();
result.setSuccess(allCancelled);
result.setCancelResults(cancelResults);

if (!allCancelled) {
result.setErrorMessage("部分参与者取消失败");
}

return result;
});
}

/**
* 执行参与者准备
*/
private PrepareResult executeParticipantPrepare(TransactionParticipant participant) {
// 实现参与者准备逻辑
return PrepareResult.success(participant.getId());
}

/**
* 执行参与者提交
*/
private CommitResult executeParticipantCommit(TransactionParticipant participant) {
// 实现参与者提交逻辑
return CommitResult.success(participant.getId());
}

/**
* 执行参与者回滚
*/
private RollbackResult executeParticipantRollback(TransactionParticipant participant) {
// 实现参与者回滚逻辑
return RollbackResult.success(participant.getId());
}

/**
* 执行参与者Try
*/
private TryResult executeParticipantTry(TransactionParticipant participant) {
// 实现参与者Try逻辑
return TryResult.success(participant.getId());
}

/**
* 执行参与者Confirm
*/
private ConfirmResult executeParticipantConfirm(TransactionParticipant participant) {
// 实现参与者Confirm逻辑
return ConfirmResult.success(participant.getId());
}

/**
* 执行参与者Cancel
*/
private CancelResult executeParticipantCancel(TransactionParticipant participant) {
// 实现参与者Cancel逻辑
return CancelResult.success(participant.getId());
}

/**
* 执行步骤
*/
private StepResult executeStep(SagaStep step) {
// 实现步骤执行逻辑
return StepResult.success(step.getId());
}

/**
* 执行补偿
*/
private DistributedTransactionResult executeCompensation(SagaStep step) {
// 实现补偿逻辑
DistributedTransactionResult result = new DistributedTransactionResult();
result.setSuccess(false);
result.setErrorMessage("补偿执行失败");
return result;
}

/**
* 生成事务ID
*/
private String generateTransactionId() {
return "DTX" + System.currentTimeMillis() + RandomUtils.nextInt(1000, 9999);
}
}

// 两阶段提交请求
public class TwoPhaseCommitRequest {
private String requestId;
private List<TransactionParticipant> participants;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

// Saga请求
public class SagaRequest {
private String requestId;
private List<SagaStep> steps;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

// TCC请求
public class TccRequest {
private String requestId;
private List<TransactionParticipant> participants;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

// 事务参与者
public class TransactionParticipant {
private String id;
private String serviceName;
private String methodName;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

// Saga步骤
public class SagaStep {
private String id;
private String serviceName;
private String methodName;
private String compensationMethod;
private Map<String, Object> parameters;

// 构造函数和getter/setter方法
}

// 分布式事务结果
public class DistributedTransactionResult {
private boolean success;
private String errorMessage;
private List<PrepareResult> prepareResults;
private List<CommitResult> commitResults;
private List<RollbackResult> rollbackResults;
private List<TryResult> tryResults;
private List<ConfirmResult> confirmResults;
private List<CancelResult> cancelResults;
private List<StepResult> stepResults;

// 构造函数和getter/setter方法
}

// 各种结果类
public class PrepareResult {
private String participantId;
private boolean success;
private String errorMessage;

public static PrepareResult success(String participantId) {
PrepareResult result = new PrepareResult();
result.participantId = participantId;
result.success = true;
return result;
}

public static PrepareResult failed(String participantId, String errorMessage) {
PrepareResult result = new PrepareResult();
result.participantId = participantId;
result.success = false;
result.errorMessage = errorMessage;
return result;
}

// 构造函数和getter/setter方法
}

// 其他结果类类似实现...
public class CommitResult { /* 实现类似PrepareResult */ }
public class RollbackResult { /* 实现类似PrepareResult */ }
public class TryResult { /* 实现类似PrepareResult */ }
public class ConfirmResult { /* 实现类似PrepareResult */ }
public class CancelResult { /* 实现类似PrepareResult */ }
public class StepResult { /* 实现类似PrepareResult */ }

三、事务优化配置与监控

3.1 事务配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// 事务配置管理
@Configuration
@EnableTransactionManagement
@Slf4j
public class TransactionConfig {

/**
* 配置事务管理器
*/
@Bean
@Primary
public PlatformTransactionManager transactionManager(DataSource dataSource) {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource);

// 设置事务超时时间
transactionManager.setDefaultTimeout(30);

// 设置是否允许嵌套事务
transactionManager.setNestedTransactionAllowed(true);

// 设置是否在事务失败时回滚
transactionManager.setFailEarlyOnGlobalRollbackOnly(true);

return transactionManager;
}

/**
* 配置JTA事务管理器(用于分布式事务)
*/
@Bean
public JtaTransactionManager jtaTransactionManager() {
JtaTransactionManager jtaTransactionManager = new JtaTransactionManager();

// 设置事务超时时间
jtaTransactionManager.setDefaultTimeout(60);

// 设置是否允许嵌套事务
jtaTransactionManager.setNestedTransactionAllowed(false);

return jtaTransactionManager;
}

/**
* 配置事务拦截器
*/
@Bean
public TransactionInterceptor transactionInterceptor(PlatformTransactionManager transactionManager) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionManager(transactionManager);

// 配置事务属性
Properties transactionAttributes = new Properties();

// 查询方法使用只读事务
transactionAttributes.setProperty("get*", "PROPAGATION_REQUIRED,readOnly");
transactionAttributes.setProperty("find*", "PROPAGATION_REQUIRED,readOnly");
transactionAttributes.setProperty("query*", "PROPAGATION_REQUIRED,readOnly");
transactionAttributes.setProperty("select*", "PROPAGATION_REQUIRED,readOnly");

// 更新方法使用读写事务
transactionAttributes.setProperty("save*", "PROPAGATION_REQUIRED");
transactionAttributes.setProperty("update*", "PROPAGATION_REQUIRED");
transactionAttributes.setProperty("delete*", "PROPAGATION_REQUIRED");
transactionAttributes.setProperty("insert*", "PROPAGATION_REQUIRED");

// 批量操作使用新事务
transactionAttributes.setProperty("batch*", "PROPAGATION_REQUIRES_NEW");

// 异步操作使用新事务
transactionAttributes.setProperty("async*", "PROPAGATION_REQUIRES_NEW");

interceptor.setTransactionAttributes(transactionAttributes);

return interceptor;
}

/**
* 配置事务切面
*/
@Bean
public Advisor transactionAdvisor(TransactionInterceptor transactionInterceptor) {
AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut();
pointcut.setExpression("execution(* com.example.service..*(..))");

DefaultPointcutAdvisor advisor = new DefaultPointcutAdvisor();
advisor.setPointcut(pointcut);
advisor.setAdvice(transactionInterceptor);

return advisor;
}

/**
* 配置连接池
*/
@Bean
@ConfigurationProperties(prefix = "spring.datasource.hikari")
public HikariConfig hikariConfig() {
HikariConfig config = new HikariConfig();

// 连接池配置
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);

// 事务相关配置
config.setAutoCommit(false);
config.setTransactionIsolation("TRANSACTION_READ_COMMITTED");

// 连接泄漏检测
config.setLeakDetectionThreshold(60000);

return config;
}

/**
* 配置数据源
*/
@Bean
@Primary
public DataSource dataSource(HikariConfig hikariConfig) {
return new HikariDataSource(hikariConfig);
}
}

// 事务属性配置
@ConfigurationProperties(prefix = "transaction")
@Data
public class TransactionProperties {

/**
* 默认事务超时时间(秒)
*/
private int defaultTimeout = 30;

/**
* 是否允许嵌套事务
*/
private boolean nestedTransactionAllowed = true;

/**
* 是否在全局回滚时立即失败
*/
private boolean failEarlyOnGlobalRollbackOnly = true;

/**
* 大事务阈值(毫秒)
*/
private long largeTransactionThreshold = 5000;

/**
* 是否启用大事务拆分
*/
private boolean enableLargeTransactionSplit = true;

/**
* 是否启用异步事务处理
*/
private boolean enableAsyncTransaction = true;

/**
* 是否启用分布式事务
*/
private boolean enableDistributedTransaction = false;

/**
* 事务监控配置
*/
private TransactionMonitorProperties monitor = new TransactionMonitorProperties();

@Data
public static class TransactionMonitorProperties {
/**
* 是否启用事务监控
*/
private boolean enabled = true;

/**
* 监控数据保留时间(小时)
*/
private int retentionHours = 24;

/**
* 慢事务阈值(毫秒)
*/
private long slowTransactionThreshold = 1000;

/**
* 是否启用事务告警
*/
private boolean enableAlert = true;
}
}

3.2 事务监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
// 事务监控服务
@Service
@Slf4j
public class TransactionMonitorService {

private final Map<String, TransactionExecutionInfo> executionInfos = new ConcurrentHashMap<>();
private final AtomicLong totalTransactions = new AtomicLong(0);
private final AtomicLong successfulTransactions = new AtomicLong(0);
private final AtomicLong failedTransactions = new AtomicLong(0);
private final AtomicLong slowTransactions = new AtomicLong(0);

@Autowired
private TransactionProperties transactionProperties;

/**
* 记录事务执行
*/
public void recordTransactionExecution(String transactionId, long executionTime, int subTransactionCount) {
TransactionExecutionInfo info = new TransactionExecutionInfo();
info.setTransactionId(transactionId);
info.setExecutionTime(executionTime);
info.setSubTransactionCount(subTransactionCount);
info.setEndTime(System.currentTimeMillis());
info.setSuccess(true);

executionInfos.put(transactionId, info);

totalTransactions.incrementAndGet();
successfulTransactions.incrementAndGet();

// 检查是否为慢事务
if (executionTime > transactionProperties.getMonitor().getSlowTransactionThreshold()) {
slowTransactions.incrementAndGet();
log.warn("慢事务检测: transactionId={}, executionTime={}ms", transactionId, executionTime);
}

log.info("事务执行完成: transactionId={}, executionTime={}ms, subTransactionCount={}",
transactionId, executionTime, subTransactionCount);
}

/**
* 记录事务失败
*/
public void recordTransactionFailure(String transactionId, String errorMessage) {
TransactionExecutionInfo info = executionInfos.get(transactionId);
if (info != null) {
info.setSuccess(false);
info.setErrorMessage(errorMessage);
info.setEndTime(System.currentTimeMillis());
}

totalTransactions.incrementAndGet();
failedTransactions.incrementAndGet();

log.error("事务执行失败: transactionId={}, error={}", transactionId, errorMessage);
}

/**
* 记录大事务拆分
*/
public void recordLargeTransactionSplit(String transactionId, int originalSize, int splitCount) {
TransactionExecutionInfo info = executionInfos.get(transactionId);
if (info != null) {
info.setOriginalSize(originalSize);
info.setSplitCount(splitCount);
info.setLargeTransactionSplit(true);
}

log.info("大事务拆分: transactionId={}, originalSize={}, splitCount={}",
transactionId, originalSize, splitCount);
}

/**
* 记录异步事务处理
*/
public void recordAsyncTransactionProcessing(String transactionId, AsyncTransactionType type) {
TransactionExecutionInfo info = executionInfos.get(transactionId);
if (info != null) {
info.setAsyncTransaction(true);
info.setAsyncTransactionType(type);
}

log.info("异步事务处理: transactionId={}, type={}", transactionId, type);
}

/**
* 记录分布式事务
*/
public void recordDistributedTransaction(String transactionId, DistributedTransactionType type, int participantCount) {
TransactionExecutionInfo info = executionInfos.get(transactionId);
if (info != null) {
info.setDistributedTransaction(true);
info.setDistributedTransactionType(type);
info.setParticipantCount(participantCount);
}

log.info("分布式事务: transactionId={}, type={}, participantCount={}",
transactionId, type, participantCount);
}

/**
* 获取事务统计
*/
public TransactionStatistics getTransactionStatistics() {
TransactionStatistics stats = new TransactionStatistics();
stats.setTotalTransactions(totalTransactions.get());
stats.setSuccessfulTransactions(successfulTransactions.get());
stats.setFailedTransactions(failedTransactions.get());
stats.setSlowTransactions(slowTransactions.get());
stats.setSuccessRate(calculateSuccessRate());
stats.setAverageExecutionTime(calculateAverageExecutionTime());
stats.setMaxExecutionTime(calculateMaxExecutionTime());
stats.setMinExecutionTime(calculateMinExecutionTime());

return stats;
}

/**
* 获取事务执行信息
*/
public TransactionExecutionInfo getTransactionExecutionInfo(String transactionId) {
return executionInfos.get(transactionId);
}

/**
* 获取所有事务执行信息
*/
public Map<String, TransactionExecutionInfo> getAllTransactionExecutionInfos() {
return new HashMap<>(executionInfos);
}

/**
* 获取慢事务列表
*/
public List<TransactionExecutionInfo> getSlowTransactions() {
long threshold = transactionProperties.getMonitor().getSlowTransactionThreshold();

return executionInfos.values().stream()
.filter(info -> info.getExecutionTime() > threshold)
.sorted((a, b) -> Long.compare(b.getExecutionTime(), a.getExecutionTime()))
.collect(Collectors.toList());
}

/**
* 获取大事务列表
*/
public List<TransactionExecutionInfo> getLargeTransactions() {
return executionInfos.values().stream()
.filter(TransactionExecutionInfo::isLargeTransactionSplit)
.sorted((a, b) -> Long.compare(b.getExecutionTime(), a.getExecutionTime()))
.collect(Collectors.toList());
}

/**
* 清理过期数据
*/
@Scheduled(fixedRate = 3600000) // 每小时执行一次
public void cleanupExpiredData() {
try {
long retentionTime = transactionProperties.getMonitor().getRetentionHours() * 3600000L;
long cutoffTime = System.currentTimeMillis() - retentionTime;

List<String> expiredKeys = executionInfos.entrySet().stream()
.filter(entry -> entry.getValue().getEndTime() < cutoffTime)
.map(Map.Entry::getKey)
.collect(Collectors.toList());

expiredKeys.forEach(executionInfos::remove);

if (!expiredKeys.isEmpty()) {
log.info("清理过期事务数据: count={}", expiredKeys.size());
}

} catch (Exception e) {
log.error("清理过期数据失败", e);
}
}

/**
* 计算成功率
*/
private double calculateSuccessRate() {
long total = totalTransactions.get();
return total > 0 ? (double) successfulTransactions.get() / total : 0.0;
}

/**
* 计算平均执行时间
*/
private double calculateAverageExecutionTime() {
return executionInfos.values().stream()
.mapToLong(TransactionExecutionInfo::getExecutionTime)
.average()
.orElse(0.0);
}

/**
* 计算最大执行时间
*/
private long calculateMaxExecutionTime() {
return executionInfos.values().stream()
.mapToLong(TransactionExecutionInfo::getExecutionTime)
.max()
.orElse(0L);
}

/**
* 计算最小执行时间
*/
private long calculateMinExecutionTime() {
return executionInfos.values().stream()
.mapToLong(TransactionExecutionInfo::getExecutionTime)
.min()
.orElse(0L);
}
}

// 事务执行信息
public class TransactionExecutionInfo {
private String transactionId;
private long executionTime;
private int subTransactionCount;
private int originalSize;
private int splitCount;
private boolean largeTransactionSplit;
private boolean asyncTransaction;
private AsyncTransactionType asyncTransactionType;
private boolean distributedTransaction;
private DistributedTransactionType distributedTransactionType;
private int participantCount;
private long startTime;
private long endTime;
private boolean success;
private String errorMessage;

// 构造函数和getter/setter方法
}

// 事务统计
public class TransactionStatistics {
private long totalTransactions;
private long successfulTransactions;
private long failedTransactions;
private long slowTransactions;
private double successRate;
private double averageExecutionTime;
private long maxExecutionTime;
private long minExecutionTime;

// 构造函数和getter/setter方法
}

// 分布式事务类型
public enum DistributedTransactionType {
TWO_PHASE_COMMIT, // 两阶段提交
SAGA, // Saga模式
TCC // TCC模式
}

四、最佳实践与总结

4.1 SpringBoot大事务优化最佳实践

4.1.1 事务拆分策略

  • 业务拆分:按业务模块拆分大事务
  • 数据拆分:按数据范围拆分大事务
  • 时间拆分:按时间维度拆分大事务
  • 优先级拆分:按业务优先级拆分事务

4.1.2 异步处理策略

  • 消息队列:使用消息队列实现异步处理
  • 事件驱动:基于事件驱动的异步处理
  • 回调处理:异步处理完成后的回调
  • 批量处理:批量处理提高效率

4.1.3 分布式事务策略

  • 两阶段提交:适合强一致性要求
  • Saga模式:适合长事务处理
  • TCC模式:适合高并发场景
  • 最终一致性:适合对一致性要求不高的场景

4.1.4 性能优化策略

  • 连接池优化:合理配置数据库连接池
  • 事务超时:设置合理的事务超时时间
  • 批量操作:使用批量操作减少数据库交互
  • 索引优化:优化数据库索引提高查询效率

4.2 架构演进建议

4.1.1 微服务架构演进

  • 服务拆分:将大事务拆分为多个微服务
  • 服务治理:实现服务的注册发现、负载均衡
  • 数据一致性:使用分布式事务保证数据一致性
  • 容错处理:实现熔断、降级、重试机制

4.2.2 云原生架构演进

  • 容器化部署:使用Docker等容器技术部署
  • 弹性伸缩:实现基于负载的自动扩缩容
  • 服务网格:使用Istio等服务网格技术
  • 云原生存储:使用云原生的存储服务

4.2.3 智能化事务管理

  • AI驱动优化:使用机器学习优化事务策略
  • 自适应事务:根据系统负载自动调整事务策略
  • 预测性事务:预测事务需求并提前准备
  • 智能监控:基于AI的智能监控和告警

4.3 总结

SpringBoot大事务优化是企业级应用性能提升的重要手段,通过合理的事务拆分,完善的异步处理,可靠的分布式事务协调,可以实现高效、稳定的事务处理解决方案。随着微服务架构和云原生技术的普及,事务管理将更加智能化和自动化。

在未来的发展中,企业需要持续关注技术发展趋势,不断优化和完善事务处理策略,以适应不断变化的业务需求和技术环境。通过本文的深入分析和实践指导,希望能够为企业构建高质量的大事务优化解决方案提供有价值的参考和帮助,推动企业级应用在事务处理场景下的稳定运行和持续发展。