第499集分布式系统设计
|字数总计:3.3k|阅读时长:14分钟|阅读量:
分布式系统设计
1. 概述
1.1 分布式系统的重要性
分布式系统是由多个独立的计算机节点通过网络连接,协同工作完成共同任务的系统。随着互联网的发展,分布式系统已成为现代软件架构的主流。
本文内容:
- 分布式系统基础:核心概念和挑战
- CAP定理:一致性、可用性、分区容错性
- 一致性:强一致性、最终一致性
- 分布式事务:两阶段提交、TCC、Saga
- 服务治理:服务发现、负载均衡、熔断降级
- 高可用设计:容错、冗余、故障恢复
1.2 本文内容结构
本文将从以下几个方面深入探讨分布式系统设计:
- 分布式系统基础:核心概念和挑战
- CAP定理:分布式系统的权衡
- 一致性:一致性模型和实现
- 分布式事务:事务处理方案
- 服务治理:服务发现和治理
- 高可用设计:高可用架构设计
2. 分布式系统基础
2.1 分布式系统概念
2.1.1 什么是分布式系统
分布式系统定义:
- 多个独立的计算机节点
- 通过网络连接
- 协同工作完成共同任务
- 对用户透明,表现为单一系统
分布式系统特点:
- 并发性:多个节点同时处理请求
- 无全局时钟:节点间时钟不同步
- 故障独立性:节点故障相互独立
- 网络通信:节点间通过网络通信
2.2 分布式系统挑战
2.2.1 核心挑战
分布式系统挑战:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
public class NetworkPartition { public void handlePartition() { } }
public class NodeFailure { public void handleFailure() { } }
public class ClockSync { public void handleClockSync() { } }
public class DataConsistency { public void handleConsistency() { } }
|
3. CAP定理
3.1 CAP定理概述
3.1.1 CAP定理内容
CAP定理:在分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)三者不能同时满足。
CAP三要素:
- 一致性(Consistency):所有节点同时看到相同的数据
- 可用性(Availability):系统持续可用,响应请求
- 分区容错性(Partition tolerance):系统在网络分区时仍能工作
CAP组合:
- CA:放弃分区容错性(单机系统)
- CP:放弃可用性(保证一致性)
- AP:放弃一致性(保证可用性)
3.2 CAP应用
3.2.1 不同系统的CAP选择
不同系统的CAP选择:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public class CPSystem { public void handleRequest() { if (isPartitioned()) { return Error("Service unavailable due to partition"); } } }
public class APSystem { public void handleRequest() { } }
|
4. 一致性
4.1 一致性模型
4.1.1 强一致性
强一致性:所有节点同时看到相同的数据。
强一致性实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class StrongConsistency { @Autowired private DistributedLock lock; public void updateData(String key, String value) { lock.lock(key); try { updateAllNodes(key, value); waitForAllNodes(); } finally { lock.unlock(key); } } }
|
4.2 最终一致性
4.2.1 最终一致性模型
最终一致性:系统最终会达到一致状态,但允许短暂不一致。
最终一致性实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class EventualConsistency { @Autowired private MessageQueue messageQueue; public void updateData(String key, String value) { updateLocalNode(key, value); UpdateEvent event = new UpdateEvent(key, value); messageQueue.send(event); } }
|
4.3 一致性协议
4.3.1 Raft协议
Raft协议:分布式一致性算法。
Raft核心概念:
- Leader:领导者,处理所有请求
- Follower:跟随者,接收Leader的日志
- Candidate:候选者,选举Leader
Raft示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class RaftNode { private RaftState state = RaftState.FOLLOWER; private int term = 0; private String leaderId; public void handleRequest(Request request) { if (state == RaftState.LEADER) { processRequest(request); replicateToFollowers(request); } else { forwardToLeader(request); } } public void startElection() { state = RaftState.CANDIDATE; term++; requestVote(); } }
|
5. 分布式事务
5.1 两阶段提交(2PC)
5.1.1 2PC流程
两阶段提交(2PC):
- 准备阶段:协调者询问所有参与者是否可以提交
- 提交阶段:根据参与者响应,决定提交或回滚
2PC实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class TwoPhaseCommit { private List<Participant> participants; private Coordinator coordinator; public void executeTransaction(Transaction transaction) { List<Boolean> prepareResults = new ArrayList<>(); for (Participant participant : participants) { boolean prepared = participant.prepare(transaction); prepareResults.add(prepared); } if (prepareResults.stream().allMatch(r -> r)) { for (Participant participant : participants) { participant.commit(transaction); } } else { for (Participant participant : participants) { participant.rollback(transaction); } } } }
|
2PC问题:
- 同步阻塞:参与者需要等待
- 单点故障:协调者故障影响整个事务
- 数据不一致:网络故障可能导致数据不一致
5.2 TCC模式
5.2.1 TCC事务
TCC(Try-Confirm-Cancel):
- Try:尝试执行,预留资源
- Confirm:确认执行,提交资源
- Cancel:取消执行,释放资源
TCC实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class TCCTransaction { public void execute(TransactionContext context) { try { boolean tryResult = tryExecute(context); if (!tryResult) { cancel(context); return; } confirm(context); } catch (Exception e) { cancel(context); } } private boolean tryExecute(TransactionContext context) { return true; } private void confirm(TransactionContext context) { } private void cancel(TransactionContext context) { } }
|
5.3 Saga模式
5.3.1 Saga事务
Saga模式:长事务分解为多个本地事务,每个本地事务有补偿操作。
Saga实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class SagaTransaction { private List<SagaStep> steps; public void execute() { List<SagaStep> executedSteps = new ArrayList<>(); try { for (SagaStep step : steps) { step.execute(); executedSteps.add(step); } } catch (Exception e) { for (int i = executedSteps.size() - 1; i >= 0; i--) { executedSteps.get(i).compensate(); } } } }
class SagaStep { public void execute() { } public void compensate() { } }
|
6. 服务治理
6.1 服务发现
6.1.1 服务注册与发现
服务发现:服务提供者注册服务,服务消费者发现服务。
服务发现实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Service public class ServiceRegistry { @Autowired private DiscoveryClient discoveryClient; public void register(ServiceInstance instance) { discoveryClient.register(instance); } public List<ServiceInstance> discover(String serviceName) { return discoveryClient.getInstances(serviceName); } }
@Service public class ServiceDiscovery { @Autowired private LoadBalancer loadBalancer; public ServiceInstance getInstance(String serviceName) { List<ServiceInstance> instances = serviceRegistry.discover(serviceName); return loadBalancer.choose(instances); } }
|
6.2 负载均衡
6.2.1 负载均衡策略
负载均衡策略:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class LoadBalancer { public ServiceInstance roundRobin(List<ServiceInstance> instances) { int index = (int) (System.currentTimeMillis() % instances.size()); return instances.get(index); } public ServiceInstance random(List<ServiceInstance> instances) { int index = new Random().nextInt(instances.size()); return instances.get(index); } public ServiceInstance weightedRoundRobin(List<ServiceInstance> instances) { int totalWeight = instances.stream() .mapToInt(ServiceInstance::getWeight) .sum(); int random = new Random().nextInt(totalWeight); int current = 0; for (ServiceInstance instance : instances) { current += instance.getWeight(); if (random < current) { return instance; } } return instances.get(0); } public ServiceInstance leastConnection(List<ServiceInstance> instances) { return instances.stream() .min(Comparator.comparingInt(ServiceInstance::getActiveConnections)) .orElse(instances.get(0)); } }
|
6.3 熔断降级
6.3.1 熔断器模式
熔断器模式:防止服务雪崩,快速失败。
熔断器实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class CircuitBreaker { private CircuitState state = CircuitState.CLOSED; private int failureCount = 0; private long lastFailureTime = 0; private static final int FAILURE_THRESHOLD = 5; private static final long TIMEOUT = 60000; public <T> T execute(Supplier<T> supplier) { if (state == CircuitState.OPEN) { if (System.currentTimeMillis() - lastFailureTime > TIMEOUT) { state = CircuitState.HALF_OPEN; } else { throw new CircuitBreakerOpenException("Circuit breaker is open"); } } try { T result = supplier.get(); onSuccess(); return result; } catch (Exception e) { onFailure(); throw e; } } private void onSuccess() { failureCount = 0; if (state == CircuitState.HALF_OPEN) { state = CircuitState.CLOSED; } } private void onFailure() { failureCount++; lastFailureTime = System.currentTimeMillis(); if (failureCount >= FAILURE_THRESHOLD) { state = CircuitState.OPEN; } } }
enum CircuitState { CLOSED, OPEN, HALF_OPEN }
|
6.4 限流
6.4.1 限流算法
限流算法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public class TokenBucket { private int capacity; private int tokens; private long lastRefillTime; private int refillRate; public synchronized boolean tryAcquire() { refill(); if (tokens > 0) { tokens--; return true; } return false; } private void refill() { long now = System.currentTimeMillis(); long elapsed = now - lastRefillTime; int tokensToAdd = (int) (elapsed * refillRate / 1000); tokens = Math.min(capacity, tokens + tokensToAdd); lastRefillTime = now; } }
public class LeakyBucket { private int capacity; private int water; private int leakRate; private long lastLeakTime; public synchronized boolean tryAcquire() { leak(); if (water < capacity) { water++; return true; } return false; } private void leak() { long now = System.currentTimeMillis(); long elapsed = now - lastLeakTime; int waterToLeak = (int) (elapsed * leakRate / 1000); water = Math.max(0, water - waterToLeak); lastLeakTime = now; } }
|
7. 高可用设计
7.1 容错设计
7.1.1 故障隔离
故障隔离:防止故障扩散。
故障隔离实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class ThreadPoolIsolation { private ExecutorService userServicePool = Executors.newFixedThreadPool(10); private ExecutorService orderServicePool = Executors.newFixedThreadPool(10); public void callUserService(Runnable task) { userServicePool.submit(task); } public void callOrderService(Runnable task) { orderServicePool.submit(task); } }
public class SemaphoreIsolation { private Semaphore userServiceSemaphore = new Semaphore(10); private Semaphore orderServiceSemaphore = new Semaphore(10); public void callUserService(Runnable task) throws InterruptedException { userServiceSemaphore.acquire(); try { task.run(); } finally { userServiceSemaphore.release(); } } }
|
7.2 冗余设计
7.2.1 多副本
冗余设计:多副本保证高可用。
多副本实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class Replication { private List<ServiceInstance> replicas; public Response execute(Request request) { Response response = primaryNode.execute(request); asyncReplicate(request, response); return response; } private void asyncReplicate(Request request, Response response) { for (ServiceInstance replica : replicas) { replica.replicate(request, response); } } public ServiceInstance failover() { return replicas.stream() .filter(ServiceInstance::isHealthy) .findFirst() .orElseThrow(() -> new NoAvailableInstanceException()); } }
|
7.3 故障恢复
7.3.1 自动恢复
故障恢复:自动检测和恢复故障。
故障恢复实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class FailureRecovery { @Scheduled(fixedRate = 5000) public void healthCheck() { List<ServiceInstance> instances = getAllInstances(); for (ServiceInstance instance : instances) { if (!instance.isHealthy()) { instance.markUnhealthy(); tryRecover(instance); } } } private void tryRecover(ServiceInstance instance) { instance.restart(); waitForRecovery(instance); if (instance.isHealthy()) { instance.markHealthy(); } } }
|
8. 分布式锁
8.1 分布式锁实现
8.1.1 基于Redis的分布式锁
Redis分布式锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class RedisDistributedLock { @Autowired private RedisTemplate<String, String> redisTemplate; public boolean tryLock(String key, String value, long expireTime) { Boolean result = redisTemplate.opsForValue() .setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS); return Boolean.TRUE.equals(result); } public void unlock(String key, String value) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else return 0 end"; redisTemplate.execute( new DefaultRedisScript<>(script, Long.class), Collections.singletonList(key), value ); } }
|
8.2 基于ZooKeeper的分布式锁
8.2.1 ZooKeeper分布式锁
ZooKeeper分布式锁:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class ZooKeeperDistributedLock { private ZooKeeper zookeeper; private String lockPath; public boolean tryLock(String lockKey) throws Exception { lockPath = "/locks/" + lockKey; String nodePath = zookeeper.create( lockPath + "/", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL ); List<String> children = zookeeper.getChildren(lockPath, false); Collections.sort(children); String currentNode = nodePath.substring(lockPath.length() + 1); if (currentNode.equals(children.get(0))) { return true; } String prevNode = lockPath + "/" + children.get(children.indexOf(currentNode) - 1); CountDownLatch latch = new CountDownLatch(1); zookeeper.exists(prevNode, event -> { if (event.getType() == Watcher.Event.EventType.NodeDeleted) { latch.countDown(); } }); latch.await(); return true; } public void unlock() throws Exception { zookeeper.delete(lockPath, -1); } }
|
9. 总结
9.1 核心要点
- CAP定理:一致性、可用性、分区容错性不能同时满足
- 一致性:强一致性和最终一致性
- 分布式事务:2PC、TCC、Saga等方案
- 服务治理:服务发现、负载均衡、熔断降级
- 高可用:容错、冗余、故障恢复
9.2 关键理解
- 权衡取舍:根据业务场景选择CAP组合
- 最终一致性:大多数场景可以接受最终一致性
- 故障处理:完善的故障检测和恢复机制
- 服务治理:服务发现、负载均衡、熔断降级是基础
9.3 最佳实践
- 合理选择一致性模型:根据业务需求选择
- 完善监控:实时监控系统状态
- 故障演练:定期进行故障演练
- 渐进式演进:从简单到复杂,逐步演进
相关文章: