一致性有哪些级别?强一致/最终一致怎么落地?

1. 概述

1.1 一致性的重要性

一致性是分布式系统设计的核心问题之一,决定了系统的数据正确性和用户体验。

一致性的意义

  • 数据正确性:保证数据的准确性和完整性
  • 用户体验:保证用户看到的数据是正确的
  • 系统可靠性:保证系统在各种情况下都能正常工作

1.2 一致性级别分类

一致性级别从强到弱:

  1. 强一致性(Strong Consistency)
  2. 线性一致性(Linearizability)
  3. 顺序一致性(Sequential Consistency)
  4. 因果一致性(Causal Consistency)
  5. 会话一致性(Session Consistency)
  6. 最终一致性(Eventual Consistency)

1.3 本文内容结构

本文将从以下几个方面全面解析一致性级别:

  1. 一致性级别分类:各种一致性级别的定义和特点
  2. 强一致性实现:强一致性的实现方式和落地实践
  3. 最终一致性实现:最终一致性的实现方式和落地实践
  4. 其他一致性级别:其他一致性级别的实现方式
  5. 实战案例:实际项目中的一致性实现

2. 一致性级别分类

2.1 强一致性(Strong Consistency)

2.1.1 定义

强一致性:所有节点在同一时刻看到的数据是完全一致的。

特点

  • 所有节点数据完全相同
  • 读操作总是返回最新数据
  • 写操作立即在所有节点生效
  • 性能较低

适用场景

  • 金融系统
  • 支付系统
  • 库存系统

2.1.2 实现方式

实现方式

  • 两阶段提交(2PC)
  • 三阶段提交(3PC)
  • 分布式事务
  • 分布式锁

2.2 线性一致性(Linearizability)

2.2.1 定义

线性一致性:所有操作看起来像是按照某个全局顺序执行的,且每个操作都在其调用和返回之间原子性地完成。

特点

  • 所有操作有全局顺序
  • 操作是原子的
  • 读操作能看到之前所有写操作的结果

与强一致性的区别

  • 线性一致性更严格
  • 强一致性是线性一致性的特例

2.2.2 实现方式

实现方式

  • 单主复制:所有写操作都通过主节点
  • 共识算法:Raft、Paxos
  • 分布式锁:保证操作的原子性

2.3 顺序一致性(Sequential Consistency)

2.3.1 定义

顺序一致性:所有进程看到的操作顺序是一致的,但不需要是实时的。

特点

  • 所有进程看到相同的操作顺序
  • 不需要实时性
  • 允许延迟

与线性一致性的区别

  • 顺序一致性不需要实时性
  • 线性一致性需要实时性

2.3.2 实现方式

实现方式

  • 向量时钟:记录操作的因果关系
  • 逻辑时钟:记录操作的顺序
  • 版本向量:记录数据版本

2.4 因果一致性(Causal Consistency)

2.4.1 定义

因果一致性:有因果关系的事件必须按照因果关系顺序被所有进程看到。

特点

  • 保证因果关系的顺序
  • 不保证无因果关系事件的顺序
  • 性能较高

适用场景

  • 社交网络
  • 评论系统
  • 消息系统

2.4.2 实现方式

实现方式

  • 向量时钟:记录因果关系
  • 版本向量:记录数据版本
  • 依赖追踪:追踪操作的依赖关系

2.5 会话一致性(Session Consistency)

2.5.1 定义

会话一致性:同一会话内的操作保证一致性。

特点

  • 同一会话内一致
  • 不同会话间可能不一致
  • 性能较高

适用场景

  • Web应用
  • 移动应用
  • 用户会话

2.5.2 实现方式

实现方式

  • 粘性会话:同一会话路由到同一节点
  • 会话缓存:缓存会话数据
  • 会话复制:复制会话数据

2.6 最终一致性(Eventual Consistency)

2.6.1 定义

最终一致性:系统最终会达到一致状态,但不保证立即一致。

特点

  • 允许暂时不一致
  • 最终会一致
  • 性能最高

适用场景

  • 电商系统
  • 推荐系统
  • 搜索系统

2.6.2 实现方式

实现方式

  • 主从复制:异步复制
  • 消息队列:异步处理
  • 事件溯源:事件驱动

2.7 其他一致性级别

2.7.1 读写一致性(Read-Your-Writes Consistency)

读写一致性:用户读取自己写入的数据时,总是能看到最新值。

特点

  • 保证用户看到自己的写操作
  • 不保证看到其他用户的写操作
  • 性能较高

2.7.2 单调读一致性(Monotonic Read Consistency)

单调读一致性:用户不会看到数据回退,即如果用户读取到某个值,后续读取不会看到更旧的值。

特点

  • 保证数据不回退
  • 不保证看到最新值
  • 性能较高

2.7.3 单调写一致性(Monotonic Write Consistency)

单调写一致性:用户的写操作按顺序执行。

特点

  • 保证写操作顺序
  • 不保证实时性
  • 性能较高

3. 强一致性实现

3.1 两阶段提交(2PC)

3.1.1 原理

两阶段提交(2PC):协调者协调所有参与者完成事务。

阶段1:准备阶段(Prepare)

  1. 协调者向所有参与者发送Prepare请求
  2. 参与者执行事务,但不提交
  3. 参与者返回Yes/No

阶段2:提交阶段(Commit)

  1. 如果所有参与者都返回Yes,协调者发送Commit
  2. 如果任何参与者返回No,协调者发送Abort
  3. 参与者提交或回滚事务

3.1.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Service
public class TwoPhaseCommitService {

@Autowired
private List<TransactionParticipant> participants;

public boolean executeTransaction(Transaction transaction) {
// 阶段1:准备阶段
boolean allPrepared = true;
for (TransactionParticipant participant : participants) {
if (!participant.prepare(transaction)) {
allPrepared = false;
break;
}
}

// 阶段2:提交阶段
if (allPrepared) {
// 所有参与者都准备成功,提交
for (TransactionParticipant participant : participants) {
participant.commit(transaction);
}
return true;
} else {
// 有参与者准备失败,回滚
for (TransactionParticipant participant : participants) {
participant.rollback(transaction);
}
return false;
}
}
}

@Component
public class TransactionParticipant {

@Transactional
public boolean prepare(Transaction transaction) {
try {
// 执行事务操作,但不提交
// 例如:扣减库存、冻结账户等
executeTransaction(transaction);
return true;
} catch (Exception e) {
return false;
}
}

public void commit(Transaction transaction) {
// 提交事务
transactionManager.commit();
}

public void rollback(Transaction transaction) {
// 回滚事务
transactionManager.rollback();
}
}

3.1.3 优缺点

优点

  • 保证强一致性
  • 实现相对简单

缺点

  • 性能较低
  • 阻塞时间长
  • 单点故障风险

3.2 三阶段提交(3PC)

3.2.1 原理

三阶段提交(3PC):在2PC基础上增加CanCommit阶段,减少阻塞。

阶段1:CanCommit

  1. 协调者向所有参与者发送CanCommit请求
  2. 参与者检查是否可以提交
  3. 参与者返回Yes/No

阶段2:PreCommit

  1. 如果所有参与者都返回Yes,协调者发送PreCommit
  2. 参与者执行事务,但不提交
  3. 参与者返回Ack

阶段3:DoCommit

  1. 协调者发送DoCommit
  2. 参与者提交事务
  3. 参与者返回Ack

3.2.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Service
public class ThreePhaseCommitService {

@Autowired
private List<TransactionParticipant> participants;

public boolean executeTransaction(Transaction transaction) {
// 阶段1:CanCommit
boolean allCanCommit = true;
for (TransactionParticipant participant : participants) {
if (!participant.canCommit(transaction)) {
allCanCommit = false;
break;
}
}

if (!allCanCommit) {
// 有参与者不能提交,中止
for (TransactionParticipant participant : participants) {
participant.abort(transaction);
}
return false;
}

// 阶段2:PreCommit
boolean allPreCommitted = true;
for (TransactionParticipant participant : participants) {
if (!participant.preCommit(transaction)) {
allPreCommitted = false;
break;
}
}

if (!allPreCommitted) {
// 有参与者预提交失败,中止
for (TransactionParticipant participant : participants) {
participant.abort(transaction);
}
return false;
}

// 阶段3:DoCommit
for (TransactionParticipant participant : participants) {
participant.doCommit(transaction);
}
return true;
}
}

3.2.3 优缺点

优点

  • 减少阻塞时间
  • 提高可用性

缺点

  • 实现复杂
  • 仍然有性能问题

3.3 分布式事务(Seata)

3.3.1 原理

Seata:阿里巴巴开源的分布式事务解决方案。

模式

  • AT模式:自动补偿
  • TCC模式:Try-Confirm-Cancel
  • Saga模式:长事务

3.3.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Service
public class OrderService {

@GlobalTransactional(rollbackFor = Exception.class)
public void createOrder(Order order) {
// 1. 创建订单
orderMapper.insert(order);

// 2. 扣减库存
inventoryService.deductStock(order.getSkuId(), order.getQuantity());

// 3. 扣减账户余额
accountService.deductBalance(order.getUserId(), order.getAmount());

// 如果任何步骤失败,所有操作都会回滚
}
}

@Service
public class InventoryService {

@Transactional
public void deductStock(Long skuId, Integer quantity) {
Inventory inventory = inventoryMapper.selectById(skuId);
if (inventory.getStock() < quantity) {
throw new BusinessException("库存不足");
}
inventory.setStock(inventory.getStock() - quantity);
inventoryMapper.updateById(inventory);
}
}

3.4 分布式锁(Redis)

3.4.1 原理

分布式锁:使用Redis实现分布式锁,保证操作的原子性。

实现方式

  • SETNX:设置键值,如果不存在
  • Redisson:Java客户端,提供分布式锁
  • ZooKeeper:使用ZooKeeper实现分布式锁

3.4.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Service
public class AccountService {

@Autowired
private RedissonClient redissonClient;

public void transfer(Long fromAccount, Long toAccount, BigDecimal amount) {
// 获取分布式锁
RLock lock = redissonClient.getLock("account:lock:" + fromAccount);

try {
// 尝试加锁,最多等待10秒,锁定30秒
boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("系统繁忙,请稍后再试");
}

// 执行转账操作(强一致性)
Account fromAccountObj = accountMapper.selectById(fromAccount);
if (fromAccountObj.getBalance().compareTo(amount) < 0) {
throw new BusinessException("余额不足");
}

fromAccountObj.setBalance(fromAccountObj.getBalance().subtract(amount));
accountMapper.updateById(fromAccountObj);

Account toAccountObj = accountMapper.selectById(toAccount);
toAccountObj.setBalance(toAccountObj.getBalance().add(amount));
accountMapper.updateById(toAccountObj);
} finally {
// 释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

3.5 共识算法(Raft)

3.5.1 原理

Raft:分布式一致性算法,保证强一致性。

角色

  • Leader:处理所有客户端请求
  • Follower:跟随Leader
  • Candidate:选举过程中的候选者

流程

  1. 选举:选举Leader
  2. 日志复制:Leader复制日志到Follower
  3. 提交:大多数节点确认后提交

3.5.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 使用Raft算法保证强一致性
// 这里使用Hazelcast作为Raft实现

@Service
public class ConfigService {

@Autowired
private HazelcastInstance hazelcastInstance;

public void setConfig(String key, String value) {
// 使用Raft保证强一致性
IMap<String, String> configMap = hazelcastInstance.getMap("config");
configMap.put(key, value);
}

public String getConfig(String key) {
// 读取时保证强一致性
IMap<String, String> configMap = hazelcastInstance.getMap("config");
return configMap.get(key);
}
}

4. 最终一致性实现

4.1 主从复制

4.1.1 原理

主从复制:主节点写入,从节点异步复制。

特点

  • 主节点处理写操作
  • 从节点异步复制
  • 最终达到一致

4.1.2 MySQL主从复制

配置主库

1
2
3
4
5
# my.cnf
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog-format = ROW

配置从库

1
2
3
4
5
# my.cnf
[mysqld]
server-id = 2
relay-log = mysql-relay-bin
read-only = 1

Java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Service
public class OrderService {

@Autowired
@Qualifier("masterDataSource")
private DataSource masterDataSource;

@Autowired
@Qualifier("slaveDataSource")
private DataSource slaveDataSource;

public void createOrder(Order order) {
// 写入主库(强一致性)
JdbcTemplate masterTemplate = new JdbcTemplate(masterDataSource);
masterTemplate.update("INSERT INTO orders (id, user_id, amount) VALUES (?, ?, ?)",
order.getId(), order.getUserId(), order.getAmount());
}

public Order getOrder(Long orderId) {
// 从从库读取(最终一致性)
JdbcTemplate slaveTemplate = new JdbcTemplate(slaveDataSource);
return slaveTemplate.queryForObject(
"SELECT * FROM orders WHERE id = ?",
new Object[]{orderId},
new BeanPropertyRowMapper<>(Order.class)
);
}
}

4.2 消息队列

4.2.1 原理

消息队列:使用消息队列实现异步处理,保证最终一致性。

流程

  1. 写入本地数据库
  2. 发送消息到消息队列
  3. 消费者异步处理
  4. 最终达到一致

4.2.2 Kafka实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void createOrder(Order order) {
// 1. 写入本地数据库
orderMapper.insert(order);

// 2. 发送消息到Kafka(异步处理,最终一致)
OrderEvent event = new OrderEvent();
event.setOrderId(order.getId());
event.setUserId(order.getUserId());
event.setAmount(order.getAmount());
event.setEventType("ORDER_CREATED");

kafkaTemplate.send("order-events", JSON.toJSONString(event));
}
}

@Component
public class OrderEventConsumer {

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

@KafkaListener(topics = "order-events", groupId = "order-processor")
public void processOrderEvent(String message) {
OrderEvent event = JSON.parseObject(message, OrderEvent.class);

// 异步处理(最终一致性)
switch (event.getEventType()) {
case "ORDER_CREATED":
// 扣减库存(最终一致)
inventoryService.deductStockAsync(event.getOrderId());
// 扣减账户余额(最终一致)
accountService.deductBalanceAsync(event.getOrderId());
break;
}
}
}

4.3 事件溯源(Event Sourcing)

4.3.1 原理

事件溯源:将状态变化记录为事件序列,通过重放事件恢复状态。

特点

  • 所有状态变化都是事件
  • 通过重放事件恢复状态
  • 保证最终一致性

4.3.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Entity
public class OrderEvent {
@Id
private String eventId;
private String orderId;
private String eventType;
private String eventData;
private LocalDateTime eventTime;
}

@Service
public class OrderService {

@Autowired
private OrderEventRepository eventRepository;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void createOrder(Order order) {
// 1. 创建事件
OrderEvent event = new OrderEvent();
event.setEventId(UUID.randomUUID().toString());
event.setOrderId(order.getId());
event.setEventType("ORDER_CREATED");
event.setEventData(JSON.toJSONString(order));
event.setEventTime(LocalDateTime.now());

// 2. 保存事件(事件溯源)
eventRepository.save(event);

// 3. 发布事件(最终一致性)
kafkaTemplate.send("order-events", JSON.toJSONString(event));
}

public Order getOrder(Long orderId) {
// 通过重放事件恢复订单状态
List<OrderEvent> events = eventRepository.findByOrderId(orderId);
Order order = new Order();

for (OrderEvent event : events) {
applyEvent(order, event);
}

return order;
}

private void applyEvent(Order order, OrderEvent event) {
switch (event.getEventType()) {
case "ORDER_CREATED":
Order orderData = JSON.parseObject(event.getEventData(), Order.class);
order.setId(orderData.getId());
order.setUserId(orderData.getUserId());
order.setAmount(orderData.getAmount());
break;
case "ORDER_PAID":
order.setStatus(OrderStatus.PAID);
break;
case "ORDER_SHIPPED":
order.setStatus(OrderStatus.SHIPPED);
break;
}
}
}

4.4 版本向量(Version Vector)

4.4.1 原理

版本向量:记录每个节点的版本号,用于检测冲突和解决冲突。

特点

  • 记录每个节点的版本
  • 检测数据冲突
  • 解决数据冲突

4.4.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class VersionVector {
private Map<String, Long> versions = new HashMap<>();

public void increment(String nodeId) {
versions.put(nodeId, versions.getOrDefault(nodeId, 0L) + 1);
}

public boolean happensBefore(VersionVector other) {
// 检查是否发生在other之前
for (String nodeId : versions.keySet()) {
if (versions.get(nodeId) > other.versions.getOrDefault(nodeId, 0L)) {
return false;
}
}
return true;
}

public boolean isConcurrent(VersionVector other) {
// 检查是否并发
return !happensBefore(other) && !other.happensBefore(this);
}
}

@Entity
public class Document {
@Id
private String id;
private String content;
private VersionVector version;

public void merge(Document other) {
if (version.isConcurrent(other.version)) {
// 并发冲突,需要解决
resolveConflict(other);
} else if (version.happensBefore(other.version)) {
// other更新,使用other的内容
this.content = other.content;
this.version = other.version;
}
// 否则,当前版本更新,不需要修改
}

private void resolveConflict(Document other) {
// 冲突解决策略:最后写入获胜(Last Write Wins)
if (version.getMaxTimestamp() < other.version.getMaxTimestamp()) {
this.content = other.content;
this.version = other.version;
}
}
}

4.5 向量时钟(Vector Clock)

4.5.1 原理

向量时钟:记录每个节点的逻辑时钟,用于确定事件的因果关系。

特点

  • 记录因果关系
  • 检测并发事件
  • 保证因果一致性

4.5.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class VectorClock {
private Map<String, Long> clock = new HashMap<>();

public void tick(String nodeId) {
clock.put(nodeId, clock.getOrDefault(nodeId, 0L) + 1);
}

public void update(VectorClock other) {
for (String nodeId : other.clock.keySet()) {
clock.put(nodeId, Math.max(
clock.getOrDefault(nodeId, 0L),
other.clock.get(nodeId)
));
}
}

public boolean happensBefore(VectorClock other) {
boolean strictlyLess = false;
for (String nodeId : clock.keySet()) {
long thisTime = clock.get(nodeId);
long otherTime = other.clock.getOrDefault(nodeId, 0L);
if (thisTime > otherTime) {
return false;
}
if (thisTime < otherTime) {
strictlyLess = true;
}
}
return strictlyLess;
}
}

@Entity
public class Message {
@Id
private String id;
private String content;
private VectorClock clock;

public boolean canDeliver(List<Message> deliveredMessages) {
// 检查是否可以投递(因果一致性)
for (Message delivered : deliveredMessages) {
if (delivered.clock.happensBefore(this.clock)) {
// 有依赖的消息还未投递,不能投递当前消息
return false;
}
}
return true;
}
}

5. 其他一致性级别实现

5.1 读写一致性(Read-Your-Writes)

5.1.1 实现方式

读写一致性:用户读取自己写入的数据时,总是能看到最新值。

实现方式

  • 粘性会话:同一用户路由到同一节点
  • 版本号:记录用户最后写入的版本
  • 缓存:缓存用户最后写入的数据

5.1.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class PostService {

@Autowired
private PostMapper postMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

public void createPost(Post post) {
// 1. 写入数据库
postMapper.insert(post);

// 2. 缓存用户最后写入的版本(读写一致性)
String cacheKey = "user:last_write:" + post.getUserId();
redisTemplate.opsForValue().set(cacheKey, post.getId(), 1, TimeUnit.HOURS);
}

public Post getPost(Long postId, Long userId) {
// 检查是否是用户自己写入的
String cacheKey = "user:last_write:" + userId;
String lastWriteId = redisTemplate.opsForValue().get(cacheKey);

if (lastWriteId != null && lastWriteId.equals(postId.toString())) {
// 用户读取自己写入的数据,从主库读取(读写一致性)
return postMapper.selectById(postId);
} else {
// 读取其他数据,从从库读取(最终一致性)
return postMapper.selectByIdFromSlave(postId);
}
}
}

5.2 单调读一致性(Monotonic Read)

5.2.1 实现方式

单调读一致性:用户不会看到数据回退。

实现方式

  • 版本号:记录用户读取的版本
  • 粘性会话:同一用户路由到同一节点
  • 缓存:缓存用户读取的数据

5.2.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Service
public class ProductService {

@Autowired
private ProductMapper productMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

public Product getProduct(Long productId, Long userId) {
// 获取用户最后读取的版本(单调读一致性)
String cacheKey = "user:last_read:" + userId + ":" + productId;
String lastReadVersion = redisTemplate.opsForValue().get(cacheKey);

Product product = productMapper.selectById(productId);

if (lastReadVersion != null) {
// 如果当前版本小于最后读取的版本,使用最后读取的版本(单调读)
if (product.getVersion() < Long.parseLong(lastReadVersion)) {
product = productMapper.selectByVersion(productId, Long.parseLong(lastReadVersion));
}
}

// 更新用户最后读取的版本
redisTemplate.opsForValue().set(cacheKey, String.valueOf(product.getVersion()), 1, TimeUnit.HOURS);

return product;
}
}

5.3 会话一致性(Session Consistency)

5.3.1 实现方式

会话一致性:同一会话内的操作保证一致性。

实现方式

  • 粘性会话:同一会话路由到同一节点
  • 会话复制:复制会话数据到所有节点
  • 会话缓存:缓存会话数据

5.3.2 实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Service
public class ShoppingCartService {

@Autowired
private ShoppingCartMapper cartMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

public void addToCart(Long userId, Long productId, Integer quantity) {
// 获取会话ID(会话一致性)
String sessionId = getSessionId();

// 确保同一会话路由到同一节点
String nodeId = getNodeIdBySession(sessionId);

// 写入购物车
ShoppingCart cart = cartMapper.selectByUserId(userId);
if (cart == null) {
cart = new ShoppingCart();
cart.setUserId(userId);
cartMapper.insert(cart);
}

CartItem item = new CartItem();
item.setCartId(cart.getId());
item.setProductId(productId);
item.setQuantity(quantity);
cartMapper.insertItem(item);

// 缓存会话数据(会话一致性)
String cacheKey = "session:cart:" + sessionId;
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(cart), 1, TimeUnit.HOURS);
}

public ShoppingCart getCart(Long userId) {
// 获取会话ID
String sessionId = getSessionId();

// 从缓存读取(会话一致性)
String cacheKey = "session:cart:" + sessionId;
String cartJson = redisTemplate.opsForValue().get(cacheKey);

if (cartJson != null) {
return JSON.parseObject(cartJson, ShoppingCart.class);
}

// 缓存未命中,从数据库读取
return cartMapper.selectByUserId(userId);
}
}

6. 实战案例

6.1 案例1:支付系统(强一致性)

6.1.1 场景

需求:支付系统,账户余额必须准确。

一致性要求强一致性

实现方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
public class PaymentService {

@Autowired
private RedissonClient redissonClient;

@Autowired
private AccountMapper accountMapper;

@Transactional
public void pay(Long accountId, BigDecimal amount) {
// 获取分布式锁(强一致性)
RLock lock = redissonClient.getLock("account:lock:" + accountId);

try {
boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!locked) {
throw new BusinessException("系统繁忙,请稍后再试");
}

// 检查余额(强一致性)
Account account = accountMapper.selectById(accountId);
if (account.getBalance().compareTo(amount) < 0) {
throw new BusinessException("余额不足");
}

// 扣减余额(强一致性)
account.setBalance(account.getBalance().subtract(amount));
accountMapper.updateById(account);

// 记录支付记录
PaymentRecord record = new PaymentRecord();
record.setAccountId(accountId);
record.setAmount(amount);
record.setStatus(PaymentStatus.SUCCESS);
paymentRecordMapper.insert(record);
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

6.2 案例2:电商系统(最终一致性)

6.2.1 场景

需求:电商系统,订单创建后异步处理库存和账户。

一致性要求最终一致性

实现方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Service
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void createOrder(Order order) {
// 1. 创建订单(快速响应,保证可用性)
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);

// 2. 发送消息到Kafka(异步处理,最终一致性)
OrderEvent event = new OrderEvent();
event.setOrderId(order.getId());
event.setUserId(order.getUserId());
event.setSkuId(order.getSkuId());
event.setQuantity(order.getQuantity());
event.setAmount(order.getAmount());
event.setEventType("ORDER_CREATED");

kafkaTemplate.send("order-events", JSON.toJSONString(event));
}
}

@Component
public class OrderEventConsumer {

@Autowired
private InventoryService inventoryService;

@Autowired
private AccountService accountService;

@KafkaListener(topics = "order-events", groupId = "order-processor")
public void processOrderEvent(String message) {
OrderEvent event = JSON.parseObject(message, OrderEvent.class);

try {
// 异步处理(最终一致性)
// 1. 扣减库存
inventoryService.deductStock(event.getSkuId(), event.getQuantity());

// 2. 扣减账户余额
accountService.deductBalance(event.getUserId(), event.getAmount());

// 3. 更新订单状态
Order order = orderMapper.selectById(event.getOrderId());
order.setStatus(OrderStatus.CONFIRMED);
orderMapper.updateById(order);
} catch (Exception e) {
// 处理失败,更新订单状态为失败
Order order = orderMapper.selectById(event.getOrderId());
order.setStatus(OrderStatus.FAILED);
order.setFailureReason(e.getMessage());
orderMapper.updateById(order);
}
}
}

6.3 案例3:社交网络(因果一致性)

6.3.1 场景

需求:社交网络,评论必须按照因果关系顺序显示。

一致性要求因果一致性

实现方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
public class CommentService {

@Autowired
private CommentMapper commentMapper;

@Autowired
private VectorClockService vectorClockService;

public void createComment(Comment comment) {
// 获取当前节点的向量时钟
VectorClock clock = vectorClockService.getCurrentClock();
clock.tick(getNodeId());

// 如果有父评论,更新向量时钟(因果一致性)
if (comment.getParentId() != null) {
Comment parent = commentMapper.selectById(comment.getParentId());
clock.update(parent.getClock());
}

comment.setClock(clock);
commentMapper.insert(comment);
}

public List<Comment> getComments(Long postId) {
// 获取所有评论
List<Comment> comments = commentMapper.selectByPostId(postId);

// 按照向量时钟排序(因果一致性)
comments.sort((c1, c2) -> {
if (c1.getClock().happensBefore(c2.getClock())) {
return -1;
} else if (c2.getClock().happensBefore(c1.getClock())) {
return 1;
} else {
// 并发,按照时间戳排序
return c1.getCreateTime().compareTo(c2.getCreateTime());
}
});

return comments;
}
}

7. 一致性级别选择

7.1 选择原则

7.1.1 业务需求

根据业务需求选择一致性级别

  • 金融系统:强一致性
  • 电商系统:最终一致性
  • 社交网络:因果一致性
  • 配置中心:强一致性

7.1.2 性能要求

根据性能要求选择一致性级别

  • 高性能:最终一致性
  • 中等性能:会话一致性、读写一致性
  • 低性能:强一致性

7.1.3 可用性要求

根据可用性要求选择一致性级别

  • 高可用:最终一致性
  • 中等可用:会话一致性
  • 低可用:强一致性

7.2 混合使用

7.2.1 读写分离

读写分离:写操作强一致性,读操作最终一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class ProductService {

public void updateProduct(Product product) {
// 写操作:强一致性
productMapper.updateById(product);
}

public Product getProduct(Long productId) {
// 读操作:最终一致性(从从库读取)
return productMapper.selectByIdFromSlave(productId);
}
}

7.2.2 关键路径强一致

关键路径强一致:关键操作强一致性,非关键操作最终一致性。

1
2
3
4
5
6
7
8
9
10
11
@Service
public class OrderService {

public void createOrder(Order order) {
// 关键操作:创建订单(强一致性)
orderMapper.insert(order);

// 非关键操作:发送通知(最终一致性)
notificationService.sendOrderNotificationAsync(order);
}
}

8. 总结

8.1 核心要点

  1. 一致性级别:从强一致性到最终一致性,有多种级别
  2. 强一致性:使用2PC、3PC、分布式事务、分布式锁、共识算法
  3. 最终一致性:使用主从复制、消息队列、事件溯源、版本向量
  4. 其他一致性:读写一致性、单调读一致性、会话一致性、因果一致性
  5. 选择原则:根据业务需求、性能要求、可用性要求选择

8.2 关键理解

  1. 强一致性:保证数据立即一致,但性能较低
  2. 最终一致性:允许暂时不一致,但性能较高
  3. 混合使用:不同场景使用不同的一致性级别
  4. 业务驱动:根据业务需求选择一致性级别

8.3 最佳实践

  1. 金融系统:强一致性(分布式锁、分布式事务)
  2. 电商系统:最终一致性(消息队列、主从复制)
  3. 社交网络:因果一致性(向量时钟)
  4. 配置中心:强一致性(共识算法)
  5. 混合使用:关键路径强一致,非关键路径最终一致

相关文章