前言

线程池异常处理作为企业级并发编程的核心技能之一,直接影响着系统的稳定性和可靠性。通过优雅的异常处理策略,完善的监控告警机制,能够确保线程池任务的稳定执行,及时发现和处理异常情况,保障企业级应用的高可用性。本文从异常捕获机制到监控告警,从基础原理到企业级实践,系统梳理线程池异常处理的完整解决方案。

一、线程池异常处理架构设计

1.1 异常处理整体架构

1.2 异常处理核心组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/**
* 线程池异常处理核心组件
*/
@Component
public class ThreadPoolExceptionHandler {

@Autowired
private ExceptionClassifier exceptionClassifier;

@Autowired
private ExceptionProcessor exceptionProcessor;

@Autowired
private ExceptionMonitor exceptionMonitor;

@Autowired
private ExceptionLogger exceptionLogger;

/**
* 处理线程池异常
*/
public void handleException(ThreadPoolExecutor executor,
Runnable task,
Throwable throwable) {
try {
// 1. 异常分类
ExceptionType type = exceptionClassifier.classify(throwable);

// 2. 记录异常日志
exceptionLogger.logException(task, throwable, type);

// 3. 异常处理
ExceptionHandleResult result = exceptionProcessor.process(throwable, type);

// 4. 监控异常
exceptionMonitor.recordException(throwable, type, result);

// 5. 根据处理结果决定后续动作
handleExceptionResult(executor, task, throwable, result);

} catch (Exception e) {
log.error("异常处理失败", e);
// 降级处理
handleFallback(executor, task, throwable);
}
}

/**
* 处理异常结果
*/
private void handleExceptionResult(ThreadPoolExecutor executor,
Runnable task,
Throwable throwable,
ExceptionHandleResult result) {
switch (result.getAction()) {
case RETRY:
retryTask(executor, task, result.getRetryCount());
break;
case SKIP:
log.warn("跳过任务执行: {}", task);
break;
case FAIL:
log.error("任务执行失败: {}", task, throwable);
break;
case DEGRADE:
degradeTask(executor, task);
break;
case RECOVER:
recoverTask(executor, task);
break;
}
}

/**
* 重试任务
*/
private void retryTask(ThreadPoolExecutor executor, Runnable task, int retryCount) {
if (retryCount > 0) {
try {
Thread.sleep(1000); // 延迟重试
executor.submit(task);
log.info("任务重试提交成功,剩余重试次数: {}", retryCount - 1);
} catch (Exception e) {
log.error("任务重试失败", e);
}
}
}

/**
* 降级处理
*/
private void degradeTask(ThreadPoolExecutor executor, Runnable task) {
try {
// 执行降级任务
Runnable degradedTask = createDegradedTask(task);
executor.submit(degradedTask);
log.info("任务降级处理完成");
} catch (Exception e) {
log.error("任务降级失败", e);
}
}

/**
* 恢复任务
*/
private void recoverTask(ThreadPoolExecutor executor, Runnable task) {
try {
// 执行恢复逻辑
Runnable recoveredTask = createRecoveredTask(task);
executor.submit(recoveredTask);
log.info("任务恢复处理完成");
} catch (Exception e) {
log.error("任务恢复失败", e);
}
}

/**
* 降级处理
*/
private void handleFallback(ThreadPoolExecutor executor, Runnable task, Throwable throwable) {
log.error("异常处理降级,任务: {}, 异常: {}", task, throwable.getMessage());
// 记录到死信队列或持久化存储
recordToDeadLetterQueue(task, throwable);
}
}

二、异常分类与处理策略

2.1 异常分类器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/**
* 异常分类器
*/
@Component
public class ExceptionClassifier {

private final Map<Class<? extends Throwable>, ExceptionType> exceptionTypeMap;

public ExceptionClassifier() {
this.exceptionTypeMap = new HashMap<>();
initializeExceptionTypeMap();
}

/**
* 初始化异常类型映射
*/
private void initializeExceptionTypeMap() {
// 系统异常
exceptionTypeMap.put(OutOfMemoryError.class, ExceptionType.SYSTEM_ERROR);
exceptionTypeMap.put(StackOverflowError.class, ExceptionType.SYSTEM_ERROR);
exceptionTypeMap.put(NoClassDefFoundError.class, ExceptionType.SYSTEM_ERROR);

// 运行时异常
exceptionTypeMap.put(NullPointerException.class, ExceptionType.RUNTIME_ERROR);
exceptionTypeMap.put(IllegalArgumentException.class, ExceptionType.RUNTIME_ERROR);
exceptionTypeMap.put(IllegalStateException.class, ExceptionType.RUNTIME_ERROR);
exceptionTypeMap.put(IndexOutOfBoundsException.class, ExceptionType.RUNTIME_ERROR);
exceptionTypeMap.put(ConcurrentModificationException.class, ExceptionType.RUNTIME_ERROR);

// 网络异常
exceptionTypeMap.put(ConnectException.class, ExceptionType.NETWORK_ERROR);
exceptionTypeMap.put(SocketTimeoutException.class, ExceptionType.NETWORK_ERROR);
exceptionTypeMap.put(UnknownHostException.class, ExceptionType.NETWORK_ERROR);
exceptionTypeMap.put(IOException.class, ExceptionType.NETWORK_ERROR);

// 数据库异常
exceptionTypeMap.put(SQLException.class, ExceptionType.DATABASE_ERROR);
exceptionTypeMap.put(DataAccessException.class, ExceptionType.DATABASE_ERROR);
exceptionTypeMap.put(TransactionException.class, ExceptionType.DATABASE_ERROR);

// 业务异常
exceptionTypeMap.put(BusinessException.class, ExceptionType.BUSINESS_ERROR);
exceptionTypeMap.put(ValidationException.class, ExceptionType.BUSINESS_ERROR);
exceptionTypeMap.put(AuthorizationException.class, ExceptionType.BUSINESS_ERROR);

// 超时异常
exceptionTypeMap.put(TimeoutException.class, ExceptionType.TIMEOUT_ERROR);
exceptionTypeMap.put(InterruptedException.class, ExceptionType.TIMEOUT_ERROR);

// 资源异常
exceptionTypeMap.put(RejectedExecutionException.class, ExceptionType.RESOURCE_ERROR);
exceptionTypeMap.put(OutOfMemoryError.class, ExceptionType.RESOURCE_ERROR);
}

/**
* 分类异常
*/
public ExceptionType classify(Throwable throwable) {
if (throwable == null) {
return ExceptionType.UNKNOWN;
}

// 直接匹配
ExceptionType type = exceptionTypeMap.get(throwable.getClass());
if (type != null) {
return type;
}

// 检查父类
Class<?> currentClass = throwable.getClass().getSuperclass();
while (currentClass != null && !currentClass.equals(Object.class)) {
type = exceptionTypeMap.get(currentClass);
if (type != null) {
return type;
}
currentClass = currentClass.getSuperclass();
}

// 检查接口
Class<?>[] interfaces = throwable.getClass().getInterfaces();
for (Class<?> interfaceClass : interfaces) {
type = exceptionTypeMap.get(interfaceClass);
if (type != null) {
return type;
}
}

// 基于异常消息分类
return classifyByMessage(throwable.getMessage());
}

/**
* 基于异常消息分类
*/
private ExceptionType classifyByMessage(String message) {
if (message == null) {
return ExceptionType.UNKNOWN;
}

String lowerMessage = message.toLowerCase();

if (lowerMessage.contains("timeout") || lowerMessage.contains("超时")) {
return ExceptionType.TIMEOUT_ERROR;
} else if (lowerMessage.contains("connection") || lowerMessage.contains("连接")) {
return ExceptionType.NETWORK_ERROR;
} else if (lowerMessage.contains("database") || lowerMessage.contains("数据库")) {
return ExceptionType.DATABASE_ERROR;
} else if (lowerMessage.contains("memory") || lowerMessage.contains("内存")) {
return ExceptionType.RESOURCE_ERROR;
} else if (lowerMessage.contains("permission") || lowerMessage.contains("权限")) {
return ExceptionType.BUSINESS_ERROR;
} else if (lowerMessage.contains("null") || lowerMessage.contains("空指针")) {
return ExceptionType.RUNTIME_ERROR;
}

return ExceptionType.UNKNOWN;
}

/**
* 异常类型枚举
*/
public enum ExceptionType {
SYSTEM_ERROR("系统错误", "系统级别的错误,如内存溢出、栈溢出等"),
RUNTIME_ERROR("运行时错误", "程序运行时错误,如空指针、数组越界等"),
NETWORK_ERROR("网络错误", "网络连接相关错误"),
DATABASE_ERROR("数据库错误", "数据库操作相关错误"),
BUSINESS_ERROR("业务错误", "业务逻辑相关错误"),
TIMEOUT_ERROR("超时错误", "操作超时相关错误"),
RESOURCE_ERROR("资源错误", "资源不足或资源访问错误"),
UNKNOWN("未知错误", "无法分类的异常");

private final String name;
private final String description;

ExceptionType(String name, String description) {
this.name = name;
this.description = description;
}

public String getName() {
return name;
}

public String getDescription() {
return description;
}
}
}

2.2 异常处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/**
* 异常处理器
*/
@Component
public class ExceptionProcessor {

@Autowired
private RetryStrategyService retryStrategyService;

@Autowired
private DegradeStrategyService degradeStrategyService;

@Autowired
private RecoverStrategyService recoverStrategyService;

/**
* 处理异常
*/
public ExceptionHandleResult process(Throwable throwable, ExceptionClassifier.ExceptionType type) {
ExceptionHandleResult result = new ExceptionHandleResult();

try {
// 1. 根据异常类型选择处理策略
HandleStrategy strategy = selectHandleStrategy(type);
result.setStrategy(strategy);

// 2. 执行处理策略
switch (strategy) {
case RETRY:
RetryResult retryResult = retryStrategyService.handleRetry(throwable, type);
result.setAction(ActionType.RETRY);
result.setRetryCount(retryResult.getRetryCount());
result.setRetryDelay(retryResult.getRetryDelay());
break;

case DEGRADE:
DegradeResult degradeResult = degradeStrategyService.handleDegrade(throwable, type);
result.setAction(ActionType.DEGRADE);
result.setDegradeLevel(degradeResult.getDegradeLevel());
break;

case RECOVER:
RecoverResult recoverResult = recoverStrategyService.handleRecover(throwable, type);
result.setAction(ActionType.RECOVER);
result.setRecoverMethod(recoverResult.getRecoverMethod());
break;

case SKIP:
result.setAction(ActionType.SKIP);
result.setSkipReason("异常类型不支持重试");
break;

case FAIL:
default:
result.setAction(ActionType.FAIL);
result.setFailReason("异常处理失败");
break;
}

// 3. 设置处理结果
result.setSuccess(true);
result.setProcessTime(System.currentTimeMillis());

return result;

} catch (Exception e) {
log.error("异常处理失败", e);
result.setSuccess(false);
result.setAction(ActionType.FAIL);
result.setFailReason("异常处理异常: " + e.getMessage());
return result;
}
}

/**
* 选择处理策略
*/
private HandleStrategy selectHandleStrategy(ExceptionClassifier.ExceptionType type) {
switch (type) {
case NETWORK_ERROR:
case TIMEOUT_ERROR:
return HandleStrategy.RETRY;

case DATABASE_ERROR:
return HandleStrategy.RETRY;

case BUSINESS_ERROR:
return HandleStrategy.DEGRADE;

case RESOURCE_ERROR:
return HandleStrategy.RECOVER;

case SYSTEM_ERROR:
return HandleStrategy.FAIL;

case RUNTIME_ERROR:
return HandleStrategy.SKIP;

case UNKNOWN:
default:
return HandleStrategy.FAIL;
}
}

/**
* 处理策略枚举
*/
public enum HandleStrategy {
RETRY("重试", "对可重试的异常进行重试处理"),
DEGRADE("降级", "对业务异常进行降级处理"),
RECOVER("恢复", "对资源异常进行恢复处理"),
SKIP("跳过", "跳过不可恢复的异常"),
FAIL("失败", "标记任务执行失败");

private final String name;
private final String description;

HandleStrategy(String name, String description) {
this.name = name;
this.description = description;
}

public String getName() {
return name;
}

public String getDescription() {
return description;
}
}

/**
* 动作类型枚举
*/
public enum ActionType {
RETRY("重试"),
DEGRADE("降级"),
RECOVER("恢复"),
SKIP("跳过"),
FAIL("失败");

private final String name;

ActionType(String name) {
this.name = name;
}

public String getName() {
return name;
}
}
}

2.3 重试策略服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
/**
* 重试策略服务
*/
@Service
public class RetryStrategyService {

@Autowired
private RetryConfigService retryConfigService;

/**
* 处理重试
*/
public RetryResult handleRetry(Throwable throwable, ExceptionClassifier.ExceptionType type) {
RetryResult result = new RetryResult();

try {
// 1. 获取重试配置
RetryConfig config = retryConfigService.getRetryConfig(type);

// 2. 检查是否应该重试
if (!shouldRetry(throwable, config)) {
result.setRetryCount(0);
result.setRetryDelay(0);
return result;
}

// 3. 计算重试次数
int retryCount = calculateRetryCount(throwable, config);
result.setRetryCount(retryCount);

// 4. 计算重试延迟
long retryDelay = calculateRetryDelay(retryCount, config);
result.setRetryDelay(retryDelay);

// 5. 记录重试信息
logRetryInfo(throwable, retryCount, retryDelay);

return result;

} catch (Exception e) {
log.error("重试策略处理失败", e);
result.setRetryCount(0);
result.setRetryDelay(0);
return result;
}
}

/**
* 检查是否应该重试
*/
private boolean shouldRetry(Throwable throwable, RetryConfig config) {
// 检查最大重试次数
if (config.getMaxRetryCount() <= 0) {
return false;
}

// 检查重试间隔
if (config.getRetryInterval() <= 0) {
return false;
}

// 检查异常类型是否支持重试
if (!config.getRetryableExceptionTypes().contains(throwable.getClass())) {
return false;
}

// 检查异常消息是否包含不可重试的关键词
String message = throwable.getMessage();
if (message != null) {
for (String keyword : config.getNonRetryableKeywords()) {
if (message.toLowerCase().contains(keyword.toLowerCase())) {
return false;
}
}
}

return true;
}

/**
* 计算重试次数
*/
private int calculateRetryCount(Throwable throwable, RetryConfig config) {
int baseRetryCount = config.getMaxRetryCount();

// 根据异常类型调整重试次数
if (throwable instanceof TimeoutException) {
return Math.min(baseRetryCount, 3); // 超时异常最多重试3次
} else if (throwable instanceof ConnectException) {
return Math.min(baseRetryCount, 5); // 连接异常最多重试5次
} else if (throwable instanceof SQLException) {
return Math.min(baseRetryCount, 2); // 数据库异常最多重试2次
}

return baseRetryCount;
}

/**
* 计算重试延迟
*/
private long calculateRetryDelay(int retryCount, RetryConfig config) {
long baseDelay = config.getRetryInterval();

// 指数退避策略
if (config.isExponentialBackoff()) {
return baseDelay * (long) Math.pow(2, retryCount - 1);
}

// 固定延迟策略
return baseDelay;
}

/**
* 记录重试信息
*/
private void logRetryInfo(Throwable throwable, int retryCount, long retryDelay) {
log.warn("异常重试处理 - 异常类型: {}, 重试次数: {}, 重试延迟: {}ms, 异常信息: {}",
throwable.getClass().getSimpleName(), retryCount, retryDelay, throwable.getMessage());
}
}

三、自定义线程池异常处理

3.1 自定义线程池执行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/**
* 自定义线程池执行器
*/
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {

private final ExceptionHandler exceptionHandler;
private final TaskMonitor taskMonitor;

public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler,
ExceptionHandler exceptionHandler,
TaskMonitor taskMonitor) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.exceptionHandler = exceptionHandler;
this.taskMonitor = taskMonitor;
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);

try {
// 记录任务开始执行
taskMonitor.recordTaskStart(r);

// 设置线程上下文
setThreadContext(t, r);

} catch (Exception e) {
log.error("任务执行前处理失败", e);
}
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);

try {
// 记录任务执行完成
taskMonitor.recordTaskComplete(r);

// 处理异常
if (t != null) {
exceptionHandler.handleException(this, r, t);
}

// 清理线程上下文
clearThreadContext();

} catch (Exception e) {
log.error("任务执行后处理失败", e);
}
}

@Override
protected void terminated() {
super.terminated();

try {
// 线程池终止处理
taskMonitor.recordThreadPoolTerminated();

} catch (Exception e) {
log.error("线程池终止处理失败", e);
}
}

/**
* 设置线程上下文
*/
private void setThreadContext(Thread thread, Runnable task) {
// 设置线程名称
String originalName = thread.getName();
thread.setName(originalName + "-" + task.getClass().getSimpleName());

// 设置线程本地变量
ThreadLocalContext.setTaskId(generateTaskId());
ThreadLocalContext.setStartTime(System.currentTimeMillis());
ThreadLocalContext.setTask(task);
}

/**
* 清理线程上下文
*/
private void clearThreadContext() {
ThreadLocalContext.clear();
}

/**
* 生成任务ID
*/
private String generateTaskId() {
return "task-" + System.currentTimeMillis() + "-" + Thread.currentThread().getId();
}
}

3.2 异常处理器接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 异常处理器接口
*/
public interface ExceptionHandler {

/**
* 处理异常
*/
void handleException(ThreadPoolExecutor executor, Runnable task, Throwable throwable);

/**
* 处理未捕获异常
*/
void handleUncaughtException(Thread thread, Throwable throwable);

/**
* 处理拒绝执行异常
*/
void handleRejectedExecution(Runnable task, ThreadPoolExecutor executor);
}

3.3 默认异常处理器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
/**
* 默认异常处理器实现
*/
@Component
public class DefaultExceptionHandler implements ExceptionHandler {

@Autowired
private ExceptionClassifier exceptionClassifier;

@Autowired
private ExceptionProcessor exceptionProcessor;

@Autowired
private ExceptionLogger exceptionLogger;

@Autowired
private ExceptionMonitor exceptionMonitor;

@Override
public void handleException(ThreadPoolExecutor executor, Runnable task, Throwable throwable) {
try {
// 1. 异常分类
ExceptionClassifier.ExceptionType type = exceptionClassifier.classify(throwable);

// 2. 记录异常日志
exceptionLogger.logException(task, throwable, type);

// 3. 异常处理
ExceptionHandleResult result = exceptionProcessor.process(throwable, type);

// 4. 监控异常
exceptionMonitor.recordException(throwable, type, result);

// 5. 根据处理结果决定后续动作
handleExceptionResult(executor, task, throwable, result);

} catch (Exception e) {
log.error("异常处理失败", e);
// 降级处理
handleFallback(executor, task, throwable);
}
}

@Override
public void handleUncaughtException(Thread thread, Throwable throwable) {
try {
log.error("线程未捕获异常: {}", thread.getName(), throwable);

// 记录异常信息
exceptionLogger.logUncaughtException(thread, throwable);

// 监控异常
exceptionMonitor.recordUncaughtException(thread, throwable);

// 根据异常类型决定是否终止线程
if (shouldTerminateThread(throwable)) {
log.error("严重异常,终止线程: {}", thread.getName());
thread.interrupt();
}

} catch (Exception e) {
log.error("未捕获异常处理失败", e);
}
}

@Override
public void handleRejectedExecution(Runnable task, ThreadPoolExecutor executor) {
try {
log.warn("任务被拒绝执行: {}, 线程池状态: active={}, queue={}",
task.getClass().getSimpleName(),
executor.getActiveCount(),
executor.getQueue().size());

// 记录拒绝执行信息
exceptionLogger.logRejectedExecution(task, executor);

// 监控拒绝执行
exceptionMonitor.recordRejectedExecution(task, executor);

// 尝试降级处理
handleRejectedExecutionFallback(task, executor);

} catch (Exception e) {
log.error("拒绝执行处理失败", e);
}
}

/**
* 处理异常结果
*/
private void handleExceptionResult(ThreadPoolExecutor executor,
Runnable task,
Throwable throwable,
ExceptionHandleResult result) {
switch (result.getAction()) {
case RETRY:
retryTask(executor, task, result.getRetryCount(), result.getRetryDelay());
break;
case SKIP:
log.warn("跳过任务执行: {}", task);
break;
case FAIL:
log.error("任务执行失败: {}", task, throwable);
break;
case DEGRADE:
degradeTask(executor, task);
break;
case RECOVER:
recoverTask(executor, task);
break;
}
}

/**
* 重试任务
*/
private void retryTask(ThreadPoolExecutor executor, Runnable task, int retryCount, long retryDelay) {
if (retryCount > 0) {
try {
// 延迟重试
Thread.sleep(retryDelay);

// 提交重试任务
executor.submit(task);

log.info("任务重试提交成功,剩余重试次数: {}", retryCount - 1);

} catch (Exception e) {
log.error("任务重试失败", e);
}
}
}

/**
* 降级处理
*/
private void degradeTask(ThreadPoolExecutor executor, Runnable task) {
try {
// 创建降级任务
Runnable degradedTask = createDegradedTask(task);

// 提交降级任务
executor.submit(degradedTask);

log.info("任务降级处理完成");

} catch (Exception e) {
log.error("任务降级失败", e);
}
}

/**
* 恢复任务
*/
private void recoverTask(ThreadPoolExecutor executor, Runnable task) {
try {
// 创建恢复任务
Runnable recoveredTask = createRecoveredTask(task);

// 提交恢复任务
executor.submit(recoveredTask);

log.info("任务恢复处理完成");

} catch (Exception e) {
log.error("任务恢复失败", e);
}
}

/**
* 处理拒绝执行降级
*/
private void handleRejectedExecutionFallback(Runnable task, ThreadPoolExecutor executor) {
try {
// 尝试使用备用线程池
if (hasBackupThreadPool()) {
submitToBackupThreadPool(task);
log.info("任务提交到备用线程池");
return;
}

// 尝试直接执行
if (canExecuteDirectly(task)) {
executeDirectly(task);
log.info("任务直接执行");
return;
}

// 记录到死信队列
recordToDeadLetterQueue(task, new RejectedExecutionException("任务被拒绝执行"));
log.warn("任务记录到死信队列");

} catch (Exception e) {
log.error("拒绝执行降级处理失败", e);
}
}

/**
* 检查是否应该终止线程
*/
private boolean shouldTerminateThread(Throwable throwable) {
// 系统级异常需要终止线程
if (throwable instanceof OutOfMemoryError) {
return true;
}

if (throwable instanceof StackOverflowError) {
return true;
}

if (throwable instanceof NoClassDefFoundError) {
return true;
}

return false;
}

/**
* 创建降级任务
*/
private Runnable createDegradedTask(Runnable originalTask) {
return () -> {
try {
log.info("执行降级任务: {}", originalTask.getClass().getSimpleName());
// 实现降级逻辑
// 例如:返回默认值、使用缓存数据等
} catch (Exception e) {
log.error("降级任务执行失败", e);
}
};
}

/**
* 创建恢复任务
*/
private Runnable createRecoveredTask(Runnable originalTask) {
return () -> {
try {
log.info("执行恢复任务: {}", originalTask.getClass().getSimpleName());
// 实现恢复逻辑
// 例如:重新初始化资源、清理状态等
} catch (Exception e) {
log.error("恢复任务执行失败", e);
}
};
}
}

四、异常监控与告警

4.1 异常监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/**
* 异常监控服务
*/
@Service
public class ExceptionMonitorService {

private final MeterRegistry meterRegistry;
private final Counter exceptionCounter;
private final Timer exceptionTimer;
private final Gauge exceptionGauge;

public ExceptionMonitorService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.exceptionCounter = Counter.builder("threadpool.exception.count")
.description("线程池异常计数")
.register(meterRegistry);
this.exceptionTimer = Timer.builder("threadpool.exception.duration")
.description("线程池异常处理耗时")
.register(meterRegistry);
this.exceptionGauge = Gauge.builder("threadpool.exception.rate")
.description("线程池异常率")
.register(meterRegistry, this, ExceptionMonitorService::getExceptionRate);
}

/**
* 记录异常
*/
public void recordException(Throwable throwable,
ExceptionClassifier.ExceptionType type,
ExceptionHandleResult result) {
try {
// 1. 记录异常计数
exceptionCounter.increment(
Tags.of(
"exception_type", type.name(),
"exception_class", throwable.getClass().getSimpleName(),
"action", result.getAction().name()
)
);

// 2. 记录异常处理耗时
exceptionTimer.record(result.getProcessTime(), TimeUnit.MILLISECONDS);

// 3. 记录异常详情
recordExceptionDetails(throwable, type, result);

// 4. 检查异常阈值
checkExceptionThreshold(type);

} catch (Exception e) {
log.error("异常监控记录失败", e);
}
}

/**
* 记录异常详情
*/
private void recordExceptionDetails(Throwable throwable,
ExceptionClassifier.ExceptionType type,
ExceptionHandleResult result) {
ExceptionDetail detail = new ExceptionDetail();
detail.setExceptionType(type);
detail.setExceptionClass(throwable.getClass().getName());
detail.setExceptionMessage(throwable.getMessage());
detail.setStackTrace(getStackTrace(throwable));
detail.setAction(result.getAction());
detail.setProcessTime(result.getProcessTime());
detail.setTimestamp(System.currentTimeMillis());

// 存储异常详情
storeExceptionDetail(detail);
}

/**
* 检查异常阈值
*/
private void checkExceptionThreshold(ExceptionClassifier.ExceptionType type) {
try {
// 获取异常统计
ExceptionStatistics stats = getExceptionStatistics(type);

// 检查异常率阈值
if (stats.getExceptionRate() > getExceptionRateThreshold(type)) {
sendExceptionAlert(type, stats);
}

// 检查异常数量阈值
if (stats.getExceptionCount() > getExceptionCountThreshold(type)) {
sendExceptionAlert(type, stats);
}

} catch (Exception e) {
log.error("异常阈值检查失败", e);
}
}

/**
* 获取异常统计
*/
private ExceptionStatistics getExceptionStatistics(ExceptionClassifier.ExceptionType type) {
ExceptionStatistics stats = new ExceptionStatistics();

// 获取异常计数
long exceptionCount = exceptionCounter.count(
Tags.of("exception_type", type.name())
);
stats.setExceptionCount(exceptionCount);

// 获取异常率
double exceptionRate = getExceptionRate();
stats.setExceptionRate(exceptionRate);

// 获取异常趋势
ExceptionTrend trend = getExceptionTrend(type);
stats.setTrend(trend);

return stats;
}

/**
* 获取异常率
*/
private double getExceptionRate() {
// 实现异常率计算逻辑
return 0.0;
}

/**
* 获取异常趋势
*/
private ExceptionTrend getExceptionTrend(ExceptionClassifier.ExceptionType type) {
ExceptionTrend trend = new ExceptionTrend();

// 实现异常趋势分析逻辑
trend.setDirection(TrendDirection.STABLE);
trend.setChangeRate(0.0);

return trend;
}

/**
* 发送异常告警
*/
private void sendExceptionAlert(ExceptionClassifier.ExceptionType type, ExceptionStatistics stats) {
try {
ExceptionAlert alert = new ExceptionAlert();
alert.setExceptionType(type);
alert.setStatistics(stats);
alert.setTimestamp(System.currentTimeMillis());
alert.setSeverity(calculateAlertSeverity(type, stats));

// 发送告警
sendAlert(alert);

} catch (Exception e) {
log.error("异常告警发送失败", e);
}
}

/**
* 计算告警严重程度
*/
private AlertSeverity calculateAlertSeverity(ExceptionClassifier.ExceptionType type, ExceptionStatistics stats) {
if (stats.getExceptionRate() > 0.5) {
return AlertSeverity.CRITICAL;
} else if (stats.getExceptionRate() > 0.2) {
return AlertSeverity.HIGH;
} else if (stats.getExceptionRate() > 0.1) {
return AlertSeverity.MEDIUM;
} else {
return AlertSeverity.LOW;
}
}

/**
* 发送告警
*/
private void sendAlert(ExceptionAlert alert) {
// 实现告警发送逻辑
log.warn("发送异常告警: {}", alert);
}

/**
* 存储异常详情
*/
private void storeExceptionDetail(ExceptionDetail detail) {
// 实现异常详情存储逻辑
}

/**
* 获取异常率阈值
*/
private double getExceptionRateThreshold(ExceptionClassifier.ExceptionType type) {
switch (type) {
case SYSTEM_ERROR:
return 0.01; // 1%
case RUNTIME_ERROR:
return 0.05; // 5%
case NETWORK_ERROR:
return 0.1; // 10%
case DATABASE_ERROR:
return 0.05; // 5%
case BUSINESS_ERROR:
return 0.2; // 20%
default:
return 0.1; // 10%
}
}

/**
* 获取异常数量阈值
*/
private long getExceptionCountThreshold(ExceptionClassifier.ExceptionType type) {
switch (type) {
case SYSTEM_ERROR:
return 10;
case RUNTIME_ERROR:
return 50;
case NETWORK_ERROR:
return 100;
case DATABASE_ERROR:
return 50;
case BUSINESS_ERROR:
return 200;
default:
return 100;
}
}
}

4.2 异常日志服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/**
* 异常日志服务
*/
@Service
public class ExceptionLoggerService {

private final Logger logger = LoggerFactory.getLogger(ExceptionLoggerService.class);

@Autowired
private LogStorageService logStorageService;

/**
* 记录异常日志
*/
public void logException(Runnable task, Throwable throwable, ExceptionClassifier.ExceptionType type) {
try {
// 1. 构建异常日志
ExceptionLog exceptionLog = buildExceptionLog(task, throwable, type);

// 2. 记录到日志文件
logToFile(exceptionLog);

// 3. 存储到数据库
logStorageService.storeExceptionLog(exceptionLog);

// 4. 发送到日志收集系统
sendToLogCollector(exceptionLog);

} catch (Exception e) {
logger.error("异常日志记录失败", e);
}
}

/**
* 记录未捕获异常日志
*/
public void logUncaughtException(Thread thread, Throwable throwable) {
try {
UncaughtExceptionLog log = new UncaughtExceptionLog();
log.setThreadName(thread.getName());
log.setThreadId(thread.getId());
log.setExceptionClass(throwable.getClass().getName());
log.setExceptionMessage(throwable.getMessage());
log.setStackTrace(getStackTrace(throwable));
log.setTimestamp(System.currentTimeMillis());

// 记录日志
logger.error("线程未捕获异常: {}", log);

// 存储日志
logStorageService.storeUncaughtExceptionLog(log);

} catch (Exception e) {
logger.error("未捕获异常日志记录失败", e);
}
}

/**
* 记录拒绝执行日志
*/
public void logRejectedExecution(Runnable task, ThreadPoolExecutor executor) {
try {
RejectedExecutionLog log = new RejectedExecutionLog();
log.setTaskClass(task.getClass().getName());
log.setThreadPoolName(executor.toString());
log.setActiveCount(executor.getActiveCount());
log.setQueueSize(executor.getQueue().size());
log.setCorePoolSize(executor.getCorePoolSize());
log.setMaximumPoolSize(executor.getMaximumPoolSize());
log.setTimestamp(System.currentTimeMillis());

// 记录日志
logger.warn("任务被拒绝执行: {}", log);

// 存储日志
logStorageService.storeRejectedExecutionLog(log);

} catch (Exception e) {
logger.error("拒绝执行日志记录失败", e);
}
}

/**
* 构建异常日志
*/
private ExceptionLog buildExceptionLog(Runnable task, Throwable throwable, ExceptionClassifier.ExceptionType type) {
ExceptionLog log = new ExceptionLog();

// 基本信息
log.setTaskClass(task.getClass().getName());
log.setExceptionType(type);
log.setExceptionClass(throwable.getClass().getName());
log.setExceptionMessage(throwable.getMessage());
log.setStackTrace(getStackTrace(throwable));
log.setTimestamp(System.currentTimeMillis());

// 线程信息
Thread currentThread = Thread.currentThread();
log.setThreadName(currentThread.getName());
log.setThreadId(currentThread.getId());

// 任务信息
log.setTaskId(ThreadLocalContext.getTaskId());
log.setTaskStartTime(ThreadLocalContext.getStartTime());
log.setTaskDuration(System.currentTimeMillis() - ThreadLocalContext.getStartTime());

// 系统信息
log.setHostName(getHostName());
log.setProcessId(getProcessId());

return log;
}

/**
* 记录到日志文件
*/
private void logToFile(ExceptionLog log) {
if (log.getExceptionType() == ExceptionClassifier.ExceptionType.SYSTEM_ERROR) {
logger.error("系统异常: {}", log);
} else if (log.getExceptionType() == ExceptionClassifier.ExceptionType.RUNTIME_ERROR) {
logger.error("运行时异常: {}", log);
} else if (log.getExceptionType() == ExceptionClassifier.ExceptionType.NETWORK_ERROR) {
logger.warn("网络异常: {}", log);
} else if (log.getExceptionType() == ExceptionClassifier.ExceptionType.DATABASE_ERROR) {
logger.warn("数据库异常: {}", log);
} else if (log.getExceptionType() == ExceptionClassifier.ExceptionType.BUSINESS_ERROR) {
logger.info("业务异常: {}", log);
} else {
logger.warn("未知异常: {}", log);
}
}

/**
* 发送到日志收集系统
*/
private void sendToLogCollector(ExceptionLog log) {
try {
// 实现日志收集系统发送逻辑
// 例如:发送到ELK、Fluentd等
} catch (Exception e) {
logger.error("发送到日志收集系统失败", e);
}
}

/**
* 获取堆栈跟踪
*/
private String getStackTrace(Throwable throwable) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
throwable.printStackTrace(pw);
return sw.toString();
}

/**
* 获取主机名
*/
private String getHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
return "unknown";
}
}

/**
* 获取进程ID
*/
private String getProcessId() {
try {
return ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
} catch (Exception e) {
return "unknown";
}
}
}

五、企业级线程池异常处理方案

5.1 线程池工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/**
* 线程池工厂
*/
@Component
public class ThreadPoolFactory {

@Autowired
private ExceptionHandler exceptionHandler;

@Autowired
private TaskMonitor taskMonitor;

/**
* 创建自定义线程池
*/
public CustomThreadPoolExecutor createCustomThreadPool(ThreadPoolConfig config) {
// 1. 创建线程工厂
ThreadFactory threadFactory = createThreadFactory(config);

// 2. 创建拒绝执行处理器
RejectedExecutionHandler rejectedHandler = createRejectedExecutionHandler(config);

// 3. 创建自定义线程池执行器
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(
config.getCorePoolSize(),
config.getMaximumPoolSize(),
config.getKeepAliveTime(),
config.getTimeUnit(),
config.getWorkQueue(),
threadFactory,
rejectedHandler,
exceptionHandler,
taskMonitor
);

// 4. 设置线程池属性
executor.setThreadNamePrefix(config.getThreadNamePrefix());
executor.setAllowCoreThreadTimeOut(config.isAllowCoreThreadTimeOut());

return executor;
}

/**
* 创建线程工厂
*/
private ThreadFactory createThreadFactory(ThreadPoolConfig config) {
return new CustomThreadFactory(config.getThreadNamePrefix());
}

/**
* 创建拒绝执行处理器
*/
private RejectedExecutionHandler createRejectedExecutionHandler(ThreadPoolConfig config) {
switch (config.getRejectedExecutionPolicy()) {
case CALLER_RUNS:
return new ThreadPoolExecutor.CallerRunsPolicy();
case ABORT:
return new ThreadPoolExecutor.AbortPolicy();
case DISCARD:
return new ThreadPoolExecutor.DiscardPolicy();
case DISCARD_OLDEST:
return new ThreadPoolExecutor.DiscardOldestPolicy();
case CUSTOM:
return new CustomRejectedExecutionHandler(exceptionHandler);
default:
return new ThreadPoolExecutor.CallerRunsPolicy();
}
}

/**
* 自定义线程工厂
*/
private static class CustomThreadFactory implements ThreadFactory {
private final String threadNamePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);

public CustomThreadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, threadNamePrefix + "-" + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setUncaughtExceptionHandler(new CustomUncaughtExceptionHandler());
return thread;
}
}

/**
* 自定义未捕获异常处理器
*/
private static class CustomUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
@Override
public void uncaughtException(Thread t, Throwable e) {
log.error("线程未捕获异常: {}", t.getName(), e);
// 可以在这里添加异常处理逻辑
}
}

/**
* 自定义拒绝执行处理器
*/
private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
private final ExceptionHandler exceptionHandler;

public CustomRejectedExecutionHandler(ExceptionHandler exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}

@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
exceptionHandler.handleRejectedExecution(r, executor);
}
}
}

5.2 线程池配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/**
* 线程池配置管理
*/
@Service
public class ThreadPoolConfigService {

private final Map<String, ThreadPoolConfig> configMap = new ConcurrentHashMap<>();

/**
* 获取线程池配置
*/
public ThreadPoolConfig getConfig(String poolName) {
return configMap.getOrDefault(poolName, getDefaultConfig());
}

/**
* 设置线程池配置
*/
public void setConfig(String poolName, ThreadPoolConfig config) {
configMap.put(poolName, config);
}

/**
* 获取默认配置
*/
private ThreadPoolConfig getDefaultConfig() {
ThreadPoolConfig config = new ThreadPoolConfig();
config.setCorePoolSize(10);
config.setMaximumPoolSize(20);
config.setKeepAliveTime(60L);
config.setTimeUnit(TimeUnit.SECONDS);
config.setWorkQueue(new LinkedBlockingQueue<>(100));
config.setThreadNamePrefix("default-pool");
config.setAllowCoreThreadTimeOut(false);
config.setRejectedExecutionPolicy(RejectedExecutionPolicy.CALLER_RUNS);
return config;
}

/**
* 线程池配置类
*/
public static class ThreadPoolConfig {
private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit timeUnit;
private BlockingQueue<Runnable> workQueue;
private String threadNamePrefix;
private boolean allowCoreThreadTimeOut;
private RejectedExecutionPolicy rejectedExecutionPolicy;

// getter和setter方法
public int getCorePoolSize() {
return corePoolSize;
}

public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}

public int getMaximumPoolSize() {
return maximumPoolSize;
}

public void setMaximumPoolSize(int maximumPoolSize) {
this.maximumPoolSize = maximumPoolSize;
}

public long getKeepAliveTime() {
return keepAliveTime;
}

public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}

public TimeUnit getTimeUnit() {
return timeUnit;
}

public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = timeUnit;
}

public BlockingQueue<Runnable> getWorkQueue() {
return workQueue;
}

public void setWorkQueue(BlockingQueue<Runnable> workQueue) {
this.workQueue = workQueue;
}

public String getThreadNamePrefix() {
return threadNamePrefix;
}

public void setThreadNamePrefix(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}

public boolean isAllowCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}

public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}

public RejectedExecutionPolicy getRejectedExecutionPolicy() {
return rejectedExecutionPolicy;
}

public void setRejectedExecutionPolicy(RejectedExecutionPolicy rejectedExecutionPolicy) {
this.rejectedExecutionPolicy = rejectedExecutionPolicy;
}
}

/**
* 拒绝执行策略枚举
*/
public enum RejectedExecutionPolicy {
CALLER_RUNS("调用者运行"),
ABORT("中止"),
DISCARD("丢弃"),
DISCARD_OLDEST("丢弃最老的"),
CUSTOM("自定义");

private final String description;

RejectedExecutionPolicy(String description) {
this.description = description;
}

public String getDescription() {
return description;
}
}
}

5.3 任务监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/**
* 任务监控服务
*/
@Service
public class TaskMonitorService {

private final MeterRegistry meterRegistry;
private final Counter taskCounter;
private final Timer taskTimer;
private final Gauge taskGauge;

public TaskMonitorService(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.taskCounter = Counter.builder("threadpool.task.count")
.description("线程池任务计数")
.register(meterRegistry);
this.taskTimer = Timer.builder("threadpool.task.duration")
.description("线程池任务执行耗时")
.register(meterRegistry);
this.taskGauge = Gauge.builder("threadpool.task.queue.size")
.description("线程池任务队列大小")
.register(meterRegistry, this, TaskMonitorService::getQueueSize);
}

/**
* 记录任务开始
*/
public void recordTaskStart(Runnable task) {
try {
// 记录任务计数
taskCounter.increment(
Tags.of(
"task_class", task.getClass().getSimpleName(),
"status", "start"
)
);

// 设置任务开始时间
ThreadLocalContext.setTaskStartTime(System.currentTimeMillis());

} catch (Exception e) {
log.error("任务开始记录失败", e);
}
}

/**
* 记录任务完成
*/
public void recordTaskComplete(Runnable task) {
try {
// 记录任务计数
taskCounter.increment(
Tags.of(
"task_class", task.getClass().getSimpleName(),
"status", "complete"
)
);

// 记录任务执行耗时
long startTime = ThreadLocalContext.getStartTime();
if (startTime > 0) {
long duration = System.currentTimeMillis() - startTime;
taskTimer.record(duration, TimeUnit.MILLISECONDS);
}

} catch (Exception e) {
log.error("任务完成记录失败", e);
}
}

/**
* 记录线程池终止
*/
public void recordThreadPoolTerminated() {
try {
log.info("线程池终止");
// 可以在这里添加线程池终止的监控逻辑

} catch (Exception e) {
log.error("线程池终止记录失败", e);
}
}

/**
* 获取队列大小
*/
private double getQueueSize() {
// 实现获取队列大小的逻辑
return 0.0;
}
}

六、最佳实践与总结

6.1 线程池异常处理最佳实践

  1. 异常分类处理

    • 根据异常类型选择不同的处理策略
    • 系统异常需要立即处理
    • 业务异常可以进行降级处理
  2. 重试机制设计

    • 实现指数退避策略
    • 设置合理的重试次数和延迟
    • 避免无限重试
  3. 监控告警体系

    • 建立完善的异常监控
    • 设置合理的告警阈值
    • 实现异常趋势分析
  4. 日志记录规范

    • 记录详细的异常信息
    • 包含上下文信息
    • 便于问题排查

6.2 架构师级异常处理技能

  1. 系统性思维

    • 从全局角度设计异常处理机制
    • 考虑异常处理的性能和影响
    • 设计可扩展的异常处理架构
  2. 异常处理策略

    • 制定合理的异常处理策略
    • 实现优雅的降级机制
    • 保障系统的稳定性
  3. 监控运维能力

    • 建立完善的监控体系
    • 实现智能告警
    • 快速定位和解决问题
  4. 团队协作能力

    • 制定异常处理规范
    • 培训团队成员
    • 建立知识分享机制

6.3 持续改进建议

  1. 异常处理优化

    • 持续优化异常处理策略
    • 改进重试机制
    • 提升异常处理效率
  2. 监控体系完善

    • 完善监控指标
    • 优化告警策略
    • 提升监控精度
  3. 知识积累

    • 建立异常案例库
    • 总结处理经验
    • 形成最佳实践

总结

线程池异常处理是企业级并发编程的核心技能,通过优雅的异常处理策略、完善的监控告警机制和系统化的处理流程,能够确保线程池任务的稳定执行,及时发现和处理异常情况,保障企业级应用的高可用性。本文从异常捕获机制到监控告警,从基础原理到企业级实践,系统梳理了线程池异常处理的完整解决方案。

关键要点:

  1. 异常分类处理:根据异常类型选择不同的处理策略
  2. 重试机制设计:实现指数退避和合理重试
  3. 监控告警体系:建立完善的异常监控和告警
  4. 企业级实践:自定义线程池、配置管理、任务监控

通过深入理解这些技术要点,架构师能够设计出完善的线程池异常处理机制,提升系统的稳定性和可靠性,确保企业级应用的高可用性。