第8集Java实战Redis主从架构原理及搭建实操指南
|字数总计:3.9k|阅读时长:18分钟|阅读量:
引言
在Java企业级应用中,Redis作为高性能的内存数据库,广泛应用于缓存、会话管理、分布式锁等场景。为了提高系统的可用性和性能,Redis主从架构成为了不可或缺的技术方案。本文将结合Java实战经验,深入探讨Redis主从架构的原理,并提供完整的搭建和集成指南。
Redis主从架构原理
什么是主从架构
Redis主从架构是一种数据复制方案,通过一个主节点(Master)和多个从节点(Slave)的协作,实现:
- 数据冗余:多个从节点提供数据备份
- 读写分离:主节点处理写操作,从节点处理读操作
- 负载分担:读请求分散到多个从节点
- 故障恢复:主节点故障时可以快速切换
主从复制流程
1. 建立连接阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class RedisReplicationClient { private String masterHost; private int masterPort; private String masterAuth; public void connectToMaster() { try { Jedis masterJedis = new Jedis(masterHost, masterPort); if (masterAuth != null) { masterJedis.auth(masterAuth); } String pong = masterJedis.ping(); if ("PONG".equals(pong)) { System.out.println("成功连接到主节点"); } } catch (Exception e) { System.err.println("连接主节点失败: " + e.getMessage()); } } }
|
2. 数据同步阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class DataSyncProcess { public void syncData(Jedis master, Jedis slave) { String lastSave = master.lastsave(); System.out.println("主节点最后保存时间: " + lastSave); master.set("sync_test", "Hello Master-Slave"); String value = slave.get("sync_test"); System.out.println("从节点获取到数据: " + value); } }
|
3. 命令传播阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class CommandPropagation { public void monitorReplication(Jedis master, Jedis slave) { String masterInfo = master.info("replication"); String slaveInfo = slave.info("replication"); System.out.println("主节点复制信息:"); System.out.println(masterInfo); System.out.println("从节点复制信息:"); System.out.println(slaveInfo); } }
|
Java集成Redis主从架构
Spring Boot配置
1. 添加依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> </dependencies>
|
2. 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| spring: redis: host: 192.168.1.100 port: 6379 password: your_password timeout: 2000ms jedis: pool: max-active: 20 max-idle: 10 min-idle: 5 max-wait: 2000ms cluster: nodes: - 192.168.1.101:6379 - 192.168.1.102:6379
|
3. Redis配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| @Configuration @EnableCaching public class RedisConfig { @Value("${spring.redis.host}") private String masterHost; @Value("${spring.redis.port}") private int masterPort; @Value("${spring.redis.password}") private String password; @Value("${spring.redis.jedis.pool.max-active}") private int maxActive; @Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle; @Value("${spring.redis.jedis.pool.min-idle}") private int minIdle; @Value("${spring.redis.jedis.pool.max-wait}") private long maxWaitMillis; @Bean("masterJedisPool") public JedisPool masterJedisPool() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWaitMillis); config.setTestOnBorrow(true); config.setTestOnReturn(true); return new JedisPool(config, masterHost, masterPort, 2000, password); } @Bean("slaveJedisPool") public JedisPool slaveJedisPool() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWaitMillis); config.setTestOnBorrow(true); config.setTestOnReturn(true); return new JedisPool(config, "192.168.1.101", 6379, 2000, password); } @Bean public RedisTemplate<String, Object> redisTemplate( @Qualifier("masterJedisPool") JedisPool masterPool) { RedisTemplate<String, Object> template = new RedisTemplate<>(); JedisConnectionFactory factory = new JedisConnectionFactory(); factory.setHostName(masterHost); factory.setPort(masterPort); factory.setPassword(password); factory.setPoolConfig(createPoolConfig()); factory.afterPropertiesSet(); template.setConnectionFactory(factory); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); template.afterPropertiesSet(); return template; } private JedisPoolConfig createPoolConfig() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWaitMillis); config.setTestOnBorrow(true); config.setTestOnReturn(true); return config; } }
|
读写分离实现
1. 读写分离服务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| @Service public class RedisReadWriteService { @Autowired @Qualifier("masterJedisPool") private JedisPool masterPool; @Autowired @Qualifier("slaveJedisPool") private JedisPool slavePool; public void write(String key, String value) { try (Jedis jedis = masterPool.getResource()) { jedis.set(key, value); System.out.println("写入主节点成功: " + key + " = " + value); } catch (Exception e) { System.err.println("写入主节点失败: " + e.getMessage()); throw new RuntimeException("写入失败", e); } } public String read(String key) { try (Jedis jedis = slavePool.getResource()) { String value = jedis.get(key); System.out.println("从从节点读取: " + key + " = " + value); return value; } catch (Exception e) { System.err.println("从从节点读取失败: " + e.getMessage()); return readFromMaster(key); } } private String readFromMaster(String key) { try (Jedis jedis = masterPool.getResource()) { String value = jedis.get(key); System.out.println("降级从主节点读取: " + key + " = " + value); return value; } catch (Exception e) { System.err.println("从主节点读取也失败: " + e.getMessage()); throw new RuntimeException("读取失败", e); } } public void batchWrite(Map<String, String> keyValues) { try (Jedis jedis = masterPool.getResource()) { Pipeline pipeline = jedis.pipelined(); for (Map.Entry<String, String> entry : keyValues.entrySet()) { pipeline.set(entry.getKey(), entry.getValue()); } pipeline.sync(); System.out.println("批量写入完成,共 " + keyValues.size() + " 条记录"); } catch (Exception e) { System.err.println("批量写入失败: " + e.getMessage()); throw new RuntimeException("批量写入失败", e); } } public Map<String, String> batchRead(List<String> keys) { Map<String, String> result = new HashMap<>(); try (Jedis jedis = slavePool.getResource()) { Pipeline pipeline = jedis.pipelined(); List<Response<String>> responses = new ArrayList<>(); for (String key : keys) { responses.add(pipeline.get(key)); } pipeline.sync(); for (int i = 0; i < keys.size(); i++) { String value = responses.get(i).get(); if (value != null) { result.put(keys.get(i), value); } } System.out.println("批量读取完成,共 " + result.size() + " 条记录"); } catch (Exception e) { System.err.println("批量读取失败: " + e.getMessage()); throw new RuntimeException("批量读取失败", e); } return result; } }
|
2. 缓存服务实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| @Service public class CacheService { @Autowired private RedisReadWriteService redisService; public void cacheUserInfo(String userId, UserInfo userInfo) { try { String key = "user:info:" + userId; String value = JSON.toJSONString(userInfo); redisService.write(key, value); try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 3600); } } catch (Exception e) { System.err.println("缓存用户信息失败: " + e.getMessage()); } } public UserInfo getUserInfo(String userId) { try { String key = "user:info:" + userId; String value = redisService.read(key); if (value != null) { return JSON.parseObject(value, UserInfo.class); } } catch (Exception e) { System.err.println("获取用户信息失败: " + e.getMessage()); } return null; } public void cacheProductInfo(String productId, ProductInfo productInfo) { try { String key = "product:info:" + productId; String value = JSON.toJSONString(productInfo); redisService.write(key, value); try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 1800); } } catch (Exception e) { System.err.println("缓存商品信息失败: " + e.getMessage()); } } public ProductInfo getProductInfo(String productId) { try { String key = "product:info:" + productId; String value = redisService.read(key); if (value != null) { return JSON.parseObject(value, ProductInfo.class); } } catch (Exception e) { System.err.println("获取商品信息失败: " + e.getMessage()); } return null; } }
|
监控和健康检查
1. 健康检查服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Component public class RedisHealthChecker { @Autowired @Qualifier("masterJedisPool") private JedisPool masterPool; @Autowired @Qualifier("slaveJedisPool") private JedisPool slavePool; public boolean checkMasterHealth() { try (Jedis jedis = masterPool.getResource()) { String pong = jedis.ping(); if ("PONG".equals(pong)) { String info = jedis.info("replication"); System.out.println("主节点健康检查通过"); System.out.println("主节点复制信息: " + info); return true; } } catch (Exception e) { System.err.println("主节点健康检查失败: " + e.getMessage()); } return false; } public boolean checkSlaveHealth() { try (Jedis jedis = slavePool.getResource()) { String pong = jedis.ping(); if ("PONG".equals(pong)) { String info = jedis.info("replication"); System.out.println("从节点健康检查通过"); System.out.println("从节点复制信息: " + info); return true; } } catch (Exception e) { System.err.println("从节点健康检查失败: " + e.getMessage()); } return false; } public long checkReplicationLag() { try (Jedis masterJedis = masterPool.getResource(); Jedis slaveJedis = slavePool.getResource()) { String masterInfo = masterJedis.info("replication"); String slaveInfo = slaveJedis.info("replication"); long masterOffset = parseOffset(masterInfo); long slaveOffset = parseOffset(slaveInfo); long lag = masterOffset - slaveOffset; System.out.println("主从复制延迟: " + lag + " bytes"); return lag; } catch (Exception e) { System.err.println("检查复制延迟失败: " + e.getMessage()); return -1; } } private long parseOffset(String info) { String[] lines = info.split("\r\n"); for (String line : lines) { if (line.startsWith("master_repl_offset:")) { return Long.parseLong(line.split(":")[1]); } } return 0; } }
|
2. 定时监控任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Component public class RedisMonitorTask { @Autowired private RedisHealthChecker healthChecker; @Scheduled(fixedRate = 30000) public void monitorRedisHealth() { System.out.println("=== Redis健康检查开始 ==="); boolean masterHealthy = healthChecker.checkMasterHealth(); System.out.println("主节点状态: " + (masterHealthy ? "健康" : "异常")); boolean slaveHealthy = healthChecker.checkSlaveHealth(); System.out.println("从节点状态: " + (slaveHealthy ? "健康" : "异常")); long lag = healthChecker.checkReplicationLag(); if (lag > 0) { System.out.println("复制延迟: " + lag + " bytes"); if (lag > 1024 * 1024) { System.out.println("警告: 复制延迟过大!"); } } System.out.println("=== Redis健康检查结束 ===\n"); } }
|
故障处理和切换
1. 故障检测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| @Component public class RedisFailoverHandler { @Autowired @Qualifier("masterJedisPool") private JedisPool masterPool; @Autowired @Qualifier("slaveJedisPool") private JedisPool slavePool; private volatile boolean masterDown = false; public boolean detectMasterFailure() { try (Jedis jedis = masterPool.getResource()) { jedis.ping(); masterDown = false; return false; } catch (Exception e) { masterDown = true; System.err.println("主节点故障检测: " + e.getMessage()); return true; } } public void handleFailover() { if (detectMasterFailure()) { System.out.println("检测到主节点故障,开始故障切换..."); System.out.println("停止写入操作"); notifyApplicationLayer(); waitForDataSync(); promoteSlaveToMaster(); System.out.println("故障切换完成"); } } private void notifyApplicationLayer() { System.out.println("通知应用层: 主节点故障,切换到只读模式"); } private void waitForDataSync() { System.out.println("等待数据同步完成..."); try { Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void promoteSlaveToMaster() { try (Jedis slaveJedis = slavePool.getResource()) { slaveJedis.slaveofNoOne(); System.out.println("从节点已提升为主节点"); } catch (Exception e) { System.err.println("提升从节点失败: " + e.getMessage()); } } }
|
2. 自动故障切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class AutoFailoverService { @Autowired private RedisFailoverHandler failoverHandler; @Scheduled(fixedRate = 10000) public void checkAndHandleFailover() { try { failoverHandler.handleFailover(); } catch (Exception e) { System.err.println("自动故障切换检查失败: " + e.getMessage()); } } }
|
性能优化
1. 连接池优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Configuration public class OptimizedRedisConfig { @Bean("optimizedMasterPool") public JedisPool optimizedMasterPool() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(50); config.setMaxIdle(20); config.setMinIdle(10); config.setMaxWaitMillis(3000); config.setTestOnBorrow(true); config.setTestOnReturn(true); config.setTestWhileIdle(true); config.setTimeBetweenEvictionRunsMillis(30000); config.setMinEvictableIdleTimeMillis(60000); config.setNumTestsPerEvictionRun(3); return new JedisPool(config, "192.168.1.100", 6379, 2000, "password"); } }
|
2. 批量操作优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Service public class OptimizedRedisService { @Autowired @Qualifier("optimizedMasterPool") private JedisPool masterPool; public void optimizedBatchWrite(Map<String, String> data) { try (Jedis jedis = masterPool.getResource()) { Pipeline pipeline = jedis.pipelined(); for (Map.Entry<String, String> entry : data.entrySet()) { pipeline.set(entry.getKey(), entry.getValue()); } for (String key : data.keySet()) { pipeline.expire(key, 3600); } List<Object> results = pipeline.syncAndReturnAll(); System.out.println("批量操作完成,处理了 " + results.size() + " 个命令"); } catch (Exception e) { System.err.println("批量写入失败: " + e.getMessage()); throw new RuntimeException("批量写入失败", e); } } public void optimizedComplexOperation(String key, String value) { String luaScript = "local key = KEYS[1]\n" + "local value = ARGV[1]\n" + "local ttl = ARGV[2]\n" + "redis.call('SET', key, value)\n" + "redis.call('EXPIRE', key, ttl)\n" + "return redis.call('GET', key)\n"; try (Jedis jedis = masterPool.getResource()) { Object result = jedis.eval(luaScript, 1, key, value, "3600"); System.out.println("Lua脚本执行结果: " + result); } catch (Exception e) { System.err.println("Lua脚本执行失败: " + e.getMessage()); } } }
|
实际应用案例
1. 电商系统缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| @Service public class EcommerceCacheService { @Autowired private RedisReadWriteService redisService; public void cacheProductDetail(String productId, ProductDetail product) { String key = "product:detail:" + productId; String value = JSON.toJSONString(product); redisService.write(key, value); try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 1800); } } public ProductDetail getProductDetail(String productId) { String key = "product:detail:" + productId; String value = redisService.read(key); if (value != null) { return JSON.parseObject(value, ProductDetail.class); } return loadFromDatabase(productId); } public void cacheUserCart(String userId, List<CartItem> cartItems) { String key = "user:cart:" + userId; String value = JSON.toJSONString(cartItems); redisService.write(key, value); } public List<CartItem> getUserCart(String userId) { String key = "user:cart:" + userId; String value = redisService.read(key); if (value != null) { return JSON.parseArray(value, CartItem.class); } return new ArrayList<>(); } private ProductDetail loadFromDatabase(String productId) { return new ProductDetail(); } }
|
2. 用户会话管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Service public class SessionManagementService { @Autowired private RedisReadWriteService redisService; public String createUserSession(String userId, UserSession session) { String sessionId = UUID.randomUUID().toString(); String key = "session:" + sessionId; String value = JSON.toJSONString(session); redisService.write(key, value); try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 1800); } String userSessionKey = "user:session:" + userId; redisService.write(userSessionKey, sessionId); return sessionId; } public UserSession getUserSession(String sessionId) { String key = "session:" + sessionId; String value = redisService.read(key); if (value != null) { return JSON.parseObject(value, UserSession.class); } return null; } public void extendSession(String sessionId) { String key = "session:" + sessionId; try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 1800); } } public void destroyUserSession(String sessionId) { String key = "session:" + sessionId; try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.del(key); } } }
|
总结
通过本文的详细介绍,我们了解了:
核心要点
- Redis主从架构原理:数据复制、读写分离、故障恢复
- Java集成方案:Spring Boot配置、连接池管理、读写分离实现
- 监控和健康检查:实时监控主从状态、复制延迟检测
- 故障处理机制:故障检测、自动切换、降级方案
- 性能优化策略:连接池优化、批量操作、Lua脚本
最佳实践
- 合理配置连接池:根据业务量调整连接池大小
- 实现读写分离:提高读性能,减轻主节点压力
- 监控复制延迟:确保数据一致性
- 准备故障切换:提高系统可用性
- 优化批量操作:使用Pipeline和Lua脚本
注意事项
- 主从复制存在延迟,需要根据业务需求评估
- 故障切换需要手动处理,建议结合Sentinel实现自动切换
- 监控和告警机制必不可少
- 定期备份和测试故障恢复流程
掌握Redis主从架构的Java实战应用,将大大提升系统的性能和可靠性,为高并发应用提供强有力的技术支撑。