读写一致性怎么做?延迟双删、订阅 binlog、消息通知

1. 概述

1.1 读写一致性的重要性

读写一致性是分布式系统设计的核心问题之一,特别是在使用缓存时,如何保证缓存和数据库的一致性是一个关键挑战。

读写一致性的挑战

  • 缓存更新时机:何时更新缓存
  • 并发更新:并发场景下的数据一致性
  • 缓存失效:缓存失效策略
  • 数据同步:缓存和数据库的数据同步

1.2 常见问题场景

常见问题场景

  • 写后读不一致:写入数据库后,读取缓存可能还是旧数据
  • 并发更新不一致:多个请求同时更新,导致数据不一致
  • 缓存穿透:缓存失效后,大量请求直接访问数据库
  • 缓存雪崩:大量缓存同时失效,导致数据库压力过大

1.3 本文内容结构

本文将从以下几个方面全面解析读写一致性:

  1. 读写一致性问题:问题场景、原因分析
  2. 延迟双删方案:原理、实现、优缺点
  3. 订阅Binlog方案:原理、实现、优缺点
  4. 消息通知方案:原理、实现、优缺点
  5. 方案对比:各方案的优缺点和适用场景
  6. 最佳实践:实际项目中的最佳实践

2. 读写一致性问题

2.1 问题场景

2.1.1 写后读不一致

场景描述

  1. 用户A更新数据,写入数据库
  2. 删除缓存
  3. 用户B读取数据,缓存未命中,从数据库读取
  4. 用户B读取到旧数据,写入缓存
  5. 用户A再次读取,从缓存读取到旧数据

问题:写后读可能读到旧数据。

2.1.2 并发更新不一致

场景描述

  1. 用户A更新数据,写入数据库,删除缓存
  2. 用户B更新数据,写入数据库,删除缓存
  3. 用户C读取数据,缓存未命中,从数据库读取
  4. 用户C读取到用户B的数据,写入缓存
  5. 用户A再次读取,从缓存读取到用户B的数据

问题:并发更新可能导致数据不一致。

2.2 问题原因

2.2.1 缓存更新策略

常见缓存更新策略

  • 先更新数据库,再删除缓存:可能导致读不一致
  • 先删除缓存,再更新数据库:可能导致写不一致
  • 先更新数据库,再更新缓存:可能导致并发不一致

2.2.2 时序问题

时序问题

  • 读写时序:读和写的时序问题
  • 并发时序:并发操作的时序问题
  • 网络延迟:网络延迟导致的操作顺序问题

3. 延迟双删方案

3.1 原理

3.1.1 基本思路

延迟双删(Delayed Double Delete)

  1. 第一次删除:更新数据库前,先删除缓存
  2. 更新数据库:更新数据库
  3. 延迟删除:延迟一段时间后,再次删除缓存

目的:确保缓存中的数据是最终一致的数据。

3.1.2 为什么需要延迟

延迟的原因

  • 主从延迟:如果使用主从复制,可能存在主从延迟
  • 异步处理:数据库更新可能是异步的
  • 并发读取:延迟期间可能有其他请求读取并写入缓存

3.2 实现代码

3.2.1 基础实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Service
public class UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private ScheduledExecutorService scheduledExecutor;

/**
* 更新用户(延迟双删)
*/
@Transactional
public void updateUser(User user) {
String cacheKey = "user:" + user.getId();

// 1. 第一次删除缓存
redisTemplate.delete(cacheKey);

// 2. 更新数据库
userMapper.updateById(user);

// 3. 延迟删除缓存(延迟500ms)
scheduledExecutor.schedule(() -> {
redisTemplate.delete(cacheKey);
}, 500, TimeUnit.MILLISECONDS);
}
}

3.2.2 优化实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Service
public class UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private ScheduledExecutorService scheduledExecutor;

/**
* 更新用户(延迟双删优化版)
*/
@Transactional
public void updateUser(User user) {
String cacheKey = "user:" + user.getId();

// 1. 第一次删除缓存
redisTemplate.delete(cacheKey);

// 2. 更新数据库
userMapper.updateById(user);

// 3. 延迟删除缓存(根据主从延迟动态调整)
long delay = calculateDelay();
scheduledExecutor.schedule(() -> {
// 再次删除缓存
redisTemplate.delete(cacheKey);

// 可选:再次验证数据库和缓存的一致性
verifyConsistency(user.getId());
}, delay, TimeUnit.MILLISECONDS);
}

/**
* 计算延迟时间
*/
private long calculateDelay() {
// 方法1:固定延迟(500ms)
// return 500;

// 方法2:根据主从延迟动态调整
long replicationDelay = getReplicationDelay();
return Math.max(500, replicationDelay + 100);
}

/**
* 获取主从延迟
*/
private long getReplicationDelay() {
// 从监控系统获取主从延迟
// 这里简化处理,返回固定值
return 200;
}

/**
* 验证一致性
*/
private void verifyConsistency(Long userId) {
String cacheKey = "user:" + userId;
String cacheValue = redisTemplate.opsForValue().get(cacheKey);

if (cacheValue != null) {
User cachedUser = JSON.parseObject(cacheValue, User.class);
User dbUser = userMapper.selectById(userId);

// 如果缓存和数据库不一致,删除缓存
if (!cachedUser.equals(dbUser)) {
redisTemplate.delete(cacheKey);
}
}
}
}

3.3 优缺点

3.3.1 优点

优点

  • 实现简单:实现相对简单
  • 成本低:不需要额外的中间件
  • 适合简单场景:适合业务逻辑简单的场景

3.3.2 缺点

缺点

  • 延迟不确定:延迟时间难以确定,可能过长或过短
  • 不能完全保证一致性:仍然可能出现短暂的不一致
  • 性能影响:延迟删除可能影响性能
  • 主从延迟依赖:依赖主从延迟的准确测量

4. 订阅Binlog方案

4.1 原理

4.1.1 基本思路

订阅Binlog方案

  1. 数据库更新:更新数据库,写入Binlog
  2. 订阅Binlog:订阅数据库的Binlog变更
  3. 解析Binlog:解析Binlog,获取变更数据
  4. 更新缓存:根据Binlog变更,更新或删除缓存

优势

  • 实时性:实时同步数据库变更
  • 可靠性:基于数据库的Binlog,可靠性高
  • 解耦:缓存更新与业务逻辑解耦

4.1.2 Binlog格式

Binlog格式

  • Statement:记录SQL语句
  • Row:记录行变更(推荐)
  • Mixed:混合模式

4.2 实现代码

4.2.1 Canal实现

Canal:阿里巴巴开源的MySQL Binlog增量订阅&消费组件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@Configuration
public class CanalConfig {

@Bean
public CanalConnector canalConnector() {
// 创建Canal连接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example",
"",
""
);

connector.connect();
connector.subscribe(".*\\..*"); // 订阅所有表
connector.rollback();

return connector;
}
}

@Component
public class CanalBinlogConsumer {

@Autowired
private CanalConnector canalConnector;

@Autowired
private RedisTemplate<String, String> redisTemplate;

@PostConstruct
public void start() {
// 启动Binlog消费线程
new Thread(this::consumeBinlog).start();
}

/**
* 消费Binlog
*/
private void consumeBinlog() {
while (true) {
try {
// 获取批量数据
Message message = canalConnector.getWithoutAck(100);
long batchId = message.getId();
int size = message.getEntries().size();

if (batchId == -1 || size == 0) {
Thread.sleep(1000);
continue;
}

// 处理Binlog事件
List<Entry> entries = message.getEntries();
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
handleRowChange(entry);
}
}

// 提交确认
canalConnector.ack(batchId);
} catch (Exception e) {
log.error("Consume binlog failed", e);
canalConnector.rollback();
}
}
}

/**
* 处理行变更
*/
private void handleRowChange(Entry entry) {
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
log.error("Parse row change failed", e);
return;
}

EventType eventType = rowChange.getEventType();
String tableName = entry.getHeader().getTableName();

// 只处理用户表的变更
if (!"user".equals(tableName)) {
return;
}

for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
// 删除操作
handleDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT || eventType == EventType.UPDATE) {
// 插入或更新操作
handleInsertOrUpdate(rowData.getAfterColumnsList());
}
}
}

/**
* 处理删除操作
*/
private void handleDelete(List<Column> columns) {
Long userId = getUserId(columns);
if (userId != null) {
String cacheKey = "user:" + userId;
redisTemplate.delete(cacheKey);
log.info("Delete cache: {}", cacheKey);
}
}

/**
* 处理插入或更新操作
*/
private void handleInsertOrUpdate(List<Column> columns) {
Long userId = getUserId(columns);
if (userId != null) {
// 删除缓存,下次读取时从数据库加载
String cacheKey = "user:" + userId;
redisTemplate.delete(cacheKey);
log.info("Invalidate cache: {}", cacheKey);
}
}

/**
* 获取用户ID
*/
private Long getUserId(List<Column> columns) {
for (Column column : columns) {
if ("id".equals(column.getName())) {
return Long.parseLong(column.getValue());
}
}
return null;
}
}

4.2.2 Debezium实现

Debezium:Red Hat开源的分布式平台,用于捕获数据库变更。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Configuration
public class DebeziumConfig {

@Bean
public io.debezium.config.Configuration debeziumConfig() {
return io.debezium.config.Configuration.create()
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "root")
.with("database.password", "password")
.with("database.server.id", "1")
.with("database.server.name", "mysql-server")
.with("database.whitelist", "test")
.with("table.whitelist", "test.user")
.with("database.history.kafka.bootstrap.servers", "localhost:9092")
.with("database.history.kafka.topic", "dbhistory.test")
.build();
}
}

@Component
public class DebeziumBinlogConsumer {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@KafkaListener(topics = "mysql-server.test.user")
public void consumeBinlog(ConsumerRecord<String, String> record) {
try {
// 解析Binlog消息
String value = record.value();
JSONObject json = JSON.parseObject(value);

String op = json.getString("op");
JSONObject after = json.getJSONObject("after");
JSONObject before = json.getJSONObject("before");

if ("d".equals(op)) {
// 删除操作
handleDelete(before);
} else if ("c".equals(op) || "u".equals(op)) {
// 插入或更新操作
handleInsertOrUpdate(after);
}
} catch (Exception e) {
log.error("Consume binlog failed", e);
}
}

private void handleDelete(JSONObject before) {
Long userId = before.getLong("id");
if (userId != null) {
String cacheKey = "user:" + userId;
redisTemplate.delete(cacheKey);
}
}

private void handleInsertOrUpdate(JSONObject after) {
Long userId = after.getLong("id");
if (userId != null) {
String cacheKey = "user:" + userId;
redisTemplate.delete(cacheKey);
}
}
}

4.3 优缺点

4.3.1 优点

优点

  • 实时性强:实时同步数据库变更
  • 可靠性高:基于数据库Binlog,可靠性高
  • 解耦:缓存更新与业务逻辑解耦
  • 支持多种数据库:支持MySQL、PostgreSQL等

4.3.2 缺点

缺点

  • 实现复杂:需要引入Canal或Debezium等组件
  • 运维成本:需要维护Binlog订阅服务
  • 延迟:可能存在一定的延迟(通常很小)
  • 资源消耗:Binlog订阅会消耗一定的资源

5. 消息通知方案

5.1 原理

5.1.1 基本思路

消息通知方案

  1. 更新数据库:更新数据库
  2. 发送消息:发送消息到消息队列
  3. 消费消息:消费消息,更新缓存
  4. 异步处理:异步处理,不影响主流程

优势

  • 解耦:缓存更新与业务逻辑解耦
  • 可靠性:基于消息队列,可靠性高
  • 可扩展:可以支持多个消费者

5.1.2 消息队列选择

常用消息队列

  • Kafka:高吞吐量,适合大数据场景
  • RocketMQ:阿里开源,适合电商场景
  • RabbitMQ:功能丰富,适合复杂场景

5.2 实现代码

5.2.1 基于Kafka实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Service
public class UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 更新用户(消息通知方案)
*/
@Transactional
public void updateUser(User user) {
// 1. 更新数据库
userMapper.updateById(user);

// 2. 发送消息到Kafka
CacheUpdateEvent event = new CacheUpdateEvent();
event.setEventType("UPDATE");
event.setTableName("user");
event.setEntityId(user.getId().toString());
event.setEntity(user);

kafkaTemplate.send("cache-update", JSON.toJSONString(event));
}
}

@Component
public class CacheUpdateConsumer {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@KafkaListener(topics = "cache-update", groupId = "cache-updater")
public void handleCacheUpdate(String message) {
try {
CacheUpdateEvent event = JSON.parseObject(message, CacheUpdateEvent.class);

if ("user".equals(event.getTableName())) {
String cacheKey = "user:" + event.getEntityId();

if ("DELETE".equals(event.getEventType())) {
// 删除缓存
redisTemplate.delete(cacheKey);
} else if ("UPDATE".equals(event.getEventType()) || "INSERT".equals(event.getEventType())) {
// 删除缓存,下次读取时从数据库加载
redisTemplate.delete(cacheKey);
}
}
} catch (Exception e) {
log.error("Handle cache update failed", e);
// 可以发送到死信队列
}
}
}

@Data
public class CacheUpdateEvent {
private String eventType; // INSERT, UPDATE, DELETE
private String tableName;
private String entityId;
private Object entity;
}

5.2.2 基于RocketMQ实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Service
public class UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 更新用户(RocketMQ消息通知)
*/
@Transactional
public void updateUser(User user) {
// 1. 更新数据库
userMapper.updateById(user);

// 2. 发送消息到RocketMQ
CacheUpdateEvent event = new CacheUpdateEvent();
event.setEventType("UPDATE");
event.setTableName("user");
event.setEntityId(user.getId().toString());

rocketMQTemplate.convertAndSend("cache-update-topic", event);
}
}

@Component
@RocketMQMessageListener(topic = "cache-update-topic", consumerGroup = "cache-updater")
public class CacheUpdateConsumer implements RocketMQListener<CacheUpdateEvent> {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Override
public void onMessage(CacheUpdateEvent event) {
try {
if ("user".equals(event.getTableName())) {
String cacheKey = "user:" + event.getEntityId();
redisTemplate.delete(cacheKey);
}
} catch (Exception e) {
log.error("Handle cache update failed", e);
}
}
}

5.2.3 基于本地消息表实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Service
public class UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private CacheUpdateMessageMapper messageMapper;

/**
* 更新用户(本地消息表方案)
*/
@Transactional
public void updateUser(User user) {
// 1. 更新数据库
userMapper.updateById(user);

// 2. 插入本地消息表(同一事务)
CacheUpdateMessage message = new CacheUpdateMessage();
message.setId(UUID.randomUUID().toString());
message.setEventType("UPDATE");
message.setTableName("user");
message.setEntityId(user.getId().toString());
message.setStatus(0); // 待处理
message.setCreateTime(LocalDateTime.now());
messageMapper.insert(message);

// 3. 异步发送消息(定时任务扫描)
}
}

@Component
public class CacheUpdateMessageScheduler {

@Autowired
private CacheUpdateMessageMapper messageMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Scheduled(fixedDelay = 1000) // 每秒执行一次
public void processMessages() {
// 查询待处理的消息
List<CacheUpdateMessage> messages = messageMapper.selectByStatus(0);

for (CacheUpdateMessage message : messages) {
try {
// 处理消息
handleCacheUpdate(message);

// 更新消息状态为已处理
message.setStatus(1);
message.setUpdateTime(LocalDateTime.now());
messageMapper.updateById(message);
} catch (Exception e) {
log.error("Process message failed: {}", message.getId(), e);
// 更新消息状态为失败
message.setStatus(2);
messageMapper.updateById(message);
}
}
}

private void handleCacheUpdate(CacheUpdateMessage message) {
if ("user".equals(message.getTableName())) {
String cacheKey = "user:" + message.getEntityId();
redisTemplate.delete(cacheKey);
}
}
}

5.3 优缺点

5.3.1 优点

优点

  • 解耦:缓存更新与业务逻辑解耦
  • 可靠性:基于消息队列,可靠性高
  • 可扩展:可以支持多个消费者
  • 灵活性:可以灵活处理各种场景

5.3.2 缺点

缺点

  • 延迟:消息处理可能存在延迟
  • 复杂度:需要引入消息队列
  • 一致性:最终一致性,不是强一致性
  • 消息丢失:需要处理消息丢失的情况

6. 方案对比

6.1 对比表

方案 实时性 可靠性 复杂度 成本 适用场景
延迟双删 简单场景,对一致性要求不高
订阅Binlog 复杂场景,对实时性要求高
消息通知 需要解耦,支持多消费者

6.2 选择建议

6.2.1 延迟双删

适用场景

  • 业务逻辑简单
  • 对一致性要求不高
  • 成本敏感
  • 主从延迟可控

6.2.2 订阅Binlog

适用场景

  • 对实时性要求高
  • 需要支持多种数据库
  • 需要解耦业务逻辑
  • 有运维能力

6.2.3 消息通知

适用场景

  • 需要解耦业务逻辑
  • 需要支持多个消费者
  • 已有消息队列基础设施
  • 可以接受最终一致性

7. 最佳实践

7.1 组合使用

7.1.1 多级缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Service
public class UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private CaffeineCache localCache;

/**
* 获取用户(多级缓存)
*/
public User getUser(Long userId) {
String cacheKey = "user:" + userId;

// 1. 从本地缓存获取
User user = localCache.getIfPresent(cacheKey);
if (user != null) {
return user;
}

// 2. 从Redis获取
String userJson = redisTemplate.opsForValue().get(cacheKey);
if (userJson != null) {
user = JSON.parseObject(userJson, User.class);
// 回填本地缓存
localCache.put(cacheKey, user);
return user;
}

// 3. 从数据库获取
user = userMapper.selectById(userId);
if (user != null) {
// 写入Redis
redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(user), 1, TimeUnit.HOURS);
// 写入本地缓存
localCache.put(cacheKey, user);
}

return user;
}
}

7.2 监控和告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Component
public class CacheConsistencyMonitor {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired
private UserMapper userMapper;

/**
* 监控缓存一致性
*/
@Scheduled(fixedDelay = 60000) // 每分钟检查一次
public void monitorConsistency() {
// 随机抽样检查
List<Long> userIds = getRandomUserIds(100);

for (Long userId : userIds) {
String cacheKey = "user:" + userId;
String cacheValue = redisTemplate.opsForValue().get(cacheKey);

if (cacheValue != null) {
User cachedUser = JSON.parseObject(cacheValue, User.class);
User dbUser = userMapper.selectById(userId);

if (!cachedUser.equals(dbUser)) {
// 发现不一致,记录告警
log.warn("Cache inconsistency detected: userId={}", userId);
alertService.sendAlert("缓存不一致", "userId: " + userId);

// 修复:删除缓存
redisTemplate.delete(cacheKey);
}
}
}
}
}

8. 总结

8.1 核心要点

  1. 延迟双删:实现简单,成本低,适合简单场景
  2. 订阅Binlog:实时性强,可靠性高,适合复杂场景
  3. 消息通知:解耦性好,可扩展,适合需要解耦的场景
  4. 方案选择:根据业务特点、一致性要求、成本等因素选择
  5. 最佳实践:组合使用、监控告警、多级缓存

8.2 关键理解

  1. 没有完美的方案:每种方案都有优缺点,需要权衡
  2. 最终一致性:大多数场景下,最终一致性即可
  3. 监控重要:需要监控缓存一致性,及时发现问题
  4. 组合使用:可以组合使用多种方案

8.3 最佳实践

  1. 根据场景选择:根据业务特点选择合适的方案
  2. 监控一致性:定期检查缓存一致性
  3. 多级缓存:使用多级缓存提高性能
  4. 快速修复:发现不一致时快速修复

相关文章