第113集Condition条件变量与线程协调Java实战
|字数总计:4.8k|阅读时长:24分钟|阅读量:
1. Condition概述
Condition是Java并发包中提供的高级线程协调机制,它提供了比Object.wait()和notify()更灵活和强大的线程等待和通知功能。本文将详细介绍Condition的使用方法,包括等待通知机制、生产者消费者模式、线程同步的完整解决方案。
1.1 核心功能
- 等待机制: 线程可以等待特定条件满足
- 通知机制: 可以精确通知等待的线程
- 超时等待: 支持带超时的等待操作
- 中断响应: 支持等待操作的中断
- 多条件: 一个锁可以关联多个条件
1.2 技术架构
1 2 3 4
| 线程A → 等待条件 → Condition.await() → 条件满足 → 继续执行 线程B → 修改条件 → Condition.signal() → 通知线程A → 唤醒等待 ↓ ↓ ↓ ↓ 条件检查 → 等待队列 → 条件变化 → 通知机制
|
2. Maven依赖配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| <project> <modelVersion>4.0.0</modelVersion> <groupId>com.condition</groupId> <artifactId>condition-demo</artifactId> <version>1.0.0</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.0</version> </parent> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.1-jre</version> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project>
|
3. 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| server: port: 8080
spring: application: name: condition-demo
condition: producer-consumer: buffer-size: 10 producer-count: 3 consumer-count: 2 timeout: default-timeout: 5000 max-timeout: 30000 monitor: enabled: true interval: 10000
management: endpoints: web: exposure: include: "*" metrics: export: prometheus: enabled: true
|
4. 基础条件变量服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
| package com.condition.service;
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Service public class BasicConditionService { private final ReentrantLock lock = new ReentrantLock(); private final Condition condition = lock.newCondition();
private boolean ready = false; private int value = 0;
public void waitForCondition() throws InterruptedException { lock.lock(); try { log.info("线程 {} 开始等待条件", Thread.currentThread().getName()); while (!ready) { log.info("线程 {} 等待条件满足", Thread.currentThread().getName()); condition.await(); } log.info("线程 {} 条件满足,继续执行", Thread.currentThread().getName()); } finally { lock.unlock(); } }
public boolean waitForConditionWithTimeout(long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { log.info("线程 {} 开始超时等待条件,超时时间: {} {}", Thread.currentThread().getName(), timeout, unit); boolean result = condition.await(timeout, unit); if (result) { log.info("线程 {} 条件满足", Thread.currentThread().getName()); } else { log.warn("线程 {} 等待超时", Thread.currentThread().getName()); } return result; } finally { lock.unlock(); } }
public void setConditionAndNotify(int newValue) { lock.lock(); try { log.info("线程 {} 设置条件,值: {}", Thread.currentThread().getName(), newValue); value = newValue; ready = true; condition.signal(); log.info("线程 {} 通知等待的线程", Thread.currentThread().getName()); } finally { lock.unlock(); } }
public void notifyAllWaiting() { lock.lock(); try { log.info("线程 {} 通知所有等待的线程", Thread.currentThread().getName()); condition.signalAll(); } finally { lock.unlock(); } }
public void resetCondition() { lock.lock(); try { log.info("线程 {} 重置条件", Thread.currentThread().getName()); ready = false; value = 0; } finally { lock.unlock(); } }
public ConditionStatus getStatus() { lock.lock(); try { return ConditionStatus.builder() .ready(ready) .value(value) .isLocked(lock.isLocked()) .queueLength(lock.getQueueLength()) .build(); } finally { lock.unlock(); } }
@lombok.Data @lombok.Builder public static class ConditionStatus { private boolean ready; private int value; private boolean isLocked; private int queueLength; } }
|
5. 生产者消费者服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
| package com.condition.service;
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Service public class ProducerConsumerService { private final ReentrantLock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); @Value("${condition.producer-consumer.buffer-size:10}") private int bufferSize;
private final Object[] buffer = new Object[bufferSize]; private int count = 0; private int putIndex = 0; private int takeIndex = 0;
public void produce(Object item) throws InterruptedException { lock.lock(); try { log.info("生产者 {} 尝试生产数据: {}", Thread.currentThread().getName(), item); while (count == bufferSize) { log.info("生产者 {} 等待缓冲区空间", Thread.currentThread().getName()); notFull.await(); } buffer[putIndex] = item; putIndex = (putIndex + 1) % bufferSize; count++; log.info("生产者 {} 生产数据成功: {}, 当前缓冲区大小: {}", Thread.currentThread().getName(), item, count); notEmpty.signal(); } finally { lock.unlock(); } }
public Object consume() throws InterruptedException { lock.lock(); try { log.info("消费者 {} 尝试消费数据", Thread.currentThread().getName()); while (count == 0) { log.info("消费者 {} 等待数据", Thread.currentThread().getName()); notEmpty.await(); } Object item = buffer[takeIndex]; buffer[takeIndex] = null; takeIndex = (takeIndex + 1) % bufferSize; count--; log.info("消费者 {} 消费数据成功: {}, 当前缓冲区大小: {}", Thread.currentThread().getName(), item, count); notFull.signal(); return item; } finally { lock.unlock(); } }
public boolean tryProduce(Object item, long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { log.info("生产者 {} 尝试超时生产数据: {}", Thread.currentThread().getName(), item); if (!notFull.await(timeout, unit)) { log.warn("生产者 {} 等待缓冲区空间超时", Thread.currentThread().getName()); return false; } buffer[putIndex] = item; putIndex = (putIndex + 1) % bufferSize; count++; log.info("生产者 {} 超时生产数据成功: {}, 当前缓冲区大小: {}", Thread.currentThread().getName(), item, count); notEmpty.signal(); return true; } finally { lock.unlock(); } }
public Object tryConsume(long timeout, TimeUnit unit) throws InterruptedException { lock.lock(); try { log.info("消费者 {} 尝试超时消费数据", Thread.currentThread().getName()); if (!notEmpty.await(timeout, unit)) { log.warn("消费者 {} 等待数据超时", Thread.currentThread().getName()); return null; } Object item = buffer[takeIndex]; buffer[takeIndex] = null; takeIndex = (takeIndex + 1) % bufferSize; count--; log.info("消费者 {} 超时消费数据成功: {}, 当前缓冲区大小: {}", Thread.currentThread().getName(), item, count); notFull.signal(); return item; } finally { lock.unlock(); } }
public BufferStatus getBufferStatus() { lock.lock(); try { return BufferStatus.builder() .count(count) .capacity(bufferSize) .putIndex(putIndex) .takeIndex(takeIndex) .isFull(count == bufferSize) .isEmpty(count == 0) .build(); } finally { lock.unlock(); } }
public void clearBuffer() { lock.lock(); try { log.info("清空缓冲区"); for (int i = 0; i < bufferSize; i++) { buffer[i] = null; } count = 0; putIndex = 0; takeIndex = 0; notFull.signalAll(); } finally { lock.unlock(); } }
@lombok.Data @lombok.Builder public static class BufferStatus { private int count; private int capacity; private int putIndex; private int takeIndex; private boolean isFull; private boolean isEmpty; } }
|
6. 多条件协调服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
| package com.condition.service;
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Service public class MultiConditionService { private final ReentrantLock lock = new ReentrantLock(); private final Condition dataReady = lock.newCondition(); private final Condition processingComplete = lock.newCondition(); private final Condition errorOccurred = lock.newCondition();
private boolean dataReadyFlag = false; private boolean processingCompleteFlag = false; private boolean errorOccurredFlag = false;
private String data = null; private String result = null; private String error = null;
public void prepareData(String newData) throws InterruptedException { lock.lock(); try { log.info("数据准备线程 {} 开始准备数据: {}", Thread.currentThread().getName(), newData); Thread.sleep(1000); data = newData; dataReadyFlag = true; log.info("数据准备完成: {}", data); dataReady.signal(); } finally { lock.unlock(); } }
public void processData() throws InterruptedException { lock.lock(); try { log.info("数据处理线程 {} 等待数据准备", Thread.currentThread().getName()); while (!dataReadyFlag) { dataReady.await(); } log.info("数据处理线程 {} 开始处理数据: {}", Thread.currentThread().getName(), data); try { Thread.sleep(2000); result = "处理结果: " + data.toUpperCase(); processingCompleteFlag = true; log.info("数据处理完成: {}", result); processingComplete.signal(); } catch (Exception e) { log.error("数据处理出错", e); error = "处理错误: " + e.getMessage(); errorOccurredFlag = true; errorOccurred.signal(); } } finally { lock.unlock(); } }
public String getResult() throws InterruptedException { lock.lock(); try { log.info("结果获取线程 {} 等待处理完成", Thread.currentThread().getName()); while (!processingCompleteFlag && !errorOccurredFlag) { long remainingNanos = processingComplete.awaitNanos(TimeUnit.SECONDS.toNanos(5)); if (remainingNanos <= 0) { log.warn("等待处理完成超时"); return "等待超时"; } } if (errorOccurredFlag) { log.error("获取到错误结果: {}", error); return error; } else { log.info("获取到处理结果: {}", result); return result; } } finally { lock.unlock(); } }
public String handleError() throws InterruptedException { lock.lock(); try { log.info("错误处理线程 {} 等待错误发生", Thread.currentThread().getName()); while (!errorOccurredFlag) { errorOccurred.await(); } log.info("错误处理线程 {} 处理错误: {}", Thread.currentThread().getName(), error); Thread.sleep(1000); String errorHandled = "错误已处理: " + error; errorOccurredFlag = false; error = null; return errorHandled; } finally { lock.unlock(); } }
public void resetAll() { lock.lock(); try { log.info("重置所有状态"); dataReadyFlag = false; processingCompleteFlag = false; errorOccurredFlag = false; data = null; result = null; error = null; dataReady.signalAll(); processingComplete.signalAll(); errorOccurred.signalAll(); } finally { lock.unlock(); } }
public MultiConditionStatus getAllStatus() { lock.lock(); try { return MultiConditionStatus.builder() .dataReady(dataReadyFlag) .processingComplete(processingCompleteFlag) .errorOccurred(errorOccurredFlag) .data(data) .result(result) .error(error) .build(); } finally { lock.unlock(); } }
@lombok.Data @lombok.Builder public static class MultiConditionStatus { private boolean dataReady; private boolean processingComplete; private boolean errorOccurred; private String data; private String result; private String error; } }
|
7. 条件监控服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
| package com.condition.service;
import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Service public class ConditionMonitorService { private final ReentrantLock monitorLock = new ReentrantLock();
private long awaitCount = 0; private long signalCount = 0; private long signalAllCount = 0; private long timeoutCount = 0;
@Scheduled(fixedRate = 10000) public void monitorConditionUsage() { log.info("=== 条件变量监控报告 ==="); monitorLock.lock(); try { log.info("等待操作次数: {}", awaitCount); log.info("信号通知次数: {}", signalCount); log.info("广播通知次数: {}", signalAllCount); log.info("超时次数: {}", timeoutCount); long totalOperations = awaitCount + signalCount + signalAllCount; if (totalOperations > 0) { double successRate = (double) (signalCount + signalAllCount) / totalOperations * 100; log.info("通知成功率: {:.2f}%", successRate); } } finally { monitorLock.unlock(); } log.info("=== 监控报告结束 ==="); }
public void recordAwait() { monitorLock.lock(); try { awaitCount++; } finally { monitorLock.unlock(); } }
public void recordSignal() { monitorLock.lock(); try { signalCount++; } finally { monitorLock.unlock(); } }
public void recordSignalAll() { monitorLock.lock(); try { signalAllCount++; } finally { monitorLock.unlock(); } }
public void recordTimeout() { monitorLock.lock(); try { timeoutCount++; } finally { monitorLock.unlock(); } }
public ConditionStatistics getStatistics() { monitorLock.lock(); try { return ConditionStatistics.builder() .awaitCount(awaitCount) .signalCount(signalCount) .signalAllCount(signalAllCount) .timeoutCount(timeoutCount) .build(); } finally { monitorLock.unlock(); } }
@lombok.Data @lombok.Builder public static class ConditionStatistics { private long awaitCount; private long signalCount; private long signalAllCount; private long timeoutCount; } }
|
8. 控制器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
| package com.condition.controller;
import com.condition.service.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*;
import java.util.concurrent.TimeUnit;
@Slf4j @RestController @RequestMapping("/api/condition") public class ConditionController { @Autowired private BasicConditionService basicConditionService; @Autowired private ProducerConsumerService producerConsumerService; @Autowired private MultiConditionService multiConditionService; @Autowired private ConditionMonitorService conditionMonitorService;
@PostMapping("/wait") public ResponseEntity<String> waitForCondition() { try { basicConditionService.waitForCondition(); return ResponseEntity.ok("条件等待成功"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("条件等待被中断", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/wait-timeout") public ResponseEntity<Boolean> waitForConditionWithTimeout(@RequestParam long timeout) { try { boolean result = basicConditionService.waitForConditionWithTimeout(timeout, TimeUnit.MILLISECONDS); return ResponseEntity.ok(result); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("超时等待被中断", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/set-notify") public ResponseEntity<String> setConditionAndNotify(@RequestParam int value) { try { basicConditionService.setConditionAndNotify(value); return ResponseEntity.ok("条件设置并通知成功"); } catch (Exception e) { log.error("设置条件失败", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/notify-all") public ResponseEntity<String> notifyAllWaiting() { try { basicConditionService.notifyAllWaiting(); return ResponseEntity.ok("通知所有等待线程成功"); } catch (Exception e) { log.error("通知所有线程失败", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/produce") public ResponseEntity<String> produceData(@RequestParam String data) { try { producerConsumerService.produce(data); return ResponseEntity.ok("数据生产成功: " + data); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("数据生产被中断", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/consume") public ResponseEntity<Object> consumeData() { try { Object data = producerConsumerService.consume(); return ResponseEntity.ok(data); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("数据消费被中断", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/try-produce") public ResponseEntity<Boolean> tryProduceData(@RequestParam String data, @RequestParam long timeout) { try { boolean success = producerConsumerService.tryProduce(data, timeout, TimeUnit.MILLISECONDS); return ResponseEntity.ok(success); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("超时生产被中断", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/try-consume") public ResponseEntity<Object> tryConsumeData(@RequestParam long timeout) { try { Object data = producerConsumerService.tryConsume(timeout, TimeUnit.MILLISECONDS); return ResponseEntity.ok(data); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("超时消费被中断", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/prepare-data") public ResponseEntity<String> prepareData(@RequestParam String data) { try { multiConditionService.prepareData(data); return ResponseEntity.ok("数据准备成功: " + data); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("数据准备被中断", e); return ResponseEntity.internalServerError().build(); } }
@PostMapping("/process-data") public ResponseEntity<String> processData() { try { multiConditionService.processData(); return ResponseEntity.ok("数据处理成功"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("数据处理被中断", e); return ResponseEntity.internalServerError().build(); } }
@GetMapping("/result") public ResponseEntity<String> getResult() { try { String result = multiConditionService.getResult(); return ResponseEntity.ok(result); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("获取结果被中断", e); return ResponseEntity.internalServerError().build(); } }
@GetMapping("/basic-status") public ResponseEntity<BasicConditionService.ConditionStatus> getBasicStatus() { try { BasicConditionService.ConditionStatus status = basicConditionService.getStatus(); return ResponseEntity.ok(status); } catch (Exception e) { log.error("获取基础状态失败", e); return ResponseEntity.internalServerError().build(); } }
@GetMapping("/buffer-status") public ResponseEntity<ProducerConsumerService.BufferStatus> getBufferStatus() { try { ProducerConsumerService.BufferStatus status = producerConsumerService.getBufferStatus(); return ResponseEntity.ok(status); } catch (Exception e) { log.error("获取缓冲区状态失败", e); return ResponseEntity.internalServerError().build(); } }
@GetMapping("/multi-status") public ResponseEntity<MultiConditionService.MultiConditionStatus> getMultiStatus() { try { MultiConditionService.MultiConditionStatus status = multiConditionService.getAllStatus(); return ResponseEntity.ok(status); } catch (Exception e) { log.error("获取多条件状态失败", e); return ResponseEntity.internalServerError().build(); } }
@GetMapping("/statistics") public ResponseEntity<ConditionMonitorService.ConditionStatistics> getStatistics() { try { ConditionMonitorService.ConditionStatistics statistics = conditionMonitorService.getStatistics(); return ResponseEntity.ok(statistics); } catch (Exception e) { log.error("获取监控统计失败", e); return ResponseEntity.internalServerError().build(); } } }
|
9. 测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
| package com.condition.service;
import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;
@Slf4j @SpringBootTest public class ConditionTest { @Autowired private BasicConditionService basicConditionService; @Autowired private ProducerConsumerService producerConsumerService; @Autowired private MultiConditionService multiConditionService;
@Test public void testBasicCondition() throws InterruptedException { log.info("开始测试基础条件变量"); ExecutorService executor = Executors.newFixedThreadPool(2); CountDownLatch latch = new CountDownLatch(2); executor.submit(() -> { try { basicConditionService.waitForCondition(); log.info("等待线程被唤醒"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }); executor.submit(() -> { try { Thread.sleep(1000); basicConditionService.setConditionAndNotify(100); log.info("通知线程完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }); latch.await(30, TimeUnit.SECONDS); executor.shutdown(); log.info("基础条件变量测试完成"); }
@Test public void testProducerConsumer() throws InterruptedException { log.info("开始测试生产者消费者"); ExecutorService executor = Executors.newFixedThreadPool(4); CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 5; i++) { final int index = i; executor.submit(() -> { try { producerConsumerService.produce("数据" + index); log.info("生产数据: 数据{}", index); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }); } for (int i = 0; i < 5; i++) { executor.submit(() -> { try { Object data = producerConsumerService.consume(); log.info("消费数据: {}", data); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }); } latch.await(30, TimeUnit.SECONDS); executor.shutdown(); log.info("生产者消费者测试完成"); }
@Test public void testMultiCondition() throws InterruptedException { log.info("开始测试多条件协调"); ExecutorService executor = Executors.newFixedThreadPool(4); CountDownLatch latch = new CountDownLatch(4); executor.submit(() -> { try { multiConditionService.prepareData("测试数据"); log.info("数据准备完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }); executor.submit(() -> { try { multiConditionService.processData(); log.info("数据处理完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }); executor.submit(() -> { try { String result = multiConditionService.getResult(); log.info("获取结果: {}", result); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }); executor.submit(() -> { try { String errorHandled = multiConditionService.handleError(); log.info("错误处理: {}", errorHandled); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }); latch.await(30, TimeUnit.SECONDS); executor.shutdown(); log.info("多条件协调测试完成"); } }
|
10. 总结
Condition是Java并发编程中强大的线程协调工具。通过本文的详细介绍,我们了解了:
- 基础条件变量: 等待和通知机制的基本使用
- 生产者消费者模式: 使用多个条件实现复杂的线程协调
- 多条件协调: 处理复杂的多线程协作场景
- 超时机制: 避免无限等待的问题
- 监控和统计: 监控条件变量的使用情况
通过合理使用Condition,可以实现精确的线程协调和同步。
Java实战要点:
- Condition提供比Object.wait()/notify()更灵活的线程协调
- 使用while循环避免虚假唤醒
- 多个条件可以精确控制不同的等待场景
- 超时机制避免线程无限等待
- 合理使用signal()和signalAll()提高性能
代码注解说明:
- await(): 等待条件满足
- await(timeout, unit): 超时等待条件
- signal(): 通知一个等待的线程
- signalAll(): 通知所有等待的线程
- awaitNanos(): 纳秒级超时等待
- 使用while循环避免虚假唤醒