第8集Java实战Redis主从架构原理及搭建实操指南
|字数总计:4.1k|阅读时长:19分钟|阅读量:
引言
在Java企业级应用中,Redis作为高性能的内存数据库,广泛应用于缓存、会话管理、分布式锁等场景。为了提高系统的可用性和性能,Redis主从架构成为了不可或缺的技术方案。本文将结合Java实战经验,深入探讨Redis主从架构的原理,并提供完整的搭建和集成指南。
Redis主从架构原理
什么是主从架构
Redis主从架构是一种数据复制方案,通过一个主节点(Master)和多个从节点(Slave)的协作,实现:
- 数据冗余:多个从节点提供数据备份
- 读写分离:主节点处理写操作,从节点处理读操作
- 负载分担:读请求分散到多个从节点
- 故障恢复:主节点故障时可以快速切换
主从复制流程
1. 建立连接阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class RedisReplicationClient { private String masterHost; private int masterPort; private String masterAuth;
public void connectToMaster() { try { Jedis masterJedis = new Jedis(masterHost, masterPort); if (masterAuth != null) { masterJedis.auth(masterAuth); }
String pong = masterJedis.ping(); if ("PONG".equals(pong)) { System.out.println("成功连接到主节点"); } } catch (Exception e) { System.err.println("连接主节点失败: " + e.getMessage()); } } }
|
2. 数据同步阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class DataSyncProcess {
public void syncData(Jedis master, Jedis slave) { String lastSave = master.lastsave(); System.out.println("主节点最后保存时间: " + lastSave);
master.set("sync_test", "Hello Master-Slave"); String value = slave.get("sync_test"); System.out.println("从节点获取到数据: " + value); } }
|
3. 命令传播阶段
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class CommandPropagation {
public void monitorReplication(Jedis master, Jedis slave) { String masterInfo = master.info("replication"); String slaveInfo = slave.info("replication");
System.out.println("主节点复制信息:"); System.out.println(masterInfo); System.out.println("从节点复制信息:"); System.out.println(slaveInfo); } }
|
Java集成Redis主从架构
Spring Boot配置
1. 添加依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> </dependencies>
|
2. 配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| spring: redis: host: 192.168.1.100 port: 6379 password: your_password timeout: 2000ms jedis: pool: max-active: 20 max-idle: 10 min-idle: 5 max-wait: 2000ms cluster: nodes: - 192.168.1.101:6379 - 192.168.1.102:6379
|
3. Redis配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| @Configuration @EnableCaching public class RedisConfig {
@Value("${spring.redis.host}") private String masterHost;
@Value("${spring.redis.port}") private int masterPort;
@Value("${spring.redis.password}") private String password;
@Value("${spring.redis.jedis.pool.max-active}") private int maxActive;
@Value("${spring.redis.jedis.pool.max-idle}") private int maxIdle;
@Value("${spring.redis.jedis.pool.min-idle}") private int minIdle;
@Value("${spring.redis.jedis.pool.max-wait}") private long maxWaitMillis;
@Bean("masterJedisPool") public JedisPool masterJedisPool() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWaitMillis); config.setTestOnBorrow(true); config.setTestOnReturn(true);
return new JedisPool(config, masterHost, masterPort, 2000, password); }
@Bean("slaveJedisPool") public JedisPool slaveJedisPool() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWaitMillis); config.setTestOnBorrow(true); config.setTestOnReturn(true);
return new JedisPool(config, "192.168.1.101", 6379, 2000, password); }
@Bean public RedisTemplate<String, Object> redisTemplate( @Qualifier("masterJedisPool") JedisPool masterPool) { RedisTemplate<String, Object> template = new RedisTemplate<>();
JedisConnectionFactory factory = new JedisConnectionFactory(); factory.setHostName(masterHost); factory.setPort(masterPort); factory.setPassword(password); factory.setPoolConfig(createPoolConfig()); factory.afterPropertiesSet();
template.setConnectionFactory(factory);
template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
template.afterPropertiesSet(); return template; }
private JedisPoolConfig createPoolConfig() { JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(maxActive); config.setMaxIdle(maxIdle); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWaitMillis); config.setTestOnBorrow(true); config.setTestOnReturn(true); return config; } }
|
读写分离实现
1. 读写分离服务类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| @Service public class RedisReadWriteService {
@Autowired @Qualifier("masterJedisPool") private JedisPool masterPool;
@Autowired @Qualifier("slaveJedisPool") private JedisPool slavePool;
public void write(String key, String value) { try (Jedis jedis = masterPool.getResource()) { jedis.set(key, value); System.out.println("写入主节点成功: " + key + " = " + value); } catch (Exception e) { System.err.println("写入主节点失败: " + e.getMessage()); throw new RuntimeException("写入失败", e); } }
public String read(String key) { try (Jedis jedis = slavePool.getResource()) { String value = jedis.get(key); System.out.println("从从节点读取: " + key + " = " + value); return value; } catch (Exception e) { System.err.println("从从节点读取失败: " + e.getMessage()); return readFromMaster(key); } }
private String readFromMaster(String key) { try (Jedis jedis = masterPool.getResource()) { String value = jedis.get(key); System.out.println("降级从主节点读取: " + key + " = " + value); return value; } catch (Exception e) { System.err.println("从主节点读取也失败: " + e.getMessage()); throw new RuntimeException("读取失败", e); } }
public void batchWrite(Map<String, String> keyValues) { try (Jedis jedis = masterPool.getResource()) { Pipeline pipeline = jedis.pipelined(); for (Map.Entry<String, String> entry : keyValues.entrySet()) { pipeline.set(entry.getKey(), entry.getValue()); } pipeline.sync(); System.out.println("批量写入完成,共 " + keyValues.size() + " 条记录"); } catch (Exception e) { System.err.println("批量写入失败: " + e.getMessage()); throw new RuntimeException("批量写入失败", e); } }
public Map<String, String> batchRead(List<String> keys) { Map<String, String> result = new HashMap<>(); try (Jedis jedis = slavePool.getResource()) { Pipeline pipeline = jedis.pipelined(); List<Response<String>> responses = new ArrayList<>();
for (String key : keys) { responses.add(pipeline.get(key)); } pipeline.sync();
for (int i = 0; i < keys.size(); i++) { String value = responses.get(i).get(); if (value != null) { result.put(keys.get(i), value); } } System.out.println("批量读取完成,共 " + result.size() + " 条记录"); } catch (Exception e) { System.err.println("批量读取失败: " + e.getMessage()); throw new RuntimeException("批量读取失败", e); } return result; } }
|
2. 缓存服务实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| @Service public class CacheService {
@Autowired private RedisReadWriteService redisService;
public void cacheUserInfo(String userId, UserInfo userInfo) { try { String key = "user:info:" + userId; String value = JSON.toJSONString(userInfo); redisService.write(key, value);
try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 3600); } } catch (Exception e) { System.err.println("缓存用户信息失败: " + e.getMessage()); } }
public UserInfo getUserInfo(String userId) { try { String key = "user:info:" + userId; String value = redisService.read(key); if (value != null) { return JSON.parseObject(value, UserInfo.class); } } catch (Exception e) { System.err.println("获取用户信息失败: " + e.getMessage()); } return null; }
public void cacheProductInfo(String productId, ProductInfo productInfo) { try { String key = "product:info:" + productId; String value = JSON.toJSONString(productInfo); redisService.write(key, value);
try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 1800); } } catch (Exception e) { System.err.println("缓存商品信息失败: " + e.getMessage()); } }
public ProductInfo getProductInfo(String productId) { try { String key = "product:info:" + productId; String value = redisService.read(key); if (value != null) { return JSON.parseObject(value, ProductInfo.class); } } catch (Exception e) { System.err.println("获取商品信息失败: " + e.getMessage()); } return null; } }
|
监控和健康检查
1. 健康检查服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| @Component public class RedisHealthChecker {
@Autowired @Qualifier("masterJedisPool") private JedisPool masterPool;
@Autowired @Qualifier("slaveJedisPool") private JedisPool slavePool;
public boolean checkMasterHealth() { try (Jedis jedis = masterPool.getResource()) { String pong = jedis.ping(); if ("PONG".equals(pong)) { String info = jedis.info("replication"); System.out.println("主节点健康检查通过"); System.out.println("主节点复制信息: " + info); return true; } } catch (Exception e) { System.err.println("主节点健康检查失败: " + e.getMessage()); } return false; }
public boolean checkSlaveHealth() { try (Jedis jedis = slavePool.getResource()) { String pong = jedis.ping(); if ("PONG".equals(pong)) { String info = jedis.info("replication"); System.out.println("从节点健康检查通过"); System.out.println("从节点复制信息: " + info); return true; } } catch (Exception e) { System.err.println("从节点健康检查失败: " + e.getMessage()); } return false; }
public long checkReplicationLag() { try (Jedis masterJedis = masterPool.getResource(); Jedis slaveJedis = slavePool.getResource()) {
String masterInfo = masterJedis.info("replication"); String slaveInfo = slaveJedis.info("replication");
long masterOffset = parseOffset(masterInfo); long slaveOffset = parseOffset(slaveInfo);
long lag = masterOffset - slaveOffset; System.out.println("主从复制延迟: " + lag + " bytes");
return lag; } catch (Exception e) { System.err.println("检查复制延迟失败: " + e.getMessage()); return -1; } }
private long parseOffset(String info) { String[] lines = info.split("\r\n"); for (String line : lines) { if (line.startsWith("master_repl_offset:")) { return Long.parseLong(line.split(":")[1]); } } return 0; } }
|
2. 定时监控任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Component public class RedisMonitorTask {
@Autowired private RedisHealthChecker healthChecker;
@Scheduled(fixedRate = 30000) public void monitorRedisHealth() { System.out.println("=== Redis健康检查开始 ===");
boolean masterHealthy = healthChecker.checkMasterHealth(); System.out.println("主节点状态: " + (masterHealthy ? "健康" : "异常"));
boolean slaveHealthy = healthChecker.checkSlaveHealth(); System.out.println("从节点状态: " + (slaveHealthy ? "健康" : "异常"));
long lag = healthChecker.checkReplicationLag(); if (lag > 0) { System.out.println("复制延迟: " + lag + " bytes"); if (lag > 1024 * 1024) { System.out.println("警告: 复制延迟过大!"); } }
System.out.println("=== Redis健康检查结束 ===\n"); } }
|
故障处理和切换
1. 故障检测
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| @Component public class RedisFailoverHandler {
@Autowired @Qualifier("masterJedisPool") private JedisPool masterPool;
@Autowired @Qualifier("slaveJedisPool") private JedisPool slavePool;
private volatile boolean masterDown = false;
public boolean detectMasterFailure() { try (Jedis jedis = masterPool.getResource()) { jedis.ping(); masterDown = false; return false; } catch (Exception e) { masterDown = true; System.err.println("主节点故障检测: " + e.getMessage()); return true; } }
public void handleFailover() { if (detectMasterFailure()) { System.out.println("检测到主节点故障,开始故障切换...");
System.out.println("停止写入操作");
notifyApplicationLayer();
waitForDataSync();
promoteSlaveToMaster();
System.out.println("故障切换完成"); } }
private void notifyApplicationLayer() { System.out.println("通知应用层: 主节点故障,切换到只读模式"); }
private void waitForDataSync() { System.out.println("等待数据同步完成..."); try { Thread.sleep(5000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
private void promoteSlaveToMaster() { try (Jedis slaveJedis = slavePool.getResource()) { slaveJedis.slaveofNoOne(); System.out.println("从节点已提升为主节点"); } catch (Exception e) { System.err.println("提升从节点失败: " + e.getMessage()); } } }
|
2. 自动故障切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class AutoFailoverService {
@Autowired private RedisFailoverHandler failoverHandler;
@Scheduled(fixedRate = 10000) public void checkAndHandleFailover() { try { failoverHandler.handleFailover(); } catch (Exception e) { System.err.println("自动故障切换检查失败: " + e.getMessage()); } } }
|
性能优化
1. 连接池优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Configuration public class OptimizedRedisConfig {
@Bean("optimizedMasterPool") public JedisPool optimizedMasterPool() { JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(50); config.setMaxIdle(20); config.setMinIdle(10); config.setMaxWaitMillis(3000);
config.setTestOnBorrow(true); config.setTestOnReturn(true); config.setTestWhileIdle(true);
config.setTimeBetweenEvictionRunsMillis(30000); config.setMinEvictableIdleTimeMillis(60000); config.setNumTestsPerEvictionRun(3);
return new JedisPool(config, "192.168.1.100", 6379, 2000, "password"); } }
|
2. 批量操作优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Service public class OptimizedRedisService {
@Autowired @Qualifier("optimizedMasterPool") private JedisPool masterPool;
public void optimizedBatchWrite(Map<String, String> data) { try (Jedis jedis = masterPool.getResource()) { Pipeline pipeline = jedis.pipelined();
for (Map.Entry<String, String> entry : data.entrySet()) { pipeline.set(entry.getKey(), entry.getValue()); }
for (String key : data.keySet()) { pipeline.expire(key, 3600); }
List<Object> results = pipeline.syncAndReturnAll(); System.out.println("批量操作完成,处理了 " + results.size() + " 个命令"); } catch (Exception e) { System.err.println("批量写入失败: " + e.getMessage()); throw new RuntimeException("批量写入失败", e); } }
public void optimizedComplexOperation(String key, String value) { String luaScript = "local key = KEYS[1]\n" + "local value = ARGV[1]\n" + "local ttl = ARGV[2]\n" + "redis.call('SET', key, value)\n" + "redis.call('EXPIRE', key, ttl)\n" + "return redis.call('GET', key)\n";
try (Jedis jedis = masterPool.getResource()) { Object result = jedis.eval(luaScript, 1, key, value, "3600"); System.out.println("Lua脚本执行结果: " + result); } catch (Exception e) { System.err.println("Lua脚本执行失败: " + e.getMessage()); } } }
|
实际应用案例
1. 电商系统缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| @Service public class EcommerceCacheService {
@Autowired private RedisReadWriteService redisService;
public void cacheProductDetail(String productId, ProductDetail product) { String key = "product:detail:" + productId; String value = JSON.toJSONString(product);
redisService.write(key, value);
try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 1800); } }
public ProductDetail getProductDetail(String productId) { String key = "product:detail:" + productId; String value = redisService.read(key);
if (value != null) { return JSON.parseObject(value, ProductDetail.class); }
return loadFromDatabase(productId); }
public void cacheUserCart(String userId, List<CartItem> cartItems) { String key = "user:cart:" + userId; String value = JSON.toJSONString(cartItems);
redisService.write(key, value);
}
public List<CartItem> getUserCart(String userId) { String key = "user:cart:" + userId; String value = redisService.read(key);
if (value != null) { return JSON.parseArray(value, CartItem.class); }
return new ArrayList<>(); }
private ProductDetail loadFromDatabase(String productId) { return new ProductDetail(); } }
|
2. 用户会话管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| @Service public class SessionManagementService {
@Autowired private RedisReadWriteService redisService;
public String createUserSession(String userId, UserSession session) { String sessionId = UUID.randomUUID().toString(); String key = "session:" + sessionId; String value = JSON.toJSONString(session);
redisService.write(key, value);
try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 1800); }
String userSessionKey = "user:session:" + userId; redisService.write(userSessionKey, sessionId);
return sessionId; }
public UserSession getUserSession(String sessionId) { String key = "session:" + sessionId; String value = redisService.read(key);
if (value != null) { return JSON.parseObject(value, UserSession.class); }
return null; }
public void extendSession(String sessionId) { String key = "session:" + sessionId;
try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.expire(key, 1800); } }
public void destroyUserSession(String sessionId) { String key = "session:" + sessionId;
try (Jedis jedis = redisService.getMasterPool().getResource()) { jedis.del(key); } } }
|
总结
通过本文的详细介绍,我们了解了:
核心要点
- Redis主从架构原理:数据复制、读写分离、故障恢复
- Java集成方案:Spring Boot配置、连接池管理、读写分离实现
- 监控和健康检查:实时监控主从状态、复制延迟检测
- 故障处理机制:故障检测、自动切换、降级方案
- 性能优化策略:连接池优化、批量操作、Lua脚本
最佳实践
- 合理配置连接池:根据业务量调整连接池大小
- 实现读写分离:提高读性能,减轻主节点压力
- 监控复制延迟:确保数据一致性
- 准备故障切换:提高系统可用性
- 优化批量操作:使用Pipeline和Lua脚本
注意事项
- 主从复制存在延迟,需要根据业务需求评估
- 故障切换需要手动处理,建议结合Sentinel实现自动切换
- 监控和告警机制必不可少
- 定期备份和测试故障恢复流程
掌握Redis主从架构的Java实战应用,将大大提升系统的性能和可靠性,为高并发应用提供强有力的技术支撑。