第200集Redis主从复制与高可用实战:主从同步、哨兵模式、集群架构的生产级解决方案
|字数总计:5.9k|阅读时长:30分钟|阅读量:
1. Redis主从复制概述
Redis主从复制是Redis实现高可用性的基础技术,通过将数据从主节点复制到从节点,实现数据的冗余备份和读写分离。本文将详细介绍Redis主从复制的原理、配置、哨兵模式、集群架构等核心技术的实现,以及生产环境中的最佳实践。
1.1 Redis主从复制的优势
- 数据冗余: 主从复制提供数据冗余,提高数据安全性
- 读写分离: 主节点处理写操作,从节点处理读操作
- 负载均衡: 通过多个从节点分散读请求
- 故障恢复: 主节点故障时,从节点可以提升为主节点
- 扩展性: 可以动态添加从节点扩展读性能
1.2 Redis主从复制原理
1 2 3 4 5 6 7 8 9 10 11
| ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ 主节点(Master) │ │ 从节点1(Slave)│ │ 从节点2(Slave)│ │ │ │ │ │ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │ ┌───────────┐ │ │ │ 写操作 │ │ │ │ 读操作 │ │ │ │ 读操作 │ │ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ │ │ │ │ │ │ │ ┌───────────┐ │───▶│ ┌───────────┐ │───▶│ ┌───────────┐ │ │ │ 复制缓冲区 │ │ │ │ 复制缓冲区 │ │ │ │ 复制缓冲区 │ │ │ └───────────┘ │ │ └───────────┘ │ │ └───────────┘ │ └─────────────────┘ └─────────────────┘ └─────────────────┘
|
2. Redis主从复制配置
2.1 主节点配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
port 6379 bind 0.0.0.0 protected-mode no
save 900 1 save 300 10 save 60 10000
dbfilename dump.rdb dir /var/lib/redis
appendonly yes appendfilename "appendonly.aof" appendfsync everysec
repl-diskless-sync no repl-diskless-sync-delay 5 repl-ping-slave-period 10 repl-timeout 60 repl-disable-tcp-nodelay no repl-backlog-size 1mb repl-backlog-ttl 3600
maxmemory 2gb maxmemory-policy allkeys-lru
loglevel notice logfile /var/log/redis/redis-master.log
requirepass master_password
|
2.2 从节点配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
port 6380 bind 0.0.0.0 protected-mode no
replicaof 127.0.0.1 6379 masterauth master_password replica-serve-stale-data yes replica-read-only yes repl-diskless-sync no repl-diskless-sync-delay 5 repl-ping-slave-period 10 repl-timeout 60 repl-disable-tcp-nodelay no repl-backlog-size 1mb repl-backlog-ttl 3600
save 900 1 save 300 10 save 60 10000 dbfilename dump-slave.rdb dir /var/lib/redis-slave
appendonly yes appendfilename "appendonly-slave.aof" appendfsync everysec
maxmemory 2gb maxmemory-policy allkeys-lru
loglevel notice logfile /var/log/redis/redis-slave.log
requirepass slave_password
|
2.3 启动Redis主从服务
1 2 3 4 5 6 7 8 9
| redis-server /etc/redis/redis-master.conf
redis-server /etc/redis/redis-slave.conf
redis-cli -p 6379 info replication redis-cli -p 6380 info replication
|
3. Redis主从复制实现
3.1 主从复制流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| @Component public class RedisReplicationManager { private final RedisTemplate<String, Object> masterRedisTemplate; private final RedisTemplate<String, Object> slaveRedisTemplate; private final RedisConnectionFactory masterConnectionFactory; private final RedisConnectionFactory slaveConnectionFactory; public RedisReplicationManager( @Qualifier("masterRedisTemplate") RedisTemplate<String, Object> masterRedisTemplate, @Qualifier("slaveRedisTemplate") RedisTemplate<String, Object> slaveRedisTemplate, @Qualifier("masterConnectionFactory") RedisConnectionFactory masterConnectionFactory, @Qualifier("slaveConnectionFactory") RedisConnectionFactory slaveConnectionFactory) { this.masterRedisTemplate = masterRedisTemplate; this.slaveRedisTemplate = slaveRedisTemplate; this.masterConnectionFactory = masterConnectionFactory; this.slaveConnectionFactory = slaveConnectionFactory; } public void set(String key, Object value) { masterRedisTemplate.opsForValue().set(key, value); } public void set(String key, Object value, Duration timeout) { masterRedisTemplate.opsForValue().set(key, value, timeout); } public void delete(String key) { masterRedisTemplate.delete(key); } public void delete(Collection<String> keys) { masterRedisTemplate.delete(keys); } public Object get(String key) { return slaveRedisTemplate.opsForValue().get(key); } public List<Object> multiGet(Collection<String> keys) { return slaveRedisTemplate.opsForValue().multiGet(keys); } public Boolean exists(String key) { return slaveRedisTemplate.hasKey(key); } public Set<String> keys(String pattern) { return slaveRedisTemplate.keys(pattern); } public ReplicationInfo getReplicationInfo() { ReplicationInfo info = new ReplicationInfo(); RedisConnection masterConnection = masterConnectionFactory.getConnection(); Properties masterInfo = masterConnection.info("replication"); info.setMasterInfo(parseReplicationInfo(masterInfo)); RedisConnection slaveConnection = slaveConnectionFactory.getConnection(); Properties slaveInfo = slaveConnection.info("replication"); info.setSlaveInfo(parseReplicationInfo(slaveInfo)); return info; } private ReplicationNodeInfo parseReplicationInfo(Properties info) { ReplicationNodeInfo nodeInfo = new ReplicationNodeInfo(); nodeInfo.setRole(info.getProperty("role")); nodeInfo.setConnectedSlaves(Integer.parseInt(info.getProperty("connected_slaves", "0"))); nodeInfo.setMasterReplOffset(Long.parseLong(info.getProperty("master_repl_offset", "0"))); nodeInfo.setReplBacklogActive(Integer.parseInt(info.getProperty("repl_backlog_active", "0"))); nodeInfo.setReplBacklogSize(Long.parseLong(info.getProperty("repl_backlog_size", "0"))); nodeInfo.setReplBacklogFirstByteOffset(Long.parseLong(info.getProperty("repl_backlog_first_byte_offset", "0"))); nodeInfo.setReplBacklogHistlen(Long.parseLong(info.getProperty("repl_backlog_histlen", "0"))); return nodeInfo; } public long getReplicationLag() { RedisConnection masterConnection = masterConnectionFactory.getConnection(); Properties masterInfo = masterConnection.info("replication"); long masterOffset = Long.parseLong(masterInfo.getProperty("master_repl_offset", "0")); RedisConnection slaveConnection = slaveConnectionFactory.getConnection(); Properties slaveInfo = slaveConnection.info("replication"); long slaveOffset = Long.parseLong(slaveInfo.getProperty("master_repl_offset", "0")); return masterOffset - slaveOffset; } public void forceSync() { RedisConnection masterConnection = masterConnectionFactory.getConnection(); masterConnection.sync(); } }
|
3.2 主从复制监控
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Service @Slf4j public class RedisReplicationMonitor { private final RedisReplicationManager replicationManager; private final MeterRegistry meterRegistry; public RedisReplicationMonitor(RedisReplicationManager replicationManager, MeterRegistry meterRegistry) { this.replicationManager = replicationManager; this.meterRegistry = meterRegistry; } @Scheduled(fixedDelay = 30000) public void monitorReplication() { try { ReplicationInfo info = replicationManager.getReplicationInfo(); long lag = replicationManager.getReplicationLag(); meterRegistry.gauge("redis.replication.lag", lag); meterRegistry.gauge("redis.master.connected_slaves", info.getMasterInfo().getConnectedSlaves()); if (lag > 1000000) { log.warn("Redis replication lag is high: {} bytes", lag); alertHighReplicationLag(lag); } if (info.getMasterInfo().getConnectedSlaves() == 0) { log.error("No slaves connected to master!"); alertNoSlavesConnected(); } log.debug("Replication status - Master: {}, Slave: {}, Lag: {} bytes", info.getMasterInfo().getRole(), info.getSlaveInfo().getRole(), lag); } catch (Exception e) { log.error("Error monitoring Redis replication", e); } } private void alertHighReplicationLag(long lag) { log.error("ALERT: High Redis replication lag detected: {} bytes", lag); } private void alertNoSlavesConnected() { log.error("ALERT: No slaves connected to Redis master!"); } public ReplicationStats getReplicationStats() { ReplicationInfo info = replicationManager.getReplicationInfo(); long lag = replicationManager.getReplicationLag(); return ReplicationStats.builder() .masterRole(info.getMasterInfo().getRole()) .slaveRole(info.getSlaveInfo().getRole()) .connectedSlaves(info.getMasterInfo().getConnectedSlaves()) .replicationLag(lag) .masterOffset(info.getMasterInfo().getMasterReplOffset()) .slaveOffset(info.getSlaveInfo().getMasterReplOffset()) .backlogSize(info.getMasterInfo().getReplBacklogSize()) .backlogActive(info.getMasterInfo().getReplBacklogActive()) .timestamp(LocalDateTime.now()) .build(); } }
|
4. Redis哨兵模式
4.1 哨兵配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
port 26379 bind 0.0.0.0 protected-mode no
sentinel monitor mymaster 127.0.0.1 6379 2 sentinel auth-pass mymaster master_password sentinel down-after-milliseconds mymaster 30000 sentinel parallel-syncs mymaster 1 sentinel failover-timeout mymaster 180000 sentinel deny-scripts-reconfig yes
logfile /var/log/redis/sentinel.log loglevel notice
sentinel notification-script mymaster /etc/redis/notify.sh sentinel client-reconfig-script mymaster /etc/redis/reconfig.sh
sentinel resolve-hostnames no sentinel announce-hostnames no
|
4.2 哨兵模式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
| @Component public class RedisSentinelManager { private final RedisTemplate<String, Object> redisTemplate; private final RedisConnectionFactory connectionFactory; private final ApplicationEventPublisher eventPublisher; public RedisSentinelManager(RedisTemplate<String, Object> redisTemplate, RedisConnectionFactory connectionFactory, ApplicationEventPublisher eventPublisher) { this.redisTemplate = redisTemplate; this.connectionFactory = connectionFactory; this.eventPublisher = eventPublisher; } public SentinelInfo getSentinelInfo() { SentinelInfo info = new SentinelInfo(); RedisConnection connection = connectionFactory.getConnection(); Properties masterInfo = connection.info("replication"); info.setMasterRole(masterInfo.getProperty("role")); info.setMasterHost(masterInfo.getProperty("master_host")); info.setMasterPort(Integer.parseInt(masterInfo.getProperty("master_port", "0"))); Properties sentinelInfo = connection.info("sentinel"); info.setSentinelMasters(Integer.parseInt(sentinelInfo.getProperty("sentinel_masters", "0"))); return info; } public boolean isMasterHealthy() { try { RedisConnection connection = connectionFactory.getConnection(); Properties info = connection.info("replication"); String role = info.getProperty("role"); return "master".equals(role); } catch (Exception e) { log.error("Error checking master health", e); return false; } } public void manualFailover() { try { RedisConnection connection = connectionFactory.getConnection(); connection.sentinelFailover("mymaster"); log.info("Manual failover initiated"); } catch (Exception e) { log.error("Error initiating manual failover", e); throw new RuntimeException("Failed to initiate manual failover", e); } } public List<MasterInfo> getMonitoredMasters() { List<MasterInfo> masters = new ArrayList<>(); try { RedisConnection connection = connectionFactory.getConnection(); Properties info = connection.info("sentinel"); String mastersInfo = info.getProperty("masters"); if (mastersInfo != null) { } } catch (Exception e) { log.error("Error getting monitored masters", e); } return masters; } public void addMaster(String masterName, String host, int port, int quorum) { try { RedisConnection connection = connectionFactory.getConnection(); connection.sentinelMonitor(masterName, host, port, quorum); log.info("Added master monitoring: {} {}:{}", masterName, host, port); } catch (Exception e) { log.error("Error adding master monitoring", e); throw new RuntimeException("Failed to add master monitoring", e); } } public void removeMaster(String masterName) { try { RedisConnection connection = connectionFactory.getConnection(); connection.sentinelRemove(masterName); log.info("Removed master monitoring: {}", masterName); } catch (Exception e) { log.error("Error removing master monitoring", e); throw new RuntimeException("Failed to remove master monitoring", e); } } }
|
4.3 哨兵事件监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| @Component @Slf4j public class RedisSentinelEventListener { private final RedisSentinelManager sentinelManager; private final ApplicationEventPublisher eventPublisher; public RedisSentinelEventListener(RedisSentinelManager sentinelManager, ApplicationEventPublisher eventPublisher) { this.sentinelManager = sentinelManager; this.eventPublisher = eventPublisher; } @EventListener public void handleSentinelEvent(SentinelEvent event) { log.info("Received sentinel event: {}", event.getType()); switch (event.getType()) { case "+switch-master": handleMasterSwitch(event); break; case "+sdown": handleSubjectivelyDown(event); break; case "-sdown": handleSubjectivelyUp(event); break; case "+odown": handleObjectivelyDown(event); break; case "-odown": handleObjectivelyUp(event); break; case "+slave": handleSlaveAdded(event); break; case "-slave": handleSlaveRemoved(event); break; default: log.debug("Unhandled sentinel event: {}", event.getType()); } } private void handleMasterSwitch(SentinelEvent event) { log.warn("Master switch detected: {}", event.getDetails()); MasterSwitchEvent switchEvent = new MasterSwitchEvent( event.getMasterName(), event.getOldMaster(), event.getNewMaster(), LocalDateTime.now() ); eventPublisher.publishEvent(switchEvent); sendMasterSwitchNotification(switchEvent); } private void handleSubjectivelyDown(SentinelEvent event) { log.warn("Subjectively down detected: {}", event.getDetails()); SubjectivelyDownEvent downEvent = new SubjectivelyDownEvent( event.getMasterName(), event.getDetails(), LocalDateTime.now() ); eventPublisher.publishEvent(downEvent); } private void handleObjectivelyDown(SentinelEvent event) { log.error("Objectively down detected: {}", event.getDetails()); ObjectivelyDownEvent downEvent = new ObjectivelyDownEvent( event.getMasterName(), event.getDetails(), LocalDateTime.now() ); eventPublisher.publishEvent(downEvent); sendEmergencyNotification(downEvent); } private void handleSlaveAdded(SentinelEvent event) { log.info("Slave added: {}", event.getDetails()); SlaveAddedEvent addedEvent = new SlaveAddedEvent( event.getMasterName(), event.getDetails(), LocalDateTime.now() ); eventPublisher.publishEvent(addedEvent); } private void handleSlaveRemoved(SentinelEvent event) { log.warn("Slave removed: {}", event.getDetails()); SlaveRemovedEvent removedEvent = new SlaveRemovedEvent( event.getMasterName(), event.getDetails(), LocalDateTime.now() ); eventPublisher.publishEvent(removedEvent); } private void sendMasterSwitchNotification(MasterSwitchEvent event) { log.info("Sending master switch notification: {} -> {}", event.getOldMaster(), event.getNewMaster()); } private void sendEmergencyNotification(ObjectivelyDownEvent event) { log.error("Sending emergency notification for objectively down: {}", event.getMasterName()); } }
|
5. Redis集群架构
5.1 集群配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| port 7000 cluster-enabled yes cluster-config-file nodes-7000.conf cluster-node-timeout 5000 appendonly yes appendfilename "appendonly-7000.aof" dbfilename dump-7000.rdb dir /var/lib/redis-cluster/7000 logfile /var/log/redis/cluster-7000.log
port 7001 cluster-enabled yes cluster-config-file nodes-7001.conf cluster-node-timeout 5000 appendonly yes appendfilename "appendonly-7001.aof" dbfilename dump-7001.rdb dir /var/lib/redis-cluster/7001 logfile /var/log/redis/cluster-7001.log
port 7002 cluster-enabled yes cluster-config-file nodes-7002.conf cluster-node-timeout 5000 appendonly yes appendfilename "appendonly-7002.aof" dbfilename dump-7002.rdb dir /var/lib/redis-cluster/7002 logfile /var/log/redis/cluster-7002.log
|
5.2 集群管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
| @Component public class RedisClusterManager { private final RedisClusterTemplate<String, Object> clusterTemplate; private final RedisClusterConnection clusterConnection; public RedisClusterManager(RedisClusterTemplate<String, Object> clusterTemplate, RedisClusterConnection clusterConnection) { this.clusterTemplate = clusterTemplate; this.clusterConnection = clusterConnection; } public ClusterInfo getClusterInfo() { ClusterInfo info = new ClusterInfo(); Set<RedisClusterNode> nodes = clusterConnection.clusterGetNodes(); info.setNodes(nodes); Properties clusterInfo = clusterConnection.info("cluster"); info.setClusterEnabled("1".equals(clusterInfo.getProperty("cluster_enabled"))); info.setClusterKnownNodes(Integer.parseInt(clusterInfo.getProperty("cluster_known_nodes", "0"))); info.setClusterSize(Integer.parseInt(clusterInfo.getProperty("cluster_size", "0"))); info.setClusterCurrentEpoch(Long.parseLong(clusterInfo.getProperty("cluster_current_epoch", "0"))); info.setClusterMyEpoch(Long.parseLong(clusterInfo.getProperty("cluster_my_epoch", "0"))); return info; } public List<ClusterNodeInfo> getClusterNodes() { List<ClusterNodeInfo> nodeInfos = new ArrayList<>(); Set<RedisClusterNode> nodes = clusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { ClusterNodeInfo nodeInfo = new ClusterNodeInfo(); nodeInfo.setNodeId(node.getId()); nodeInfo.setHost(node.getHost()); nodeInfo.setPort(node.getPort()); nodeInfo.setFlags(node.getFlags()); nodeInfo.setMasterId(node.getMasterId()); nodeInfo.setPingSent(node.getPingSent()); nodeInfo.setPongReceived(node.getPongReceived()); nodeInfo.setConfigEpoch(node.getConfigEpoch()); nodeInfo.setLinkState(node.getLinkState()); nodeInfo.setSlots(node.getSlots()); nodeInfos.add(nodeInfo); } return nodeInfos; } public void addNode(String host, int port) { try { clusterConnection.clusterMeet(host, port); log.info("Added node to cluster: {}:{}", host, port); } catch (Exception e) { log.error("Error adding node to cluster", e); throw new RuntimeException("Failed to add node to cluster", e); } } public void removeNode(String nodeId) { try { clusterConnection.clusterForget(nodeId); log.info("Removed node from cluster: {}", nodeId); } catch (Exception e) { log.error("Error removing node from cluster", e); throw new RuntimeException("Failed to remove node from cluster", e); } } public void reshard(int slotCount, String sourceNodeId, String targetNodeId) { try { clusterConnection.clusterReshard(slotCount, sourceNodeId, targetNodeId); log.info("Resharded {} slots from {} to {}", slotCount, sourceNodeId, targetNodeId); } catch (Exception e) { log.error("Error resharding cluster", e); throw new RuntimeException("Failed to reshard cluster", e); } } public Map<Integer, String> getSlotMapping() { Map<Integer, String> slotMapping = new HashMap<>(); Set<RedisClusterNode> nodes = clusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { if (node.isMaster()) { Set<Integer> slots = node.getSlots(); for (Integer slot : slots) { slotMapping.put(slot, node.getId()); } } } return slotMapping; } public ClusterHealthStatus checkClusterHealth() { ClusterHealthStatus status = new ClusterHealthStatus(); try { Set<RedisClusterNode> nodes = clusterConnection.clusterGetNodes(); int totalNodes = nodes.size(); int healthyNodes = 0; int masterNodes = 0; int slaveNodes = 0; for (RedisClusterNode node : nodes) { if (node.isConnected()) { healthyNodes++; } if (node.isMaster()) { masterNodes++; } else { slaveNodes++; } } status.setTotalNodes(totalNodes); status.setHealthyNodes(healthyNodes); status.setMasterNodes(masterNodes); status.setSlaveNodes(slaveNodes); status.setHealthRatio((double) healthyNodes / totalNodes); status.setHealthy(healthyNodes == totalNodes); } catch (Exception e) { log.error("Error checking cluster health", e); status.setHealthy(false); status.setErrorMessage(e.getMessage()); } return status; } }
|
6. 读写分离实现
6.1 读写分离配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| @Configuration @EnableConfigurationProperties(RedisProperties.class) public class RedisReadWriteSplitConfig { @Bean @Primary public RedisConnectionFactory masterConnectionFactory() { LettuceConnectionFactory factory = new LettuceConnectionFactory( new RedisStandaloneConfiguration("127.0.0.1", 6379)); factory.setPassword("master_password"); return factory; } @Bean public RedisConnectionFactory slaveConnectionFactory() { LettuceConnectionFactory factory = new LettuceConnectionFactory( new RedisStandaloneConfiguration("127.0.0.1", 6380)); factory.setPassword("slave_password"); return factory; } @Bean @Primary public RedisTemplate<String, Object> masterRedisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(masterConnectionFactory()); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); return template; } @Bean public RedisTemplate<String, Object> slaveRedisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(slaveConnectionFactory()); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); return template; } }
|
6.2 读写分离服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Service public class RedisReadWriteSplitService { private final RedisTemplate<String, Object> masterTemplate; private final RedisTemplate<String, Object> slaveTemplate; private final RedisReplicationManager replicationManager; public RedisReadWriteSplitService( @Qualifier("masterRedisTemplate") RedisTemplate<String, Object> masterTemplate, @Qualifier("slaveRedisTemplate") RedisTemplate<String, Object> slaveTemplate, RedisReplicationManager replicationManager) { this.masterTemplate = masterTemplate; this.slaveTemplate = slaveTemplate; this.replicationManager = replicationManager; } public void set(String key, Object value) { masterTemplate.opsForValue().set(key, value); } public void set(String key, Object value, Duration timeout) { masterTemplate.opsForValue().set(key, value, timeout); } public void delete(String key) { masterTemplate.delete(key); } public void delete(Collection<String> keys) { masterTemplate.delete(keys); } public void expire(String key, Duration timeout) { masterTemplate.expire(key, timeout); } public Object get(String key) { return slaveTemplate.opsForValue().get(key); } public List<Object> multiGet(Collection<String> keys) { return slaveTemplate.opsForValue().multiGet(keys); } public Boolean exists(String key) { return slaveTemplate.hasKey(key); } public Set<String> keys(String pattern) { return slaveTemplate.keys(pattern); } public Long ttl(String key) { return slaveTemplate.getExpire(key); } public Object getFromMaster(String key) { return masterTemplate.opsForValue().get(key); } public boolean isReplicationLagAcceptable(long maxLag) { long lag = replicationManager.getReplicationLag(); return lag <= maxLag; } public Object getWithLagCheck(String key, long maxLag) { if (isReplicationLagAcceptable(maxLag)) { return get(key); } else { return getFromMaster(key); } } }
|
7. 故障转移与恢复
7.1 故障转移策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
| @Component @Slf4j public class RedisFailoverStrategy { private final RedisSentinelManager sentinelManager; private final RedisReplicationManager replicationManager; private final ApplicationEventPublisher eventPublisher; public RedisFailoverStrategy(RedisSentinelManager sentinelManager, RedisReplicationManager replicationManager, ApplicationEventPublisher eventPublisher) { this.sentinelManager = sentinelManager; this.replicationManager = replicationManager; this.eventPublisher = eventPublisher; } @EventListener public void handleMasterFailure(MasterFailureEvent event) { log.warn("Master failure detected: {}", event.getMasterName()); try { if (canPerformFailover()) { performFailover(event); } else { log.error("Cannot perform failover - insufficient conditions"); handleFailoverFailure(event); } } catch (Exception e) { log.error("Error during failover", e); handleFailoverFailure(event); } } private boolean canPerformFailover() { try { SentinelInfo sentinelInfo = sentinelManager.getSentinelInfo(); if (!sentinelInfo.isHealthy()) { log.warn("Sentinel is not healthy"); return false; } ReplicationInfo replicationInfo = replicationManager.getReplicationInfo(); if (replicationInfo.getMasterInfo().getConnectedSlaves() == 0) { log.warn("No slaves available for failover"); return false; } return true; } catch (Exception e) { log.error("Error checking failover conditions", e); return false; } } private void performFailover(MasterFailureEvent event) { log.info("Starting failover for master: {}", event.getMasterName()); try { sentinelManager.manualFailover(); waitForFailoverCompletion(event.getMasterName()); if (verifyFailoverSuccess()) { log.info("Failover completed successfully"); publishFailoverSuccessEvent(event); } else { log.error("Failover verification failed"); handleFailoverFailure(event); } } catch (Exception e) { log.error("Error during failover execution", e); handleFailoverFailure(event); } } private void waitForFailoverCompletion(String masterName) { int maxWaitTime = 300; int waitInterval = 5; for (int i = 0; i < maxWaitTime; i += waitInterval) { try { Thread.sleep(waitInterval * 1000); if (isFailoverComplete()) { return; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Failover wait interrupted", e); } } throw new RuntimeException("Failover timeout"); } private boolean isFailoverComplete() { try { SentinelInfo sentinelInfo = sentinelManager.getSentinelInfo(); return sentinelInfo.isHealthy() && sentinelInfo.getMasterRole().equals("master"); } catch (Exception e) { log.error("Error checking failover completion", e); return false; } } private boolean verifyFailoverSuccess() { try { boolean masterHealthy = sentinelManager.isMasterHealthy(); ReplicationInfo replicationInfo = replicationManager.getReplicationInfo(); boolean replicationHealthy = replicationInfo.getMasterInfo().getConnectedSlaves() > 0; return masterHealthy && replicationHealthy; } catch (Exception e) { log.error("Error verifying failover success", e); return false; } } private void handleFailoverFailure(MasterFailureEvent event) { log.error("Failover failed for master: {}", event.getMasterName()); FailoverFailureEvent failureEvent = new FailoverFailureEvent( event.getMasterName(), event.getFailureTime(), LocalDateTime.now() ); eventPublisher.publishEvent(failureEvent); sendEmergencyNotification(failureEvent); } private void publishFailoverSuccessEvent(MasterFailureEvent event) { FailoverSuccessEvent successEvent = new FailoverSuccessEvent( event.getMasterName(), event.getFailureTime(), LocalDateTime.now() ); eventPublisher.publishEvent(successEvent); } private void sendEmergencyNotification(FailoverFailureEvent event) { log.error("Sending emergency notification for failed failover: {}", event.getMasterName()); } }
|
7.2 数据恢复策略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
| @Service @Slf4j public class RedisDataRecoveryStrategy { private final RedisReplicationManager replicationManager; private final RedisTemplate<String, Object> masterTemplate; private final RedisTemplate<String, Object> slaveTemplate; public RedisDataRecoveryStrategy(RedisReplicationManager replicationManager, @Qualifier("masterRedisTemplate") RedisTemplate<String, Object> masterTemplate, @Qualifier("slaveRedisTemplate") RedisTemplate<String, Object> slaveTemplate) { this.replicationManager = replicationManager; this.masterTemplate = masterTemplate; this.slaveTemplate = slaveTemplate; } public DataConsistencyReport checkDataConsistency() { DataConsistencyReport report = new DataConsistencyReport(); try { Set<String> masterKeys = masterTemplate.keys("*"); Set<String> slaveKeys = slaveTemplate.keys("*"); Set<String> missingInSlave = new HashSet<>(masterKeys); missingInSlave.removeAll(slaveKeys); Set<String> extraInSlave = new HashSet<>(slaveKeys); extraInSlave.removeAll(masterKeys); List<String> inconsistentValues = new ArrayList<>(); Set<String> commonKeys = new HashSet<>(masterKeys); commonKeys.retainAll(slaveKeys); for (String key : commonKeys) { Object masterValue = masterTemplate.opsForValue().get(key); Object slaveValue = slaveTemplate.opsForValue().get(key); if (!Objects.equals(masterValue, slaveValue)) { inconsistentValues.add(key); } } report.setMasterKeyCount(masterKeys.size()); report.setSlaveKeyCount(slaveKeys.size()); report.setMissingInSlave(missingInSlave); report.setExtraInSlave(extraInSlave); report.setInconsistentValues(inconsistentValues); report.setConsistent(missingInSlave.isEmpty() && extraInSlave.isEmpty() && inconsistentValues.isEmpty()); report.setCheckTime(LocalDateTime.now()); } catch (Exception e) { log.error("Error checking data consistency", e); report.setConsistent(false); report.setErrorMessage(e.getMessage()); } return report; } public void repairDataInconsistency(DataConsistencyReport report) { log.info("Starting data consistency repair"); try { for (String key : report.getMissingInSlave()) { Object value = masterTemplate.opsForValue().get(key); slaveTemplate.opsForValue().set(key, value); log.debug("Repaired missing key: {}", key); } for (String key : report.getExtraInSlave()) { slaveTemplate.delete(key); log.debug("Removed extra key: {}", key); } for (String key : report.getInconsistentValues()) { Object masterValue = masterTemplate.opsForValue().get(key); slaveTemplate.opsForValue().set(key, masterValue); log.debug("Repaired inconsistent value for key: {}", key); } log.info("Data consistency repair completed"); } catch (Exception e) { log.error("Error during data consistency repair", e); throw new RuntimeException("Failed to repair data consistency", e); } } public void forceSync() { log.info("Starting force sync"); try { replicationManager.forceSync(); waitForSyncCompletion(); log.info("Force sync completed"); } catch (Exception e) { log.error("Error during force sync", e); throw new RuntimeException("Failed to force sync", e); } } private void waitForSyncCompletion() { int maxWaitTime = 60; int waitInterval = 2; for (int i = 0; i < maxWaitTime; i += waitInterval) { try { Thread.sleep(waitInterval * 1000); long lag = replicationManager.getReplicationLag(); if (lag == 0) { return; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Sync wait interrupted", e); } } throw new RuntimeException("Sync timeout"); } public void restoreFromBackup(String backupPath) { log.info("Starting restore from backup: {}", backupPath); try { log.info("Restore from backup completed"); } catch (Exception e) { log.error("Error during restore from backup", e); throw new RuntimeException("Failed to restore from backup", e); } } }
|
8. 监控与告警
8.1 监控指标收集
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| @Component @Slf4j public class RedisMetricsCollector { private final RedisReplicationManager replicationManager; private final RedisSentinelManager sentinelManager; private final MeterRegistry meterRegistry; public RedisMetricsCollector(RedisReplicationManager replicationManager, RedisSentinelManager sentinelManager, MeterRegistry meterRegistry) { this.replicationManager = replicationManager; this.sentinelManager = sentinelManager; this.meterRegistry = meterRegistry; } @Scheduled(fixedDelay = 10000) public void collectMetrics() { try { collectReplicationMetrics(); collectSentinelMetrics(); collectPerformanceMetrics(); } catch (Exception e) { log.error("Error collecting Redis metrics", e); } } private void collectReplicationMetrics() { try { ReplicationInfo info = replicationManager.getReplicationInfo(); long lag = replicationManager.getReplicationLag(); meterRegistry.gauge("redis.replication.lag", lag); meterRegistry.gauge("redis.replication.connected_slaves", info.getMasterInfo().getConnectedSlaves()); meterRegistry.gauge("redis.replication.master_offset", info.getMasterInfo().getMasterReplOffset()); meterRegistry.gauge("redis.replication.slave_offset", info.getSlaveInfo().getMasterReplOffset()); meterRegistry.gauge("redis.replication.backlog_size", info.getMasterInfo().getReplBacklogSize()); } catch (Exception e) { log.error("Error collecting replication metrics", e); } } private void collectSentinelMetrics() { try { SentinelInfo info = sentinelManager.getSentinelInfo(); meterRegistry.gauge("redis.sentinel.healthy", info.isHealthy() ? 1 : 0); meterRegistry.gauge("redis.sentinel.monitored_masters", info.getSentinelMasters()); } catch (Exception e) { log.error("Error collecting sentinel metrics", e); } } private void collectPerformanceMetrics() { try { } catch (Exception e) { log.error("Error collecting performance metrics", e); } } }
|
8.2 告警规则配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Configuration public class RedisAlertRules { @Bean public AlertRule replicationLagAlertRule() { return AlertRule.builder() .name("Redis Replication Lag High") .description("Redis replication lag is too high") .condition("redis.replication.lag > 1000000") .severity(AlertSeverity.WARNING) .enabled(true) .build(); } @Bean public AlertRule noSlavesConnectedAlertRule() { return AlertRule.builder() .name("No Redis Slaves Connected") .description("No slaves connected to Redis master") .condition("redis.replication.connected_slaves == 0") .severity(AlertSeverity.CRITICAL) .enabled(true) .build(); } @Bean public AlertRule sentinelUnhealthyAlertRule() { return AlertRule.builder() .name("Redis Sentinel Unhealthy") .description("Redis sentinel is not healthy") .condition("redis.sentinel.healthy == 0") .severity(AlertSeverity.CRITICAL) .enabled(true) .build(); } @Bean public AlertRule masterFailureAlertRule() { return AlertRule.builder() .name("Redis Master Failure") .description("Redis master node has failed") .condition("redis.master.healthy == 0") .severity(AlertSeverity.CRITICAL) .enabled(true) .build(); } }
|
9. 总结
Redis主从复制是构建高可用Redis架构的基础技术,通过主从复制、哨兵模式、集群架构等技术,可以实现Redis的高可用性、数据冗余和负载均衡。本文详细介绍的Redis主从复制实现方案,为企业级Redis部署提供了完整的技术支撑。
9.1 技术优势
- 高可用性: 通过主从复制和哨兵模式实现自动故障转移
- 数据安全: 多副本数据存储,防止数据丢失
- 性能优化: 读写分离提升系统整体性能
- 扩展性: 支持动态添加从节点扩展读性能
- 监控完善: 全面的监控和告警机制
9.2 实施要点
- 配置优化: 合理配置复制参数和超时时间
- 网络稳定: 确保主从节点间网络连接稳定
- 监控告警: 建立完善的监控和告警体系
- 故障演练: 定期进行故障转移演练
- 数据一致性: 定期检查主从数据一致性
9.3 最佳实践
- 部署架构: 采用多机房部署提高容灾能力
- 容量规划: 合理规划主从节点容量和性能
- 安全配置: 配置密码认证和网络访问控制
- 备份策略: 建立完善的数据备份和恢复策略
- 版本管理: 保持主从节点Redis版本一致
通过本文的学习,您应该已经掌握了Redis主从复制的核心技术,能够设计和实现高可用的Redis架构,为企业的数据存储提供可靠的技术保障。