第473集一致性有哪些级别?强一致/最终一致怎么落地?
|字数总计:5.9k|阅读时长:24分钟|阅读量:
一致性有哪些级别?强一致/最终一致怎么落地?
1. 概述
1.1 一致性的重要性
一致性是分布式系统设计的核心问题之一,决定了系统的数据正确性和用户体验。
一致性的意义:
- 数据正确性:保证数据的准确性和完整性
- 用户体验:保证用户看到的数据是正确的
- 系统可靠性:保证系统在各种情况下都能正常工作
1.2 一致性级别分类
一致性级别从强到弱:
- 强一致性(Strong Consistency)
- 线性一致性(Linearizability)
- 顺序一致性(Sequential Consistency)
- 因果一致性(Causal Consistency)
- 会话一致性(Session Consistency)
- 最终一致性(Eventual Consistency)
1.3 本文内容结构
本文将从以下几个方面全面解析一致性级别:
- 一致性级别分类:各种一致性级别的定义和特点
- 强一致性实现:强一致性的实现方式和落地实践
- 最终一致性实现:最终一致性的实现方式和落地实践
- 其他一致性级别:其他一致性级别的实现方式
- 实战案例:实际项目中的一致性实现
2. 一致性级别分类
2.1 强一致性(Strong Consistency)
2.1.1 定义
强一致性:所有节点在同一时刻看到的数据是完全一致的。
特点:
- 所有节点数据完全相同
- 读操作总是返回最新数据
- 写操作立即在所有节点生效
- 性能较低
适用场景:
2.1.2 实现方式
实现方式:
- 两阶段提交(2PC)
- 三阶段提交(3PC)
- 分布式事务
- 分布式锁
2.2 线性一致性(Linearizability)
2.2.1 定义
线性一致性:所有操作看起来像是按照某个全局顺序执行的,且每个操作都在其调用和返回之间原子性地完成。
特点:
- 所有操作有全局顺序
- 操作是原子的
- 读操作能看到之前所有写操作的结果
与强一致性的区别:
2.2.2 实现方式
实现方式:
- 单主复制:所有写操作都通过主节点
- 共识算法:Raft、Paxos
- 分布式锁:保证操作的原子性
2.3 顺序一致性(Sequential Consistency)
2.3.1 定义
顺序一致性:所有进程看到的操作顺序是一致的,但不需要是实时的。
特点:
- 所有进程看到相同的操作顺序
- 不需要实时性
- 允许延迟
与线性一致性的区别:
2.3.2 实现方式
实现方式:
- 向量时钟:记录操作的因果关系
- 逻辑时钟:记录操作的顺序
- 版本向量:记录数据版本
2.4 因果一致性(Causal Consistency)
2.4.1 定义
因果一致性:有因果关系的事件必须按照因果关系顺序被所有进程看到。
特点:
- 保证因果关系的顺序
- 不保证无因果关系事件的顺序
- 性能较高
适用场景:
2.4.2 实现方式
实现方式:
- 向量时钟:记录因果关系
- 版本向量:记录数据版本
- 依赖追踪:追踪操作的依赖关系
2.5 会话一致性(Session Consistency)
2.5.1 定义
会话一致性:同一会话内的操作保证一致性。
特点:
适用场景:
2.5.2 实现方式
实现方式:
- 粘性会话:同一会话路由到同一节点
- 会话缓存:缓存会话数据
- 会话复制:复制会话数据
2.6 最终一致性(Eventual Consistency)
2.6.1 定义
最终一致性:系统最终会达到一致状态,但不保证立即一致。
特点:
适用场景:
2.6.2 实现方式
实现方式:
- 主从复制:异步复制
- 消息队列:异步处理
- 事件溯源:事件驱动
2.7 其他一致性级别
2.7.1 读写一致性(Read-Your-Writes Consistency)
读写一致性:用户读取自己写入的数据时,总是能看到最新值。
特点:
- 保证用户看到自己的写操作
- 不保证看到其他用户的写操作
- 性能较高
2.7.2 单调读一致性(Monotonic Read Consistency)
单调读一致性:用户不会看到数据回退,即如果用户读取到某个值,后续读取不会看到更旧的值。
特点:
2.7.3 单调写一致性(Monotonic Write Consistency)
单调写一致性:用户的写操作按顺序执行。
特点:
3. 强一致性实现
3.1 两阶段提交(2PC)
3.1.1 原理
两阶段提交(2PC):协调者协调所有参与者完成事务。
阶段1:准备阶段(Prepare)
- 协调者向所有参与者发送Prepare请求
- 参与者执行事务,但不提交
- 参与者返回Yes/No
阶段2:提交阶段(Commit)
- 如果所有参与者都返回Yes,协调者发送Commit
- 如果任何参与者返回No,协调者发送Abort
- 参与者提交或回滚事务
3.1.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| @Service public class TwoPhaseCommitService { @Autowired private List<TransactionParticipant> participants; public boolean executeTransaction(Transaction transaction) { boolean allPrepared = true; for (TransactionParticipant participant : participants) { if (!participant.prepare(transaction)) { allPrepared = false; break; } } if (allPrepared) { for (TransactionParticipant participant : participants) { participant.commit(transaction); } return true; } else { for (TransactionParticipant participant : participants) { participant.rollback(transaction); } return false; } } }
@Component public class TransactionParticipant { @Transactional public boolean prepare(Transaction transaction) { try { executeTransaction(transaction); return true; } catch (Exception e) { return false; } } public void commit(Transaction transaction) { transactionManager.commit(); } public void rollback(Transaction transaction) { transactionManager.rollback(); } }
|
3.1.3 优缺点
优点:
缺点:
3.2 三阶段提交(3PC)
3.2.1 原理
三阶段提交(3PC):在2PC基础上增加CanCommit阶段,减少阻塞。
阶段1:CanCommit
- 协调者向所有参与者发送CanCommit请求
- 参与者检查是否可以提交
- 参与者返回Yes/No
阶段2:PreCommit
- 如果所有参与者都返回Yes,协调者发送PreCommit
- 参与者执行事务,但不提交
- 参与者返回Ack
阶段3:DoCommit
- 协调者发送DoCommit
- 参与者提交事务
- 参与者返回Ack
3.2.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Service public class ThreePhaseCommitService { @Autowired private List<TransactionParticipant> participants; public boolean executeTransaction(Transaction transaction) { boolean allCanCommit = true; for (TransactionParticipant participant : participants) { if (!participant.canCommit(transaction)) { allCanCommit = false; break; } } if (!allCanCommit) { for (TransactionParticipant participant : participants) { participant.abort(transaction); } return false; } boolean allPreCommitted = true; for (TransactionParticipant participant : participants) { if (!participant.preCommit(transaction)) { allPreCommitted = false; break; } } if (!allPreCommitted) { for (TransactionParticipant participant : participants) { participant.abort(transaction); } return false; } for (TransactionParticipant participant : participants) { participant.doCommit(transaction); } return true; } }
|
3.2.3 优缺点
优点:
缺点:
3.3 分布式事务(Seata)
3.3.1 原理
Seata:阿里巴巴开源的分布式事务解决方案。
模式:
- AT模式:自动补偿
- TCC模式:Try-Confirm-Cancel
- Saga模式:长事务
3.3.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @Service public class OrderService { @GlobalTransactional(rollbackFor = Exception.class) public void createOrder(Order order) { orderMapper.insert(order); inventoryService.deductStock(order.getSkuId(), order.getQuantity()); accountService.deductBalance(order.getUserId(), order.getAmount()); } }
@Service public class InventoryService { @Transactional public void deductStock(Long skuId, Integer quantity) { Inventory inventory = inventoryMapper.selectById(skuId); if (inventory.getStock() < quantity) { throw new BusinessException("库存不足"); } inventory.setStock(inventory.getStock() - quantity); inventoryMapper.updateById(inventory); } }
|
3.4 分布式锁(Redis)
3.4.1 原理
分布式锁:使用Redis实现分布式锁,保证操作的原子性。
实现方式:
- SETNX:设置键值,如果不存在
- Redisson:Java客户端,提供分布式锁
- ZooKeeper:使用ZooKeeper实现分布式锁
3.4.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Service public class AccountService { @Autowired private RedissonClient redissonClient; public void transfer(Long fromAccount, Long toAccount, BigDecimal amount) { RLock lock = redissonClient.getLock("account:lock:" + fromAccount); try { boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS); if (!locked) { throw new BusinessException("系统繁忙,请稍后再试"); } Account fromAccountObj = accountMapper.selectById(fromAccount); if (fromAccountObj.getBalance().compareTo(amount) < 0) { throw new BusinessException("余额不足"); } fromAccountObj.setBalance(fromAccountObj.getBalance().subtract(amount)); accountMapper.updateById(fromAccountObj); Account toAccountObj = accountMapper.selectById(toAccount); toAccountObj.setBalance(toAccountObj.getBalance().add(amount)); accountMapper.updateById(toAccountObj); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
|
3.5 共识算法(Raft)
3.5.1 原理
Raft:分布式一致性算法,保证强一致性。
角色:
- Leader:处理所有客户端请求
- Follower:跟随Leader
- Candidate:选举过程中的候选者
流程:
- 选举:选举Leader
- 日志复制:Leader复制日志到Follower
- 提交:大多数节点确认后提交
3.5.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
@Service public class ConfigService { @Autowired private HazelcastInstance hazelcastInstance; public void setConfig(String key, String value) { IMap<String, String> configMap = hazelcastInstance.getMap("config"); configMap.put(key, value); } public String getConfig(String key) { IMap<String, String> configMap = hazelcastInstance.getMap("config"); return configMap.get(key); } }
|
4. 最终一致性实现
4.1 主从复制
4.1.1 原理
主从复制:主节点写入,从节点异步复制。
特点:
4.1.2 MySQL主从复制
配置主库:
1 2 3 4 5
| [mysqld] server-id = 1 log-bin = mysql-bin binlog-format = ROW
|
配置从库:
1 2 3 4 5
| [mysqld] server-id = 2 relay-log = mysql-relay-bin read-only = 1
|
Java代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Service public class OrderService { @Autowired @Qualifier("masterDataSource") private DataSource masterDataSource; @Autowired @Qualifier("slaveDataSource") private DataSource slaveDataSource; public void createOrder(Order order) { JdbcTemplate masterTemplate = new JdbcTemplate(masterDataSource); masterTemplate.update("INSERT INTO orders (id, user_id, amount) VALUES (?, ?, ?)", order.getId(), order.getUserId(), order.getAmount()); } public Order getOrder(Long orderId) { JdbcTemplate slaveTemplate = new JdbcTemplate(slaveDataSource); return slaveTemplate.queryForObject( "SELECT * FROM orders WHERE id = ?", new Object[]{orderId}, new BeanPropertyRowMapper<>(Order.class) ); } }
|
4.2 消息队列
4.2.1 原理
消息队列:使用消息队列实现异步处理,保证最终一致性。
流程:
- 写入本地数据库
- 发送消息到消息队列
- 消费者异步处理
- 最终达到一致
4.2.2 Kafka实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void createOrder(Order order) { orderMapper.insert(order); OrderEvent event = new OrderEvent(); event.setOrderId(order.getId()); event.setUserId(order.getUserId()); event.setAmount(order.getAmount()); event.setEventType("ORDER_CREATED"); kafkaTemplate.send("order-events", JSON.toJSONString(event)); } }
@Component public class OrderEventConsumer { @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService; @KafkaListener(topics = "order-events", groupId = "order-processor") public void processOrderEvent(String message) { OrderEvent event = JSON.parseObject(message, OrderEvent.class); switch (event.getEventType()) { case "ORDER_CREATED": inventoryService.deductStockAsync(event.getOrderId()); accountService.deductBalanceAsync(event.getOrderId()); break; } } }
|
4.3 事件溯源(Event Sourcing)
4.3.1 原理
事件溯源:将状态变化记录为事件序列,通过重放事件恢复状态。
特点:
- 所有状态变化都是事件
- 通过重放事件恢复状态
- 保证最终一致性
4.3.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| @Entity public class OrderEvent { @Id private String eventId; private String orderId; private String eventType; private String eventData; private LocalDateTime eventTime; }
@Service public class OrderService { @Autowired private OrderEventRepository eventRepository; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void createOrder(Order order) { OrderEvent event = new OrderEvent(); event.setEventId(UUID.randomUUID().toString()); event.setOrderId(order.getId()); event.setEventType("ORDER_CREATED"); event.setEventData(JSON.toJSONString(order)); event.setEventTime(LocalDateTime.now()); eventRepository.save(event); kafkaTemplate.send("order-events", JSON.toJSONString(event)); } public Order getOrder(Long orderId) { List<OrderEvent> events = eventRepository.findByOrderId(orderId); Order order = new Order(); for (OrderEvent event : events) { applyEvent(order, event); } return order; } private void applyEvent(Order order, OrderEvent event) { switch (event.getEventType()) { case "ORDER_CREATED": Order orderData = JSON.parseObject(event.getEventData(), Order.class); order.setId(orderData.getId()); order.setUserId(orderData.getUserId()); order.setAmount(orderData.getAmount()); break; case "ORDER_PAID": order.setStatus(OrderStatus.PAID); break; case "ORDER_SHIPPED": order.setStatus(OrderStatus.SHIPPED); break; } } }
|
4.4 版本向量(Version Vector)
4.4.1 原理
版本向量:记录每个节点的版本号,用于检测冲突和解决冲突。
特点:
4.4.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class VersionVector { private Map<String, Long> versions = new HashMap<>(); public void increment(String nodeId) { versions.put(nodeId, versions.getOrDefault(nodeId, 0L) + 1); } public boolean happensBefore(VersionVector other) { for (String nodeId : versions.keySet()) { if (versions.get(nodeId) > other.versions.getOrDefault(nodeId, 0L)) { return false; } } return true; } public boolean isConcurrent(VersionVector other) { return !happensBefore(other) && !other.happensBefore(this); } }
@Entity public class Document { @Id private String id; private String content; private VersionVector version; public void merge(Document other) { if (version.isConcurrent(other.version)) { resolveConflict(other); } else if (version.happensBefore(other.version)) { this.content = other.content; this.version = other.version; } } private void resolveConflict(Document other) { if (version.getMaxTimestamp() < other.version.getMaxTimestamp()) { this.content = other.content; this.version = other.version; } } }
|
4.5 向量时钟(Vector Clock)
4.5.1 原理
向量时钟:记录每个节点的逻辑时钟,用于确定事件的因果关系。
特点:
4.5.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class VectorClock { private Map<String, Long> clock = new HashMap<>(); public void tick(String nodeId) { clock.put(nodeId, clock.getOrDefault(nodeId, 0L) + 1); } public void update(VectorClock other) { for (String nodeId : other.clock.keySet()) { clock.put(nodeId, Math.max( clock.getOrDefault(nodeId, 0L), other.clock.get(nodeId) )); } } public boolean happensBefore(VectorClock other) { boolean strictlyLess = false; for (String nodeId : clock.keySet()) { long thisTime = clock.get(nodeId); long otherTime = other.clock.getOrDefault(nodeId, 0L); if (thisTime > otherTime) { return false; } if (thisTime < otherTime) { strictlyLess = true; } } return strictlyLess; } }
@Entity public class Message { @Id private String id; private String content; private VectorClock clock; public boolean canDeliver(List<Message> deliveredMessages) { for (Message delivered : deliveredMessages) { if (delivered.clock.happensBefore(this.clock)) { return false; } } return true; } }
|
5. 其他一致性级别实现
5.1 读写一致性(Read-Your-Writes)
5.1.1 实现方式
读写一致性:用户读取自己写入的数据时,总是能看到最新值。
实现方式:
- 粘性会话:同一用户路由到同一节点
- 版本号:记录用户最后写入的版本
- 缓存:缓存用户最后写入的数据
5.1.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Service public class PostService { @Autowired private PostMapper postMapper; @Autowired private RedisTemplate<String, String> redisTemplate; public void createPost(Post post) { postMapper.insert(post); String cacheKey = "user:last_write:" + post.getUserId(); redisTemplate.opsForValue().set(cacheKey, post.getId(), 1, TimeUnit.HOURS); } public Post getPost(Long postId, Long userId) { String cacheKey = "user:last_write:" + userId; String lastWriteId = redisTemplate.opsForValue().get(cacheKey); if (lastWriteId != null && lastWriteId.equals(postId.toString())) { return postMapper.selectById(postId); } else { return postMapper.selectByIdFromSlave(postId); } } }
|
5.2 单调读一致性(Monotonic Read)
5.2.1 实现方式
单调读一致性:用户不会看到数据回退。
实现方式:
- 版本号:记录用户读取的版本
- 粘性会话:同一用户路由到同一节点
- 缓存:缓存用户读取的数据
5.2.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Service public class ProductService { @Autowired private ProductMapper productMapper; @Autowired private RedisTemplate<String, String> redisTemplate; public Product getProduct(Long productId, Long userId) { String cacheKey = "user:last_read:" + userId + ":" + productId; String lastReadVersion = redisTemplate.opsForValue().get(cacheKey); Product product = productMapper.selectById(productId); if (lastReadVersion != null) { if (product.getVersion() < Long.parseLong(lastReadVersion)) { product = productMapper.selectByVersion(productId, Long.parseLong(lastReadVersion)); } } redisTemplate.opsForValue().set(cacheKey, String.valueOf(product.getVersion()), 1, TimeUnit.HOURS); return product; } }
|
5.3 会话一致性(Session Consistency)
5.3.1 实现方式
会话一致性:同一会话内的操作保证一致性。
实现方式:
- 粘性会话:同一会话路由到同一节点
- 会话复制:复制会话数据到所有节点
- 会话缓存:缓存会话数据
5.3.2 实现代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @Service public class ShoppingCartService { @Autowired private ShoppingCartMapper cartMapper; @Autowired private RedisTemplate<String, String> redisTemplate; public void addToCart(Long userId, Long productId, Integer quantity) { String sessionId = getSessionId(); String nodeId = getNodeIdBySession(sessionId); ShoppingCart cart = cartMapper.selectByUserId(userId); if (cart == null) { cart = new ShoppingCart(); cart.setUserId(userId); cartMapper.insert(cart); } CartItem item = new CartItem(); item.setCartId(cart.getId()); item.setProductId(productId); item.setQuantity(quantity); cartMapper.insertItem(item); String cacheKey = "session:cart:" + sessionId; redisTemplate.opsForValue().set(cacheKey, JSON.toJSONString(cart), 1, TimeUnit.HOURS); } public ShoppingCart getCart(Long userId) { String sessionId = getSessionId(); String cacheKey = "session:cart:" + sessionId; String cartJson = redisTemplate.opsForValue().get(cacheKey); if (cartJson != null) { return JSON.parseObject(cartJson, ShoppingCart.class); } return cartMapper.selectByUserId(userId); } }
|
6. 实战案例
6.1 案例1:支付系统(强一致性)
6.1.1 场景
需求:支付系统,账户余额必须准确。
一致性要求:强一致性
实现方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Service public class PaymentService { @Autowired private RedissonClient redissonClient; @Autowired private AccountMapper accountMapper; @Transactional public void pay(Long accountId, BigDecimal amount) { RLock lock = redissonClient.getLock("account:lock:" + accountId); try { boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS); if (!locked) { throw new BusinessException("系统繁忙,请稍后再试"); } Account account = accountMapper.selectById(accountId); if (account.getBalance().compareTo(amount) < 0) { throw new BusinessException("余额不足"); } account.setBalance(account.getBalance().subtract(amount)); accountMapper.updateById(account); PaymentRecord record = new PaymentRecord(); record.setAccountId(accountId); record.setAmount(amount); record.setStatus(PaymentStatus.SUCCESS); paymentRecordMapper.insert(record); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
|
6.2 案例2:电商系统(最终一致性)
6.2.1 场景
需求:电商系统,订单创建后异步处理库存和账户。
一致性要求:最终一致性
实现方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| @Service public class OrderService { @Autowired private OrderMapper orderMapper; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void createOrder(Order order) { order.setStatus(OrderStatus.PENDING); orderMapper.insert(order); OrderEvent event = new OrderEvent(); event.setOrderId(order.getId()); event.setUserId(order.getUserId()); event.setSkuId(order.getSkuId()); event.setQuantity(order.getQuantity()); event.setAmount(order.getAmount()); event.setEventType("ORDER_CREATED"); kafkaTemplate.send("order-events", JSON.toJSONString(event)); } }
@Component public class OrderEventConsumer { @Autowired private InventoryService inventoryService; @Autowired private AccountService accountService; @KafkaListener(topics = "order-events", groupId = "order-processor") public void processOrderEvent(String message) { OrderEvent event = JSON.parseObject(message, OrderEvent.class); try { inventoryService.deductStock(event.getSkuId(), event.getQuantity()); accountService.deductBalance(event.getUserId(), event.getAmount()); Order order = orderMapper.selectById(event.getOrderId()); order.setStatus(OrderStatus.CONFIRMED); orderMapper.updateById(order); } catch (Exception e) { Order order = orderMapper.selectById(event.getOrderId()); order.setStatus(OrderStatus.FAILED); order.setFailureReason(e.getMessage()); orderMapper.updateById(order); } } }
|
6.3 案例3:社交网络(因果一致性)
6.3.1 场景
需求:社交网络,评论必须按照因果关系顺序显示。
一致性要求:因果一致性
实现方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| @Service public class CommentService { @Autowired private CommentMapper commentMapper; @Autowired private VectorClockService vectorClockService; public void createComment(Comment comment) { VectorClock clock = vectorClockService.getCurrentClock(); clock.tick(getNodeId()); if (comment.getParentId() != null) { Comment parent = commentMapper.selectById(comment.getParentId()); clock.update(parent.getClock()); } comment.setClock(clock); commentMapper.insert(comment); } public List<Comment> getComments(Long postId) { List<Comment> comments = commentMapper.selectByPostId(postId); comments.sort((c1, c2) -> { if (c1.getClock().happensBefore(c2.getClock())) { return -1; } else if (c2.getClock().happensBefore(c1.getClock())) { return 1; } else { return c1.getCreateTime().compareTo(c2.getCreateTime()); } }); return comments; } }
|
7. 一致性级别选择
7.1 选择原则
7.1.1 业务需求
根据业务需求选择一致性级别:
- 金融系统:强一致性
- 电商系统:最终一致性
- 社交网络:因果一致性
- 配置中心:强一致性
7.1.2 性能要求
根据性能要求选择一致性级别:
- 高性能:最终一致性
- 中等性能:会话一致性、读写一致性
- 低性能:强一致性
7.1.3 可用性要求
根据可用性要求选择一致性级别:
- 高可用:最终一致性
- 中等可用:会话一致性
- 低可用:强一致性
7.2 混合使用
7.2.1 读写分离
读写分离:写操作强一致性,读操作最终一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Service public class ProductService { public void updateProduct(Product product) { productMapper.updateById(product); } public Product getProduct(Long productId) { return productMapper.selectByIdFromSlave(productId); } }
|
7.2.2 关键路径强一致
关键路径强一致:关键操作强一致性,非关键操作最终一致性。
1 2 3 4 5 6 7 8 9 10 11
| @Service public class OrderService { public void createOrder(Order order) { orderMapper.insert(order); notificationService.sendOrderNotificationAsync(order); } }
|
8. 总结
8.1 核心要点
- 一致性级别:从强一致性到最终一致性,有多种级别
- 强一致性:使用2PC、3PC、分布式事务、分布式锁、共识算法
- 最终一致性:使用主从复制、消息队列、事件溯源、版本向量
- 其他一致性:读写一致性、单调读一致性、会话一致性、因果一致性
- 选择原则:根据业务需求、性能要求、可用性要求选择
8.2 关键理解
- 强一致性:保证数据立即一致,但性能较低
- 最终一致性:允许暂时不一致,但性能较高
- 混合使用:不同场景使用不同的一致性级别
- 业务驱动:根据业务需求选择一致性级别
8.3 最佳实践
- 金融系统:强一致性(分布式锁、分布式事务)
- 电商系统:最终一致性(消息队列、主从复制)
- 社交网络:因果一致性(向量时钟)
- 配置中心:强一致性(共识算法)
- 混合使用:关键路径强一致,非关键路径最终一致
相关文章: