1. Condition概述

Condition是Java并发包中提供的高级线程协调机制,它提供了比Object.wait()和notify()更灵活和强大的线程等待和通知功能。本文将详细介绍Condition的使用方法,包括等待通知机制、生产者消费者模式、线程同步的完整解决方案。

1.1 核心功能

  1. 等待机制: 线程可以等待特定条件满足
  2. 通知机制: 可以精确通知等待的线程
  3. 超时等待: 支持带超时的等待操作
  4. 中断响应: 支持等待操作的中断
  5. 多条件: 一个锁可以关联多个条件

1.2 技术架构

1
2
3
4
线程A → 等待条件 → Condition.await() → 条件满足 → 继续执行
线程B → 修改条件 → Condition.signal() → 通知线程A → 唤醒等待
↓ ↓ ↓ ↓
条件检查 → 等待队列 → 条件变化 → 通知机制

2. Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.condition</groupId>
<artifactId>condition-demo</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>

<properties>
<java.version>11</java.version>
</properties>

<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- Commons工具 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>

<!-- Micrometer(监控指标) -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

3. 配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# application.yml
server:
port: 8080

spring:
application:
name: condition-demo

# 条件变量配置
condition:
# 生产者消费者配置
producer-consumer:
buffer-size: 10
producer-count: 3
consumer-count: 2

# 等待超时配置
timeout:
default-timeout: 5000
max-timeout: 30000

# 监控配置
monitor:
enabled: true
interval: 10000

# 监控配置
management:
endpoints:
web:
exposure:
include: "*"
metrics:
export:
prometheus:
enabled: true

4. 基础条件变量服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.condition.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 基础条件变量服务
* @author Java实战
*/
@Slf4j
@Service
public class BasicConditionService {

private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();

/**
* 共享状态
*/
private boolean ready = false;
private int value = 0;

/**
* 等待条件满足
*/
public void waitForCondition() throws InterruptedException {
lock.lock();
try {
log.info("线程 {} 开始等待条件", Thread.currentThread().getName());

// 使用while循环避免虚假唤醒
while (!ready) {
log.info("线程 {} 等待条件满足", Thread.currentThread().getName());
condition.await();
}

log.info("线程 {} 条件满足,继续执行", Thread.currentThread().getName());

} finally {
lock.unlock();
}
}

/**
* 超时等待条件
*/
public boolean waitForConditionWithTimeout(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
log.info("线程 {} 开始超时等待条件,超时时间: {} {}",
Thread.currentThread().getName(), timeout, unit);

boolean result = condition.await(timeout, unit);

if (result) {
log.info("线程 {} 条件满足", Thread.currentThread().getName());
} else {
log.warn("线程 {} 等待超时", Thread.currentThread().getName());
}

return result;

} finally {
lock.unlock();
}
}

/**
* 设置条件并通知
*/
public void setConditionAndNotify(int newValue) {
lock.lock();
try {
log.info("线程 {} 设置条件,值: {}", Thread.currentThread().getName(), newValue);

value = newValue;
ready = true;

// 通知等待的线程
condition.signal();
log.info("线程 {} 通知等待的线程", Thread.currentThread().getName());

} finally {
lock.unlock();
}
}

/**
* 通知所有等待的线程
*/
public void notifyAllWaiting() {
lock.lock();
try {
log.info("线程 {} 通知所有等待的线程", Thread.currentThread().getName());
condition.signalAll();
} finally {
lock.unlock();
}
}

/**
* 重置条件
*/
public void resetCondition() {
lock.lock();
try {
log.info("线程 {} 重置条件", Thread.currentThread().getName());
ready = false;
value = 0;
} finally {
lock.unlock();
}
}

/**
* 获取当前状态
*/
public ConditionStatus getStatus() {
lock.lock();
try {
return ConditionStatus.builder()
.ready(ready)
.value(value)
.isLocked(lock.isLocked())
.queueLength(lock.getQueueLength())
.build();
} finally {
lock.unlock();
}
}

/**
* 条件状态
*/
@lombok.Data
@lombok.Builder
public static class ConditionStatus {
private boolean ready;
private int value;
private boolean isLocked;
private int queueLength;
}
}

5. 生产者消费者服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package com.condition.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 生产者消费者服务
* @author Java实战
*/
@Slf4j
@Service
public class ProducerConsumerService {

private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // 缓冲区未满条件
private final Condition notEmpty = lock.newCondition(); // 缓冲区非空条件

@Value("${condition.producer-consumer.buffer-size:10}")
private int bufferSize;

/**
* 缓冲区
*/
private final Object[] buffer = new Object[bufferSize];
private int count = 0;
private int putIndex = 0;
private int takeIndex = 0;

/**
* 生产者:生产数据
*/
public void produce(Object item) throws InterruptedException {
lock.lock();
try {
log.info("生产者 {} 尝试生产数据: {}", Thread.currentThread().getName(), item);

// 等待缓冲区未满
while (count == bufferSize) {
log.info("生产者 {} 等待缓冲区空间", Thread.currentThread().getName());
notFull.await();
}

// 生产数据
buffer[putIndex] = item;
putIndex = (putIndex + 1) % bufferSize;
count++;

log.info("生产者 {} 生产数据成功: {}, 当前缓冲区大小: {}",
Thread.currentThread().getName(), item, count);

// 通知消费者
notEmpty.signal();

} finally {
lock.unlock();
}
}

/**
* 消费者:消费数据
*/
public Object consume() throws InterruptedException {
lock.lock();
try {
log.info("消费者 {} 尝试消费数据", Thread.currentThread().getName());

// 等待缓冲区非空
while (count == 0) {
log.info("消费者 {} 等待数据", Thread.currentThread().getName());
notEmpty.await();
}

// 消费数据
Object item = buffer[takeIndex];
buffer[takeIndex] = null;
takeIndex = (takeIndex + 1) % bufferSize;
count--;

log.info("消费者 {} 消费数据成功: {}, 当前缓冲区大小: {}",
Thread.currentThread().getName(), item, count);

// 通知生产者
notFull.signal();

return item;

} finally {
lock.unlock();
}
}

/**
* 超时生产数据
*/
public boolean tryProduce(Object item, long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
log.info("生产者 {} 尝试超时生产数据: {}", Thread.currentThread().getName(), item);

// 超时等待缓冲区未满
if (!notFull.await(timeout, unit)) {
log.warn("生产者 {} 等待缓冲区空间超时", Thread.currentThread().getName());
return false;
}

// 生产数据
buffer[putIndex] = item;
putIndex = (putIndex + 1) % bufferSize;
count++;

log.info("生产者 {} 超时生产数据成功: {}, 当前缓冲区大小: {}",
Thread.currentThread().getName(), item, count);

// 通知消费者
notEmpty.signal();

return true;

} finally {
lock.unlock();
}
}

/**
* 超时消费数据
*/
public Object tryConsume(long timeout, TimeUnit unit) throws InterruptedException {
lock.lock();
try {
log.info("消费者 {} 尝试超时消费数据", Thread.currentThread().getName());

// 超时等待缓冲区非空
if (!notEmpty.await(timeout, unit)) {
log.warn("消费者 {} 等待数据超时", Thread.currentThread().getName());
return null;
}

// 消费数据
Object item = buffer[takeIndex];
buffer[takeIndex] = null;
takeIndex = (takeIndex + 1) % bufferSize;
count--;

log.info("消费者 {} 超时消费数据成功: {}, 当前缓冲区大小: {}",
Thread.currentThread().getName(), item, count);

// 通知生产者
notFull.signal();

return item;

} finally {
lock.unlock();
}
}

/**
* 获取缓冲区状态
*/
public BufferStatus getBufferStatus() {
lock.lock();
try {
return BufferStatus.builder()
.count(count)
.capacity(bufferSize)
.putIndex(putIndex)
.takeIndex(takeIndex)
.isFull(count == bufferSize)
.isEmpty(count == 0)
.build();
} finally {
lock.unlock();
}
}

/**
* 清空缓冲区
*/
public void clearBuffer() {
lock.lock();
try {
log.info("清空缓冲区");

for (int i = 0; i < bufferSize; i++) {
buffer[i] = null;
}

count = 0;
putIndex = 0;
takeIndex = 0;

// 通知所有等待的生产者
notFull.signalAll();

} finally {
lock.unlock();
}
}

/**
* 缓冲区状态
*/
@lombok.Data
@lombok.Builder
public static class BufferStatus {
private int count;
private int capacity;
private int putIndex;
private int takeIndex;
private boolean isFull;
private boolean isEmpty;
}
}

6. 多条件协调服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package com.condition.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* 多条件协调服务
* @author Java实战
*/
@Slf4j
@Service
public class MultiConditionService {

private final ReentrantLock lock = new ReentrantLock();

// 多个条件
private final Condition dataReady = lock.newCondition(); // 数据准备就绪
private final Condition processingComplete = lock.newCondition(); // 处理完成
private final Condition errorOccurred = lock.newCondition(); // 错误发生

/**
* 数据状态
*/
private boolean dataReadyFlag = false;
private boolean processingCompleteFlag = false;
private boolean errorOccurredFlag = false;

/**
* 数据
*/
private String data = null;
private String result = null;
private String error = null;

/**
* 数据准备线程
*/
public void prepareData(String newData) throws InterruptedException {
lock.lock();
try {
log.info("数据准备线程 {} 开始准备数据: {}", Thread.currentThread().getName(), newData);

// 模拟数据准备
Thread.sleep(1000);

data = newData;
dataReadyFlag = true;

log.info("数据准备完成: {}", data);

// 通知数据准备就绪
dataReady.signal();

} finally {
lock.unlock();
}
}

/**
* 数据处理线程
*/
public void processData() throws InterruptedException {
lock.lock();
try {
log.info("数据处理线程 {} 等待数据准备", Thread.currentThread().getName());

// 等待数据准备就绪
while (!dataReadyFlag) {
dataReady.await();
}

log.info("数据处理线程 {} 开始处理数据: {}", Thread.currentThread().getName(), data);

try {
// 模拟数据处理
Thread.sleep(2000);

// 模拟处理结果
result = "处理结果: " + data.toUpperCase();
processingCompleteFlag = true;

log.info("数据处理完成: {}", result);

// 通知处理完成
processingComplete.signal();

} catch (Exception e) {
log.error("数据处理出错", e);

error = "处理错误: " + e.getMessage();
errorOccurredFlag = true;

// 通知错误发生
errorOccurred.signal();
}

} finally {
lock.unlock();
}
}

/**
* 结果获取线程
*/
public String getResult() throws InterruptedException {
lock.lock();
try {
log.info("结果获取线程 {} 等待处理完成", Thread.currentThread().getName());

// 等待处理完成
while (!processingCompleteFlag && !errorOccurredFlag) {
// 使用awaitNanos实现超时等待
long remainingNanos = processingComplete.awaitNanos(TimeUnit.SECONDS.toNanos(5));
if (remainingNanos <= 0) {
log.warn("等待处理完成超时");
return "等待超时";
}
}

if (errorOccurredFlag) {
log.error("获取到错误结果: {}", error);
return error;
} else {
log.info("获取到处理结果: {}", result);
return result;
}

} finally {
lock.unlock();
}
}

/**
* 错误处理线程
*/
public String handleError() throws InterruptedException {
lock.lock();
try {
log.info("错误处理线程 {} 等待错误发生", Thread.currentThread().getName());

// 等待错误发生
while (!errorOccurredFlag) {
errorOccurred.await();
}

log.info("错误处理线程 {} 处理错误: {}", Thread.currentThread().getName(), error);

// 模拟错误处理
Thread.sleep(1000);

String errorHandled = "错误已处理: " + error;

// 重置错误状态
errorOccurredFlag = false;
error = null;

return errorHandled;

} finally {
lock.unlock();
}
}

/**
* 重置所有状态
*/
public void resetAll() {
lock.lock();
try {
log.info("重置所有状态");

dataReadyFlag = false;
processingCompleteFlag = false;
errorOccurredFlag = false;

data = null;
result = null;
error = null;

// 通知所有等待的线程
dataReady.signalAll();
processingComplete.signalAll();
errorOccurred.signalAll();

} finally {
lock.unlock();
}
}

/**
* 获取所有状态
*/
public MultiConditionStatus getAllStatus() {
lock.lock();
try {
return MultiConditionStatus.builder()
.dataReady(dataReadyFlag)
.processingComplete(processingCompleteFlag)
.errorOccurred(errorOccurredFlag)
.data(data)
.result(result)
.error(error)
.build();
} finally {
lock.unlock();
}
}

/**
* 多条件状态
*/
@lombok.Data
@lombok.Builder
public static class MultiConditionStatus {
private boolean dataReady;
private boolean processingComplete;
private boolean errorOccurred;
private String data;
private String result;
private String error;
}
}

7. 条件监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.condition.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.util.concurrent.locks.ReentrantLock;

/**
* 条件监控服务
* @author Java实战
*/
@Slf4j
@Service
public class ConditionMonitorService {

private final ReentrantLock monitorLock = new ReentrantLock();

/**
* 监控统计
*/
private long awaitCount = 0;
private long signalCount = 0;
private long signalAllCount = 0;
private long timeoutCount = 0;

/**
* 监控条件使用情况(每10秒)
*/
@Scheduled(fixedRate = 10000)
public void monitorConditionUsage() {
log.info("=== 条件变量监控报告 ===");

monitorLock.lock();
try {
log.info("等待操作次数: {}", awaitCount);
log.info("信号通知次数: {}", signalCount);
log.info("广播通知次数: {}", signalAllCount);
log.info("超时次数: {}", timeoutCount);

// 计算成功率
long totalOperations = awaitCount + signalCount + signalAllCount;
if (totalOperations > 0) {
double successRate = (double) (signalCount + signalAllCount) / totalOperations * 100;
log.info("通知成功率: {:.2f}%", successRate);
}

} finally {
monitorLock.unlock();
}

log.info("=== 监控报告结束 ===");
}

/**
* 记录等待操作
*/
public void recordAwait() {
monitorLock.lock();
try {
awaitCount++;
} finally {
monitorLock.unlock();
}
}

/**
* 记录信号通知
*/
public void recordSignal() {
monitorLock.lock();
try {
signalCount++;
} finally {
monitorLock.unlock();
}
}

/**
* 记录广播通知
*/
public void recordSignalAll() {
monitorLock.lock();
try {
signalAllCount++;
} finally {
monitorLock.unlock();
}
}

/**
* 记录超时
*/
public void recordTimeout() {
monitorLock.lock();
try {
timeoutCount++;
} finally {
monitorLock.unlock();
}
}

/**
* 获取监控统计
*/
public ConditionStatistics getStatistics() {
monitorLock.lock();
try {
return ConditionStatistics.builder()
.awaitCount(awaitCount)
.signalCount(signalCount)
.signalAllCount(signalAllCount)
.timeoutCount(timeoutCount)
.build();
} finally {
monitorLock.unlock();
}
}

/**
* 条件统计信息
*/
@lombok.Data
@lombok.Builder
public static class ConditionStatistics {
private long awaitCount;
private long signalCount;
private long signalAllCount;
private long timeoutCount;
}
}

8. 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package com.condition.controller;

import com.condition.service.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.concurrent.TimeUnit;

/**
* Condition控制器
* @author Java实战
*/
@Slf4j
@RestController
@RequestMapping("/api/condition")
public class ConditionController {

@Autowired
private BasicConditionService basicConditionService;

@Autowired
private ProducerConsumerService producerConsumerService;

@Autowired
private MultiConditionService multiConditionService;

@Autowired
private ConditionMonitorService conditionMonitorService;

/**
* 等待条件
*/
@PostMapping("/wait")
public ResponseEntity<String> waitForCondition() {
try {
basicConditionService.waitForCondition();
return ResponseEntity.ok("条件等待成功");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("条件等待被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 超时等待条件
*/
@PostMapping("/wait-timeout")
public ResponseEntity<Boolean> waitForConditionWithTimeout(@RequestParam long timeout) {
try {
boolean result = basicConditionService.waitForConditionWithTimeout(timeout, TimeUnit.MILLISECONDS);
return ResponseEntity.ok(result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("超时等待被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 设置条件并通知
*/
@PostMapping("/set-notify")
public ResponseEntity<String> setConditionAndNotify(@RequestParam int value) {
try {
basicConditionService.setConditionAndNotify(value);
return ResponseEntity.ok("条件设置并通知成功");
} catch (Exception e) {
log.error("设置条件失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 通知所有等待线程
*/
@PostMapping("/notify-all")
public ResponseEntity<String> notifyAllWaiting() {
try {
basicConditionService.notifyAllWaiting();
return ResponseEntity.ok("通知所有等待线程成功");
} catch (Exception e) {
log.error("通知所有线程失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 生产数据
*/
@PostMapping("/produce")
public ResponseEntity<String> produceData(@RequestParam String data) {
try {
producerConsumerService.produce(data);
return ResponseEntity.ok("数据生产成功: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("数据生产被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 消费数据
*/
@PostMapping("/consume")
public ResponseEntity<Object> consumeData() {
try {
Object data = producerConsumerService.consume();
return ResponseEntity.ok(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("数据消费被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 超时生产数据
*/
@PostMapping("/try-produce")
public ResponseEntity<Boolean> tryProduceData(@RequestParam String data, @RequestParam long timeout) {
try {
boolean success = producerConsumerService.tryProduce(data, timeout, TimeUnit.MILLISECONDS);
return ResponseEntity.ok(success);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("超时生产被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 超时消费数据
*/
@PostMapping("/try-consume")
public ResponseEntity<Object> tryConsumeData(@RequestParam long timeout) {
try {
Object data = producerConsumerService.tryConsume(timeout, TimeUnit.MILLISECONDS);
return ResponseEntity.ok(data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("超时消费被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 准备数据
*/
@PostMapping("/prepare-data")
public ResponseEntity<String> prepareData(@RequestParam String data) {
try {
multiConditionService.prepareData(data);
return ResponseEntity.ok("数据准备成功: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("数据准备被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 处理数据
*/
@PostMapping("/process-data")
public ResponseEntity<String> processData() {
try {
multiConditionService.processData();
return ResponseEntity.ok("数据处理成功");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("数据处理被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取结果
*/
@GetMapping("/result")
public ResponseEntity<String> getResult() {
try {
String result = multiConditionService.getResult();
return ResponseEntity.ok(result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("获取结果被中断", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取基础条件状态
*/
@GetMapping("/basic-status")
public ResponseEntity<BasicConditionService.ConditionStatus> getBasicStatus() {
try {
BasicConditionService.ConditionStatus status = basicConditionService.getStatus();
return ResponseEntity.ok(status);
} catch (Exception e) {
log.error("获取基础状态失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取缓冲区状态
*/
@GetMapping("/buffer-status")
public ResponseEntity<ProducerConsumerService.BufferStatus> getBufferStatus() {
try {
ProducerConsumerService.BufferStatus status = producerConsumerService.getBufferStatus();
return ResponseEntity.ok(status);
} catch (Exception e) {
log.error("获取缓冲区状态失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取多条件状态
*/
@GetMapping("/multi-status")
public ResponseEntity<MultiConditionService.MultiConditionStatus> getMultiStatus() {
try {
MultiConditionService.MultiConditionStatus status = multiConditionService.getAllStatus();
return ResponseEntity.ok(status);
} catch (Exception e) {
log.error("获取多条件状态失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取监控统计
*/
@GetMapping("/statistics")
public ResponseEntity<ConditionMonitorService.ConditionStatistics> getStatistics() {
try {
ConditionMonitorService.ConditionStatistics statistics = conditionMonitorService.getStatistics();
return ResponseEntity.ok(statistics);
} catch (Exception e) {
log.error("获取监控统计失败", e);
return ResponseEntity.internalServerError().build();
}
}
}

9. 测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package com.condition.service;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Condition测试类
* @author Java实战
*/
@Slf4j
@SpringBootTest
public class ConditionTest {

@Autowired
private BasicConditionService basicConditionService;

@Autowired
private ProducerConsumerService producerConsumerService;

@Autowired
private MultiConditionService multiConditionService;

/**
* 测试基础条件变量
*/
@Test
public void testBasicCondition() throws InterruptedException {
log.info("开始测试基础条件变量");

ExecutorService executor = Executors.newFixedThreadPool(2);
CountDownLatch latch = new CountDownLatch(2);

// 等待线程
executor.submit(() -> {
try {
basicConditionService.waitForCondition();
log.info("等待线程被唤醒");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});

// 通知线程
executor.submit(() -> {
try {
Thread.sleep(1000);
basicConditionService.setConditionAndNotify(100);
log.info("通知线程完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});

latch.await(30, TimeUnit.SECONDS);
executor.shutdown();

log.info("基础条件变量测试完成");
}

/**
* 测试生产者消费者
*/
@Test
public void testProducerConsumer() throws InterruptedException {
log.info("开始测试生产者消费者");

ExecutorService executor = Executors.newFixedThreadPool(4);
CountDownLatch latch = new CountDownLatch(10);

// 生产者线程
for (int i = 0; i < 5; i++) {
final int index = i;
executor.submit(() -> {
try {
producerConsumerService.produce("数据" + index);
log.info("生产数据: 数据{}", index);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}

// 消费者线程
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
try {
Object data = producerConsumerService.consume();
log.info("消费数据: {}", data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}

latch.await(30, TimeUnit.SECONDS);
executor.shutdown();

log.info("生产者消费者测试完成");
}

/**
* 测试多条件协调
*/
@Test
public void testMultiCondition() throws InterruptedException {
log.info("开始测试多条件协调");

ExecutorService executor = Executors.newFixedThreadPool(4);
CountDownLatch latch = new CountDownLatch(4);

// 数据准备线程
executor.submit(() -> {
try {
multiConditionService.prepareData("测试数据");
log.info("数据准备完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});

// 数据处理线程
executor.submit(() -> {
try {
multiConditionService.processData();
log.info("数据处理完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});

// 结果获取线程
executor.submit(() -> {
try {
String result = multiConditionService.getResult();
log.info("获取结果: {}", result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});

// 错误处理线程
executor.submit(() -> {
try {
String errorHandled = multiConditionService.handleError();
log.info("错误处理: {}", errorHandled);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});

latch.await(30, TimeUnit.SECONDS);
executor.shutdown();

log.info("多条件协调测试完成");
}
}

10. 总结

Condition是Java并发编程中强大的线程协调工具。通过本文的详细介绍,我们了解了:

  1. 基础条件变量: 等待和通知机制的基本使用
  2. 生产者消费者模式: 使用多个条件实现复杂的线程协调
  3. 多条件协调: 处理复杂的多线程协作场景
  4. 超时机制: 避免无限等待的问题
  5. 监控和统计: 监控条件变量的使用情况

通过合理使用Condition,可以实现精确的线程协调和同步。


Java实战要点:

  • Condition提供比Object.wait()/notify()更灵活的线程协调
  • 使用while循环避免虚假唤醒
  • 多个条件可以精确控制不同的等待场景
  • 超时机制避免线程无限等待
  • 合理使用signal()和signalAll()提高性能

代码注解说明:

  • await(): 等待条件满足
  • await(timeout, unit): 超时等待条件
  • signal(): 通知一个等待的线程
  • signalAll(): 通知所有等待的线程
  • awaitNanos(): 纳秒级超时等待
  • 使用while循环避免虚假唤醒