第485集读写一致性怎么做?延迟双删、订阅 binlog、消息通知
|字数总计:4.2k|阅读时长:17分钟|阅读量:
读写一致性怎么做?延迟双删、订阅 binlog、消息通知
1. 概述
1.1 读写一致性的重要性
读写一致性是分布式系统设计的核心问题之一,特别是在使用缓存时,如何保证缓存和数据库的一致性是一个关键挑战。
读写一致性的挑战:
- 缓存更新时机:何时更新缓存
- 并发更新:并发场景下的数据一致性
- 缓存失效:缓存失效策略
- 数据同步:缓存和数据库的数据同步
1.2 常见问题场景
常见问题场景:
- 写后读不一致:写入数据库后,读取缓存可能还是旧数据
- 并发更新不一致:多个请求同时更新,导致数据不一致
- 缓存穿透:缓存失效后,大量请求直接访问数据库
- 缓存雪崩:大量缓存同时失效,导致数据库压力过大
1.3 本文内容结构
本文将从以下几个方面全面解析读写一致性:
- 读写一致性问题:问题场景、原因分析
- 延迟双删方案:原理、实现、优缺点
- 订阅Binlog方案:原理、实现、优缺点
- 消息通知方案:原理、实现、优缺点
- 方案对比:各方案的优缺点和适用场景
- 最佳实践:实际项目中的最佳实践
2. 读写一致性问题
2.1 问题场景
2.1.1 写后读不一致
场景描述:
- 用户A更新数据,写入数据库
- 删除缓存
- 用户B读取数据,缓存未命中,从数据库读取
- 用户B读取到旧数据,写入缓存
- 用户A再次读取,从缓存读取到旧数据
问题:写后读可能读到旧数据。
2.1.2 并发更新不一致
场景描述:
- 用户A更新数据,写入数据库,删除缓存
- 用户B更新数据,写入数据库,删除缓存
- 用户C读取数据,缓存未命中,从数据库读取
- 用户C读取到用户B的数据,写入缓存
- 用户A再次读取,从缓存读取到用户B的数据
问题:并发更新可能导致数据不一致。
2.2 问题原因
2.2.1 缓存更新策略
常见缓存更新策略:
- 先更新数据库,再删除缓存:可能导致读不一致
- 先删除缓存,再更新数据库:可能导致写不一致
- 先更新数据库,再更新缓存:可能导致并发不一致
2.2.2 时序问题
时序问题:
- 读写时序:读和写的时序问题
- 并发时序:并发操作的时序问题
- 网络延迟:网络延迟导致的操作顺序问题
3. 延迟双删方案
3.1 原理
3.1.1 基本思路
延迟双删(Delayed Double Delete):
- 第一次删除:更新数据库前,先删除缓存
- 更新数据库:更新数据库
- 延迟删除:延迟一段时间后,再次删除缓存
目的:确保缓存中的数据是最终一致的数据。
3.1.2 为什么需要延迟
延迟的原因:
- 主从延迟:如果使用主从复制,可能存在主从延迟
- 异步处理:数据库更新可能是异步的
- 并发读取:延迟期间可能有其他请求读取并写入缓存
3.2 实现代码
3.2.1 基础实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private ScheduledExecutorService scheduledExecutor;
@Transactional public void updateUser(User user) { String cacheKey = "user:" + user.getId(); redisTemplate.delete(cacheKey); userMapper.updateById(user); scheduledExecutor.schedule(() -> { redisTemplate.delete(cacheKey); }, 500, TimeUnit.MILLISECONDS); } }
|
3.2.2 优化实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| @Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private ScheduledExecutorService scheduledExecutor;
@Transactional public void updateUser(User user) { String cacheKey = "user:" + user.getId(); redisTemplate.delete(cacheKey); userMapper.updateById(user); long delay = calculateDelay(); scheduledExecutor.schedule(() -> { redisTemplate.delete(cacheKey); verifyConsistency(user.getId()); }, delay, TimeUnit.MILLISECONDS); }
private long calculateDelay() { long replicationDelay = getReplicationDelay(); return Math.max(500, replicationDelay + 100); }
private long getReplicationDelay() { return 200; }
private void verifyConsistency(Long userId) { String cacheKey = "user:" + userId; String cacheValue = redisTemplate.opsForValue().get(cacheKey); if (cacheValue != null) { User cachedUser = JSON.parseObject(cacheValue, User.class); User dbUser = userMapper.selectById(userId); if (!cachedUser.equals(dbUser)) { redisTemplate.delete(cacheKey); } } } }
|
3.3 优缺点
3.3.1 优点
优点:
- 实现简单:实现相对简单
- 成本低:不需要额外的中间件
- 适合简单场景:适合业务逻辑简单的场景
3.3.2 缺点
缺点:
- 延迟不确定:延迟时间难以确定,可能过长或过短
- 不能完全保证一致性:仍然可能出现短暂的不一致
- 性能影响:延迟删除可能影响性能
- 主从延迟依赖:依赖主从延迟的准确测量
4. 订阅Binlog方案
4.1 原理
4.1.1 基本思路
订阅Binlog方案:
- 数据库更新:更新数据库,写入Binlog
- 订阅Binlog:订阅数据库的Binlog变更
- 解析Binlog:解析Binlog,获取变更数据
- 更新缓存:根据Binlog变更,更新或删除缓存
优势:
- 实时性:实时同步数据库变更
- 可靠性:基于数据库的Binlog,可靠性高
- 解耦:缓存更新与业务逻辑解耦
4.1.2 Binlog格式
Binlog格式:
- Statement:记录SQL语句
- Row:记录行变更(推荐)
- Mixed:混合模式
4.2 实现代码
4.2.1 Canal实现
Canal:阿里巴巴开源的MySQL Binlog增量订阅&消费组件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
| @Configuration public class CanalConfig { @Bean public CanalConnector canalConnector() { CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("127.0.0.1", 11111), "example", "", "" ); connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); return connector; } }
@Component public class CanalBinlogConsumer { @Autowired private CanalConnector canalConnector; @Autowired private RedisTemplate<String, String> redisTemplate; @PostConstruct public void start() { new Thread(this::consumeBinlog).start(); }
private void consumeBinlog() { while (true) { try { Message message = canalConnector.getWithoutAck(100); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); continue; } List<Entry> entries = message.getEntries(); for (Entry entry : entries) { if (entry.getEntryType() == EntryType.ROWDATA) { handleRowChange(entry); } } canalConnector.ack(batchId); } catch (Exception e) { log.error("Consume binlog failed", e); canalConnector.rollback(); } } }
private void handleRowChange(Entry entry) { RowChange rowChange = null; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { log.error("Parse row change failed", e); return; } EventType eventType = rowChange.getEventType(); String tableName = entry.getHeader().getTableName(); if (!"user".equals(tableName)) { return; } for (RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { handleDelete(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT || eventType == EventType.UPDATE) { handleInsertOrUpdate(rowData.getAfterColumnsList()); } } }
private void handleDelete(List<Column> columns) { Long userId = getUserId(columns); if (userId != null) { String cacheKey = "user:" + userId; redisTemplate.delete(cacheKey); log.info("Delete cache: {}", cacheKey); } }
private void handleInsertOrUpdate(List<Column> columns) { Long userId = getUserId(columns); if (userId != null) { String cacheKey = "user:" + userId; redisTemplate.delete(cacheKey); log.info("Invalidate cache: {}", cacheKey); } }
private Long getUserId(List<Column> columns) { for (Column column : columns) { if ("id".equals(column.getName())) { return Long.parseLong(column.getValue()); } } return null; } }
|
4.2.2 Debezium实现
Debezium:Red Hat开源的分布式平台,用于捕获数据库变更。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| @Configuration public class DebeziumConfig { @Bean public io.debezium.config.Configuration debeziumConfig() { return io.debezium.config.Configuration.create() .with("connector.class", "io.debezium.connector.mysql.MySqlConnector") .with("database.hostname", "localhost") .with("database.port", 3306) .with("database.user", "root") .with("database.password", "password") .with("database.server.id", "1") .with("database.server.name", "mysql-server") .with("database.whitelist", "test") .with("table.whitelist", "test.user") .with("database.history.kafka.bootstrap.servers", "localhost:9092") .with("database.history.kafka.topic", "dbhistory.test") .build(); } }
@Component public class DebeziumBinlogConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @KafkaListener(topics = "mysql-server.test.user") public void consumeBinlog(ConsumerRecord<String, String> record) { try { String value = record.value(); JSONObject json = JSON.parseObject(value); String op = json.getString("op"); JSONObject after = json.getJSONObject("after"); JSONObject before = json.getJSONObject("before"); if ("d".equals(op)) { handleDelete(before); } else if ("c".equals(op) || "u".equals(op)) { handleInsertOrUpdate(after); } } catch (Exception e) { log.error("Consume binlog failed", e); } } private void handleDelete(JSONObject before) { Long userId = before.getLong("id"); if (userId != null) { String cacheKey = "user:" + userId; redisTemplate.delete(cacheKey); } } private void handleInsertOrUpdate(JSONObject after) { Long userId = after.getLong("id"); if (userId != null) { String cacheKey = "user:" + userId; redisTemplate.delete(cacheKey); } } }
|
4.3 优缺点
4.3.1 优点
优点:
- 实时性强:实时同步数据库变更
- 可靠性高:基于数据库Binlog,可靠性高
- 解耦:缓存更新与业务逻辑解耦
- 支持多种数据库:支持MySQL、PostgreSQL等
4.3.2 缺点
缺点:
- 实现复杂:需要引入Canal或Debezium等组件
- 运维成本:需要维护Binlog订阅服务
- 延迟:可能存在一定的延迟(通常很小)
- 资源消耗:Binlog订阅会消耗一定的资源
5. 消息通知方案
5.1 原理
5.1.1 基本思路
消息通知方案:
- 更新数据库:更新数据库
- 发送消息:发送消息到消息队列
- 消费消息:消费消息,更新缓存
- 异步处理:异步处理,不影响主流程
优势:
- 解耦:缓存更新与业务逻辑解耦
- 可靠性:基于消息队列,可靠性高
- 可扩展:可以支持多个消费者
5.1.2 消息队列选择
常用消息队列:
- Kafka:高吞吐量,适合大数据场景
- RocketMQ:阿里开源,适合电商场景
- RabbitMQ:功能丰富,适合复杂场景
5.2 实现代码
5.2.1 基于Kafka实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private KafkaTemplate<String, String> kafkaTemplate;
@Transactional public void updateUser(User user) { userMapper.updateById(user); CacheUpdateEvent event = new CacheUpdateEvent(); event.setEventType("UPDATE"); event.setTableName("user"); event.setEntityId(user.getId().toString()); event.setEntity(user); kafkaTemplate.send("cache-update", JSON.toJSONString(event)); } }
@Component public class CacheUpdateConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @KafkaListener(topics = "cache-update", groupId = "cache-updater") public void handleCacheUpdate(String message) { try { CacheUpdateEvent event = JSON.parseObject(message, CacheUpdateEvent.class); if ("user".equals(event.getTableName())) { String cacheKey = "user:" + event.getEntityId(); if ("DELETE".equals(event.getEventType())) { redisTemplate.delete(cacheKey); } else if ("UPDATE".equals(event.getEventType()) || "INSERT".equals(event.getEventType())) { redisTemplate.delete(cacheKey); } } } catch (Exception e) { log.error("Handle cache update failed", e); } } }
@Data public class CacheUpdateEvent { private String eventType; private String tableName; private String entityId; private Object entity; }
|
5.2.2 基于RocketMQ实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| @Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private RocketMQTemplate rocketMQTemplate;
@Transactional public void updateUser(User user) { userMapper.updateById(user); CacheUpdateEvent event = new CacheUpdateEvent(); event.setEventType("UPDATE"); event.setTableName("user"); event.setEntityId(user.getId().toString()); rocketMQTemplate.convertAndSend("cache-update-topic", event); } }
@Component @RocketMQMessageListener(topic = "cache-update-topic", consumerGroup = "cache-updater") public class CacheUpdateConsumer implements RocketMQListener<CacheUpdateEvent> { @Autowired private RedisTemplate<String, String> redisTemplate; @Override public void onMessage(CacheUpdateEvent event) { try { if ("user".equals(event.getTableName())) { String cacheKey = "user:" + event.getEntityId(); redisTemplate.delete(cacheKey); } } catch (Exception e) { log.error("Handle cache update failed", e); } } }
|
5.2.3 基于本地消息表实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| @Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private CacheUpdateMessageMapper messageMapper;
@Transactional public void updateUser(User user) { userMapper.updateById(user); CacheUpdateMessage message = new CacheUpdateMessage(); message.setId(UUID.randomUUID().toString()); message.setEventType("UPDATE"); message.setTableName("user"); message.setEntityId(user.getId().toString()); message.setStatus(0); message.setCreateTime(LocalDateTime.now()); messageMapper.insert(message); } }
@Component public class CacheUpdateMessageScheduler { @Autowired private CacheUpdateMessageMapper messageMapper; @Autowired private RedisTemplate<String, String> redisTemplate; @Scheduled(fixedDelay = 1000) public void processMessages() { List<CacheUpdateMessage> messages = messageMapper.selectByStatus(0); for (CacheUpdateMessage message : messages) { try { handleCacheUpdate(message); message.setStatus(1); message.setUpdateTime(LocalDateTime.now()); messageMapper.updateById(message); } catch (Exception e) { log.error("Process message failed: {}", message.getId(), e); message.setStatus(2); messageMapper.updateById(message); } } } private void handleCacheUpdate(CacheUpdateMessage message) { if ("user".equals(message.getTableName())) { String cacheKey = "user:" + message.getEntityId(); redisTemplate.delete(cacheKey); } } }
|
5.3 优缺点
5.3.1 优点
优点:
- 解耦:缓存更新与业务逻辑解耦
- 可靠性:基于消息队列,可靠性高
- 可扩展:可以支持多个消费者
- 灵活性:可以灵活处理各种场景
5.3.2 缺点
缺点:
- 延迟:消息处理可能存在延迟
- 复杂度:需要引入消息队列
- 一致性:最终一致性,不是强一致性
- 消息丢失:需要处理消息丢失的情况
6. 方案对比
6.1 对比表
| 方案 |
实时性 |
可靠性 |
复杂度 |
成本 |
适用场景 |
| 延迟双删 |
中 |
中 |
低 |
低 |
简单场景,对一致性要求不高 |
| 订阅Binlog |
高 |
高 |
高 |
中 |
复杂场景,对实时性要求高 |
| 消息通知 |
中 |
高 |
中 |
中 |
需要解耦,支持多消费者 |
6.2 选择建议
6.2.1 延迟双删
适用场景:
- 业务逻辑简单
- 对一致性要求不高
- 成本敏感
- 主从延迟可控
6.2.2 订阅Binlog
适用场景:
- 对实时性要求高
- 需要支持多种数据库
- 需要解耦业务逻辑
- 有运维能力
6.2.3 消息通知
适用场景:
- 需要解耦业务逻辑
- 需要支持多个消费者
- 已有消息队列基础设施
- 可以接受最终一致性
7. 最佳实践
7.1 组合使用
7.1.1 多级缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @Service public class UserService { @Autowired private UserMapper userMapper; @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private CaffeineCache localCache;
public User getUser(Long userId) { String cacheKey = "user:" + userId; User user = localCache.getIfPresent(cacheKey); if (user != null) { return user; } String userJson = redisTemplate.opsForValue().get(cacheKey); if (userJson != null) { user = JSON.parseObject(userJson, User.class); localCache.put(cacheKey, user); return user; } user = userMapper.selectById(userId); if (user != null) { redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(user), 1, TimeUnit.HOURS); localCache.put(cacheKey, user); } return user; } }
|
7.2 监控和告警
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Component public class CacheConsistencyMonitor { @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired private UserMapper userMapper;
@Scheduled(fixedDelay = 60000) public void monitorConsistency() { List<Long> userIds = getRandomUserIds(100); for (Long userId : userIds) { String cacheKey = "user:" + userId; String cacheValue = redisTemplate.opsForValue().get(cacheKey); if (cacheValue != null) { User cachedUser = JSON.parseObject(cacheValue, User.class); User dbUser = userMapper.selectById(userId); if (!cachedUser.equals(dbUser)) { log.warn("Cache inconsistency detected: userId={}", userId); alertService.sendAlert("缓存不一致", "userId: " + userId); redisTemplate.delete(cacheKey); } } } } }
|
8. 总结
8.1 核心要点
- 延迟双删:实现简单,成本低,适合简单场景
- 订阅Binlog:实时性强,可靠性高,适合复杂场景
- 消息通知:解耦性好,可扩展,适合需要解耦的场景
- 方案选择:根据业务特点、一致性要求、成本等因素选择
- 最佳实践:组合使用、监控告警、多级缓存
8.2 关键理解
- 没有完美的方案:每种方案都有优缺点,需要权衡
- 最终一致性:大多数场景下,最终一致性即可
- 监控重要:需要监控缓存一致性,及时发现问题
- 组合使用:可以组合使用多种方案
8.3 最佳实践
- 根据场景选择:根据业务特点选择合适的方案
- 监控一致性:定期检查缓存一致性
- 多级缓存:使用多级缓存提高性能
- 快速修复:发现不一致时快速修复
相关文章: