第450集Java线程全面解析从基础到架构实战
|字数总计:5.6k|阅读时长:25分钟|阅读量:
Java线程全面解析:从基础到架构实战
1. 概述
1.1 Java线程的重要性
Java线程是Java并发编程的核心,理解线程机制对于开发高性能、高并发的应用程序至关重要。从Java 1.0到Java最新版本,线程API和并发框架经历了巨大的发展和变化。
线程的价值:
- 性能提升:充分利用多核CPU,提高程序执行效率
- 响应性:避免阻塞,提高用户体验
- 资源利用:合理利用系统资源
- 并发处理:处理大量并发请求
1.2 Java线程发展历程
| Java版本 |
主要特性 |
时间 |
| Java 1.0 |
Thread、Runnable |
1996 |
| Java 1.2 |
synchronized优化 |
1998 |
| Java 1.5 |
java.util.concurrent包 |
2004 |
| Java 1.6 |
并发性能优化 |
2006 |
| Java 1.7 |
ForkJoinPool |
2011 |
| Java 1.8 |
CompletableFuture、Stream并行 |
2014 |
| Java 9+ |
模块化、响应式编程 |
2017+ |
1.3 本文内容结构
本文将从以下几个方面全面解析Java线程:
- 线程基础:线程概念、生命周期、创建方式
- 线程同步:synchronized、Lock、volatile
- 线程通信:wait/notify、Condition
- 并发工具类:CountDownLatch、CyclicBarrier等
- 线程池:ThreadPoolExecutor、ForkJoinPool
- JUC框架:java.util.concurrent包详解
- 底层源码:JVM线程模型、源码分析
- 架构实战:高并发场景下的线程应用
2. 线程基础
2.1 什么是线程
2.1.1 进程与线程
进程(Process):
- 操作系统资源分配的基本单位
- 每个进程有独立的地址空间
- 进程间通信需要特殊机制(IPC)
线程(Thread):
- CPU调度的基本单位
- 同一进程内的线程共享内存空间
- 线程间通信更简单、高效
关系:
- 一个进程可以包含多个线程
- 线程是进程内的执行单元
- 线程共享进程的资源
2.1.2 Java线程模型
Java线程与操作系统线程:
- Java线程是JVM层面的抽象
- 在Linux/Windows上,Java线程映射到操作系统线程(1:1模型)
- JVM负责线程的创建、调度和管理
2.2 线程生命周期
2.2.1 线程状态
Java线程的6种状态(Thread.State枚举):
1 2 3 4 5 6 7 8
| public enum State { NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED }
|
状态转换图:
1 2 3 4 5 6 7 8 9 10 11 12 13
| NEW ↓ start() RUNNABLE ↓ wait() / join() / LockSupport.park() WAITING / TIMED_WAITING ↓ notify() / notifyAll() / unpark() RUNNABLE ↓ synchronized / I/O阻塞 BLOCKED ↓ 获取锁 / I/O完成 RUNNABLE ↓ run()结束 TERMINATED
|
2.2.2 状态说明
NEW(新建):
RUNNABLE(可运行):
BLOCKED(阻塞):
- 线程等待获取监视器锁(synchronized)
- 等待进入同步代码块
WAITING(等待):
- 无限期等待其他线程执行特定操作
- 调用wait()、join()、LockSupport.park()
TIMED_WAITING(定时等待):
- 在指定时间内等待
- 调用sleep()、wait(timeout)、join(timeout)
TERMINATED(终止):
2.3 线程创建方式
2.3.1 方式一:继承Thread类
Java 1.0开始支持:
1 2 3 4 5 6 7 8 9 10 11
| public class MyThread extends Thread { @Override public void run() { System.out.println("线程执行: " + Thread.currentThread().getName()); } }
MyThread thread = new MyThread(); thread.start();
|
特点:
- 简单直接
- 单继承限制
- 不推荐使用(违反面向对象设计原则)
2.3.2 方式二:实现Runnable接口
Java 1.0开始支持:
1 2 3 4 5 6 7 8 9 10 11
| public class MyRunnable implements Runnable { @Override public void run() { System.out.println("线程执行: " + Thread.currentThread().getName()); } }
Thread thread = new Thread(new MyRunnable()); thread.start();
|
Lambda表达式(Java 8+):
1 2 3 4 5
| Thread thread = new Thread(() -> { System.out.println("线程执行: " + Thread.currentThread().getName()); }); thread.start();
|
特点:
- 推荐方式
- 实现接口,更灵活
- 可以共享Runnable实例
2.3.3 方式三:实现Callable接口
Java 1.5开始支持:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import java.util.concurrent.Callable; import java.util.concurrent.FutureTask;
public class MyCallable implements Callable<String> { @Override public String call() throws Exception { return "线程执行结果: " + Thread.currentThread().getName(); } }
Callable<String> callable = new MyCallable(); FutureTask<String> futureTask = new FutureTask<>(callable); Thread thread = new Thread(futureTask); thread.start();
String result = futureTask.get(); System.out.println(result);
|
特点:
- 可以有返回值
- 可以抛出异常
- 需要配合Future使用
2.3.4 方式四:使用线程池
Java 1.5开始支持:
1 2 3 4 5 6 7 8 9 10 11 12 13
| import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> { System.out.println("线程执行: " + Thread.currentThread().getName()); });
executor.shutdown();
|
特点:
- 推荐的生产环境方式
- 线程复用,性能好
- 便于管理和监控
2.3.5 方式五:使用CompletableFuture(Java 8+)
1 2 3 4 5 6 7 8 9
| import java.util.concurrent.CompletableFuture;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "线程执行结果: " + Thread.currentThread().getName(); });
future.thenAccept(result -> System.out.println(result));
|
3. 线程同步
3.1 synchronized关键字
3.1.1 synchronized的演变
Java 1.0:
Java 1.2:
- 优化了synchronized性能
- 引入偏向锁、轻量级锁
**Java 1.6+**:
- 锁升级机制:无锁 → 偏向锁 → 轻量级锁 → 重量级锁
- 大幅提升性能
3.1.2 synchronized使用方式
方式1:同步方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class Counter { private int count = 0; public synchronized void increment() { count++; } public static synchronized void staticMethod() { } }
|
方式2:同步代码块:
1 2 3 4 5 6 7 8 9 10
| public class Counter { private int count = 0; private final Object lock = new Object(); public void increment() { synchronized (lock) { count++; } } }
|
3.1.3 synchronized底层原理
对象头结构(64位JVM):
1
| | Mark Word (64 bits) | Klass Word (64 bits) |
|
Mark Word结构(不同锁状态):
1 2 3 4 5 6 7 8 9 10 11
| 无锁状态: | unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 |
偏向锁: | thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 |
轻量级锁: | ptr_to_lock_record:62 | lock:2 |
重量级锁: | ptr_to_monitor:62 | lock:2 |
|
锁升级过程:
- 无锁:初始状态
- 偏向锁:第一个线程访问,CAS设置线程ID
- 轻量级锁:有竞争,CAS自旋
- 重量级锁:竞争激烈,升级为重量级锁
3.2 Lock接口
3.2.1 ReentrantLock
Java 1.5引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import java.util.concurrent.locks.ReentrantLock;
public class Counter { private int count = 0; private final ReentrantLock lock = new ReentrantLock(); public void increment() { lock.lock(); try { count++; } finally { lock.unlock(); } } }
|
ReentrantLock vs synchronized:
| 特性 |
synchronized |
ReentrantLock |
| 锁类型 |
JVM内置锁 |
API锁 |
| 自动释放 |
是 |
否(需手动unlock) |
| 可中断 |
否 |
是 |
| 公平锁 |
否 |
是(可配置) |
| 条件变量 |
wait/notify |
Condition |
| 性能 |
Java 1.6+优化后性能相当 |
性能相当 |
3.2.2 公平锁 vs 非公平锁
1 2 3 4 5
| ReentrantLock lock = new ReentrantLock();
ReentrantLock fairLock = new ReentrantLock(true);
|
公平锁:
非公平锁:
3.2.3 可中断锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import java.util.concurrent.locks.Lock;
Lock lock = new ReentrantLock();
try { lock.lockInterruptibly(); try { } finally { lock.unlock(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
|
3.3 volatile关键字
3.3.1 volatile的作用
volatile保证:
- 可见性:修改立即对其他线程可见
- 有序性:禁止指令重排序
不保证:
3.3.2 volatile使用示例
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class VolatileExample { private volatile boolean flag = false; public void writer() { flag = true; } public void reader() { if (flag) { } } }
|
3.3.3 volatile底层原理
内存屏障:
- 写操作:StoreStore + StoreLoad
- 读操作:LoadLoad + LoadStore
CPU缓存一致性协议(MESI):
- Modified(修改)
- Exclusive(独占)
- Shared(共享)
- Invalid(无效)
4. 线程通信
4.1 wait/notify机制
4.1.1 wait/notify使用
Java 1.0开始支持:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class WaitNotifyExample { private final Object lock = new Object(); private boolean condition = false; public void waitMethod() throws InterruptedException { synchronized (lock) { while (!condition) { lock.wait(); } } } public void notifyMethod() { synchronized (lock) { condition = true; lock.notify(); lock.notifyAll(); } } }
|
注意事项:
- 必须在synchronized块中使用
- wait()会释放锁
- notify()不会释放锁,需要退出synchronized块
4.1.2 生产者消费者示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import java.util.LinkedList; import java.util.Queue;
public class ProducerConsumer { private final Queue<Integer> queue = new LinkedList<>(); private final int CAPACITY = 10; private final Object lock = new Object(); public void produce(int item) throws InterruptedException { synchronized (lock) { while (queue.size() == CAPACITY) { lock.wait(); } queue.offer(item); lock.notifyAll(); } } public int consume() throws InterruptedException { synchronized (lock) { while (queue.isEmpty()) { lock.wait(); } int item = queue.poll(); lock.notifyAll(); return item; } } }
|
4.2 Condition接口
4.2.1 Condition使用
Java 1.5引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;
public class ConditionExample { private final ReentrantLock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private boolean flag = false; public void awaitMethod() throws InterruptedException { lock.lock(); try { while (!flag) { condition.await(); } } finally { lock.unlock(); } } public void signalMethod() { lock.lock(); try { flag = true; condition.signal(); condition.signalAll(); } finally { lock.unlock(); } } }
|
Condition vs wait/notify:
| 特性 |
wait/notify |
Condition |
| 锁类型 |
synchronized |
Lock |
| 多个条件 |
不支持 |
支持(多个Condition) |
| 可中断 |
是 |
是 |
| 超时等待 |
是 |
是(更灵活) |
4.2.2 多条件示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class BoundedBuffer { private final ReentrantLock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); private final Object[] items = new Object[100]; private int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) { notFull.await(); } items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
|
5. 并发工具类
5.1 CountDownLatch
5.1.1 CountDownLatch使用
Java 1.5引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { int threadCount = 5; CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { new Thread(() -> { try { System.out.println("线程执行: " + Thread.currentThread().getName()); } finally { latch.countDown(); } }).start(); } latch.await(); System.out.println("所有线程执行完毕"); } }
|
应用场景:
- 等待多个线程完成后再继续
- 主线程等待子线程初始化完成
5.2 CyclicBarrier
5.2.1 CyclicBarrier使用
Java 1.5引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample { public static void main(String[] args) { int threadCount = 5; CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> { System.out.println("所有线程到达屏障"); }); for (int i = 0; i < threadCount; i++) { new Thread(() -> { try { System.out.println("线程到达屏障: " + Thread.currentThread().getName()); barrier.await(); System.out.println("线程继续执行: " + Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } }).start(); } } }
|
CyclicBarrier vs CountDownLatch:
| 特性 |
CountDownLatch |
CyclicBarrier |
| 计数 |
只能使用一次 |
可以重复使用 |
| 等待 |
一个或多个线程等待 |
多个线程相互等待 |
| 用途 |
等待事件 |
同步多个线程 |
5.3 Semaphore
5.3.1 Semaphore使用
Java 1.5引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import java.util.concurrent.Semaphore;
public class SemaphoreExample { public static void main(String[] args) { int permits = 3; Semaphore semaphore = new Semaphore(permits); for (int i = 0; i < 10; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println("线程执行: " + Thread.currentThread().getName()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } }).start(); } } }
|
应用场景:
5.4 Exchanger
5.4.1 Exchanger使用
Java 1.5引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import java.util.concurrent.Exchanger;
public class ExchangerExample { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); new Thread(() -> { try { String data = "Data from Thread 1"; String received = exchanger.exchange(data); System.out.println("Thread 1 received: " + received); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { String data = "Data from Thread 2"; String received = exchanger.exchange(data); System.out.println("Thread 2 received: " + received); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
|
应用场景:
6. 线程池
6.1 为什么需要线程池
6.1.1 线程创建的开销
问题:
- 线程创建和销毁开销大
- 线程数量过多会导致资源耗尽
- 难以管理和监控
解决方案:
6.2 ThreadPoolExecutor
6.2.1 核心参数
Java 1.5引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue;
ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("MyThread-" + t.getId()); return t; } }, new ThreadPoolExecutor.CallerRunsPolicy() );
|
参数说明:
| 参数 |
说明 |
示例值 |
corePoolSize |
核心线程数 |
5 |
maximumPoolSize |
最大线程数 |
10 |
keepAliveTime |
空闲线程存活时间 |
60秒 |
workQueue |
工作队列 |
LinkedBlockingQueue |
threadFactory |
线程工厂 |
自定义ThreadFactory |
handler |
拒绝策略 |
CallerRunsPolicy |
6.2.2 线程池执行流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 提交任务 ↓ 核心线程是否已满? ↓ 否 创建核心线程执行 ↓ 是 工作队列是否已满? ↓ 否 加入工作队列 ↓ 是 线程数是否达到最大值? ↓ 否 创建非核心线程执行 ↓ 是 执行拒绝策略
|
6.2.3 拒绝策略
Java提供的4种拒绝策略:
1 2 3 4 5 6 7 8 9 10 11
| new ThreadPoolExecutor.AbortPolicy()
new ThreadPoolExecutor.CallerRunsPolicy()
new ThreadPoolExecutor.DiscardPolicy()
new ThreadPoolExecutor.DiscardOldestPolicy()
|
自定义拒绝策略:
1 2 3 4 5 6 7
| public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("任务被拒绝: " + r); } }
|
6.3 Executors工具类
6.3.1 常用线程池
Java 1.5引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService;
ExecutorService fixedPool = Executors.newFixedThreadPool(10);
ExecutorService singlePool = Executors.newSingleThreadExecutor();
ExecutorService cachedPool = Executors.newCachedThreadPool();
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
|
不推荐使用Executors的原因:
newFixedThreadPool和newSingleThreadExecutor:使用无界队列,可能导致OOM
newCachedThreadPool:最大线程数为Integer.MAX_VALUE,可能导致线程过多
- 推荐手动创建ThreadPoolExecutor
6.4 ForkJoinPool
6.4.1 ForkJoinPool使用
Java 1.7引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask;
public class ForkJoinExample { static class SumTask extends RecursiveTask<Long> { private final int[] array; private final int start; private final int end; private static final int THRESHOLD = 100; public SumTask(int[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { if (end - start <= THRESHOLD) { long sum = 0; for (int i = start; i < end; i++) { sum += array[i]; } return sum; } else { int mid = (start + end) / 2; SumTask left = new SumTask(array, start, mid); SumTask right = new SumTask(array, mid, end); left.fork(); long rightResult = right.compute(); long leftResult = left.join(); return leftResult + rightResult; } } } public static void main(String[] args) { int[] array = new int[1000]; for (int i = 0; i < array.length; i++) { array[i] = i; } ForkJoinPool pool = new ForkJoinPool(); SumTask task = new SumTask(array, 0, array.length); long result = pool.invoke(task); System.out.println("结果: " + result); } }
|
应用场景:
7. JUC框架详解
7.1 java.util.concurrent包结构
7.1.1 主要组件
并发集合:
ConcurrentHashMap:线程安全的HashMap
ConcurrentLinkedQueue:线程安全的队列
CopyOnWriteArrayList:写时复制的List
同步工具:
CountDownLatch:倒计时门闩
CyclicBarrier:循环屏障
Semaphore:信号量
Exchanger:交换器
执行器框架:
Executor:执行器接口
ExecutorService:执行器服务
ThreadPoolExecutor:线程池执行器
ForkJoinPool:分叉连接池
Future框架:
Future:异步计算结果
FutureTask:可取消的异步任务
CompletableFuture:可完成的Future(Java 8)
锁框架:
Lock:锁接口
ReentrantLock:可重入锁
ReadWriteLock:读写锁
StampedLock:戳记锁(Java 8)
7.2 ConcurrentHashMap
7.2.1 ConcurrentHashMap演变
Java 1.5:
- 分段锁(Segment)
- 16个Segment,每个Segment独立锁
Java 1.8:
- 取消分段锁
- 使用CAS + synchronized
- 性能大幅提升
7.2.2 ConcurrentHashMap使用
1 2 3 4 5 6 7 8 9 10 11 12
| import java.util.concurrent.ConcurrentHashMap;
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("key1", "value1"); String value = map.get("key1"); map.remove("key1");
map.putIfAbsent("key2", "value2"); map.replace("key2", "value2", "newValue");
|
7.2.3 ConcurrentHashMap源码分析
Java 1.8实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| transient volatile Node<K,V>[] table; private transient volatile int sizeCtl;
final V putVal(K key, V value, boolean onlyIfAbsent) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { synchronized (f) { } } } }
|
7.3 CompletableFuture
7.3.1 CompletableFuture使用
Java 8引入:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "Hello"; });
CompletableFuture<String> result = CompletableFuture .supplyAsync(() -> "Hello") .thenApply(s -> s + " World") .thenApply(String::toUpperCase);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> s1 + " " + s2);
CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> { if (true) throw new RuntimeException("Error"); return "Success"; }) .exceptionally(ex -> "Error: " + ex.getMessage());
|
应用场景:
8. 底层源码分析
8.1 JVM线程模型
8.1.1 线程实现方式
1:1模型(Java采用):
- 一个Java线程对应一个操作系统线程
- 优点:简单,性能好
- 缺点:线程创建受限于操作系统
N:M模型:
- N个Java线程映射到M个操作系统线程
- 优点:可以创建大量线程
- 缺点:实现复杂
混合模型:
8.1.2 线程栈
线程栈结构:
1 2 3 4 5 6 7 8 9
| 高地址 ↓ 局部变量 ↓ 方法参数 ↓ 返回地址 ↓ 低地址
|
栈大小:
- 默认:1MB(Linux x64)
- 可通过
-Xss参数调整
8.2 Thread源码分析
8.2.1 Thread类结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class Thread implements Runnable { private volatile String name; private int priority; private volatile int threadStatus = 0; private ThreadGroup group; private Runnable target; ThreadLocal.ThreadLocalMap threadLocals; public synchronized void start() { if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { } } } private native void start0(); }
|
8.2.2 start()方法分析
关键步骤:
- 检查线程状态
- 添加到线程组
- 调用本地方法
start0()创建操作系统线程
- 设置启动标志
**start() vs run()**:
start():创建新线程执行
run():在当前线程执行
8.3 synchronized底层实现
8.3.1 对象头分析
对象内存布局:
1 2 3 4 5 6 7 8 9
| 对象头 (Header) ↓ Mark Word (64 bits) ↓ Klass Word (64 bits) ↓ 实例数据 (Instance Data) ↓ 对齐填充 (Padding)
|
Mark Word在不同锁状态下的结构:
1 2 3 4 5 6 7 8 9 10 11
| | unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 |
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 |
| ptr_to_lock_record:62 | lock:2 |
| ptr_to_monitor:62 | lock:2 |
|
8.3.2 锁升级过程
1. 偏向锁:
1 2 3 4 5
| if (mark == 无锁状态) { CAS(mark, threadId); return 偏向锁; }
|
2. 轻量级锁:
1 2 3 4 5 6
| if (mark == 偏向锁 && threadId != currentThread) { return 轻量级锁; }
|
3. 重量级锁:
1 2 3 4 5 6
| if (自旋失败) { return 重量级锁; }
|
8.4 AQS(AbstractQueuedSynchronizer)
8.4.1 AQS概述
AQS:
- Java并发包的基础框架
- 提供了锁和同步器的实现基础
- ReentrantLock、Semaphore等都基于AQS
8.4.2 AQS核心原理
CLH队列:
- 双向链表实现的等待队列
- 每个节点代表一个等待的线程
关键方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public abstract class AbstractQueuedSynchronizer { protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } }
|
9. 架构实战
9.1 高并发场景设计
9.1.1 线程池配置
CPU密集型任务:
1 2 3 4 5 6 7 8 9 10
| int corePoolSize = Runtime.getRuntime().availableProcessors() + 1; ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() );
|
IO密集型任务:
1 2 3 4 5 6 7 8 9 10
| int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; ThreadPoolExecutor executor = new ThreadPoolExecutor( corePoolSize, corePoolSize * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy() );
|
9.1.2 线程池监控
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class ThreadPoolMonitor { private final ThreadPoolExecutor executor; public void monitor() { System.out.println("核心线程数: " + executor.getCorePoolSize()); System.out.println("最大线程数: " + executor.getMaximumPoolSize()); System.out.println("当前线程数: " + executor.getPoolSize()); System.out.println("活跃线程数: " + executor.getActiveCount()); System.out.println("已完成任务数: " + executor.getCompletedTaskCount()); System.out.println("总任务数: " + executor.getTaskCount()); System.out.println("队列大小: " + executor.getQueue().size()); } }
|
9.2 实战案例
9.2.1 异步任务处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;
public class AsyncTaskProcessor { private final ExecutorService executor = Executors.newFixedThreadPool(10); public CompletableFuture<String> processAsync(String data) { return CompletableFuture.supplyAsync(() -> { return processData(data); }, executor); } private String processData(String data) { return "Processed: " + data; } public void shutdown() { executor.shutdown(); } }
|
9.2.2 并发计数器
1 2 3 4 5 6 7 8 9 10 11 12 13
| import java.util.concurrent.atomic.AtomicLong;
public class ConcurrentCounter { private final AtomicLong count = new AtomicLong(0); public void increment() { count.incrementAndGet(); } public long get() { return count.get(); } }
|
10. 总结
10.1 核心要点
- 线程基础:线程生命周期、创建方式
- 线程同步:synchronized、Lock、volatile
- 线程通信:wait/notify、Condition
- 并发工具:CountDownLatch、CyclicBarrier等
- 线程池:ThreadPoolExecutor、ForkJoinPool
- JUC框架:ConcurrentHashMap、CompletableFuture等
- 底层原理:JVM线程模型、锁机制、AQS
10.2 架构师建议
线程池使用:
- 生产环境使用ThreadPoolExecutor
- 根据任务类型配置线程数
- 实现线程池监控
并发安全:
- 优先使用JUC并发集合
- 合理使用锁,避免死锁
- 注意可见性和有序性
性能优化:
10.3 最佳实践
- 避免直接创建线程:使用线程池
- 合理使用锁:减少锁粒度,避免死锁
- 线程安全:使用并发集合和原子类
- 监控和调优:监控线程状态,优化线程配置
相关文章: