分布式系统设计

1. 概述

1.1 分布式系统的重要性

分布式系统是由多个独立的计算机节点通过网络连接,协同工作完成共同任务的系统。随着互联网的发展,分布式系统已成为现代软件架构的主流。

本文内容

  • 分布式系统基础:核心概念和挑战
  • CAP定理:一致性、可用性、分区容错性
  • 一致性:强一致性、最终一致性
  • 分布式事务:两阶段提交、TCC、Saga
  • 服务治理:服务发现、负载均衡、熔断降级
  • 高可用设计:容错、冗余、故障恢复

1.2 本文内容结构

本文将从以下几个方面深入探讨分布式系统设计:

  1. 分布式系统基础:核心概念和挑战
  2. CAP定理:分布式系统的权衡
  3. 一致性:一致性模型和实现
  4. 分布式事务:事务处理方案
  5. 服务治理:服务发现和治理
  6. 高可用设计:高可用架构设计

2. 分布式系统基础

2.1 分布式系统概念

2.1.1 什么是分布式系统

分布式系统定义

  • 多个独立的计算机节点
  • 通过网络连接
  • 协同工作完成共同任务
  • 对用户透明,表现为单一系统

分布式系统特点

  1. 并发性:多个节点同时处理请求
  2. 无全局时钟:节点间时钟不同步
  3. 故障独立性:节点故障相互独立
  4. 网络通信:节点间通过网络通信

2.2 分布式系统挑战

2.2.1 核心挑战

分布式系统挑战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 1. 网络分区
// 节点间网络断开,导致通信失败
public class NetworkPartition {
public void handlePartition() {
// 网络分区时,系统需要做出选择:
// - 保证一致性(停止服务)
// - 保证可用性(继续服务,可能数据不一致)
}
}

// 2. 节点故障
// 节点可能随时故障
public class NodeFailure {
public void handleFailure() {
// 需要容错机制:
// - 故障检测
// - 故障恢复
// - 数据备份
}
}

// 3. 时钟不同步
// 节点间时钟不一致
public class ClockSync {
public void handleClockSync() {
// 使用逻辑时钟或向量时钟
// 而不是依赖物理时钟
}
}

// 4. 数据一致性
// 多个节点数据可能不一致
public class DataConsistency {
public void handleConsistency() {
// 需要一致性协议:
// - 强一致性
// - 最终一致性
}
}

3. CAP定理

3.1 CAP定理概述

3.1.1 CAP定理内容

CAP定理:在分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)三者不能同时满足。

CAP三要素

  1. 一致性(Consistency):所有节点同时看到相同的数据
  2. 可用性(Availability):系统持续可用,响应请求
  3. 分区容错性(Partition tolerance):系统在网络分区时仍能工作

CAP组合

  • CA:放弃分区容错性(单机系统)
  • CP:放弃可用性(保证一致性)
  • AP:放弃一致性(保证可用性)

3.2 CAP应用

3.2.1 不同系统的CAP选择

不同系统的CAP选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// CP系统:保证一致性和分区容错性
// 例如:ZooKeeper、etcd
public class CPSystem {
// 网络分区时,停止服务,保证一致性
public void handleRequest() {
if (isPartitioned()) {
return Error("Service unavailable due to partition");
}
// 处理请求,保证一致性
}
}

// AP系统:保证可用性和分区容错性
// 例如:Cassandra、DynamoDB
public class APSystem {
// 网络分区时,继续服务,可能数据不一致
public void handleRequest() {
// 处理请求,优先保证可用性
// 数据最终会一致(最终一致性)
}
}

4. 一致性

4.1 一致性模型

4.1.1 强一致性

强一致性:所有节点同时看到相同的数据。

强一致性实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 使用分布式锁保证强一致性
public class StrongConsistency {

@Autowired
private DistributedLock lock;

public void updateData(String key, String value) {
lock.lock(key);
try {
// 更新所有节点
updateAllNodes(key, value);
// 等待所有节点确认
waitForAllNodes();
} finally {
lock.unlock(key);
}
}
}

4.2 最终一致性

4.2.1 最终一致性模型

最终一致性:系统最终会达到一致状态,但允许短暂不一致。

最终一致性实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 使用消息队列实现最终一致性
public class EventualConsistency {

@Autowired
private MessageQueue messageQueue;

public void updateData(String key, String value) {
// 1. 更新本地节点
updateLocalNode(key, value);

// 2. 发送更新消息到其他节点
UpdateEvent event = new UpdateEvent(key, value);
messageQueue.send(event);

// 3. 其他节点异步更新(最终一致)
}
}

4.3 一致性协议

4.3.1 Raft协议

Raft协议:分布式一致性算法。

Raft核心概念

  • Leader:领导者,处理所有请求
  • Follower:跟随者,接收Leader的日志
  • Candidate:候选者,选举Leader

Raft示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class RaftNode {

private RaftState state = RaftState.FOLLOWER;
private int term = 0;
private String leaderId;

public void handleRequest(Request request) {
if (state == RaftState.LEADER) {
// Leader处理请求
processRequest(request);
// 复制到Follower
replicateToFollowers(request);
} else {
// 转发到Leader
forwardToLeader(request);
}
}

public void startElection() {
state = RaftState.CANDIDATE;
term++;
// 发起选举
requestVote();
}
}

5. 分布式事务

5.1 两阶段提交(2PC)

5.1.1 2PC流程

两阶段提交(2PC)

  1. 准备阶段:协调者询问所有参与者是否可以提交
  2. 提交阶段:根据参与者响应,决定提交或回滚

2PC实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TwoPhaseCommit {

private List<Participant> participants;
private Coordinator coordinator;

public void executeTransaction(Transaction transaction) {
// 阶段1:准备阶段
List<Boolean> prepareResults = new ArrayList<>();
for (Participant participant : participants) {
boolean prepared = participant.prepare(transaction);
prepareResults.add(prepared);
}

// 阶段2:提交阶段
if (prepareResults.stream().allMatch(r -> r)) {
// 所有参与者准备成功,提交
for (Participant participant : participants) {
participant.commit(transaction);
}
} else {
// 有参与者准备失败,回滚
for (Participant participant : participants) {
participant.rollback(transaction);
}
}
}
}

2PC问题

  • 同步阻塞:参与者需要等待
  • 单点故障:协调者故障影响整个事务
  • 数据不一致:网络故障可能导致数据不一致

5.2 TCC模式

5.2.1 TCC事务

TCC(Try-Confirm-Cancel)

  1. Try:尝试执行,预留资源
  2. Confirm:确认执行,提交资源
  3. Cancel:取消执行,释放资源

TCC实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TCCTransaction {

public void execute(TransactionContext context) {
try {
// Try阶段:尝试执行
boolean tryResult = tryExecute(context);
if (!tryResult) {
cancel(context);
return;
}

// Confirm阶段:确认执行
confirm(context);
} catch (Exception e) {
// Cancel阶段:取消执行
cancel(context);
}
}

private boolean tryExecute(TransactionContext context) {
// 尝试执行,预留资源
return true;
}

private void confirm(TransactionContext context) {
// 确认执行,提交资源
}

private void cancel(TransactionContext context) {
// 取消执行,释放资源
}
}

5.3 Saga模式

5.3.1 Saga事务

Saga模式:长事务分解为多个本地事务,每个本地事务有补偿操作。

Saga实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class SagaTransaction {

private List<SagaStep> steps;

public void execute() {
List<SagaStep> executedSteps = new ArrayList<>();

try {
for (SagaStep step : steps) {
step.execute();
executedSteps.add(step);
}
} catch (Exception e) {
// 补偿已执行的步骤
for (int i = executedSteps.size() - 1; i >= 0; i--) {
executedSteps.get(i).compensate();
}
}
}
}

class SagaStep {
public void execute() {
// 执行本地事务
}

public void compensate() {
// 补偿操作
}
}

6. 服务治理

6.1 服务发现

6.1.1 服务注册与发现

服务发现:服务提供者注册服务,服务消费者发现服务。

服务发现实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 服务注册
@Service
public class ServiceRegistry {

@Autowired
private DiscoveryClient discoveryClient;

public void register(ServiceInstance instance) {
// 注册服务实例
discoveryClient.register(instance);
}

public List<ServiceInstance> discover(String serviceName) {
// 发现服务实例
return discoveryClient.getInstances(serviceName);
}
}

// 服务发现
@Service
public class ServiceDiscovery {

@Autowired
private LoadBalancer loadBalancer;

public ServiceInstance getInstance(String serviceName) {
List<ServiceInstance> instances = serviceRegistry.discover(serviceName);
return loadBalancer.choose(instances);
}
}

6.2 负载均衡

6.2.1 负载均衡策略

负载均衡策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class LoadBalancer {

// 轮询
public ServiceInstance roundRobin(List<ServiceInstance> instances) {
int index = (int) (System.currentTimeMillis() % instances.size());
return instances.get(index);
}

// 随机
public ServiceInstance random(List<ServiceInstance> instances) {
int index = new Random().nextInt(instances.size());
return instances.get(index);
}

// 加权轮询
public ServiceInstance weightedRoundRobin(List<ServiceInstance> instances) {
int totalWeight = instances.stream()
.mapToInt(ServiceInstance::getWeight)
.sum();
int random = new Random().nextInt(totalWeight);

int current = 0;
for (ServiceInstance instance : instances) {
current += instance.getWeight();
if (random < current) {
return instance;
}
}
return instances.get(0);
}

// 最少连接
public ServiceInstance leastConnection(List<ServiceInstance> instances) {
return instances.stream()
.min(Comparator.comparingInt(ServiceInstance::getActiveConnections))
.orElse(instances.get(0));
}
}

6.3 熔断降级

6.3.1 熔断器模式

熔断器模式:防止服务雪崩,快速失败。

熔断器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class CircuitBreaker {

private CircuitState state = CircuitState.CLOSED;
private int failureCount = 0;
private long lastFailureTime = 0;
private static final int FAILURE_THRESHOLD = 5;
private static final long TIMEOUT = 60000; // 60秒

public <T> T execute(Supplier<T> supplier) {
if (state == CircuitState.OPEN) {
if (System.currentTimeMillis() - lastFailureTime > TIMEOUT) {
state = CircuitState.HALF_OPEN;
} else {
throw new CircuitBreakerOpenException("Circuit breaker is open");
}
}

try {
T result = supplier.get();
onSuccess();
return result;
} catch (Exception e) {
onFailure();
throw e;
}
}

private void onSuccess() {
failureCount = 0;
if (state == CircuitState.HALF_OPEN) {
state = CircuitState.CLOSED;
}
}

private void onFailure() {
failureCount++;
lastFailureTime = System.currentTimeMillis();

if (failureCount >= FAILURE_THRESHOLD) {
state = CircuitState.OPEN;
}
}
}

enum CircuitState {
CLOSED, // 关闭:正常状态
OPEN, // 打开:快速失败
HALF_OPEN // 半开:尝试恢复
}

6.4 限流

6.4.1 限流算法

限流算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 令牌桶算法
public class TokenBucket {

private int capacity; // 桶容量
private int tokens; // 当前令牌数
private long lastRefillTime;
private int refillRate; // 令牌补充速率

public synchronized boolean tryAcquire() {
refill();
if (tokens > 0) {
tokens--;
return true;
}
return false;
}

private void refill() {
long now = System.currentTimeMillis();
long elapsed = now - lastRefillTime;
int tokensToAdd = (int) (elapsed * refillRate / 1000);
tokens = Math.min(capacity, tokens + tokensToAdd);
lastRefillTime = now;
}
}

// 漏桶算法
public class LeakyBucket {

private int capacity;
private int water;
private int leakRate;
private long lastLeakTime;

public synchronized boolean tryAcquire() {
leak();
if (water < capacity) {
water++;
return true;
}
return false;
}

private void leak() {
long now = System.currentTimeMillis();
long elapsed = now - lastLeakTime;
int waterToLeak = (int) (elapsed * leakRate / 1000);
water = Math.max(0, water - waterToLeak);
lastLeakTime = now;
}
}

7. 高可用设计

7.1 容错设计

7.1.1 故障隔离

故障隔离:防止故障扩散。

故障隔离实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 线程池隔离
public class ThreadPoolIsolation {

private ExecutorService userServicePool = Executors.newFixedThreadPool(10);
private ExecutorService orderServicePool = Executors.newFixedThreadPool(10);

public void callUserService(Runnable task) {
userServicePool.submit(task);
}

public void callOrderService(Runnable task) {
orderServicePool.submit(task);
}
}

// 信号量隔离
public class SemaphoreIsolation {

private Semaphore userServiceSemaphore = new Semaphore(10);
private Semaphore orderServiceSemaphore = new Semaphore(10);

public void callUserService(Runnable task) throws InterruptedException {
userServiceSemaphore.acquire();
try {
task.run();
} finally {
userServiceSemaphore.release();
}
}
}

7.2 冗余设计

7.2.1 多副本

冗余设计:多副本保证高可用。

多副本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Replication {

private List<ServiceInstance> replicas;

public Response execute(Request request) {
// 主节点处理
Response response = primaryNode.execute(request);

// 异步复制到副本
asyncReplicate(request, response);

return response;
}

private void asyncReplicate(Request request, Response response) {
for (ServiceInstance replica : replicas) {
replica.replicate(request, response);
}
}

// 故障转移
public ServiceInstance failover() {
// 选择健康的副本作为新的主节点
return replicas.stream()
.filter(ServiceInstance::isHealthy)
.findFirst()
.orElseThrow(() -> new NoAvailableInstanceException());
}
}

7.3 故障恢复

7.3.1 自动恢复

故障恢复:自动检测和恢复故障。

故障恢复实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class FailureRecovery {

@Scheduled(fixedRate = 5000)
public void healthCheck() {
List<ServiceInstance> instances = getAllInstances();

for (ServiceInstance instance : instances) {
if (!instance.isHealthy()) {
// 标记为不健康
instance.markUnhealthy();

// 尝试恢复
tryRecover(instance);
}
}
}

private void tryRecover(ServiceInstance instance) {
// 重启服务
instance.restart();

// 等待恢复
waitForRecovery(instance);

// 验证恢复
if (instance.isHealthy()) {
instance.markHealthy();
}
}
}

8. 分布式锁

8.1 分布式锁实现

8.1.1 基于Redis的分布式锁

Redis分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RedisDistributedLock {

@Autowired
private RedisTemplate<String, String> redisTemplate;

public boolean tryLock(String key, String value, long expireTime) {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS);
return Boolean.TRUE.equals(result);
}

public void unlock(String key, String value) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key),
value
);
}
}

8.2 基于ZooKeeper的分布式锁

8.2.1 ZooKeeper分布式锁

ZooKeeper分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ZooKeeperDistributedLock {

private ZooKeeper zookeeper;
private String lockPath;

public boolean tryLock(String lockKey) throws Exception {
lockPath = "/locks/" + lockKey;

// 创建临时顺序节点
String nodePath = zookeeper.create(
lockPath + "/",
null,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL
);

// 获取所有子节点
List<String> children = zookeeper.getChildren(lockPath, false);
Collections.sort(children);

// 判断是否是最小节点
String currentNode = nodePath.substring(lockPath.length() + 1);
if (currentNode.equals(children.get(0))) {
return true; // 获得锁
}

// 监听前一个节点
String prevNode = lockPath + "/" + children.get(children.indexOf(currentNode) - 1);
CountDownLatch latch = new CountDownLatch(1);
zookeeper.exists(prevNode, event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
latch.countDown();
}
});
latch.await();

return true;
}

public void unlock() throws Exception {
zookeeper.delete(lockPath, -1);
}
}

9. 总结

9.1 核心要点

  1. CAP定理:一致性、可用性、分区容错性不能同时满足
  2. 一致性:强一致性和最终一致性
  3. 分布式事务:2PC、TCC、Saga等方案
  4. 服务治理:服务发现、负载均衡、熔断降级
  5. 高可用:容错、冗余、故障恢复

9.2 关键理解

  1. 权衡取舍:根据业务场景选择CAP组合
  2. 最终一致性:大多数场景可以接受最终一致性
  3. 故障处理:完善的故障检测和恢复机制
  4. 服务治理:服务发现、负载均衡、熔断降级是基础

9.3 最佳实践

  1. 合理选择一致性模型:根据业务需求选择
  2. 完善监控:实时监控系统状态
  3. 故障演练:定期进行故障演练
  4. 渐进式演进:从简单到复杂,逐步演进

相关文章