Java线程全面解析:从基础到架构实战

1. 概述

1.1 Java线程的重要性

Java线程是Java并发编程的核心,理解线程机制对于开发高性能、高并发的应用程序至关重要。从Java 1.0到Java最新版本,线程API和并发框架经历了巨大的发展和变化。

线程的价值

  • 性能提升:充分利用多核CPU,提高程序执行效率
  • 响应性:避免阻塞,提高用户体验
  • 资源利用:合理利用系统资源
  • 并发处理:处理大量并发请求

1.2 Java线程发展历程

Java版本 主要特性 时间
Java 1.0 Thread、Runnable 1996
Java 1.2 synchronized优化 1998
Java 1.5 java.util.concurrent包 2004
Java 1.6 并发性能优化 2006
Java 1.7 ForkJoinPool 2011
Java 1.8 CompletableFuture、Stream并行 2014
Java 9+ 模块化、响应式编程 2017+

1.3 本文内容结构

本文将从以下几个方面全面解析Java线程:

  1. 线程基础:线程概念、生命周期、创建方式
  2. 线程同步:synchronized、Lock、volatile
  3. 线程通信:wait/notify、Condition
  4. 并发工具类:CountDownLatch、CyclicBarrier等
  5. 线程池:ThreadPoolExecutor、ForkJoinPool
  6. JUC框架:java.util.concurrent包详解
  7. 底层源码:JVM线程模型、源码分析
  8. 架构实战:高并发场景下的线程应用

2. 线程基础

2.1 什么是线程

2.1.1 进程与线程

进程(Process)

  • 操作系统资源分配的基本单位
  • 每个进程有独立的地址空间
  • 进程间通信需要特殊机制(IPC)

线程(Thread)

  • CPU调度的基本单位
  • 同一进程内的线程共享内存空间
  • 线程间通信更简单、高效

关系

  • 一个进程可以包含多个线程
  • 线程是进程内的执行单元
  • 线程共享进程的资源

2.1.2 Java线程模型

Java线程与操作系统线程

  • Java线程是JVM层面的抽象
  • 在Linux/Windows上,Java线程映射到操作系统线程(1:1模型)
  • JVM负责线程的创建、调度和管理

2.2 线程生命周期

2.2.1 线程状态

Java线程的6种状态(Thread.State枚举):

1
2
3
4
5
6
7
8
public enum State {
NEW, // 新建
RUNNABLE, // 可运行
BLOCKED, // 阻塞
WAITING, // 等待
TIMED_WAITING, // 定时等待
TERMINATED // 终止
}

状态转换图

1
2
3
4
5
6
7
8
9
10
11
12
13
NEW
↓ start()
RUNNABLE
↓ wait() / join() / LockSupport.park()
WAITING / TIMED_WAITING
↓ notify() / notifyAll() / unpark()
RUNNABLE
↓ synchronized / I/O阻塞
BLOCKED
↓ 获取锁 / I/O完成
RUNNABLE
↓ run()结束
TERMINATED

2.2.2 状态说明

NEW(新建)

  • 线程对象已创建,但尚未调用start()方法

RUNNABLE(可运行)

  • 线程正在JVM中执行
  • 可能正在等待CPU时间片

BLOCKED(阻塞)

  • 线程等待获取监视器锁(synchronized)
  • 等待进入同步代码块

WAITING(等待)

  • 无限期等待其他线程执行特定操作
  • 调用wait()、join()、LockSupport.park()

TIMED_WAITING(定时等待)

  • 在指定时间内等待
  • 调用sleep()、wait(timeout)、join(timeout)

TERMINATED(终止)

  • 线程执行完毕或异常退出

2.3 线程创建方式

2.3.1 方式一:继承Thread类

Java 1.0开始支持

1
2
3
4
5
6
7
8
9
10
11
// 继承Thread类
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("线程执行: " + Thread.currentThread().getName());
}
}

// 使用
MyThread thread = new MyThread();
thread.start(); // 启动线程

特点

  • 简单直接
  • 单继承限制
  • 不推荐使用(违反面向对象设计原则)

2.3.2 方式二:实现Runnable接口

Java 1.0开始支持

1
2
3
4
5
6
7
8
9
10
11
// 实现Runnable接口
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("线程执行: " + Thread.currentThread().getName());
}
}

// 使用
Thread thread = new Thread(new MyRunnable());
thread.start();

Lambda表达式(Java 8+)

1
2
3
4
5
// 使用Lambda表达式
Thread thread = new Thread(() -> {
System.out.println("线程执行: " + Thread.currentThread().getName());
});
thread.start();

特点

  • 推荐方式
  • 实现接口,更灵活
  • 可以共享Runnable实例

2.3.3 方式三:实现Callable接口

Java 1.5开始支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

// 实现Callable接口
public class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
return "线程执行结果: " + Thread.currentThread().getName();
}
}

// 使用
Callable<String> callable = new MyCallable();
FutureTask<String> futureTask = new FutureTask<>(callable);
Thread thread = new Thread(futureTask);
thread.start();

// 获取结果
String result = futureTask.get(); // 阻塞等待结果
System.out.println(result);

特点

  • 可以有返回值
  • 可以抛出异常
  • 需要配合Future使用

2.3.4 方式四:使用线程池

Java 1.5开始支持

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(10);

// 提交任务
executor.submit(() -> {
System.out.println("线程执行: " + Thread.currentThread().getName());
});

// 关闭线程池
executor.shutdown();

特点

  • 推荐的生产环境方式
  • 线程复用,性能好
  • 便于管理和监控

2.3.5 方式五:使用CompletableFuture(Java 8+)

1
2
3
4
5
6
7
8
9
import java.util.concurrent.CompletableFuture;

// 异步执行
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "线程执行结果: " + Thread.currentThread().getName();
});

// 获取结果
future.thenAccept(result -> System.out.println(result));

3. 线程同步

3.1 synchronized关键字

3.1.1 synchronized的演变

Java 1.0

  • 重量级锁,性能较差
  • 需要操作系统互斥量支持

Java 1.2

  • 优化了synchronized性能
  • 引入偏向锁、轻量级锁

**Java 1.6+**:

  • 锁升级机制:无锁 → 偏向锁 → 轻量级锁 → 重量级锁
  • 大幅提升性能

3.1.2 synchronized使用方式

方式1:同步方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Counter {
private int count = 0;

// 同步实例方法
public synchronized void increment() {
count++;
}

// 同步静态方法
public static synchronized void staticMethod() {
// ...
}
}

方式2:同步代码块

1
2
3
4
5
6
7
8
9
10
public class Counter {
private int count = 0;
private final Object lock = new Object();

public void increment() {
synchronized (lock) {
count++;
}
}
}

3.1.3 synchronized底层原理

对象头结构(64位JVM):

1
| Mark Word (64 bits) | Klass Word (64 bits) |

Mark Word结构(不同锁状态):

1
2
3
4
5
6
7
8
9
10
11
无锁状态:
| unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 |

偏向锁:
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 |

轻量级锁:
| ptr_to_lock_record:62 | lock:2 |

重量级锁:
| ptr_to_monitor:62 | lock:2 |

锁升级过程

  1. 无锁:初始状态
  2. 偏向锁:第一个线程访问,CAS设置线程ID
  3. 轻量级锁:有竞争,CAS自旋
  4. 重量级锁:竞争激烈,升级为重量级锁

3.2 Lock接口

3.2.1 ReentrantLock

Java 1.5引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.util.concurrent.locks.ReentrantLock;

public class Counter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();

public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
}

ReentrantLock vs synchronized

特性 synchronized ReentrantLock
锁类型 JVM内置锁 API锁
自动释放 否(需手动unlock)
可中断
公平锁 是(可配置)
条件变量 wait/notify Condition
性能 Java 1.6+优化后性能相当 性能相当

3.2.2 公平锁 vs 非公平锁

1
2
3
4
5
// 非公平锁(默认)
ReentrantLock lock = new ReentrantLock();

// 公平锁
ReentrantLock fairLock = new ReentrantLock(true);

公平锁

  • 按照线程等待时间分配锁
  • 性能略低,但更公平

非公平锁

  • 允许插队,性能更好
  • 可能导致线程饥饿

3.2.3 可中断锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import java.util.concurrent.locks.Lock;

Lock lock = new ReentrantLock();

try {
// 可中断的获取锁
lock.lockInterruptibly();
try {
// 业务逻辑
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
// 处理中断
Thread.currentThread().interrupt();
}

3.3 volatile关键字

3.3.1 volatile的作用

volatile保证

  1. 可见性:修改立即对其他线程可见
  2. 有序性:禁止指令重排序

不保证

  • 原子性(需要synchronized或Lock)

3.3.2 volatile使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
public class VolatileExample {
private volatile boolean flag = false;

public void writer() {
flag = true; // 写操作
}

public void reader() {
if (flag) { // 读操作
// ...
}
}
}

3.3.3 volatile底层原理

内存屏障

  • 写操作:StoreStore + StoreLoad
  • 读操作:LoadLoad + LoadStore

CPU缓存一致性协议(MESI)

  • Modified(修改)
  • Exclusive(独占)
  • Shared(共享)
  • Invalid(无效)

4. 线程通信

4.1 wait/notify机制

4.1.1 wait/notify使用

Java 1.0开始支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class WaitNotifyExample {
private final Object lock = new Object();
private boolean condition = false;

public void waitMethod() throws InterruptedException {
synchronized (lock) {
while (!condition) {
lock.wait(); // 等待
}
// 条件满足,继续执行
}
}

public void notifyMethod() {
synchronized (lock) {
condition = true;
lock.notify(); // 唤醒一个等待线程
// 或
lock.notifyAll(); // 唤醒所有等待线程
}
}
}

注意事项

  • 必须在synchronized块中使用
  • wait()会释放锁
  • notify()不会释放锁,需要退出synchronized块

4.1.2 生产者消费者示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumer {
private final Queue<Integer> queue = new LinkedList<>();
private final int CAPACITY = 10;
private final Object lock = new Object();

public void produce(int item) throws InterruptedException {
synchronized (lock) {
while (queue.size() == CAPACITY) {
lock.wait(); // 队列满,等待
}
queue.offer(item);
lock.notifyAll(); // 通知消费者
}
}

public int consume() throws InterruptedException {
synchronized (lock) {
while (queue.isEmpty()) {
lock.wait(); // 队列空,等待
}
int item = queue.poll();
lock.notifyAll(); // 通知生产者
return item;
}
}
}

4.2 Condition接口

4.2.1 Condition使用

Java 1.5引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private boolean flag = false;

public void awaitMethod() throws InterruptedException {
lock.lock();
try {
while (!flag) {
condition.await(); // 等待
}
} finally {
lock.unlock();
}
}

public void signalMethod() {
lock.lock();
try {
flag = true;
condition.signal(); // 唤醒一个
// 或
condition.signalAll(); // 唤醒所有
} finally {
lock.unlock();
}
}
}

Condition vs wait/notify

特性 wait/notify Condition
锁类型 synchronized Lock
多个条件 不支持 支持(多个Condition)
可中断
超时等待 是(更灵活)

4.2.2 多条件示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class BoundedBuffer {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
private final Object[] items = new Object[100];
private int putptr, takeptr, count;

public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await(); // 队列满,等待
}
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 通知消费者
} finally {
lock.unlock();
}
}

public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 队列空,等待
}
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 通知生产者
return x;
} finally {
lock.unlock();
}
}
}

5. 并发工具类

5.1 CountDownLatch

5.1.1 CountDownLatch使用

Java 1.5引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.CountDownLatch;

public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount);

for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
// 执行任务
System.out.println("线程执行: " + Thread.currentThread().getName());
} finally {
latch.countDown(); // 计数减1
}
}).start();
}

latch.await(); // 等待所有线程完成
System.out.println("所有线程执行完毕");
}
}

应用场景

  • 等待多个线程完成后再继续
  • 主线程等待子线程初始化完成

5.2 CyclicBarrier

5.2.1 CyclicBarrier使用

Java 1.5引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierExample {
public static void main(String[] args) {
int threadCount = 5;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程到达屏障");
});

for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
try {
System.out.println("线程到达屏障: " + Thread.currentThread().getName());
barrier.await(); // 等待其他线程
System.out.println("线程继续执行: " + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}

CyclicBarrier vs CountDownLatch

特性 CountDownLatch CyclicBarrier
计数 只能使用一次 可以重复使用
等待 一个或多个线程等待 多个线程相互等待
用途 等待事件 同步多个线程

5.3 Semaphore

5.3.1 Semaphore使用

Java 1.5引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.Semaphore;

public class SemaphoreExample {
public static void main(String[] args) {
int permits = 3; // 允许3个线程同时访问
Semaphore semaphore = new Semaphore(permits);

for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire(); // 获取许可
System.out.println("线程执行: " + Thread.currentThread().getName());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}

应用场景

  • 控制并发访问数量
  • 限流

5.4 Exchanger

5.4.1 Exchanger使用

Java 1.5引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.Exchanger;

public class ExchangerExample {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
try {
String data = "Data from Thread 1";
String received = exchanger.exchange(data);
System.out.println("Thread 1 received: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
String data = "Data from Thread 2";
String received = exchanger.exchange(data);
System.out.println("Thread 2 received: " + received);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

应用场景

  • 两个线程交换数据
  • 管道通信

6. 线程池

6.1 为什么需要线程池

6.1.1 线程创建的开销

问题

  • 线程创建和销毁开销大
  • 线程数量过多会导致资源耗尽
  • 难以管理和监控

解决方案

  • 线程池:复用线程,控制线程数量

6.2 ThreadPoolExecutor

6.2.1 核心参数

Java 1.5引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, // corePoolSize: 核心线程数
10, // maximumPoolSize: 最大线程数
60L, // keepAliveTime: 空闲线程存活时间
TimeUnit.SECONDS, // unit: 时间单位
new LinkedBlockingQueue<>(), // workQueue: 工作队列
new ThreadFactory() { // threadFactory: 线程工厂
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("MyThread-" + t.getId());
return t;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // handler: 拒绝策略
);

参数说明

参数 说明 示例值
corePoolSize 核心线程数 5
maximumPoolSize 最大线程数 10
keepAliveTime 空闲线程存活时间 60秒
workQueue 工作队列 LinkedBlockingQueue
threadFactory 线程工厂 自定义ThreadFactory
handler 拒绝策略 CallerRunsPolicy

6.2.2 线程池执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
提交任务

核心线程是否已满?
↓ 否
创建核心线程执行
↓ 是
工作队列是否已满?
↓ 否
加入工作队列
↓ 是
线程数是否达到最大值?
↓ 否
创建非核心线程执行
↓ 是
执行拒绝策略

6.2.3 拒绝策略

Java提供的4种拒绝策略

1
2
3
4
5
6
7
8
9
10
11
// 1. AbortPolicy(默认):抛出异常
new ThreadPoolExecutor.AbortPolicy()

// 2. CallerRunsPolicy:调用者运行
new ThreadPoolExecutor.CallerRunsPolicy()

// 3. DiscardPolicy:直接丢弃
new ThreadPoolExecutor.DiscardPolicy()

// 4. DiscardOldestPolicy:丢弃最老的任务
new ThreadPoolExecutor.DiscardOldestPolicy()

自定义拒绝策略

1
2
3
4
5
6
7
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 自定义处理逻辑
System.out.println("任务被拒绝: " + r);
}
}

6.3 Executors工具类

6.3.1 常用线程池

Java 1.5引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

// 1. 固定大小线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(10);

// 2. 单线程线程池
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 3. 缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 4. 定时任务线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);

不推荐使用Executors的原因

  • newFixedThreadPoolnewSingleThreadExecutor:使用无界队列,可能导致OOM
  • newCachedThreadPool:最大线程数为Integer.MAX_VALUE,可能导致线程过多
  • 推荐手动创建ThreadPoolExecutor

6.4 ForkJoinPool

6.4.1 ForkJoinPool使用

Java 1.7引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinExample {
static class SumTask extends RecursiveTask<Long> {
private final int[] array;
private final int start;
private final int end;
private static final int THRESHOLD = 100;

public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 直接计算
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// 分而治之
int mid = (start + end) / 2;
SumTask left = new SumTask(array, start, mid);
SumTask right = new SumTask(array, mid, end);
left.fork(); // 异步执行
long rightResult = right.compute(); // 同步执行
long leftResult = left.join(); // 等待结果
return leftResult + rightResult;
}
}
}

public static void main(String[] args) {
int[] array = new int[1000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}

ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(array, 0, array.length);
long result = pool.invoke(task);
System.out.println("结果: " + result);
}
}

应用场景

  • 分而治之的算法
  • 并行计算
  • 大数据处理

7. JUC框架详解

7.1 java.util.concurrent包结构

7.1.1 主要组件

并发集合

  • ConcurrentHashMap:线程安全的HashMap
  • ConcurrentLinkedQueue:线程安全的队列
  • CopyOnWriteArrayList:写时复制的List

同步工具

  • CountDownLatch:倒计时门闩
  • CyclicBarrier:循环屏障
  • Semaphore:信号量
  • Exchanger:交换器

执行器框架

  • Executor:执行器接口
  • ExecutorService:执行器服务
  • ThreadPoolExecutor:线程池执行器
  • ForkJoinPool:分叉连接池

Future框架

  • Future:异步计算结果
  • FutureTask:可取消的异步任务
  • CompletableFuture:可完成的Future(Java 8)

锁框架

  • Lock:锁接口
  • ReentrantLock:可重入锁
  • ReadWriteLock:读写锁
  • StampedLock:戳记锁(Java 8)

7.2 ConcurrentHashMap

7.2.1 ConcurrentHashMap演变

Java 1.5

  • 分段锁(Segment)
  • 16个Segment,每个Segment独立锁

Java 1.8

  • 取消分段锁
  • 使用CAS + synchronized
  • 性能大幅提升

7.2.2 ConcurrentHashMap使用

1
2
3
4
5
6
7
8
9
10
11
12
import java.util.concurrent.ConcurrentHashMap;

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();

// 线程安全的操作
map.put("key1", "value1");
String value = map.get("key1");
map.remove("key1");

// 原子操作
map.putIfAbsent("key2", "value2");
map.replace("key2", "value2", "newValue");

7.2.3 ConcurrentHashMap源码分析

Java 1.8实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 关键字段
transient volatile Node<K,V>[] table; // 哈希表
private transient volatile int sizeCtl; // 控制标识符

// put操作(简化版)
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 1. 计算hash值
int hash = spread(key.hashCode());

// 2. 循环插入
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 3. CAS插入
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
} else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 协助扩容
else {
// 4. synchronized锁住链表头节点
synchronized (f) {
// 插入或更新
}
}
}
}

7.3 CompletableFuture

7.3.1 CompletableFuture使用

Java 8引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

// 创建CompletableFuture
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});

// 链式调用
CompletableFuture<String> result = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase);

// 组合多个Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);

// 异常处理
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("Error");
return "Success";
})
.exceptionally(ex -> "Error: " + ex.getMessage());

应用场景

  • 异步编程
  • 响应式编程
  • 组合多个异步操作

8. 底层源码分析

8.1 JVM线程模型

8.1.1 线程实现方式

1:1模型(Java采用)

  • 一个Java线程对应一个操作系统线程
  • 优点:简单,性能好
  • 缺点:线程创建受限于操作系统

N:M模型

  • N个Java线程映射到M个操作系统线程
  • 优点:可以创建大量线程
  • 缺点:实现复杂

混合模型

  • 结合1:1和N:M的优点

8.1.2 线程栈

线程栈结构

1
2
3
4
5
6
7
8
9
高地址

局部变量

方法参数

返回地址

低地址

栈大小

  • 默认:1MB(Linux x64)
  • 可通过-Xss参数调整

8.2 Thread源码分析

8.2.1 Thread类结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class Thread implements Runnable {
// 线程名称
private volatile String name;

// 线程优先级
private int priority;

// 线程状态
private volatile int threadStatus = 0;

// 线程组
private ThreadGroup group;

// 目标Runnable
private Runnable target;

// 线程本地变量
ThreadLocal.ThreadLocalMap threadLocals;

// 启动线程
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();

group.add(this);
boolean started = false;
try {
start0(); // 本地方法,创建操作系统线程
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
}
}
}

private native void start0(); // JNI调用
}

8.2.2 start()方法分析

关键步骤

  1. 检查线程状态
  2. 添加到线程组
  3. 调用本地方法start0()创建操作系统线程
  4. 设置启动标志

**start() vs run()**:

  • start():创建新线程执行
  • run():在当前线程执行

8.3 synchronized底层实现

8.3.1 对象头分析

对象内存布局

1
2
3
4
5
6
7
8
9
对象头 (Header)

Mark Word (64 bits)

Klass Word (64 bits)

实例数据 (Instance Data)

对齐填充 (Padding)

Mark Word在不同锁状态下的结构

1
2
3
4
5
6
7
8
9
10
11
// 无锁状态
| unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 |

// 偏向锁
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 |

// 轻量级锁
| ptr_to_lock_record:62 | lock:2 |

// 重量级锁
| ptr_to_monitor:62 | lock:2 |

8.3.2 锁升级过程

1. 偏向锁

1
2
3
4
5
// 第一次访问,CAS设置线程ID
if (mark == 无锁状态) {
CAS(mark, threadId);
return 偏向锁;
}

2. 轻量级锁

1
2
3
4
5
6
// 有竞争,CAS自旋
if (mark == 偏向锁 && threadId != currentThread) {
// 撤销偏向锁
// CAS获取轻量级锁
return 轻量级锁;
}

3. 重量级锁

1
2
3
4
5
6
// 竞争激烈,升级为重量级锁
if (自旋失败) {
// 创建Monitor对象
// 线程进入等待队列
return 重量级锁;
}

8.4 AQS(AbstractQueuedSynchronizer)

8.4.1 AQS概述

AQS

  • Java并发包的基础框架
  • 提供了锁和同步器的实现基础
  • ReentrantLock、Semaphore等都基于AQS

8.4.2 AQS核心原理

CLH队列

  • 双向链表实现的等待队列
  • 每个节点代表一个等待的线程

关键方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public abstract class AbstractQueuedSynchronizer {
// 尝试获取锁(子类实现)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

// 获取锁(模板方法)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 释放锁(模板方法)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}

9. 架构实战

9.1 高并发场景设计

9.1.1 线程池配置

CPU密集型任务

1
2
3
4
5
6
7
8
9
10
// 线程数 = CPU核心数 + 1
int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
corePoolSize,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);

IO密集型任务

1
2
3
4
5
6
7
8
9
10
// 线程数 = CPU核心数 × 2
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
corePoolSize * 2,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);

9.1.2 线程池监控

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadPoolMonitor {
private final ThreadPoolExecutor executor;

public void monitor() {
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
System.out.println("队列大小: " + executor.getQueue().size());
}
}

9.2 实战案例

9.2.1 异步任务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AsyncTaskProcessor {
private final ExecutorService executor = Executors.newFixedThreadPool(10);

public CompletableFuture<String> processAsync(String data) {
return CompletableFuture.supplyAsync(() -> {
// 异步处理
return processData(data);
}, executor);
}

private String processData(String data) {
// 业务逻辑
return "Processed: " + data;
}

public void shutdown() {
executor.shutdown();
}
}

9.2.2 并发计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.concurrent.atomic.AtomicLong;

public class ConcurrentCounter {
private final AtomicLong count = new AtomicLong(0);

public void increment() {
count.incrementAndGet();
}

public long get() {
return count.get();
}
}

10. 总结

10.1 核心要点

  1. 线程基础:线程生命周期、创建方式
  2. 线程同步:synchronized、Lock、volatile
  3. 线程通信:wait/notify、Condition
  4. 并发工具:CountDownLatch、CyclicBarrier等
  5. 线程池:ThreadPoolExecutor、ForkJoinPool
  6. JUC框架:ConcurrentHashMap、CompletableFuture等
  7. 底层原理:JVM线程模型、锁机制、AQS

10.2 架构师建议

  1. 线程池使用

    • 生产环境使用ThreadPoolExecutor
    • 根据任务类型配置线程数
    • 实现线程池监控
  2. 并发安全

    • 优先使用JUC并发集合
    • 合理使用锁,避免死锁
    • 注意可见性和有序性
  3. 性能优化

    • 减少锁竞争
    • 使用无锁数据结构
    • 合理使用线程池

10.3 最佳实践

  1. 避免直接创建线程:使用线程池
  2. 合理使用锁:减少锁粒度,避免死锁
  3. 线程安全:使用并发集合和原子类
  4. 监控和调优:监控线程状态,优化线程配置

相关文章