引言

随着业务规模的不断扩大,单机Redis已经无法满足高并发、高可用的需求。Redis分布式架构成为解决这一问题的关键方案。通过合理的分布式设计,可以实现数据的高可用、负载均衡和水平扩展。

本文将深入探讨Redis分布式架构的各个方面,从基础的主从复制到高级的Redis Cluster,从分布式锁到一致性哈希,提供完整的分布式解决方案。

Redis分布式架构概述

1. 分布式架构的必要性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Redis分布式架构必要性分析
public class RedisDistributedNecessity {

/**
* 单机Redis的局限性
*/
public static void analyzeSingleInstanceLimitations() {
System.out.println("=== 单机Redis的局限性 ===");

System.out.println("1. 容量限制:");
System.out.println(" - 单机内存容量有限");
System.out.println(" - 无法存储海量数据");
System.out.println(" - 数据增长受限");

System.out.println("\n2. 性能瓶颈:");
System.out.println(" - 单机CPU性能有限");
System.out.println(" - 无法充分利用多核CPU");
System.out.println(" - 并发处理能力受限");

System.out.println("\n3. 可用性问题:");
System.out.println(" - 单点故障风险");
System.out.println(" - 服务不可用影响业务");
System.out.println(" - 缺乏容错能力");

System.out.println("\n4. 扩展性问题:");
System.out.println(" - 无法水平扩展");
System.out.println(" - 升级维护困难");
System.out.println(" - 资源利用率低");
}

/**
* 分布式架构的优势
*/
public static void analyzeDistributedAdvantages() {
System.out.println("\n=== 分布式架构的优势 ===");

System.out.println("1. 高可用性:");
System.out.println(" - 多节点冗余");
System.out.println(" - 故障自动切换");
System.out.println(" - 服务持续可用");

System.out.println("\n2. 高性能:");
System.out.println(" - 负载均衡");
System.out.println(" - 并行处理");
System.out.println(" - 资源充分利用");

System.out.println("\n3. 可扩展性:");
System.out.println(" - 水平扩展");
System.out.println(" - 动态扩容");
System.out.println(" - 弹性伸缩");

System.out.println("\n4. 容错性:");
System.out.println(" - 故障隔离");
System.out.println(" - 自动恢复");
System.out.println(" - 数据备份");
}
}

2. Redis分布式架构模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Redis分布式架构模式
public class RedisDistributedPatterns {

/**
* Redis分布式架构模式分析
*/
public static void analyzeDistributedPatterns() {
System.out.println("=== Redis分布式架构模式 ===");

System.out.println("1. 主从复制模式(Master-Slave):");
System.out.println(" - 一个主节点,多个从节点");
System.out.println(" - 主节点负责写操作");
System.out.println(" - 从节点负责读操作");
System.out.println(" - 数据异步复制");

System.out.println("\n2. 哨兵模式(Sentinel):");
System.out.println(" - 监控主从节点状态");
System.out.println(" - 自动故障转移");
System.out.println(" - 配置管理");
System.out.println(" - 通知机制");

System.out.println("\n3. 集群模式(Cluster):");
System.out.println(" - 多主多从架构");
System.out.println(" - 数据分片存储");
System.out.println(" - 自动故障转移");
System.out.println(" - 无中心化设计");

System.out.println("\n4. 分片模式(Sharding):");
System.out.println(" - 客户端分片");
System.out.println(" - 代理分片");
System.out.println(" - 数据水平切分");
System.out.println(" - 负载均衡");
}

/**
* 架构模式选择指南
*/
public static void architectureSelectionGuide() {
System.out.println("\n=== 架构模式选择指南 ===");

System.out.println("1. 主从复制模式适用场景:");
System.out.println(" - 读多写少");
System.out.println(" - 数据量不大");
System.out.println(" - 对一致性要求不高");
System.out.println(" - 预算有限");

System.out.println("\n2. 哨兵模式适用场景:");
System.out.println(" - 需要高可用");
System.out.println(" - 自动故障转移");
System.out.println(" - 监控告警");
System.out.println(" - 配置管理");

System.out.println("\n3. 集群模式适用场景:");
System.out.println(" - 大数据量");
System.out.println(" - 高并发");
System.out.println(" - 水平扩展");
System.out.println(" - 企业级应用");

System.out.println("\n4. 分片模式适用场景:");
System.out.println(" - 数据量巨大");
System.out.println(" - 需要自定义分片");
System.out.println(" - 复杂业务逻辑");
System.out.println(" - 特殊需求");
}
}

Redis主从复制架构

1. 主从复制原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Redis主从复制原理
@Service
public class RedisMasterSlaveReplication {

private static final Logger logger = LoggerFactory.getLogger(RedisMasterSlaveReplication.class);

@Autowired
private RedisTemplate<String, String> redisTemplate;

/**
* 主从复制配置
*/
@Configuration
public static class MasterSlaveConfig {

/**
* 主节点配置
*/
@Bean
public LettuceConnectionFactory masterConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName("127.0.0.1");
config.setPort(6379);
config.setPassword("password");

LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(2))
.poolConfig(createPoolConfig())
.build();

return new LettuceConnectionFactory(config, clientConfig);
}

/**
* 从节点配置
*/
@Bean
public LettuceConnectionFactory slaveConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName("127.0.0.1");
config.setPort(6380);
config.setPassword("password");

LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(2))
.poolConfig(createPoolConfig())
.build();

return new LettuceConnectionFactory(config, clientConfig);
}

/**
* 连接池配置
*/
private GenericObjectPoolConfig<?> createPoolConfig() {
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(20);
poolConfig.setMaxIdle(10);
poolConfig.setMinIdle(5);
poolConfig.setMaxWaitMillis(3000);
return poolConfig;
}
}

/**
* 主从复制监控
*/
public void monitorReplication() {
logger.info("开始监控Redis主从复制...");

try {
// 获取主节点信息
Properties masterInfo = getMasterInfo();
logMasterInfo(masterInfo);

// 获取从节点信息
Properties slaveInfo = getSlaveInfo();
logSlaveInfo(slaveInfo);

// 检查复制状态
checkReplicationStatus(masterInfo, slaveInfo);

} catch (Exception e) {
logger.error("监控主从复制失败: {}", e.getMessage(), e);
}
}

/**
* 获取主节点信息
*/
private Properties getMasterInfo() {
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
Properties info = connection.info();
connection.close();
return info;
}

/**
* 获取从节点信息
*/
private Properties getSlaveInfo() {
// 这里需要连接到从节点
// 实际应用中需要配置多个连接工厂
return new Properties();
}

/**
* 记录主节点信息
*/
private void logMasterInfo(Properties info) {
logger.info("=== 主节点信息 ===");
logger.info("角色: {}", info.getProperty("role"));
logger.info("连接数: {}", info.getProperty("connected_clients"));
logger.info("内存使用: {}", info.getProperty("used_memory_human"));
logger.info("键数量: {}", info.getProperty("db0"));
}

/**
* 记录从节点信息
*/
private void logSlaveInfo(Properties info) {
logger.info("=== 从节点信息 ===");
logger.info("角色: {}", info.getProperty("role"));
logger.info("主节点: {}", info.getProperty("master_host"));
logger.info("主节点端口: {}", info.getProperty("master_port"));
logger.info("复制状态: {}", info.getProperty("master_link_status"));
}

/**
* 检查复制状态
*/
private void checkReplicationStatus(Properties masterInfo, Properties slaveInfo) {
logger.info("=== 复制状态检查 ===");

String masterRole = masterInfo.getProperty("role");
String slaveRole = slaveInfo.getProperty("role");

if ("master".equals(masterRole) && "slave".equals(slaveRole)) {
logger.info("主从复制配置正确");

// 检查复制延迟
String masterOffset = masterInfo.getProperty("master_repl_offset");
String slaveOffset = slaveInfo.getProperty("slave_repl_offset");

if (masterOffset != null && slaveOffset != null) {
long masterOff = Long.parseLong(masterOffset);
long slaveOff = Long.parseLong(slaveOffset);
long lag = masterOff - slaveOff;

logger.info("复制延迟: {} 字节", lag);

if (lag > 1024 * 1024) { // 1MB
logger.warn("复制延迟过大,建议检查网络连接");
}
}
} else {
logger.error("主从复制配置错误");
}
}
}

2. 主从复制优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// Redis主从复制优化
@Service
public class RedisReplicationOptimization {

private static final Logger logger = LoggerFactory.getLogger(RedisReplicationOptimization.class);

/**
* 复制优化配置
*/
@Configuration
public static class ReplicationOptimizationConfig {

/**
* 优化后的主节点配置
*/
@Bean
public LettuceConnectionFactory optimizedMasterConnectionFactory() {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
config.setHostName("127.0.0.1");
config.setPort(6379);
config.setPassword("password");

// 优化配置
LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(1)) // 减少超时时间
.poolConfig(createOptimizedPoolConfig())
.build();

return new LettuceConnectionFactory(config, clientConfig);
}

/**
* 优化后的连接池配置
*/
private GenericObjectPoolConfig<?> createOptimizedPoolConfig() {
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(50); // 增加最大连接数
poolConfig.setMaxIdle(20); // 增加最大空闲连接数
poolConfig.setMinIdle(10); // 增加最小空闲连接数
poolConfig.setMaxWaitMillis(1000); // 减少等待时间
poolConfig.setTestOnBorrow(true); // 借用时测试连接
poolConfig.setTestOnReturn(true); // 归还时测试连接
poolConfig.setTestWhileIdle(true); // 空闲时测试连接
return poolConfig;
}
}

/**
* 复制性能优化
*/
public void optimizeReplicationPerformance() {
logger.info("开始优化Redis复制性能...");

// 1. 调整复制缓冲区大小
adjustReplicationBuffer();

// 2. 优化网络配置
optimizeNetworkConfig();

// 3. 调整复制策略
adjustReplicationStrategy();

// 4. 监控复制性能
monitorReplicationPerformance();
}

/**
* 调整复制缓冲区大小
*/
private void adjustReplicationBuffer() {
logger.info("调整复制缓冲区大小...");

// 通过Redis配置调整
// repl-backlog-size 1gb
// repl-backlog-ttl 3600

logger.info("建议配置:");
logger.info("- repl-backlog-size: 1gb (根据内存情况调整)");
logger.info("- repl-backlog-ttl: 3600 (1小时)");
logger.info("- repl-ping-slave-period: 10 (10秒)");
logger.info("- repl-timeout: 60 (60秒)");
}

/**
* 优化网络配置
*/
private void optimizeNetworkConfig() {
logger.info("优化网络配置...");

logger.info("网络优化建议:");
logger.info("- 使用内网连接");
logger.info("- 调整TCP参数");
logger.info("- 启用TCP_NODELAY");
logger.info("- 调整网络缓冲区大小");
}

/**
* 调整复制策略
*/
private void adjustReplicationStrategy() {
logger.info("调整复制策略...");

logger.info("复制策略建议:");
logger.info("- 使用异步复制");
logger.info("- 调整复制频率");
logger.info("- 优化复制命令");
logger.info("- 使用压缩传输");
}

/**
* 监控复制性能
*/
private void monitorReplicationPerformance() {
logger.info("监控复制性能...");

// 定期检查复制状态
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
checkReplicationHealth();
} catch (Exception e) {
logger.error("监控复制性能失败: {}", e.getMessage());
}
}, 0, 30, TimeUnit.SECONDS);
}

/**
* 检查复制健康状态
*/
private void checkReplicationHealth() {
logger.debug("检查复制健康状态...");

// 检查复制延迟
// 检查连接状态
// 检查内存使用
// 检查网络状况

logger.debug("复制健康状态检查完成");
}
}

Redis Cluster集群架构

1. Redis Cluster原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// Redis Cluster集群架构
@Service
public class RedisClusterArchitecture {

private static final Logger logger = LoggerFactory.getLogger(RedisClusterArchitecture.class);

/**
* Redis Cluster配置
*/
@Configuration
public static class RedisClusterConfig {

/**
* 创建Redis Cluster连接工厂
*/
@Bean
public LettuceConnectionFactory clusterConnectionFactory() {
// 集群节点配置
List<RedisNode> nodes = Arrays.asList(
new RedisNode("127.0.0.1", 7000),
new RedisNode("127.0.0.1", 7001),
new RedisNode("127.0.0.1", 7002),
new RedisNode("127.0.0.1", 7003),
new RedisNode("127.0.0.1", 7004),
new RedisNode("127.0.0.1", 7005)
);

RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration(nodes);
clusterConfig.setPassword("password");
clusterConfig.setMaxRedirects(3);

LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
.commandTimeout(Duration.ofSeconds(2))
.poolConfig(createClusterPoolConfig())
.build();

return new LettuceConnectionFactory(clusterConfig, clientConfig);
}

/**
* 集群连接池配置
*/
private GenericObjectPoolConfig<?> createClusterPoolConfig() {
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(100);
poolConfig.setMaxIdle(50);
poolConfig.setMinIdle(10);
poolConfig.setMaxWaitMillis(3000);
return poolConfig;
}
}

/**
* 集群管理服务
*/
@Service
public static class ClusterManagementService {

@Autowired
private LettuceConnectionFactory clusterConnectionFactory;

/**
* 获取集群信息
*/
public void getClusterInfo() {
logger.info("获取Redis Cluster信息...");

try {
RedisConnection connection = clusterConnectionFactory.getConnection();

// 获取集群节点信息
Properties info = connection.info();
logClusterInfo(info);

// 获取集群拓扑
getClusterTopology();

connection.close();

} catch (Exception e) {
logger.error("获取集群信息失败: {}", e.getMessage(), e);
}
}

/**
* 记录集群信息
*/
private void logClusterInfo(Properties info) {
logger.info("=== Redis Cluster信息 ===");
logger.info("集群状态: {}", info.getProperty("cluster_enabled"));
logger.info("节点数量: {}", info.getProperty("cluster_known_nodes"));
logger.info("槽位数量: {}", info.getProperty("cluster_size"));
logger.info("当前节点: {}", info.getProperty("cluster_my_epoch"));
}

/**
* 获取集群拓扑
*/
private void getClusterTopology() {
logger.info("=== 集群拓扑信息 ===");

// 这里需要实现获取集群拓扑的逻辑
// 实际应用中可以使用Redis Cluster客户端API

logger.info("主节点: 127.0.0.1:7000, 127.0.0.1:7001, 127.0.0.1:7002");
logger.info("从节点: 127.0.0.1:7003, 127.0.0.1:7004, 127.0.0.1:7005");
logger.info("槽位分布: 0-5460, 5461-10922, 10923-16383");
}

/**
* 检查集群健康状态
*/
public void checkClusterHealth() {
logger.info("检查Redis Cluster健康状态...");

try {
// 检查所有节点状态
checkNodeStatus();

// 检查槽位分布
checkSlotDistribution();

// 检查主从关系
checkMasterSlaveRelation();

} catch (Exception e) {
logger.error("检查集群健康状态失败: {}", e.getMessage(), e);
}
}

/**
* 检查节点状态
*/
private void checkNodeStatus() {
logger.info("检查节点状态...");

// 实现节点状态检查逻辑
logger.info("所有节点状态正常");
}

/**
* 检查槽位分布
*/
private void checkSlotDistribution() {
logger.info("检查槽位分布...");

// 实现槽位分布检查逻辑
logger.info("槽位分布正常");
}

/**
* 检查主从关系
*/
private void checkMasterSlaveRelation() {
logger.info("检查主从关系...");

// 实现主从关系检查逻辑
logger.info("主从关系正常");
}
}
}

2. 集群数据分片

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// Redis Cluster数据分片
@Service
public class RedisClusterSharding {

private static final Logger logger = LoggerFactory.getLogger(RedisClusterSharding.class);

/**
* 一致性哈希分片
*/
public static class ConsistentHashSharding {

private final TreeMap<Long, String> circle = new TreeMap<>();
private final int virtualNodes = 160;

/**
* 初始化一致性哈希环
*/
public ConsistentHashSharding(List<String> nodes) {
for (String node : nodes) {
addNode(node);
}
}

/**
* 添加节点
*/
public void addNode(String node) {
for (int i = 0; i < virtualNodes; i++) {
String virtualNode = node + "#" + i;
long hash = hash(virtualNode);
circle.put(hash, node);
}
}

/**
* 移除节点
*/
public void removeNode(String node) {
for (int i = 0; i < virtualNodes; i++) {
String virtualNode = node + "#" + i;
long hash = hash(virtualNode);
circle.remove(hash);
}
}

/**
* 获取节点
*/
public String getNode(String key) {
if (circle.isEmpty()) {
return null;
}

long hash = hash(key);
Map.Entry<Long, String> entry = circle.ceilingEntry(hash);

if (entry == null) {
entry = circle.firstEntry();
}

return entry.getValue();
}

/**
* 计算哈希值
*/
private long hash(String key) {
return key.hashCode();
}
}

/**
* 槽位分片
*/
public static class SlotSharding {

private static final int SLOT_COUNT = 16384;
private final Map<Integer, String> slotToNode = new HashMap<>();

/**
* 初始化槽位分片
*/
public SlotSharding(Map<String, List<Integer>> nodeSlots) {
for (Map.Entry<String, List<Integer>> entry : nodeSlots.entrySet()) {
String node = entry.getKey();
List<Integer> slots = entry.getValue();

for (Integer slot : slots) {
slotToNode.put(slot, node);
}
}
}

/**
* 根据键获取节点
*/
public String getNode(String key) {
int slot = calculateSlot(key);
return slotToNode.get(slot);
}

/**
* 计算槽位
*/
private int calculateSlot(String key) {
// 使用CRC16算法计算槽位
return crc16(key) % SLOT_COUNT;
}

/**
* CRC16算法实现
*/
private int crc16(String key) {
int crc = 0;
for (byte b : key.getBytes()) {
crc = ((crc << 8) ^ crc16Table[((crc >> 8) ^ b) & 0xFF]) & 0xFFFF;
}
return crc;
}

/**
* CRC16查找表
*/
private static final int[] crc16Table = {
0x0000, 0x1021, 0x2042, 0x3063, 0x4084, 0x50A5, 0x60C6, 0x70E7,
0x8108, 0x9129, 0xA14A, 0xB16B, 0xC18C, 0xD1AD, 0xE1CE, 0xF1EF,
// ... 完整的CRC16查找表
};
}

/**
* 分片策略选择
*/
public static class ShardingStrategySelector {

/**
* 选择分片策略
*/
public static ShardingStrategy selectStrategy(ShardingType type) {
switch (type) {
case CONSISTENT_HASH:
return new ConsistentHashStrategy();
case SLOT_BASED:
return new SlotBasedStrategy();
case ROUND_ROBIN:
return new RoundRobinStrategy();
default:
return new ConsistentHashStrategy();
}
}

/**
* 分片策略接口
*/
public interface ShardingStrategy {
String getNode(String key);
}

/**
* 一致性哈希策略
*/
public static class ConsistentHashStrategy implements ShardingStrategy {
@Override
public String getNode(String key) {
// 实现一致性哈希逻辑
return "node1";
}
}

/**
* 槽位策略
*/
public static class SlotBasedStrategy implements ShardingStrategy {
@Override
public String getNode(String key) {
// 实现槽位分片逻辑
return "node1";
}
}

/**
* 轮询策略
*/
public static class RoundRobinStrategy implements ShardingStrategy {
private int currentIndex = 0;
private final String[] nodes = {"node1", "node2", "node3"};

@Override
public String getNode(String key) {
String node = nodes[currentIndex];
currentIndex = (currentIndex + 1) % nodes.length;
return node;
}
}

/**
* 分片类型枚举
*/
public enum ShardingType {
CONSISTENT_HASH,
SLOT_BASED,
ROUND_ROBIN
}
}
}

分布式锁实现

1. 基于Redis的分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Redis分布式锁实现
@Service
public class RedisDistributedLock {

private static final Logger logger = LoggerFactory.getLogger(RedisDistributedLock.class);

@Autowired
private RedisTemplate<String, String> redisTemplate;

private static final String LOCK_PREFIX = "distributed:lock:";
private static final int DEFAULT_EXPIRE_TIME = 30; // 30秒
private static final int DEFAULT_WAIT_TIME = 10; // 10秒

/**
* 获取分布式锁
*/
public boolean acquireLock(String lockKey, String lockValue, int expireTime) {
String key = LOCK_PREFIX + lockKey;

try {
// 使用SET命令的NX和EX选项实现原子性
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, lockValue, Duration.ofSeconds(expireTime));

if (Boolean.TRUE.equals(result)) {
logger.info("成功获取分布式锁: {}", lockKey);
return true;
} else {
logger.warn("获取分布式锁失败: {}", lockKey);
return false;
}

} catch (Exception e) {
logger.error("获取分布式锁异常: {}", e.getMessage(), e);
return false;
}
}

/**
* 释放分布式锁
*/
public boolean releaseLock(String lockKey, String lockValue) {
String key = LOCK_PREFIX + lockKey;

try {
// 使用Lua脚本确保原子性
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

Long result = redisTemplate.execute(
(RedisCallback<Long>) connection ->
connection.eval(luaScript.getBytes(),
ReturnType.INTEGER, 1,
key.getBytes(),
lockValue.getBytes())
);

if (result != null && result == 1) {
logger.info("成功释放分布式锁: {}", lockKey);
return true;
} else {
logger.warn("释放分布式锁失败: {}", lockKey);
return false;
}

} catch (Exception e) {
logger.error("释放分布式锁异常: {}", e.getMessage(), e);
return false;
}
}

/**
* 尝试获取锁(带重试)
*/
public boolean tryLock(String lockKey, String lockValue, int expireTime, int waitTime) {
long startTime = System.currentTimeMillis();
long endTime = startTime + waitTime * 1000;

while (System.currentTimeMillis() < endTime) {
if (acquireLock(lockKey, lockValue, expireTime)) {
return true;
}

try {
Thread.sleep(100); // 等待100ms后重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}

return false;
}

/**
* 锁续期
*/
public boolean renewLock(String lockKey, String lockValue, int expireTime) {
String key = LOCK_PREFIX + lockKey;

try {
// 使用Lua脚本检查锁是否存在并续期
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";

Long result = redisTemplate.execute(
(RedisCallback<Long>) connection ->
connection.eval(luaScript.getBytes(),
ReturnType.INTEGER, 1,
key.getBytes(),
lockValue.getBytes(),
String.valueOf(expireTime).getBytes())
);

return result != null && result == 1;

} catch (Exception e) {
logger.error("锁续期异常: {}", e.getMessage(), e);
return false;
}
}

/**
* 分布式锁工具类
*/
public static class DistributedLockUtil {

/**
* 执行带锁的操作
*/
public static <T> T executeWithLock(RedisDistributedLock lockService,
String lockKey,
String lockValue,
int expireTime,
Supplier<T> operation) {

boolean acquired = false;
try {
acquired = lockService.acquireLock(lockKey, lockValue, expireTime);
if (acquired) {
return operation.get();
} else {
throw new RuntimeException("获取锁失败");
}
} finally {
if (acquired) {
lockService.releaseLock(lockKey, lockValue);
}
}
}

/**
* 执行带锁的操作(无返回值)
*/
public static void executeWithLock(RedisDistributedLock lockService,
String lockKey,
String lockValue,
int expireTime,
Runnable operation) {

boolean acquired = false;
try {
acquired = lockService.acquireLock(lockKey, lockValue, expireTime);
if (acquired) {
operation.run();
} else {
throw new RuntimeException("获取锁失败");
}
} finally {
if (acquired) {
lockService.releaseLock(lockKey, lockValue);
}
}
}
}
}

2. 分布式锁优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// 分布式锁优化
@Service
public class DistributedLockOptimization {

private static final Logger logger = LoggerFactory.getLogger(DistributedLockOptimization.class);

@Autowired
private RedisDistributedLock distributedLock;

/**
* 可重入分布式锁
*/
public static class ReentrantDistributedLock {

private final RedisDistributedLock lockService;
private final ThreadLocal<Map<String, Integer>> lockCount = new ThreadLocal<>();

public ReentrantDistributedLock(RedisDistributedLock lockService) {
this.lockService = lockService;
}

/**
* 获取可重入锁
*/
public boolean acquireReentrantLock(String lockKey, String lockValue, int expireTime) {
Map<String, Integer> countMap = lockCount.get();
if (countMap == null) {
countMap = new HashMap<>();
lockCount.set(countMap);
}

Integer count = countMap.get(lockKey);
if (count != null && count > 0) {
// 重入锁,增加计数
countMap.put(lockKey, count + 1);
return true;
}

// 尝试获取锁
boolean acquired = lockService.acquireLock(lockKey, lockValue, expireTime);
if (acquired) {
countMap.put(lockKey, 1);
}

return acquired;
}

/**
* 释放可重入锁
*/
public boolean releaseReentrantLock(String lockKey, String lockValue) {
Map<String, Integer> countMap = lockCount.get();
if (countMap == null) {
return false;
}

Integer count = countMap.get(lockKey);
if (count == null || count <= 0) {
return false;
}

if (count == 1) {
// 最后一次释放,真正释放锁
countMap.remove(lockKey);
return lockService.releaseLock(lockKey, lockValue);
} else {
// 减少计数
countMap.put(lockKey, count - 1);
return true;
}
}
}

/**
* 读写锁
*/
public static class ReadWriteDistributedLock {

private final RedisDistributedLock lockService;
private static final String READ_LOCK_PREFIX = "read:lock:";
private static final String WRITE_LOCK_PREFIX = "write:lock:";

public ReadWriteDistributedLock(RedisDistributedLock lockService) {
this.lockService = lockService;
}

/**
* 获取读锁
*/
public boolean acquireReadLock(String lockKey, String lockValue, int expireTime) {
String readLockKey = READ_LOCK_PREFIX + lockKey;
return lockService.acquireLock(readLockKey, lockValue, expireTime);
}

/**
* 释放读锁
*/
public boolean releaseReadLock(String lockKey, String lockValue) {
String readLockKey = READ_LOCK_PREFIX + lockKey;
return lockService.releaseLock(readLockKey, lockValue);
}

/**
* 获取写锁
*/
public boolean acquireWriteLock(String lockKey, String lockValue, int expireTime) {
String writeLockKey = WRITE_LOCK_PREFIX + lockKey;
return lockService.acquireLock(writeLockKey, lockValue, expireTime);
}

/**
* 释放写锁
*/
public boolean releaseWriteLock(String lockKey, String lockValue) {
String writeLockKey = WRITE_LOCK_PREFIX + lockKey;
return lockService.releaseLock(writeLockKey, lockValue);
}
}

/**
* 锁监控
*/
public void monitorLocks() {
logger.info("开始监控分布式锁...");

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
checkLockStatus();
} catch (Exception e) {
logger.error("监控分布式锁失败: {}", e.getMessage());
}
}, 0, 30, TimeUnit.SECONDS);
}

/**
* 检查锁状态
*/
private void checkLockStatus() {
logger.debug("检查分布式锁状态...");

// 检查锁的过期时间
// 检查锁的持有者
// 检查锁的等待队列

logger.debug("分布式锁状态检查完成");
}
}

总结

Redis分布式架构是现代高并发系统的重要组成部分,通过合理的架构设计可以实现:

  1. 高可用性:多节点冗余,故障自动切换
  2. 高性能:负载均衡,并行处理
  3. 可扩展性:水平扩展,动态扩容
  4. 容错性:故障隔离,自动恢复

在实际应用中,需要根据业务需求选择合适的分布式架构模式,并做好性能监控和优化工作。

参考资料

  1. 《Redis设计与实现》
  2. 《Redis实战》
  3. Redis官方文档
  4. 《分布式系统概念与设计》
  5. 《高性能MySQL》