引言

ConcurrentHashMap是Java并发包中最重要的数据结构之一,它提供了高效的线程安全哈希表实现。在多线程环境下,ConcurrentHashMap不仅保证了线程安全,还提供了比Hashtable和synchronized HashMap更好的性能。

本文将深入探讨ConcurrentHashMap的线程安全实现机制,从设计原理到具体实现,从分段锁到CAS操作,帮助开发者全面理解Java并发编程的核心技术。

ConcurrentHashMap线程安全机制

1. 整体设计原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// ConcurrentHashMap线程安全机制分析
public class ConcurrentHashMapAnalysis {

// ConcurrentHashMap的核心设计理念
public void explainDesignPrinciples() {
System.out.println("ConcurrentHashMap设计原理:");
System.out.println("1. 分段锁机制:将整个哈希表分成多个段,每个段独立加锁");
System.out.println("2. CAS操作:使用Compare-And-Swap实现无锁操作");
System.out.println("3. 读写分离:读操作不加锁,写操作使用细粒度锁");
System.out.println("4. 扩容优化:支持并发扩容,减少阻塞时间");
}

// 版本演进分析
public void analyzeVersionEvolution() {
System.out.println("ConcurrentHashMap版本演进:");
System.out.println("JDK 1.7:使用分段锁(Segment)");
System.out.println("JDK 1.8:使用CAS + synchronized");
System.out.println("JDK 1.8+:优化了扩容和并发性能");
}

// 线程安全保证
public void explainThreadSafety() {
System.out.println("线程安全保证:");
System.out.println("1. 可见性:使用volatile关键字保证内存可见性");
System.out.println("2. 原子性:使用CAS操作保证原子性");
System.out.println("3. 有序性:使用内存屏障保证有序性");
System.out.println("4. 死锁避免:细粒度锁避免死锁");
}
}

// 使用示例
ConcurrentHashMapAnalysis analysis = new ConcurrentHashMapAnalysis();
analysis.explainDesignPrinciples();
analysis.analyzeVersionEvolution();
analysis.explainThreadSafety();

2. JDK 1.7分段锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// JDK 1.7分段锁实现分析
public class SegmentBasedConcurrentHashMap {

// 分段锁的核心实现
static class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;

// 段内的哈希表
transient volatile HashEntry<K,V>[] table;

// 段内元素数量
transient int count;

// 修改次数(用于fail-fast)
transient int modCount;

// 扩容阈值
transient int threshold;

// 负载因子
final float loadFactor;

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

// 获取操作(不加锁)
V get(Object key, int hash) {
HashEntry<K,V>[] tab = table;
HashEntry<K,V> e = tab[hash & (tab.length - 1)];

while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
return e.value;
}
e = e.next;
}
return null;
}

// 插入操作(加锁)
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];

// 查找是否已存在
for (HashEntry<K,V> e = first; e != null; e = e.next) {
if (e.hash == hash && key.equals(e.key)) {
V oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
}
return oldValue;
}
}

// 创建新节点
HashEntry<K,V> newEntry = new HashEntry<>(key, hash, first, value);
tab[index] = newEntry;
count++;
return null;
} finally {
unlock();
}
}
}

// 哈希表节点
static class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;

HashEntry(int hash, K key, HashEntry<K,V> next, V value) {
this.hash = hash;
this.key = key;
this.next = next;
this.value = value;
}
}

// 分段数组
final Segment<K,V>[] segments;

// 段掩码
final int segmentMask;

// 段偏移量
final int segmentShift;

// 构造函数
public SegmentBasedConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// 计算段数
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;

// 创建段数组
this.segments = new Segment[ssize];
for (int i = 0; i < ssize; ++i) {
segments[i] = new Segment<>(loadFactor,
(int)(initialCapacity / ssize), new HashEntry[0]);
}
}

// 获取段
private Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}

// 获取操作
public V get(Object key) {
int hash = hash(key);
return segmentFor(hash).get(key, hash);
}

// 插入操作
public V put(K key, V value) {
int hash = hash(key);
return segmentFor(hash).put(key, hash, value, false);
}

// 哈希函数
private int hash(Object key) {
int h = key.hashCode();
h ^= (h >>> 20) ^ (h >>> 12);
h ^= (h >>> 7) ^ (h >>> 4);
return h;
}
}

3. JDK 1.8 CAS + synchronized实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// JDK 1.8 CAS + synchronized实现分析
public class CASBasedConcurrentHashMap<K,V> {

// 节点类型定义
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final V setValue(V value) { throw new UnsupportedOperationException(); }

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

public final int hashCode() {
return Objects.hashCode(key) ^ Objects.hashCode(val);
}
}

// 红黑树节点
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent;
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev;
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
}

// 转发节点(用于扩容)
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;

ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}

// 哈希表数组
transient volatile Node<K,V>[] table;

// 扩容时的临时表
private transient volatile Node<K,V>[] nextTable;

// 基础计数器
private transient volatile long baseCount;

// 扩容和初始化的控制位
private transient volatile int sizeCtl;

// 扩容时的分割点
private transient volatile int transferIndex;

// 常量定义
static final int MOVED = -1; // 转发节点
static final int TREEBIN = -2; // 红黑树根节点
static final int RESERVED = -3; // 保留节点
static final int HASH_BITS = 0x7fffffff; // 哈希位掩码

// 获取操作(无锁)
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());

if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {

if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) {
// 红黑树或转发节点
return (p = e.find(h, key)) != null ? p.val : null;
}

while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

// 插入操作(CAS + synchronized)
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;

for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 使用CAS插入新节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
break;
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 红黑树插入
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

// CAS操作
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 原子获取
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// 原子设置
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

// 哈希函数
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

// Unsafe实例
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;

static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = ConcurrentHashMap.class;
SIZECTL = U.objectFieldOffset(k.getDeclaredField("sizeCtl"));
TRANSFERINDEX = U.objectFieldOffset(k.getDeclaredField("transferIndex"));
BASECOUNT = U.objectFieldOffset(k.getDeclaredField("baseCount"));
CELLSBUSY = U.objectFieldOffset(k.getDeclaredField("cellsBusy"));
CELLVALUE = U.objectFieldOffset(CounterCell.class.getDeclaredField("value"));
ABASE = U.arrayBaseOffset(Node[].class);
int scale = U.arrayIndexScale(Node[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
}
}

CAS操作详解

1. CAS原理和实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// CAS操作详解
public class CASOperationAnalysis {

// CAS操作的基本原理
public void explainCASPrinciple() {
System.out.println("CAS操作原理:");
System.out.println("1. Compare:比较当前值与期望值");
System.out.println("2. And:如果相等,则执行操作");
System.out.println("3. Swap:交换新值与当前值");
System.out.println("4. 原子性:整个操作是原子的,不会被中断");
}

// CAS操作的优势
public void explainCASAdvantages() {
System.out.println("CAS操作优势:");
System.out.println("1. 无锁:不需要加锁,提高并发性能");
System.out.println("2. 原子性:保证操作的原子性");
System.out.println("3. 高效:避免了锁的开销");
System.out.println("4. 无死锁:不会产生死锁问题");
}

// CAS操作的问题
public void explainCASProblems() {
System.out.println("CAS操作问题:");
System.out.println("1. ABA问题:值被修改后又改回原值");
System.out.println("2. 循环时间长:如果CAS失败,会一直重试");
System.out.println("3. 只能保证一个共享变量的原子操作");
System.out.println("4. 内存开销:需要额外的内存空间");
}
}

// CAS操作实现示例
public class CASExample {

// 使用AtomicInteger演示CAS操作
private AtomicInteger count = new AtomicInteger(0);

// 自增操作
public void increment() {
int current;
int next;
do {
current = count.get();
next = current + 1;
} while (!count.compareAndSet(current, next));
}

// 自减操作
public void decrement() {
int current;
int next;
do {
current = count.get();
next = current - 1;
} while (!count.compareAndSet(current, next));
}

// 获取当前值
public int get() {
return count.get();
}

// 设置值
public void set(int value) {
count.set(value);
}
}

// 使用示例
CASExample example = new CASExample();
example.increment();
example.increment();
System.out.println("当前值: " + example.get()); // 输出: 2

2. ABA问题解决方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// ABA问题解决方案
public class ABASolution {

// 使用版本号解决ABA问题
public static class VersionedReference<T> {
private final T reference;
private final int version;

public VersionedReference(T reference, int version) {
this.reference = reference;
this.version = version;
}

public T getReference() {
return reference;
}

public int getVersion() {
return version;
}
}

// 使用AtomicStampedReference解决ABA问题
public static class ABAExample {
private AtomicStampedReference<Integer> atomicStampedRef =
new AtomicStampedReference<>(0, 0);

// 安全的自增操作
public boolean safeIncrement() {
int[] stamp = new int[1];
Integer current = atomicStampedRef.get(stamp);
int newStamp = stamp[0] + 1;
return atomicStampedRef.compareAndSet(current, current + 1, stamp[0], newStamp);
}

// 安全的自减操作
public boolean safeDecrement() {
int[] stamp = new int[1];
Integer current = atomicStampedRef.get(stamp);
int newStamp = stamp[0] + 1;
return atomicStampedRef.compareAndSet(current, current - 1, stamp[0], newStamp);
}

// 获取当前值和版本
public VersionedReference<Integer> getVersionedValue() {
int[] stamp = new int[1];
Integer value = atomicStampedRef.get(stamp);
return new VersionedReference<>(value, stamp[0]);
}
}

// 使用示例
public static void demonstrateABA() {
ABAExample example = new ABAExample();

// 模拟ABA问题场景
System.out.println("初始值: " + example.getVersionedValue());

// 第一次修改
example.safeIncrement();
System.out.println("第一次修改后: " + example.getVersionedValue());

// 第二次修改(模拟ABA)
example.safeDecrement();
System.out.println("第二次修改后: " + example.getVersionedValue());

// 第三次修改
example.safeIncrement();
System.out.println("第三次修改后: " + example.getVersionedValue());
}
}

性能对比分析

1. 不同实现方式的性能对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// 性能对比分析
public class PerformanceComparison {

// 性能测试类
public static class PerformanceTest {
private static final int THREAD_COUNT = 10;
private static final int OPERATION_COUNT = 100000;

// 测试HashMap性能
public long testHashMap() {
Map<Integer, String> map = new HashMap<>();
return testMap(map, "HashMap");
}

// 测试Hashtable性能
public long testHashtable() {
Map<Integer, String> map = new Hashtable<>();
return testMap(map, "Hashtable");
}

// 测试ConcurrentHashMap性能
public long testConcurrentHashMap() {
Map<Integer, String> map = new ConcurrentHashMap<>();
return testMap(map, "ConcurrentHashMap");
}

// 测试SynchronizedHashMap性能
public long testSynchronizedHashMap() {
Map<Integer, String> map = Collections.synchronizedMap(new HashMap<>());
return testMap(map, "SynchronizedHashMap");
}

// 通用测试方法
private long testMap(Map<Integer, String> map, String mapType) {
long startTime = System.currentTimeMillis();

// 创建线程
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < OPERATION_COUNT; j++) {
int key = threadId * OPERATION_COUNT + j;
map.put(key, "value" + key);
map.get(key);
}
});
}

// 启动线程
for (Thread thread : threads) {
thread.start();
}

// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

long endTime = System.currentTimeMillis();
long duration = endTime - startTime;

System.out.println(mapType + " 测试完成,耗时: " + duration + "ms");
return duration;
}
}

// 性能分析结果
public static void analyzePerformance() {
PerformanceTest test = new PerformanceTest();

System.out.println("=== 性能对比测试 ===");
System.out.println("线程数: " + PerformanceTest.THREAD_COUNT);
System.out.println("每线程操作数: " + PerformanceTest.OPERATION_COUNT);
System.out.println("总操作数: " + (PerformanceTest.THREAD_COUNT * PerformanceTest.OPERATION_COUNT));
System.out.println();

// 执行测试
long hashMapTime = test.testHashMap();
long hashtableTime = test.testHashtable();
long concurrentHashMapTime = test.testConcurrentHashMap();
long synchronizedHashMapTime = test.testSynchronizedHashMap();

// 分析结果
System.out.println("\n=== 性能分析 ===");
System.out.println("HashMap: " + hashMapTime + "ms (非线程安全)");
System.out.println("Hashtable: " + hashtableTime + "ms");
System.out.println("ConcurrentHashMap: " + concurrentHashMapTime + "ms");
System.out.println("SynchronizedHashMap: " + synchronizedHashMapTime + "ms");

// 计算性能提升
double hashtableImprovement = (double)(hashtableTime - concurrentHashMapTime) / hashtableTime * 100;
double synchronizedImprovement = (double)(synchronizedHashMapTime - concurrentHashMapTime) / synchronizedHashMapTime * 100;

System.out.println("\n=== 性能提升 ===");
System.out.println("ConcurrentHashMap vs Hashtable: " + String.format("%.2f", hashtableImprovement) + "%");
System.out.println("ConcurrentHashMap vs SynchronizedHashMap: " + String.format("%.2f", synchronizedImprovement) + "%");
}
}

2. 内存使用分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 内存使用分析
public class MemoryUsageAnalysis {

// 内存使用测试
public static void analyzeMemoryUsage() {
System.out.println("=== 内存使用分析 ===");

// 测试不同Map的内存使用
testMemoryUsage("HashMap", new HashMap<>());
testMemoryUsage("Hashtable", new Hashtable<>());
testMemoryUsage("ConcurrentHashMap", new ConcurrentHashMap<>());
testMemoryUsage("SynchronizedHashMap", Collections.synchronizedMap(new HashMap<>()));
}

// 测试单个Map的内存使用
private static void testMemoryUsage(String mapType, Map<Integer, String> map) {
Runtime runtime = Runtime.getRuntime();

// 强制垃圾回收
System.gc();
long beforeMemory = runtime.totalMemory() - runtime.freeMemory();

// 添加数据
for (int i = 0; i < 100000; i++) {
map.put(i, "value" + i);
}

// 强制垃圾回收
System.gc();
long afterMemory = runtime.totalMemory() - runtime.freeMemory();

long memoryUsed = afterMemory - beforeMemory;
System.out.println(mapType + " 内存使用: " + (memoryUsed / 1024 / 1024) + "MB");
}

// 分析内存使用特点
public static void explainMemoryCharacteristics() {
System.out.println("\n=== 内存使用特点 ===");
System.out.println("HashMap: 内存使用最少,但非线程安全");
System.out.println("Hashtable: 内存使用较多,使用synchronized关键字");
System.out.println("ConcurrentHashMap: 内存使用适中,使用分段锁或CAS");
System.out.println("SynchronizedHashMap: 内存使用与HashMap相同,但性能较差");
}
}

使用示例和最佳实践

1. 基本使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// ConcurrentHashMap使用示例
public class ConcurrentHashMapExample {

// 基本操作示例
public static void basicOperations() {
System.out.println("=== 基本操作示例 ===");

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 添加元素
map.put("apple", 10);
map.put("banana", 20);
map.put("orange", 30);

// 获取元素
System.out.println("apple数量: " + map.get("apple"));

// 检查是否存在
System.out.println("包含apple: " + map.containsKey("apple"));

// 更新元素
map.put("apple", 15);
System.out.println("更新后apple数量: " + map.get("apple"));

// 删除元素
map.remove("banana");
System.out.println("删除banana后大小: " + map.size());

// 遍历元素
System.out.println("所有元素:");
map.forEach((key, value) -> System.out.println(key + ": " + value));
}

// 原子操作示例
public static void atomicOperations() {
System.out.println("\n=== 原子操作示例 ===");

ConcurrentHashMap<String, AtomicInteger> map = new ConcurrentHashMap<>();

// 原子自增
map.computeIfAbsent("counter", k -> new AtomicInteger(0)).incrementAndGet();
map.computeIfAbsent("counter", k -> new AtomicInteger(0)).incrementAndGet();

System.out.println("计数器值: " + map.get("counter").get());

// 原子更新
map.compute("counter", (key, value) -> {
if (value != null) {
value.addAndGet(10);
}
return value;
});

System.out.println("更新后计数器值: " + map.get("counter").get());
}

// 并发操作示例
public static void concurrentOperations() {
System.out.println("\n=== 并发操作示例 ===");

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 创建多个线程同时操作
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
String key = "key" + (threadId * 1000 + j);
map.put(key, threadId * 1000 + j);
}
});
}

// 启动所有线程
for (Thread thread : threads) {
thread.start();
}

// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

System.out.println("并发操作完成,最终大小: " + map.size());
}
}

2. 最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// ConcurrentHashMap最佳实践
public class ConcurrentHashMapBestPractices {

// 1. 选择合适的初始容量
public static void chooseInitialCapacity() {
System.out.println("=== 初始容量选择 ===");

// 根据预期元素数量选择初始容量
int expectedSize = 10000;
int initialCapacity = (int)(expectedSize / 0.75f) + 1;

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(initialCapacity);
System.out.println("预期大小: " + expectedSize);
System.out.println("初始容量: " + initialCapacity);
}

// 2. 使用合适的并发级别
public static void chooseConcurrencyLevel() {
System.out.println("\n=== 并发级别选择 ===");

// 根据线程数量选择并发级别
int threadCount = Runtime.getRuntime().availableProcessors();
int concurrencyLevel = threadCount * 2;

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(16, 0.75f, concurrencyLevel);
System.out.println("CPU核心数: " + threadCount);
System.out.println("并发级别: " + concurrencyLevel);
}

// 3. 使用原子操作
public static void useAtomicOperations() {
System.out.println("\n=== 原子操作使用 ===");

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 使用computeIfAbsent进行原子操作
map.computeIfAbsent("counter", k -> 0);

// 使用compute进行原子更新
map.compute("counter", (key, value) -> value + 1);

// 使用merge进行原子合并
map.merge("counter", 1, Integer::sum);

System.out.println("计数器值: " + map.get("counter"));
}

// 4. 避免不必要的同步
public static void avoidUnnecessarySynchronization() {
System.out.println("\n=== 避免不必要的同步 ===");

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// 好的做法:直接使用ConcurrentHashMap的方法
map.put("key", 1);
Integer value = map.get("key");

// 不好的做法:使用synchronized包装
synchronized (map) {
map.put("key", 2);
value = map.get("key");
}

System.out.println("值: " + value);
}

// 5. 使用合适的遍历方式
public static void useAppropriateIteration() {
System.out.println("\n=== 合适的遍历方式 ===");

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("a", 1);
map.put("b", 2);
map.put("c", 3);

// 使用forEach进行遍历(推荐)
System.out.println("使用forEach遍历:");
map.forEach((key, value) -> System.out.println(key + ": " + value));

// 使用entrySet进行遍历
System.out.println("使用entrySet遍历:");
for (Map.Entry<String, Integer> entry : map.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}

// 使用keySet进行遍历
System.out.println("使用keySet遍历:");
for (String key : map.keySet()) {
System.out.println(key + ": " + map.get(key));
}
}

// 6. 处理空值
public static void handleNullValues() {
System.out.println("\n=== 处理空值 ===");

ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

// ConcurrentHashMap不允许null键和null值
try {
map.put(null, 1);
} catch (NullPointerException e) {
System.out.println("不能添加null键: " + e.getMessage());
}

try {
map.put("key", null);
} catch (NullPointerException e) {
System.out.println("不能添加null值: " + e.getMessage());
}

// 使用Optional处理可能为null的值
Optional<Integer> value = Optional.ofNullable(map.get("nonexistent"));
System.out.println("使用Optional处理空值: " + value.orElse(0));
}
}

源码分析

1. 关键方法源码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// ConcurrentHashMap源码分析
public class ConcurrentHashMapSourceAnalysis {

// 分析putVal方法
public static void analyzePutVal() {
System.out.println("=== putVal方法分析 ===");
System.out.println("1. 计算哈希值");
System.out.println("2. 检查表是否初始化");
System.out.println("3. 检查桶是否为空");
System.out.println("4. 使用CAS插入新节点");
System.out.println("5. 检查是否需要扩容");
System.out.println("6. 使用synchronized处理冲突");
System.out.println("7. 更新计数器");
}

// 分析get方法
public static void analyzeGet() {
System.out.println("\n=== get方法分析 ===");
System.out.println("1. 计算哈希值");
System.out.println("2. 获取桶中的第一个节点");
System.out.println("3. 检查节点类型");
System.out.println("4. 处理链表节点");
System.out.println("5. 处理红黑树节点");
System.out.println("6. 处理转发节点");
}

// 分析扩容方法
public static void analyzeResize() {
System.out.println("\n=== 扩容方法分析 ===");
System.out.println("1. 检查是否需要扩容");
System.out.println("2. 创建新的哈希表");
System.out.println("3. 设置转发节点");
System.out.println("4. 并发转移数据");
System.out.println("5. 更新表引用");
}

// 分析计数器
public static void analyzeCounter() {
System.out.println("\n=== 计数器分析 ===");
System.out.println("1. 使用LongAdder实现");
System.out.println("2. 分段计数减少竞争");
System.out.println("3. 使用CAS更新计数");
System.out.println("4. 支持并发计数");
}
}

// 源码关键部分解析
public class ConcurrentHashMapKeyParts {

// 哈希函数实现
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

// 获取桶中的节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

// CAS设置桶中的节点
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

// 设置桶中的节点
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

// 初始化表
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // 等待其他线程初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
}

总结

ConcurrentHashMap是Java并发编程中的重要数据结构,它通过以下机制实现了高效的线程安全:

  1. 分段锁机制:JDK 1.7使用分段锁,将整个哈希表分成多个段,每个段独立加锁
  2. CAS + synchronized:JDK 1.8使用CAS操作和synchronized关键字,提供更好的并发性能
  3. 无锁读操作:读操作不需要加锁,提高了读操作的性能
  4. 并发扩容:支持并发扩容,减少了扩容时的阻塞时间
  5. 原子操作:使用CAS操作保证原子性,避免了锁的开销

通过深入理解ConcurrentHashMap的实现原理,开发者可以更好地使用这个强大的并发数据结构,构建高性能的多线程应用程序。

参考资料

  1. 《Java并发编程实战》
  2. 《Java并发编程的艺术》
  3. 《深入理解Java虚拟机》
  4. 《Java并发包源码解析》
  5. Oracle官方文档