1. Redis主从复制概述

Redis主从复制是Redis实现高可用性的基础技术,通过将数据从主节点复制到从节点,实现数据的冗余备份和读写分离。本文将详细介绍Redis主从复制的原理、配置、哨兵模式、集群架构等核心技术的实现,以及生产环境中的最佳实践。

1.1 Redis主从复制的优势

  1. 数据冗余: 主从复制提供数据冗余,提高数据安全性
  2. 读写分离: 主节点处理写操作,从节点处理读操作
  3. 负载均衡: 通过多个从节点分散读请求
  4. 故障恢复: 主节点故障时,从节点可以提升为主节点
  5. 扩展性: 可以动态添加从节点扩展读性能

1.2 Redis主从复制原理

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│ 主节点(Master) │ │ 从节点1(Slave)│ │ 从节点2(Slave)│
│ │ │ │ │ │
│ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │
│ │ 写操作 │ │ │ │ 读操作 │ │ │ │ 读操作 │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
│ │ │ │ │ │
│ ┌───────────┐ │───▶│ ┌───────────┐ │───▶│ ┌───────────┐ │
│ │ 复制缓冲区 │ │ │ │ 复制缓冲区 │ │ │ │ 复制缓冲区 │ │
│ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │
└─────────────────┘ └─────────────────┘ └─────────────────┘

2. Redis主从复制配置

2.1 主节点配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# redis-master.conf
# 基本配置
port 6379
bind 0.0.0.0
protected-mode no

# 持久化配置
save 900 1
save 300 10
save 60 10000

# RDB配置
dbfilename dump.rdb
dir /var/lib/redis

# AOF配置
appendonly yes
appendfilename "appendonly.aof"
appendfsync everysec

# 复制配置
repl-diskless-sync no
repl-diskless-sync-delay 5
repl-ping-slave-period 10
repl-timeout 60
repl-disable-tcp-nodelay no
repl-backlog-size 1mb
repl-backlog-ttl 3600

# 内存配置
maxmemory 2gb
maxmemory-policy allkeys-lru

# 日志配置
loglevel notice
logfile /var/log/redis/redis-master.log

# 安全配置
requirepass master_password

2.2 从节点配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# redis-slave.conf
# 基本配置
port 6380
bind 0.0.0.0
protected-mode no

# 主从复制配置
replicaof 127.0.0.1 6379
masterauth master_password
replica-serve-stale-data yes
replica-read-only yes
repl-diskless-sync no
repl-diskless-sync-delay 5
repl-ping-slave-period 10
repl-timeout 60
repl-disable-tcp-nodelay no
repl-backlog-size 1mb
repl-backlog-ttl 3600

# 持久化配置
save 900 1
save 300 10
save 60 10000
dbfilename dump-slave.rdb
dir /var/lib/redis-slave

# AOF配置
appendonly yes
appendfilename "appendonly-slave.aof"
appendfsync everysec

# 内存配置
maxmemory 2gb
maxmemory-policy allkeys-lru

# 日志配置
loglevel notice
logfile /var/log/redis/redis-slave.log

# 安全配置
requirepass slave_password

2.3 启动Redis主从服务

1
2
3
4
5
6
7
8
9
# 启动主节点
redis-server /etc/redis/redis-master.conf

# 启动从节点
redis-server /etc/redis/redis-slave.conf

# 验证主从复制状态
redis-cli -p 6379 info replication
redis-cli -p 6380 info replication

3. Redis主从复制实现

3.1 主从复制流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Redis主从复制管理器
@Component
public class RedisReplicationManager {

private final RedisTemplate<String, Object> masterRedisTemplate;
private final RedisTemplate<String, Object> slaveRedisTemplate;
private final RedisConnectionFactory masterConnectionFactory;
private final RedisConnectionFactory slaveConnectionFactory;

public RedisReplicationManager(
@Qualifier("masterRedisTemplate") RedisTemplate<String, Object> masterRedisTemplate,
@Qualifier("slaveRedisTemplate") RedisTemplate<String, Object> slaveRedisTemplate,
@Qualifier("masterConnectionFactory") RedisConnectionFactory masterConnectionFactory,
@Qualifier("slaveConnectionFactory") RedisConnectionFactory slaveConnectionFactory) {
this.masterRedisTemplate = masterRedisTemplate;
this.slaveRedisTemplate = slaveRedisTemplate;
this.masterConnectionFactory = masterConnectionFactory;
this.slaveConnectionFactory = slaveConnectionFactory;
}

// 写入操作(主节点)
public void set(String key, Object value) {
masterRedisTemplate.opsForValue().set(key, value);
}

public void set(String key, Object value, Duration timeout) {
masterRedisTemplate.opsForValue().set(key, value, timeout);
}

public void delete(String key) {
masterRedisTemplate.delete(key);
}

public void delete(Collection<String> keys) {
masterRedisTemplate.delete(keys);
}

// 读取操作(从节点)
public Object get(String key) {
return slaveRedisTemplate.opsForValue().get(key);
}

public List<Object> multiGet(Collection<String> keys) {
return slaveRedisTemplate.opsForValue().multiGet(keys);
}

public Boolean exists(String key) {
return slaveRedisTemplate.hasKey(key);
}

public Set<String> keys(String pattern) {
return slaveRedisTemplate.keys(pattern);
}

// 获取主从复制状态
public ReplicationInfo getReplicationInfo() {
ReplicationInfo info = new ReplicationInfo();

// 获取主节点信息
RedisConnection masterConnection = masterConnectionFactory.getConnection();
Properties masterInfo = masterConnection.info("replication");
info.setMasterInfo(parseReplicationInfo(masterInfo));

// 获取从节点信息
RedisConnection slaveConnection = slaveConnectionFactory.getConnection();
Properties slaveInfo = slaveConnection.info("replication");
info.setSlaveInfo(parseReplicationInfo(slaveInfo));

return info;
}

private ReplicationNodeInfo parseReplicationInfo(Properties info) {
ReplicationNodeInfo nodeInfo = new ReplicationNodeInfo();
nodeInfo.setRole(info.getProperty("role"));
nodeInfo.setConnectedSlaves(Integer.parseInt(info.getProperty("connected_slaves", "0")));
nodeInfo.setMasterReplOffset(Long.parseLong(info.getProperty("master_repl_offset", "0")));
nodeInfo.setReplBacklogActive(Integer.parseInt(info.getProperty("repl_backlog_active", "0")));
nodeInfo.setReplBacklogSize(Long.parseLong(info.getProperty("repl_backlog_size", "0")));
nodeInfo.setReplBacklogFirstByteOffset(Long.parseLong(info.getProperty("repl_backlog_first_byte_offset", "0")));
nodeInfo.setReplBacklogHistlen(Long.parseLong(info.getProperty("repl_backlog_histlen", "0")));
return nodeInfo;
}

// 检查主从复制延迟
public long getReplicationLag() {
RedisConnection masterConnection = masterConnectionFactory.getConnection();
Properties masterInfo = masterConnection.info("replication");
long masterOffset = Long.parseLong(masterInfo.getProperty("master_repl_offset", "0"));

RedisConnection slaveConnection = slaveConnectionFactory.getConnection();
Properties slaveInfo = slaveConnection.info("replication");
long slaveOffset = Long.parseLong(slaveInfo.getProperty("master_repl_offset", "0"));

return masterOffset - slaveOffset;
}

// 强制同步
public void forceSync() {
RedisConnection masterConnection = masterConnectionFactory.getConnection();
masterConnection.sync();
}
}

3.2 主从复制监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Redis主从复制监控服务
@Service
@Slf4j
public class RedisReplicationMonitor {

private final RedisReplicationManager replicationManager;
private final MeterRegistry meterRegistry;

public RedisReplicationMonitor(RedisReplicationManager replicationManager,
MeterRegistry meterRegistry) {
this.replicationManager = replicationManager;
this.meterRegistry = meterRegistry;
}

@Scheduled(fixedDelay = 30000) // 每30秒检查一次
public void monitorReplication() {
try {
ReplicationInfo info = replicationManager.getReplicationInfo();
long lag = replicationManager.getReplicationLag();

// 记录复制延迟指标
meterRegistry.gauge("redis.replication.lag", lag);

// 记录主从节点状态
meterRegistry.gauge("redis.master.connected_slaves",
info.getMasterInfo().getConnectedSlaves());

// 检查复制延迟是否过高
if (lag > 1000000) { // 1MB延迟阈值
log.warn("Redis replication lag is high: {} bytes", lag);
alertHighReplicationLag(lag);
}

// 检查从节点连接状态
if (info.getMasterInfo().getConnectedSlaves() == 0) {
log.error("No slaves connected to master!");
alertNoSlavesConnected();
}

log.debug("Replication status - Master: {}, Slave: {}, Lag: {} bytes",
info.getMasterInfo().getRole(),
info.getSlaveInfo().getRole(),
lag);

} catch (Exception e) {
log.error("Error monitoring Redis replication", e);
}
}

private void alertHighReplicationLag(long lag) {
// 发送告警通知
log.error("ALERT: High Redis replication lag detected: {} bytes", lag);
// 可以集成告警系统,如发送邮件、短信等
}

private void alertNoSlavesConnected() {
// 发送告警通知
log.error("ALERT: No slaves connected to Redis master!");
// 可以集成告警系统
}

// 获取复制统计信息
public ReplicationStats getReplicationStats() {
ReplicationInfo info = replicationManager.getReplicationInfo();
long lag = replicationManager.getReplicationLag();

return ReplicationStats.builder()
.masterRole(info.getMasterInfo().getRole())
.slaveRole(info.getSlaveInfo().getRole())
.connectedSlaves(info.getMasterInfo().getConnectedSlaves())
.replicationLag(lag)
.masterOffset(info.getMasterInfo().getMasterReplOffset())
.slaveOffset(info.getSlaveInfo().getMasterReplOffset())
.backlogSize(info.getMasterInfo().getReplBacklogSize())
.backlogActive(info.getMasterInfo().getReplBacklogActive())
.timestamp(LocalDateTime.now())
.build();
}
}

4. Redis哨兵模式

4.1 哨兵配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# sentinel.conf
# 基本配置
port 26379
bind 0.0.0.0
protected-mode no

# 监控配置
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel auth-pass mymaster master_password
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000
sentinel deny-scripts-reconfig yes

# 日志配置
logfile /var/log/redis/sentinel.log
loglevel notice

# 通知脚本
sentinel notification-script mymaster /etc/redis/notify.sh
sentinel client-reconfig-script mymaster /etc/redis/reconfig.sh

# 其他配置
sentinel resolve-hostnames no
sentinel announce-hostnames no

4.2 哨兵模式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// Redis哨兵管理器
@Component
public class RedisSentinelManager {

private final RedisTemplate<String, Object> redisTemplate;
private final RedisConnectionFactory connectionFactory;
private final ApplicationEventPublisher eventPublisher;

public RedisSentinelManager(RedisTemplate<String, Object> redisTemplate,
RedisConnectionFactory connectionFactory,
ApplicationEventPublisher eventPublisher) {
this.redisTemplate = redisTemplate;
this.connectionFactory = connectionFactory;
this.eventPublisher = eventPublisher;
}

// 获取哨兵信息
public SentinelInfo getSentinelInfo() {
SentinelInfo info = new SentinelInfo();

RedisConnection connection = connectionFactory.getConnection();

// 获取主节点信息
Properties masterInfo = connection.info("replication");
info.setMasterRole(masterInfo.getProperty("role"));
info.setMasterHost(masterInfo.getProperty("master_host"));
info.setMasterPort(Integer.parseInt(masterInfo.getProperty("master_port", "0")));

// 获取哨兵信息
Properties sentinelInfo = connection.info("sentinel");
info.setSentinelMasters(Integer.parseInt(sentinelInfo.getProperty("sentinel_masters", "0")));

return info;
}

// 检查主节点状态
public boolean isMasterHealthy() {
try {
RedisConnection connection = connectionFactory.getConnection();
Properties info = connection.info("replication");
String role = info.getProperty("role");
return "master".equals(role);
} catch (Exception e) {
log.error("Error checking master health", e);
return false;
}
}

// 手动故障转移
public void manualFailover() {
try {
RedisConnection connection = connectionFactory.getConnection();
connection.sentinelFailover("mymaster");
log.info("Manual failover initiated");
} catch (Exception e) {
log.error("Error initiating manual failover", e);
throw new RuntimeException("Failed to initiate manual failover", e);
}
}

// 获取哨兵监控的主节点列表
public List<MasterInfo> getMonitoredMasters() {
List<MasterInfo> masters = new ArrayList<>();

try {
RedisConnection connection = connectionFactory.getConnection();
Properties info = connection.info("sentinel");

// 解析哨兵监控的主节点信息
String mastersInfo = info.getProperty("masters");
if (mastersInfo != null) {
// 解析主节点信息
// 这里需要根据实际的哨兵信息格式进行解析
}

} catch (Exception e) {
log.error("Error getting monitored masters", e);
}

return masters;
}

// 添加监控主节点
public void addMaster(String masterName, String host, int port, int quorum) {
try {
RedisConnection connection = connectionFactory.getConnection();
connection.sentinelMonitor(masterName, host, port, quorum);
log.info("Added master monitoring: {} {}:{}", masterName, host, port);
} catch (Exception e) {
log.error("Error adding master monitoring", e);
throw new RuntimeException("Failed to add master monitoring", e);
}
}

// 移除监控主节点
public void removeMaster(String masterName) {
try {
RedisConnection connection = connectionFactory.getConnection();
connection.sentinelRemove(masterName);
log.info("Removed master monitoring: {}", masterName);
} catch (Exception e) {
log.error("Error removing master monitoring", e);
throw new RuntimeException("Failed to remove master monitoring", e);
}
}
}

4.3 哨兵事件监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Redis哨兵事件监听器
@Component
@Slf4j
public class RedisSentinelEventListener {

private final RedisSentinelManager sentinelManager;
private final ApplicationEventPublisher eventPublisher;

public RedisSentinelEventListener(RedisSentinelManager sentinelManager,
ApplicationEventPublisher eventPublisher) {
this.sentinelManager = sentinelManager;
this.eventPublisher = eventPublisher;
}

@EventListener
public void handleSentinelEvent(SentinelEvent event) {
log.info("Received sentinel event: {}", event.getType());

switch (event.getType()) {
case "+switch-master":
handleMasterSwitch(event);
break;
case "+sdown":
handleSubjectivelyDown(event);
break;
case "-sdown":
handleSubjectivelyUp(event);
break;
case "+odown":
handleObjectivelyDown(event);
break;
case "-odown":
handleObjectivelyUp(event);
break;
case "+slave":
handleSlaveAdded(event);
break;
case "-slave":
handleSlaveRemoved(event);
break;
default:
log.debug("Unhandled sentinel event: {}", event.getType());
}
}

private void handleMasterSwitch(SentinelEvent event) {
log.warn("Master switch detected: {}", event.getDetails());

// 发布主节点切换事件
MasterSwitchEvent switchEvent = new MasterSwitchEvent(
event.getMasterName(),
event.getOldMaster(),
event.getNewMaster(),
LocalDateTime.now()
);
eventPublisher.publishEvent(switchEvent);

// 发送通知
sendMasterSwitchNotification(switchEvent);
}

private void handleSubjectivelyDown(SentinelEvent event) {
log.warn("Subjectively down detected: {}", event.getDetails());

// 发布主观下线事件
SubjectivelyDownEvent downEvent = new SubjectivelyDownEvent(
event.getMasterName(),
event.getDetails(),
LocalDateTime.now()
);
eventPublisher.publishEvent(downEvent);
}

private void handleObjectivelyDown(SentinelEvent event) {
log.error("Objectively down detected: {}", event.getDetails());

// 发布客观下线事件
ObjectivelyDownEvent downEvent = new ObjectivelyDownEvent(
event.getMasterName(),
event.getDetails(),
LocalDateTime.now()
);
eventPublisher.publishEvent(downEvent);

// 发送紧急通知
sendEmergencyNotification(downEvent);
}

private void handleSlaveAdded(SentinelEvent event) {
log.info("Slave added: {}", event.getDetails());

// 发布从节点添加事件
SlaveAddedEvent addedEvent = new SlaveAddedEvent(
event.getMasterName(),
event.getDetails(),
LocalDateTime.now()
);
eventPublisher.publishEvent(addedEvent);
}

private void handleSlaveRemoved(SentinelEvent event) {
log.warn("Slave removed: {}", event.getDetails());

// 发布从节点移除事件
SlaveRemovedEvent removedEvent = new SlaveRemovedEvent(
event.getMasterName(),
event.getDetails(),
LocalDateTime.now()
);
eventPublisher.publishEvent(removedEvent);
}

private void sendMasterSwitchNotification(MasterSwitchEvent event) {
// 发送主节点切换通知
log.info("Sending master switch notification: {} -> {}",
event.getOldMaster(), event.getNewMaster());
// 可以集成通知系统
}

private void sendEmergencyNotification(ObjectivelyDownEvent event) {
// 发送紧急通知
log.error("Sending emergency notification for objectively down: {}",
event.getMasterName());
// 可以集成紧急通知系统
}
}

5. Redis集群架构

5.1 集群配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# redis-cluster-node-1.conf
port 7000
cluster-enabled yes
cluster-config-file nodes-7000.conf
cluster-node-timeout 5000
appendonly yes
appendfilename "appendonly-7000.aof"
dbfilename dump-7000.rdb
dir /var/lib/redis-cluster/7000
logfile /var/log/redis/cluster-7000.log

# redis-cluster-node-2.conf
port 7001
cluster-enabled yes
cluster-config-file nodes-7001.conf
cluster-node-timeout 5000
appendonly yes
appendfilename "appendonly-7001.aof"
dbfilename dump-7001.rdb
dir /var/lib/redis-cluster/7001
logfile /var/log/redis/cluster-7001.log

# redis-cluster-node-3.conf
port 7002
cluster-enabled yes
cluster-config-file nodes-7002.conf
cluster-node-timeout 5000
appendonly yes
appendfilename "appendonly-7002.aof"
dbfilename dump-7002.rdb
dir /var/lib/redis-cluster/7002
logfile /var/log/redis/cluster-7002.log

5.2 集群管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Redis集群管理器
@Component
public class RedisClusterManager {

private final RedisClusterTemplate<String, Object> clusterTemplate;
private final RedisClusterConnection clusterConnection;

public RedisClusterManager(RedisClusterTemplate<String, Object> clusterTemplate,
RedisClusterConnection clusterConnection) {
this.clusterTemplate = clusterTemplate;
this.clusterConnection = clusterConnection;
}

// 获取集群信息
public ClusterInfo getClusterInfo() {
ClusterInfo info = new ClusterInfo();

// 获取集群节点信息
Set<RedisClusterNode> nodes = clusterConnection.clusterGetNodes();
info.setNodes(nodes);

// 获取集群状态
Properties clusterInfo = clusterConnection.info("cluster");
info.setClusterEnabled("1".equals(clusterInfo.getProperty("cluster_enabled")));
info.setClusterKnownNodes(Integer.parseInt(clusterInfo.getProperty("cluster_known_nodes", "0")));
info.setClusterSize(Integer.parseInt(clusterInfo.getProperty("cluster_size", "0")));
info.setClusterCurrentEpoch(Long.parseLong(clusterInfo.getProperty("cluster_current_epoch", "0")));
info.setClusterMyEpoch(Long.parseLong(clusterInfo.getProperty("cluster_my_epoch", "0")));

return info;
}

// 获取集群节点状态
public List<ClusterNodeInfo> getClusterNodes() {
List<ClusterNodeInfo> nodeInfos = new ArrayList<>();

Set<RedisClusterNode> nodes = clusterConnection.clusterGetNodes();
for (RedisClusterNode node : nodes) {
ClusterNodeInfo nodeInfo = new ClusterNodeInfo();
nodeInfo.setNodeId(node.getId());
nodeInfo.setHost(node.getHost());
nodeInfo.setPort(node.getPort());
nodeInfo.setFlags(node.getFlags());
nodeInfo.setMasterId(node.getMasterId());
nodeInfo.setPingSent(node.getPingSent());
nodeInfo.setPongReceived(node.getPongReceived());
nodeInfo.setConfigEpoch(node.getConfigEpoch());
nodeInfo.setLinkState(node.getLinkState());
nodeInfo.setSlots(node.getSlots());

nodeInfos.add(nodeInfo);
}

return nodeInfos;
}

// 添加节点到集群
public void addNode(String host, int port) {
try {
clusterConnection.clusterMeet(host, port);
log.info("Added node to cluster: {}:{}", host, port);
} catch (Exception e) {
log.error("Error adding node to cluster", e);
throw new RuntimeException("Failed to add node to cluster", e);
}
}

// 从集群中移除节点
public void removeNode(String nodeId) {
try {
clusterConnection.clusterForget(nodeId);
log.info("Removed node from cluster: {}", nodeId);
} catch (Exception e) {
log.error("Error removing node from cluster", e);
throw new RuntimeException("Failed to remove node from cluster", e);
}
}

// 重新分片
public void reshard(int slotCount, String sourceNodeId, String targetNodeId) {
try {
clusterConnection.clusterReshard(slotCount, sourceNodeId, targetNodeId);
log.info("Resharded {} slots from {} to {}", slotCount, sourceNodeId, targetNodeId);
} catch (Exception e) {
log.error("Error resharding cluster", e);
throw new RuntimeException("Failed to reshard cluster", e);
}
}

// 获取槽位信息
public Map<Integer, String> getSlotMapping() {
Map<Integer, String> slotMapping = new HashMap<>();

Set<RedisClusterNode> nodes = clusterConnection.clusterGetNodes();
for (RedisClusterNode node : nodes) {
if (node.isMaster()) {
Set<Integer> slots = node.getSlots();
for (Integer slot : slots) {
slotMapping.put(slot, node.getId());
}
}
}

return slotMapping;
}

// 检查集群健康状态
public ClusterHealthStatus checkClusterHealth() {
ClusterHealthStatus status = new ClusterHealthStatus();

try {
Set<RedisClusterNode> nodes = clusterConnection.clusterGetNodes();
int totalNodes = nodes.size();
int healthyNodes = 0;
int masterNodes = 0;
int slaveNodes = 0;

for (RedisClusterNode node : nodes) {
if (node.isConnected()) {
healthyNodes++;
}
if (node.isMaster()) {
masterNodes++;
} else {
slaveNodes++;
}
}

status.setTotalNodes(totalNodes);
status.setHealthyNodes(healthyNodes);
status.setMasterNodes(masterNodes);
status.setSlaveNodes(slaveNodes);
status.setHealthRatio((double) healthyNodes / totalNodes);
status.setHealthy(healthyNodes == totalNodes);

} catch (Exception e) {
log.error("Error checking cluster health", e);
status.setHealthy(false);
status.setErrorMessage(e.getMessage());
}

return status;
}
}

6. 读写分离实现

6.1 读写分离配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Redis读写分离配置
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisReadWriteSplitConfig {

@Bean
@Primary
public RedisConnectionFactory masterConnectionFactory() {
LettuceConnectionFactory factory = new LettuceConnectionFactory(
new RedisStandaloneConfiguration("127.0.0.1", 6379));
factory.setPassword("master_password");
return factory;
}

@Bean
public RedisConnectionFactory slaveConnectionFactory() {
LettuceConnectionFactory factory = new LettuceConnectionFactory(
new RedisStandaloneConfiguration("127.0.0.1", 6380));
factory.setPassword("slave_password");
return factory;
}

@Bean
@Primary
public RedisTemplate<String, Object> masterRedisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(masterConnectionFactory());
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}

@Bean
public RedisTemplate<String, Object> slaveRedisTemplate() {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(slaveConnectionFactory());
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
return template;
}
}

6.2 读写分离服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Redis读写分离服务
@Service
public class RedisReadWriteSplitService {

private final RedisTemplate<String, Object> masterTemplate;
private final RedisTemplate<String, Object> slaveTemplate;
private final RedisReplicationManager replicationManager;

public RedisReadWriteSplitService(
@Qualifier("masterRedisTemplate") RedisTemplate<String, Object> masterTemplate,
@Qualifier("slaveRedisTemplate") RedisTemplate<String, Object> slaveTemplate,
RedisReplicationManager replicationManager) {
this.masterTemplate = masterTemplate;
this.slaveTemplate = slaveTemplate;
this.replicationManager = replicationManager;
}

// 写操作(主节点)
public void set(String key, Object value) {
masterTemplate.opsForValue().set(key, value);
}

public void set(String key, Object value, Duration timeout) {
masterTemplate.opsForValue().set(key, value, timeout);
}

public void delete(String key) {
masterTemplate.delete(key);
}

public void delete(Collection<String> keys) {
masterTemplate.delete(keys);
}

public void expire(String key, Duration timeout) {
masterTemplate.expire(key, timeout);
}

// 读操作(从节点)
public Object get(String key) {
return slaveTemplate.opsForValue().get(key);
}

public List<Object> multiGet(Collection<String> keys) {
return slaveTemplate.opsForValue().multiGet(keys);
}

public Boolean exists(String key) {
return slaveTemplate.hasKey(key);
}

public Set<String> keys(String pattern) {
return slaveTemplate.keys(pattern);
}

public Long ttl(String key) {
return slaveTemplate.getExpire(key);
}

// 强制从主节点读取(用于一致性要求高的场景)
public Object getFromMaster(String key) {
return masterTemplate.opsForValue().get(key);
}

// 检查复制延迟
public boolean isReplicationLagAcceptable(long maxLag) {
long lag = replicationManager.getReplicationLag();
return lag <= maxLag;
}

// 根据复制延迟选择读取节点
public Object getWithLagCheck(String key, long maxLag) {
if (isReplicationLagAcceptable(maxLag)) {
return get(key); // 从从节点读取
} else {
return getFromMaster(key); // 从主节点读取
}
}
}

7. 故障转移与恢复

7.1 故障转移策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Redis故障转移策略
@Component
@Slf4j
public class RedisFailoverStrategy {

private final RedisSentinelManager sentinelManager;
private final RedisReplicationManager replicationManager;
private final ApplicationEventPublisher eventPublisher;

public RedisFailoverStrategy(RedisSentinelManager sentinelManager,
RedisReplicationManager replicationManager,
ApplicationEventPublisher eventPublisher) {
this.sentinelManager = sentinelManager;
this.replicationManager = replicationManager;
this.eventPublisher = eventPublisher;
}

// 自动故障转移
@EventListener
public void handleMasterFailure(MasterFailureEvent event) {
log.warn("Master failure detected: {}", event.getMasterName());

try {
// 检查是否可以进行故障转移
if (canPerformFailover()) {
// 执行故障转移
performFailover(event);
} else {
log.error("Cannot perform failover - insufficient conditions");
handleFailoverFailure(event);
}
} catch (Exception e) {
log.error("Error during failover", e);
handleFailoverFailure(event);
}
}

// 检查是否可以执行故障转移
private boolean canPerformFailover() {
try {
// 检查哨兵状态
SentinelInfo sentinelInfo = sentinelManager.getSentinelInfo();
if (!sentinelInfo.isHealthy()) {
log.warn("Sentinel is not healthy");
return false;
}

// 检查从节点状态
ReplicationInfo replicationInfo = replicationManager.getReplicationInfo();
if (replicationInfo.getMasterInfo().getConnectedSlaves() == 0) {
log.warn("No slaves available for failover");
return false;
}

return true;
} catch (Exception e) {
log.error("Error checking failover conditions", e);
return false;
}
}

// 执行故障转移
private void performFailover(MasterFailureEvent event) {
log.info("Starting failover for master: {}", event.getMasterName());

try {
// 手动触发故障转移
sentinelManager.manualFailover();

// 等待故障转移完成
waitForFailoverCompletion(event.getMasterName());

// 验证故障转移结果
if (verifyFailoverSuccess()) {
log.info("Failover completed successfully");
publishFailoverSuccessEvent(event);
} else {
log.error("Failover verification failed");
handleFailoverFailure(event);
}

} catch (Exception e) {
log.error("Error during failover execution", e);
handleFailoverFailure(event);
}
}

// 等待故障转移完成
private void waitForFailoverCompletion(String masterName) {
int maxWaitTime = 300; // 5分钟
int waitInterval = 5; // 5秒

for (int i = 0; i < maxWaitTime; i += waitInterval) {
try {
Thread.sleep(waitInterval * 1000);

// 检查故障转移是否完成
if (isFailoverComplete()) {
return;
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failover wait interrupted", e);
}
}

throw new RuntimeException("Failover timeout");
}

// 检查故障转移是否完成
private boolean isFailoverComplete() {
try {
SentinelInfo sentinelInfo = sentinelManager.getSentinelInfo();
return sentinelInfo.isHealthy() && sentinelInfo.getMasterRole().equals("master");
} catch (Exception e) {
log.error("Error checking failover completion", e);
return false;
}
}

// 验证故障转移成功
private boolean verifyFailoverSuccess() {
try {
// 检查主节点状态
boolean masterHealthy = sentinelManager.isMasterHealthy();

// 检查复制状态
ReplicationInfo replicationInfo = replicationManager.getReplicationInfo();
boolean replicationHealthy = replicationInfo.getMasterInfo().getConnectedSlaves() > 0;

return masterHealthy && replicationHealthy;
} catch (Exception e) {
log.error("Error verifying failover success", e);
return false;
}
}

// 处理故障转移失败
private void handleFailoverFailure(MasterFailureEvent event) {
log.error("Failover failed for master: {}", event.getMasterName());

// 发布故障转移失败事件
FailoverFailureEvent failureEvent = new FailoverFailureEvent(
event.getMasterName(),
event.getFailureTime(),
LocalDateTime.now()
);
eventPublisher.publishEvent(failureEvent);

// 发送紧急通知
sendEmergencyNotification(failureEvent);
}

// 发布故障转移成功事件
private void publishFailoverSuccessEvent(MasterFailureEvent event) {
FailoverSuccessEvent successEvent = new FailoverSuccessEvent(
event.getMasterName(),
event.getFailureTime(),
LocalDateTime.now()
);
eventPublisher.publishEvent(successEvent);
}

// 发送紧急通知
private void sendEmergencyNotification(FailoverFailureEvent event) {
log.error("Sending emergency notification for failed failover: {}",
event.getMasterName());
// 可以集成紧急通知系统
}
}

7.2 数据恢复策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Redis数据恢复策略
@Service
@Slf4j
public class RedisDataRecoveryStrategy {

private final RedisReplicationManager replicationManager;
private final RedisTemplate<String, Object> masterTemplate;
private final RedisTemplate<String, Object> slaveTemplate;

public RedisDataRecoveryStrategy(RedisReplicationManager replicationManager,
@Qualifier("masterRedisTemplate") RedisTemplate<String, Object> masterTemplate,
@Qualifier("slaveRedisTemplate") RedisTemplate<String, Object> slaveTemplate) {
this.replicationManager = replicationManager;
this.masterTemplate = masterTemplate;
this.slaveTemplate = slaveTemplate;
}

// 数据一致性检查
public DataConsistencyReport checkDataConsistency() {
DataConsistencyReport report = new DataConsistencyReport();

try {
// 获取主从节点的键集合
Set<String> masterKeys = masterTemplate.keys("*");
Set<String> slaveKeys = slaveTemplate.keys("*");

// 检查键的一致性
Set<String> missingInSlave = new HashSet<>(masterKeys);
missingInSlave.removeAll(slaveKeys);

Set<String> extraInSlave = new HashSet<>(slaveKeys);
extraInSlave.removeAll(masterKeys);

// 检查值的一致性
List<String> inconsistentValues = new ArrayList<>();
Set<String> commonKeys = new HashSet<>(masterKeys);
commonKeys.retainAll(slaveKeys);

for (String key : commonKeys) {
Object masterValue = masterTemplate.opsForValue().get(key);
Object slaveValue = slaveTemplate.opsForValue().get(key);

if (!Objects.equals(masterValue, slaveValue)) {
inconsistentValues.add(key);
}
}

report.setMasterKeyCount(masterKeys.size());
report.setSlaveKeyCount(slaveKeys.size());
report.setMissingInSlave(missingInSlave);
report.setExtraInSlave(extraInSlave);
report.setInconsistentValues(inconsistentValues);
report.setConsistent(missingInSlave.isEmpty() && extraInSlave.isEmpty() && inconsistentValues.isEmpty());
report.setCheckTime(LocalDateTime.now());

} catch (Exception e) {
log.error("Error checking data consistency", e);
report.setConsistent(false);
report.setErrorMessage(e.getMessage());
}

return report;
}

// 数据同步修复
public void repairDataInconsistency(DataConsistencyReport report) {
log.info("Starting data consistency repair");

try {
// 修复缺失的键
for (String key : report.getMissingInSlave()) {
Object value = masterTemplate.opsForValue().get(key);
slaveTemplate.opsForValue().set(key, value);
log.debug("Repaired missing key: {}", key);
}

// 删除多余的键
for (String key : report.getExtraInSlave()) {
slaveTemplate.delete(key);
log.debug("Removed extra key: {}", key);
}

// 修复不一致的值
for (String key : report.getInconsistentValues()) {
Object masterValue = masterTemplate.opsForValue().get(key);
slaveTemplate.opsForValue().set(key, masterValue);
log.debug("Repaired inconsistent value for key: {}", key);
}

log.info("Data consistency repair completed");

} catch (Exception e) {
log.error("Error during data consistency repair", e);
throw new RuntimeException("Failed to repair data consistency", e);
}
}

// 强制同步
public void forceSync() {
log.info("Starting force sync");

try {
// 执行强制同步
replicationManager.forceSync();

// 等待同步完成
waitForSyncCompletion();

log.info("Force sync completed");

} catch (Exception e) {
log.error("Error during force sync", e);
throw new RuntimeException("Failed to force sync", e);
}
}

// 等待同步完成
private void waitForSyncCompletion() {
int maxWaitTime = 60; // 1分钟
int waitInterval = 2; // 2秒

for (int i = 0; i < maxWaitTime; i += waitInterval) {
try {
Thread.sleep(waitInterval * 1000);

// 检查同步是否完成
long lag = replicationManager.getReplicationLag();
if (lag == 0) {
return;
}

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Sync wait interrupted", e);
}
}

throw new RuntimeException("Sync timeout");
}

// 备份恢复
public void restoreFromBackup(String backupPath) {
log.info("Starting restore from backup: {}", backupPath);

try {
// 停止从节点
// 这里需要根据实际的Redis管理方式来实现

// 恢复备份数据
// 这里需要根据实际的备份格式来实现

// 重启从节点
// 这里需要根据实际的Redis管理方式来实现

log.info("Restore from backup completed");

} catch (Exception e) {
log.error("Error during restore from backup", e);
throw new RuntimeException("Failed to restore from backup", e);
}
}
}

8. 监控与告警

8.1 监控指标收集

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Redis监控指标收集器
@Component
@Slf4j
public class RedisMetricsCollector {

private final RedisReplicationManager replicationManager;
private final RedisSentinelManager sentinelManager;
private final MeterRegistry meterRegistry;

public RedisMetricsCollector(RedisReplicationManager replicationManager,
RedisSentinelManager sentinelManager,
MeterRegistry meterRegistry) {
this.replicationManager = replicationManager;
this.sentinelManager = sentinelManager;
this.meterRegistry = meterRegistry;
}

@Scheduled(fixedDelay = 10000) // 每10秒收集一次
public void collectMetrics() {
try {
// 收集复制指标
collectReplicationMetrics();

// 收集哨兵指标
collectSentinelMetrics();

// 收集性能指标
collectPerformanceMetrics();

} catch (Exception e) {
log.error("Error collecting Redis metrics", e);
}
}

// 收集复制指标
private void collectReplicationMetrics() {
try {
ReplicationInfo info = replicationManager.getReplicationInfo();
long lag = replicationManager.getReplicationLag();

// 记录复制延迟
meterRegistry.gauge("redis.replication.lag", lag);

// 记录连接的从节点数量
meterRegistry.gauge("redis.replication.connected_slaves",
info.getMasterInfo().getConnectedSlaves());

// 记录主节点偏移量
meterRegistry.gauge("redis.replication.master_offset",
info.getMasterInfo().getMasterReplOffset());

// 记录从节点偏移量
meterRegistry.gauge("redis.replication.slave_offset",
info.getSlaveInfo().getMasterReplOffset());

// 记录复制缓冲区大小
meterRegistry.gauge("redis.replication.backlog_size",
info.getMasterInfo().getReplBacklogSize());

} catch (Exception e) {
log.error("Error collecting replication metrics", e);
}
}

// 收集哨兵指标
private void collectSentinelMetrics() {
try {
SentinelInfo info = sentinelManager.getSentinelInfo();

// 记录哨兵状态
meterRegistry.gauge("redis.sentinel.healthy", info.isHealthy() ? 1 : 0);

// 记录监控的主节点数量
meterRegistry.gauge("redis.sentinel.monitored_masters",
info.getSentinelMasters());

} catch (Exception e) {
log.error("Error collecting sentinel metrics", e);
}
}

// 收集性能指标
private void collectPerformanceMetrics() {
try {
// 这里可以收集更多的性能指标
// 如内存使用、连接数、命令执行时间等

} catch (Exception e) {
log.error("Error collecting performance metrics", e);
}
}
}

8.2 告警规则配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Redis告警规则配置
@Configuration
public class RedisAlertRules {

@Bean
public AlertRule replicationLagAlertRule() {
return AlertRule.builder()
.name("Redis Replication Lag High")
.description("Redis replication lag is too high")
.condition("redis.replication.lag > 1000000")
.severity(AlertSeverity.WARNING)
.enabled(true)
.build();
}

@Bean
public AlertRule noSlavesConnectedAlertRule() {
return AlertRule.builder()
.name("No Redis Slaves Connected")
.description("No slaves connected to Redis master")
.condition("redis.replication.connected_slaves == 0")
.severity(AlertSeverity.CRITICAL)
.enabled(true)
.build();
}

@Bean
public AlertRule sentinelUnhealthyAlertRule() {
return AlertRule.builder()
.name("Redis Sentinel Unhealthy")
.description("Redis sentinel is not healthy")
.condition("redis.sentinel.healthy == 0")
.severity(AlertSeverity.CRITICAL)
.enabled(true)
.build();
}

@Bean
public AlertRule masterFailureAlertRule() {
return AlertRule.builder()
.name("Redis Master Failure")
.description("Redis master node has failed")
.condition("redis.master.healthy == 0")
.severity(AlertSeverity.CRITICAL)
.enabled(true)
.build();
}
}

9. 总结

Redis主从复制是构建高可用Redis架构的基础技术,通过主从复制、哨兵模式、集群架构等技术,可以实现Redis的高可用性、数据冗余和负载均衡。本文详细介绍的Redis主从复制实现方案,为企业级Redis部署提供了完整的技术支撑。

9.1 技术优势

  1. 高可用性: 通过主从复制和哨兵模式实现自动故障转移
  2. 数据安全: 多副本数据存储,防止数据丢失
  3. 性能优化: 读写分离提升系统整体性能
  4. 扩展性: 支持动态添加从节点扩展读性能
  5. 监控完善: 全面的监控和告警机制

9.2 实施要点

  1. 配置优化: 合理配置复制参数和超时时间
  2. 网络稳定: 确保主从节点间网络连接稳定
  3. 监控告警: 建立完善的监控和告警体系
  4. 故障演练: 定期进行故障转移演练
  5. 数据一致性: 定期检查主从数据一致性

9.3 最佳实践

  1. 部署架构: 采用多机房部署提高容灾能力
  2. 容量规划: 合理规划主从节点容量和性能
  3. 安全配置: 配置密码认证和网络访问控制
  4. 备份策略: 建立完善的数据备份和恢复策略
  5. 版本管理: 保持主从节点Redis版本一致

通过本文的学习,您应该已经掌握了Redis主从复制的核心技术,能够设计和实现高可用的Redis架构,为企业的数据存储提供可靠的技术保障。