1. Java线程概述

Java线程是Java并发编程的核心,通过多线程实现并发执行、提高程序性能、增强用户体验。本文将详细介绍线程创建、线程池管理、线程同步、线程通信和性能优化的完整解决方案。

1.1 核心功能

  1. 线程创建: Thread、Runnable、Callable、线程工厂
  2. 线程池管理: ThreadPoolExecutor、ForkJoinPool、ScheduledExecutorService
  3. 线程同步: synchronized、Lock、Semaphore、CountDownLatch
  4. 线程通信: wait/notify、Condition、BlockingQueue、Exchanger
  5. 性能优化: 线程池调优、并发控制、资源管理

1.2 技术架构

1
2
3
4
5
Java应用 → 线程创建 → 线程池管理 → 线程同步 → 性能优化
↓ ↓ ↓ ↓ ↓
并发执行 → 任务调度 → 资源管理 → 数据安全 → 系统优化
↓ ↓ ↓ ↓ ↓
多线程 → 线程复用 → 同步控制 → 通信机制 → 性能提升

2. Java线程配置

2.1 Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Boot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<!-- Micrometer -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

<!-- Apache Commons Lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
</dependencies>

2.2 Java线程配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* Java线程配置类
*/
@Configuration
public class JavaThreadConfig {

@Value("${java-thread.core-pool-size:10}")
private int corePoolSize;

@Value("${java-thread.max-pool-size:50}")
private int maxPoolSize;

@Value("${java-thread.queue-capacity:1000}")
private int queueCapacity;

/**
* Java线程配置属性
*/
@Bean
public JavaThreadProperties javaThreadProperties() {
return JavaThreadProperties.builder()
.corePoolSize(corePoolSize)
.maxPoolSize(maxPoolSize)
.queueCapacity(queueCapacity)
.build();
}

/**
* 线程池管理器
*/
@Bean
public ThreadPoolManager threadPoolManager() {
return new ThreadPoolManager(javaThreadProperties());
}

/**
* 线程监控器
*/
@Bean
public ThreadMonitor threadMonitor() {
return new ThreadMonitor(javaThreadProperties());
}

/**
* 线程性能分析器
*/
@Bean
public ThreadPerformanceAnalyzer threadPerformanceAnalyzer() {
return new ThreadPerformanceAnalyzer(javaThreadProperties());
}
}

/**
* Java线程配置属性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class JavaThreadProperties {
private int corePoolSize;
private int maxPoolSize;
private int queueCapacity;

// 线程池配置
private long keepAliveTime = 60; // 秒
private String threadNamePrefix = "Thread-";
private boolean allowCoreThreadTimeOut = false;
private String rejectedExecutionHandler = "CallerRunsPolicy";

// 线程监控配置
private boolean enableThreadMonitoring = true;
private int monitoringInterval = 60; // 秒
private boolean enableThreadMetrics = true;

// 性能配置
private boolean enableThreadOptimization = true;
private int maxConcurrentThreads = 100;
private boolean enableThreadPoolTuning = true;
}

3. 线程创建与管理

3.1 线程创建与管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
/**
* 线程创建服务
*/
@Service
public class ThreadCreationService {

private final JavaThreadProperties properties;
private final ThreadPoolManager threadPoolManager;

public ThreadCreationService(JavaThreadProperties properties) {
this.properties = properties;
this.threadPoolManager = null; // 注入
}

/**
* 创建线程(继承Thread)
* @param taskName 任务名称
* @param task 任务
* @return 线程
*/
public Thread createThread(String taskName, Runnable task) {
Thread thread = new Thread(task, taskName);
thread.setDaemon(false);
return thread;
}

/**
* 创建线程(实现Runnable)
* @param taskName 任务名称
* @param task 任务
* @return 线程
*/
public Thread createRunnableThread(String taskName, Runnable task) {
Thread thread = new Thread(task, taskName);
thread.setDaemon(false);
return thread;
}

/**
* 创建线程(实现Callable)
* @param taskName 任务名称
* @param task 任务
* @param <T> 返回类型
* @return Future
*/
public <T> Future<T> createCallableThread(String taskName, Callable<T> task) {
ExecutorService executor = threadPoolManager.getExecutorService();
return executor.submit(task);
}

/**
* 创建线程组
* @param groupName 组名称
* @return 线程组
*/
public ThreadGroup createThreadGroup(String groupName) {
return new ThreadGroup(groupName);
}

/**
* 创建线程工厂
* @param namePrefix 名称前缀
* @return 线程工厂
*/
public ThreadFactory createThreadFactory(String namePrefix) {
return new CustomThreadFactory(namePrefix);
}

/**
* 执行任务
* @param task 任务
* @return Future
*/
public Future<?> executeTask(Runnable task) {
ExecutorService executor = threadPoolManager.getExecutorService();
return executor.submit(task);
}

/**
* 执行任务(带返回值)
* @param task 任务
* @param <T> 返回类型
* @return Future
*/
public <T> Future<T> executeTask(Callable<T> task) {
ExecutorService executor = threadPoolManager.getExecutorService();
return executor.submit(task);
}

/**
* 批量执行任务
* @param tasks 任务列表
* @return Future列表
*/
public List<Future<?>> executeTasks(List<Runnable> tasks) {
ExecutorService executor = threadPoolManager.getExecutorService();
List<Future<?>> futures = new ArrayList<>();

for (Runnable task : tasks) {
futures.add(executor.submit(task));
}

return futures;
}

/**
* 等待所有任务完成
* @param futures Future列表
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否全部完成
*/
public boolean waitForAllTasks(List<Future<?>> futures, long timeout, TimeUnit unit) {
try {
for (Future<?> future : futures) {
future.get(timeout, unit);
}
return true;
} catch (Exception e) {
log.error("等待任务完成失败", e);
return false;
}
}
}

/**
* 自定义线程工厂
*/
public class CustomThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;

public CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
this.group = Thread.currentThread().getThreadGroup();
}

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);

if (thread.isDaemon()) {
thread.setDaemon(false);
}

if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}

return thread;
}
}

/**
* 线程任务
*/
public class ThreadTask implements Runnable {
private final String taskName;
private final int duration; // 毫秒

public ThreadTask(String taskName, int duration) {
this.taskName = taskName;
this.duration = duration;
}

@Override
public void run() {
try {
log.info("任务开始执行: {}", taskName);
Thread.sleep(duration);
log.info("任务执行完成: {}", taskName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("任务被中断: {}", taskName);
} catch (Exception e) {
log.error("任务执行失败: {}", taskName, e);
}
}
}

/**
* 线程任务(带返回值)
*/
public class CallableTask<T> implements Callable<T> {
private final String taskName;
private final int duration; // 毫秒
private final T result;

public CallableTask(String taskName, int duration, T result) {
this.taskName = taskName;
this.duration = duration;
this.result = result;
}

@Override
public T call() throws Exception {
try {
log.info("任务开始执行: {}", taskName);
Thread.sleep(duration);
log.info("任务执行完成: {}", taskName);
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("任务被中断: {}", taskName);
throw e;
} catch (Exception e) {
log.error("任务执行失败: {}", taskName, e);
throw e;
}
}
}

4. 线程池管理器

4.1 线程池管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
/**
* 线程池管理器
*/
@Component
public class ThreadPoolManager {

private final JavaThreadProperties properties;
private final Map<String, ExecutorService> threadPools = new ConcurrentHashMap<>();
private final Map<String, ScheduledExecutorService> scheduledThreadPools = new ConcurrentHashMap<>();

public ThreadPoolManager(JavaThreadProperties properties) {
this.properties = properties;
initializeDefaultThreadPools();
}

/**
* 初始化默认线程池
*/
private void initializeDefaultThreadPools() {
// 创建默认线程池
ExecutorService defaultPool = createThreadPool("default");
threadPools.put("default", defaultPool);

// 创建定时任务线程池
ScheduledExecutorService scheduledPool = createScheduledThreadPool("scheduled");
scheduledThreadPools.put("scheduled", scheduledPool);
}

/**
* 创建线程池
* @param poolName 线程池名称
* @return 线程池
*/
public ExecutorService createThreadPool(String poolName) {
ThreadFactory threadFactory = new CustomThreadFactory(poolName + "-");

ThreadPoolExecutor executor = new ThreadPoolExecutor(
properties.getCorePoolSize(),
properties.getMaxPoolSize(),
properties.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(properties.getQueueCapacity()),
threadFactory,
createRejectedExecutionHandler(properties.getRejectedExecutionHandler())
);

executor.allowCoreThreadTimeOut(properties.isAllowCoreThreadTimeOut());

return executor;
}

/**
* 创建定时任务线程池
* @param poolName 线程池名称
* @return 定时任务线程池
*/
public ScheduledExecutorService createScheduledThreadPool(String poolName) {
ThreadFactory threadFactory = new CustomThreadFactory(poolName + "-scheduled-");

return new ScheduledThreadPoolExecutor(
properties.getCorePoolSize(),
threadFactory,
createRejectedExecutionHandler(properties.getRejectedExecutionHandler())
);
}

/**
* 创建拒绝执行处理器
* @param handlerType 处理器类型
* @return 拒绝执行处理器
*/
private RejectedExecutionHandler createRejectedExecutionHandler(String handlerType) {
switch (handlerType.toLowerCase()) {
case "abortpolicy":
return new ThreadPoolExecutor.AbortPolicy();
case "callerrunspolicy":
return new ThreadPoolExecutor.CallerRunsPolicy();
case "discardpolicy":
return new ThreadPoolExecutor.DiscardPolicy();
case "discardoldestpolicy":
return new ThreadPoolExecutor.DiscardOldestPolicy();
default:
return new ThreadPoolExecutor.CallerRunsPolicy();
}
}

/**
* 获取线程池
* @param poolName 线程池名称
* @return 线程池
*/
public ExecutorService getThreadPool(String poolName) {
return threadPools.computeIfAbsent(poolName, this::createThreadPool);
}

/**
* 获取默认线程池
* @return 线程池
*/
public ExecutorService getExecutorService() {
return getThreadPool("default");
}

/**
* 获取定时任务线程池
* @param poolName 线程池名称
* @return 定时任务线程池
*/
public ScheduledExecutorService getScheduledThreadPool(String poolName) {
return scheduledThreadPools.computeIfAbsent(poolName, this::createScheduledThreadPool);
}

/**
* 获取默认定时任务线程池
* @return 定时任务线程池
*/
public ScheduledExecutorService getScheduledExecutorService() {
return getScheduledThreadPool("scheduled");
}

/**
* 执行任务
* @param task 任务
* @return Future
*/
public Future<?> execute(Runnable task) {
return getExecutorService().submit(task);
}

/**
* 执行任务(带返回值)
* @param task 任务
* @param <T> 返回类型
* @return Future
*/
public <T> Future<T> execute(Callable<T> task) {
return getExecutorService().submit(task);
}

/**
* 延迟执行任务
* @param task 任务
* @param delay 延迟时间
* @param unit 时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit unit) {
return getScheduledExecutorService().schedule(task, delay, unit);
}

/**
* 定时执行任务
* @param task 任务
* @param initialDelay 初始延迟
* @param period 执行周期
* @param unit 时间单位
* @return ScheduledFuture
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
return getScheduledExecutorService().scheduleAtFixedRate(task, initialDelay, period, unit);
}

/**
* 获取线程池状态
* @param poolName 线程池名称
* @return 线程池状态
*/
public ThreadPoolStatus getThreadPoolStatus(String poolName) {
ExecutorService executor = threadPools.get(poolName);
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) executor;

return ThreadPoolStatus.builder()
.poolName(poolName)
.corePoolSize(tpe.getCorePoolSize())
.maximumPoolSize(tpe.getMaximumPoolSize())
.currentPoolSize(tpe.getPoolSize())
.activeThreadCount(tpe.getActiveCount())
.completedTaskCount(tpe.getCompletedTaskCount())
.totalTaskCount(tpe.getTaskCount())
.queueSize(tpe.getQueue().size())
.isShutdown(tpe.isShutdown())
.isTerminated(tpe.isTerminated())
.build();
}

return ThreadPoolStatus.builder().poolName(poolName).build();
}

/**
* 关闭线程池
* @param poolName 线程池名称
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否成功关闭
*/
public boolean shutdownThreadPool(String poolName, long timeout, TimeUnit unit) {
try {
ExecutorService executor = threadPools.get(poolName);
if (executor != null) {
executor.shutdown();
return executor.awaitTermination(timeout, unit);
}
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}

/**
* 线程池状态
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ThreadPoolStatus {
private String poolName;
private int corePoolSize;
private int maximumPoolSize;
private int currentPoolSize;
private int activeThreadCount;
private long completedTaskCount;
private long totalTaskCount;
private int queueSize;
private boolean isShutdown;
private boolean isTerminated;
}

5. 线程同步与通信

5.1 线程同步与通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
/**
* 线程同步服务
*/
@Service
public class ThreadSynchronizationService {

private final JavaThreadProperties properties;
private final Map<String, Object> locks = new ConcurrentHashMap<>();
private final Map<String, Semaphore> semaphores = new ConcurrentHashMap<>();
private final Map<String, CountDownLatch> countDownLatches = new ConcurrentHashMap<>();

public ThreadSynchronizationService(JavaThreadProperties properties) {
this.properties = properties;
}

/**
* 使用synchronized同步
* @param lockKey 锁键
* @param task 任务
* @param <T> 返回类型
* @return 执行结果
*/
public <T> T executeWithSynchronized(String lockKey, Supplier<T> task) {
Object lock = locks.computeIfAbsent(lockKey, k -> new Object());

synchronized (lock) {
return task.get();
}
}

/**
* 使用ReentrantLock同步
* @param lockKey 锁键
* @param task 任务
* @param <T> 返回类型
* @return 执行结果
*/
public <T> T executeWithReentrantLock(String lockKey, Supplier<T> task) {
ReentrantLock lock = (ReentrantLock) locks.computeIfAbsent(lockKey, k -> new ReentrantLock());

lock.lock();
try {
return task.get();
} finally {
lock.unlock();
}
}

/**
* 使用信号量控制并发
* @param semaphoreKey 信号量键
* @param permits 许可数
* @param task 任务
* @param <T> 返回类型
* @return 执行结果
*/
public <T> T executeWithSemaphore(String semaphoreKey, int permits, Supplier<T> task) {
Semaphore semaphore = semaphores.computeIfAbsent(semaphoreKey, k -> new Semaphore(permits));

try {
semaphore.acquire();
return task.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取信号量被中断", e);
} finally {
semaphore.release();
}
}

/**
* 使用CountDownLatch等待
* @param latchKey 门闩键
* @param count 计数
* @param task 任务
* @param <T> 返回类型
* @return 执行结果
*/
public <T> T executeWithCountDownLatch(String latchKey, int count, Supplier<T> task) {
CountDownLatch latch = countDownLatches.computeIfAbsent(latchKey, k -> new CountDownLatch(count));

try {
latch.await();
return task.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("等待门闩被中断", e);
}
}

/**
* 减少CountDownLatch计数
* @param latchKey 门闩键
*/
public void countDown(String latchKey) {
CountDownLatch latch = countDownLatches.get(latchKey);
if (latch != null) {
latch.countDown();
}
}

/**
* 使用CyclicBarrier同步
* @param barrierKey 屏障键
* @param parties 参与方数
* @param task 任务
* @param <T> 返回类型
* @return 执行结果
*/
public <T> T executeWithCyclicBarrier(String barrierKey, int parties, Supplier<T> task) {
CyclicBarrier barrier = (CyclicBarrier) locks.computeIfAbsent(barrierKey, k -> new CyclicBarrier(parties));

try {
barrier.await();
return task.get();
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("等待屏障被中断", e);
}
}
}

/**
* 线程通信服务
*/
@Service
public class ThreadCommunicationService {

private final Map<String, BlockingQueue<Object>> queues = new ConcurrentHashMap<>();
private final Map<String, Exchanger<Object>> exchangers = new ConcurrentHashMap<>();
private final Map<String, Object> waitNotifyObjects = new ConcurrentHashMap<>();

/**
* 使用BlockingQueue通信
* @param queueKey 队列键
* @param capacity 队列容量
* @param data 数据
* @return 是否成功
*/
public boolean sendMessage(String queueKey, int capacity, Object data) {
BlockingQueue<Object> queue = queues.computeIfAbsent(queueKey, k -> new LinkedBlockingQueue<>(capacity));

try {
return queue.offer(data, 1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}

/**
* 接收消息
* @param queueKey 队列键
* @param timeout 超时时间
* @param unit 时间单位
* @return 消息
*/
public Object receiveMessage(String queueKey, long timeout, TimeUnit unit) {
BlockingQueue<Object> queue = queues.get(queueKey);
if (queue != null) {
try {
return queue.poll(timeout, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
return null;
}

/**
* 使用Exchanger交换数据
* @param exchangerKey 交换器键
* @param data 数据
* @return 交换后的数据
*/
public Object exchangeData(String exchangerKey, Object data) {
Exchanger<Object> exchanger = exchangers.computeIfAbsent(exchangerKey, k -> new Exchanger<>());

try {
return exchanger.exchange(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("数据交换被中断", e);
}
}

/**
* 使用wait/notify通信
* @param objectKey 对象键
* @param timeout 超时时间
* @param unit 时间单位
* @return 是否被通知
*/
public boolean waitForNotification(String objectKey, long timeout, TimeUnit unit) {
Object obj = waitNotifyObjects.computeIfAbsent(objectKey, k -> new Object());

synchronized (obj) {
try {
obj.wait(unit.toMillis(timeout));
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}

/**
* 发送通知
* @param objectKey 对象键
*/
public void sendNotification(String objectKey) {
Object obj = waitNotifyObjects.get(objectKey);
if (obj != null) {
synchronized (obj) {
obj.notify();
}
}
}

/**
* 发送通知给所有等待的线程
* @param objectKey 对象键
*/
public void sendNotificationToAll(String objectKey) {
Object obj = waitNotifyObjects.get(objectKey);
if (obj != null) {
synchronized (obj) {
obj.notifyAll();
}
}
}
}

6. 线程监控器

6.1 线程监控器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
/**
* 线程监控器
*/
@Component
public class ThreadMonitor {

private final JavaThreadProperties properties;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final List<ThreadSnapshot> snapshots = new ArrayList<>();
private final Map<String, ThreadStats> threadStatsMap = new ConcurrentHashMap<>();

public ThreadMonitor(JavaThreadProperties properties) {
this.properties = properties;
if (properties.isEnableThreadMonitoring()) {
startThreadMonitoring();
}
}

/**
* 开始线程监控
*/
private void startThreadMonitoring() {
scheduler.scheduleAtFixedRate(
this::takeThreadSnapshot,
0,
properties.getMonitoringInterval(),
TimeUnit.SECONDS
);
}

/**
* 获取线程快照
*/
private void takeThreadSnapshot() {
try {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds());

List<ThreadInfo> activeThreads = Arrays.stream(threadInfos)
.filter(Objects::nonNull)
.collect(Collectors.toList());

ThreadSnapshot snapshot = ThreadSnapshot.builder()
.timestamp(System.currentTimeMillis())
.totalThreadCount(threadMXBean.getThreadCount())
.peakThreadCount(threadMXBean.getPeakThreadCount())
.daemonThreadCount(threadMXBean.getDaemonThreadCount())
.activeThreadCount(activeThreads.size())
.threadInfos(activeThreads)
.build();

snapshots.add(snapshot);

// 保持最近100个快照
if (snapshots.size() > 100) {
snapshots.remove(0);
}

// 更新线程统计信息
updateThreadStats(activeThreads);

} catch (Exception e) {
log.error("获取线程快照失败", e);
}
}

/**
* 更新线程统计信息
* @param threadInfos 线程信息列表
*/
private void updateThreadStats(List<ThreadInfo> threadInfos) {
for (ThreadInfo threadInfo : threadInfos) {
String threadName = threadInfo.getThreadName();
ThreadStats stats = threadStatsMap.computeIfAbsent(threadName, k -> new ThreadStats());

stats.setThreadName(threadName);
stats.setThreadState(threadInfo.getThreadState().toString());
stats.setLastUpdateTime(System.currentTimeMillis());

if (threadInfo.getThreadState() == Thread.State.RUNNABLE) {
stats.setRunnableCount(stats.getRunnableCount() + 1);
} else if (threadInfo.getThreadState() == Thread.State.BLOCKED) {
stats.setBlockedCount(stats.getBlockedCount() + 1);
} else if (threadInfo.getThreadState() == Thread.State.WAITING) {
stats.setWaitingCount(stats.getWaitingCount() + 1);
}
}
}

/**
* 获取线程监控报告
* @return 线程监控报告
*/
public ThreadMonitorReport getThreadMonitorReport() {
try {
if (snapshots.isEmpty()) {
return ThreadMonitorReport.builder().build();
}

ThreadSnapshot latest = snapshots.get(snapshots.size() - 1);
ThreadSnapshot previous = snapshots.size() > 1 ?
snapshots.get(snapshots.size() - 2) : latest;

// 计算线程趋势
int threadCountTrend = latest.getTotalThreadCount() - previous.getTotalThreadCount();
int activeThreadTrend = latest.getActiveThreadCount() - previous.getActiveThreadCount();

return ThreadMonitorReport.builder()
.currentSnapshot(latest)
.threadCountTrend(threadCountTrend)
.activeThreadTrend(activeThreadTrend)
.snapshotCount(snapshots.size())
.threadStatsMap(new HashMap<>(threadStatsMap))
.build();

} catch (Exception e) {
log.error("获取线程监控报告失败", e);
return ThreadMonitorReport.builder().build();
}
}

/**
* 获取线程告警
* @return 线程告警列表
*/
public List<ThreadAlert> getThreadAlerts() {
List<ThreadAlert> alerts = new ArrayList<>();

try {
ThreadMonitorReport report = getThreadMonitorReport();

// 检查线程数量告警
if (report.getCurrentSnapshot().getTotalThreadCount() > properties.getMaxConcurrentThreads()) {
alerts.add(new ThreadAlert(
"线程数量过多",
"当前线程数量超过最大限制",
"HIGH"
));
}

// 检查线程趋势告警
if (report.getThreadCountTrend() > 10) {
alerts.add(new ThreadAlert(
"线程数量增长过快",
"线程数量增长超过阈值",
"MEDIUM"
));
}

// 检查阻塞线程告警
long blockedThreadCount = report.getThreadStatsMap().values().stream()
.mapToLong(ThreadStats::getBlockedCount)
.sum();

if (blockedThreadCount > 5) {
alerts.add(new ThreadAlert(
"阻塞线程过多",
"当前阻塞线程数量过多",
"MEDIUM"
));
}

} catch (Exception e) {
log.error("获取线程告警失败", e);
}

return alerts;
}

/**
* 获取线程堆栈信息
* @param threadId 线程ID
* @return 线程堆栈信息
*/
public String getThreadStackTrace(long threadId) {
try {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo threadInfo = threadMXBean.getThreadInfo(threadId, Integer.MAX_VALUE);

if (threadInfo != null) {
StringBuilder sb = new StringBuilder();
sb.append("线程: ").append(threadInfo.getThreadName()).append("\n");
sb.append("状态: ").append(threadInfo.getThreadState()).append("\n");

StackTraceElement[] stackTrace = threadInfo.getStackTrace();
for (StackTraceElement element : stackTrace) {
sb.append(" ").append(element.toString()).append("\n");
}

return sb.toString();
}

} catch (Exception e) {
log.error("获取线程堆栈信息失败: threadId={}", threadId, e);
}

return "无法获取线程堆栈信息";
}
}

/**
* 线程快照
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ThreadSnapshot {
private long timestamp;
private int totalThreadCount;
private int peakThreadCount;
private int daemonThreadCount;
private int activeThreadCount;
private List<ThreadInfo> threadInfos;
}

/**
* 线程统计信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ThreadStats {
private String threadName;
private String threadState;
private long lastUpdateTime;
private long runnableCount;
private long blockedCount;
private long waitingCount;
}

/**
* 线程监控报告
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ThreadMonitorReport {
private ThreadSnapshot currentSnapshot;
private int threadCountTrend;
private int activeThreadTrend;
private int snapshotCount;
private Map<String, ThreadStats> threadStatsMap;
}

/**
* 线程告警
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ThreadAlert {
private String title;
private String description;
private String severity; // HIGH, MEDIUM, LOW
}

7. Java线程控制器

7.1 Java线程控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/**
* Java线程控制器
*/
@RestController
@RequestMapping("/api/v1/java-thread")
public class JavaThreadController {

@Autowired
private ThreadCreationService threadCreationService;

@Autowired
private ThreadPoolManager threadPoolManager;

@Autowired
private ThreadSynchronizationService synchronizationService;

@Autowired
private ThreadCommunicationService communicationService;

@Autowired
private ThreadMonitor threadMonitor;

/**
* 创建线程
*/
@PostMapping("/create")
public ResponseEntity<Map<String, Object>> createThread(@RequestBody ThreadRequest request) {
try {
ThreadTask task = new ThreadTask(request.getTaskName(), request.getDuration());
Future<?> future = threadCreationService.executeTask(task);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("message", "线程创建成功");
response.put("taskName", request.getTaskName());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("创建线程失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "创建线程失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取线程池状态
*/
@GetMapping("/pool/status")
public ResponseEntity<Map<String, Object>> getThreadPoolStatus(@RequestParam String poolName) {
try {
ThreadPoolStatus status = threadPoolManager.getThreadPoolStatus(poolName);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("status", status);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取线程池状态失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取线程池状态失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 线程同步测试
*/
@PostMapping("/synchronization")
public ResponseEntity<Map<String, Object>> testSynchronization(@RequestBody SynchronizationRequest request) {
try {
String result = synchronizationService.executeWithSynchronized(
request.getLockKey(),
() -> {
try {
Thread.sleep(request.getDuration());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "同步执行完成: " + request.getTaskName();
}
);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("result", result);
response.put("lockKey", request.getLockKey());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("线程同步测试失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "线程同步测试失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 线程通信测试
*/
@PostMapping("/communication")
public ResponseEntity<Map<String, Object>> testCommunication(@RequestBody CommunicationRequest request) {
try {
boolean success = communicationService.sendMessage(
request.getQueueKey(),
request.getCapacity(),
request.getMessage()
);

Map<String, Object> response = new HashMap<>();
response.put("success", success);
response.put("message", success ? "消息发送成功" : "消息发送失败");
response.put("queueKey", request.getQueueKey());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("线程通信测试失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "线程通信测试失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取线程监控报告
*/
@GetMapping("/monitor")
public ResponseEntity<Map<String, Object>> getThreadMonitorReport() {
try {
ThreadMonitorReport report = threadMonitor.getThreadMonitorReport();
List<ThreadAlert> alerts = threadMonitor.getThreadAlerts();

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("report", report);
response.put("alerts", alerts);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取线程监控报告失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取线程监控报告失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 获取线程堆栈信息
*/
@GetMapping("/stacktrace/{threadId}")
public ResponseEntity<Map<String, Object>> getThreadStackTrace(@PathVariable long threadId) {
try {
String stackTrace = threadMonitor.getThreadStackTrace(threadId);

Map<String, Object> response = new HashMap<>();
response.put("success", true);
response.put("threadId", threadId);
response.put("stackTrace", stackTrace);

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("获取线程堆栈信息失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "获取线程堆栈信息失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
}

/**
* 线程请求模型
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ThreadRequest {
private String taskName;
private int duration; // 毫秒
}

/**
* 同步请求模型
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SynchronizationRequest {
private String lockKey;
private String taskName;
private int duration; // 毫秒
}

/**
* 通信请求模型
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CommunicationRequest {
private String queueKey;
private int capacity;
private String message;
}

8. 总结

通过Java线程的实现,我们成功构建了一个完整的线程管理框架。关键特性包括:

8.1 核心优势

  1. 线程创建: Thread、Runnable、Callable、线程工厂
  2. 线程池管理: ThreadPoolExecutor、ForkJoinPool、ScheduledExecutorService
  3. 线程同步: synchronized、Lock、Semaphore、CountDownLatch
  4. 线程通信: wait/notify、Condition、BlockingQueue、Exchanger
  5. 性能优化: 线程池调优、并发控制、资源管理

8.2 最佳实践

  1. 线程创建: 合理使用线程池、避免频繁创建线程
  2. 线程池管理: 合理配置线程池参数、监控线程池状态
  3. 线程同步: 选择合适的同步机制、避免死锁
  4. 线程通信: 使用合适的通信机制、避免竞态条件
  5. 性能优化: 线程池调优、并发控制、资源管理

这套Java线程方案不仅能够提供完整的线程管理能力,还包含了线程同步、线程通信、线程监控等核心功能,是企业级Java应用的重要技术基础。