1. Keepalive架构概述

Keepalive是一个基于VRRP协议的高可用解决方案,主要用于实现服务的自动故障转移和负载均衡。它通过虚拟IP(VIP)漂移机制,确保服务的高可用性和连续性。本文将从架构师的角度深入分析Keepalive的原理、配置、优化策略以及生产环境中的最佳实践。

1.1 Keepalive核心特性

  1. VRRP协议: 实现虚拟路由器冗余协议
  2. 健康检查: 支持多种健康检查机制
  3. 负载均衡: 集成LVS实现负载均衡
  4. 故障转移: 自动检测故障并切换服务
  5. VIP管理: 虚拟IP的自动分配和回收

1.2 Keepalive架构层次

1
2
3
4
5
6
7
8
9
10
11
12
13
┌─────────────────────────────────────────────────────────┐
│ 应用层 │
│ (Web服务、API服务、数据库服务等) │
├─────────────────────────────────────────────────────────┤
│ Keepalive层 │
│ (VRRP协议、健康检查、故障转移、VIP管理) │
├─────────────────────────────────────────────────────────┤
│ LVS层 │
│ (负载均衡、流量分发、会话保持) │
├─────────────────────────────────────────────────────────┤
│ 网络层 │
│ (IP路由、网络接口、物理网络) │
└─────────────────────────────────────────────────────────┘

2. Keepalive核心组件

2.1 VRRP协议实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// VRRP协议管理器
@Component
@Slf4j
public class VRRPProtocolManager {

private final KeepaliveConfig keepaliveConfig;
private final NetworkInterfaceManager networkManager;
private final HealthCheckManager healthCheckManager;
private final MeterRegistry meterRegistry;

public VRRPProtocolManager(KeepaliveConfig keepaliveConfig,
NetworkInterfaceManager networkManager,
HealthCheckManager healthCheckManager,
MeterRegistry meterRegistry) {
this.keepaliveConfig = keepaliveConfig;
this.networkManager = networkManager;
this.healthCheckManager = healthCheckManager;
this.meterRegistry = meterRegistry;
}

// 初始化VRRP实例
public void initializeVRRPInstance(VRRPInstance instance) {
log.info("Initializing VRRP instance: {}", instance.getInstanceId());

try {
// 设置VRRP参数
configureVRRPParameters(instance);

// 启动VRRP守护进程
startVRRPDaemon(instance);

// 注册健康检查
registerHealthCheck(instance);

// 启动心跳检测
startHeartbeatDetection(instance);

meterRegistry.counter("vrrp.instance.initialized").increment();
log.info("VRRP instance initialized successfully: {}", instance.getInstanceId());

} catch (Exception e) {
log.error("Error initializing VRRP instance: {}", instance.getInstanceId(), e);
meterRegistry.counter("vrrp.instance.init_error").increment();
throw new RuntimeException("Failed to initialize VRRP instance", e);
}
}

// 配置VRRP参数
private void configureVRRPParameters(VRRPInstance instance) {
VRRPConfiguration config = VRRPConfiguration.builder()
.instanceId(instance.getInstanceId())
.virtualRouterId(instance.getVirtualRouterId())
.priority(instance.getPriority())
.advertInterval(instance.getAdvertInterval())
.preempt(instance.isPreempt())
.authentication(instance.getAuthentication())
.virtualIpAddresses(instance.getVirtualIpAddresses())
.build();

// 应用VRRP配置
applyVRRPConfiguration(config);
}

// 应用VRRP配置
private void applyVRRPConfiguration(VRRPConfiguration config) {
try {
// 创建VRRP配置文件
String configContent = generateVRRPConfig(config);

// 写入配置文件
writeVRRPConfigFile(configContent);

// 重新加载配置
reloadVRRPConfiguration();

log.info("VRRP configuration applied successfully");

} catch (Exception e) {
log.error("Error applying VRRP configuration", e);
throw new RuntimeException("Failed to apply VRRP configuration", e);
}
}

// 生成VRRP配置内容
private String generateVRRPConfig(VRRPConfiguration config) {
StringBuilder configBuilder = new StringBuilder();

configBuilder.append("vrrp_instance ").append(config.getInstanceId()).append(" {\n");
configBuilder.append(" state ").append(config.getState()).append("\n");
configBuilder.append(" interface ").append(config.getInterface()).append("\n");
configBuilder.append(" virtual_router_id ").append(config.getVirtualRouterId()).append("\n");
configBuilder.append(" priority ").append(config.getPriority()).append("\n");
configBuilder.append(" advert_int ").append(config.getAdvertInterval()).append("\n");
configBuilder.append(" preempt ").append(config.isPreempt() ? "on" : "off").append("\n");

if (config.getAuthentication() != null) {
configBuilder.append(" authentication {\n");
configBuilder.append(" auth_type ").append(config.getAuthentication().getAuthType()).append("\n");
configBuilder.append(" auth_pass ").append(config.getAuthentication().getAuthPass()).append("\n");
configBuilder.append(" }\n");
}

configBuilder.append(" virtual_ipaddress {\n");
for (String vip : config.getVirtualIpAddresses()) {
configBuilder.append(" ").append(vip).append("\n");
}
configBuilder.append(" }\n");

configBuilder.append("}\n");

return configBuilder.toString();
}

// 写入VRRP配置文件
private void writeVRRPConfigFile(String configContent) {
try {
Path configPath = Paths.get("/etc/keepalived/keepalived.conf");
Files.write(configPath, configContent.getBytes(), StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);

log.info("VRRP configuration file written successfully");

} catch (Exception e) {
log.error("Error writing VRRP configuration file", e);
throw new RuntimeException("Failed to write VRRP configuration file", e);
}
}

// 重新加载VRRP配置
private void reloadVRRPConfiguration() {
try {
ProcessBuilder pb = new ProcessBuilder("systemctl", "reload", "keepalived");
Process process = pb.start();

int exitCode = process.waitFor();
if (exitCode == 0) {
log.info("VRRP configuration reloaded successfully");
} else {
throw new RuntimeException("Failed to reload VRRP configuration");
}

} catch (Exception e) {
log.error("Error reloading VRRP configuration", e);
throw new RuntimeException("Failed to reload VRRP configuration", e);
}
}

// 启动VRRP守护进程
private void startVRRPDaemon(VRRPInstance instance) {
try {
ProcessBuilder pb = new ProcessBuilder("keepalived", "-D", "-f", "/etc/keepalived/keepalived.conf");
Process process = pb.start();

// 等待进程启动
Thread.sleep(2000);

if (process.isAlive()) {
log.info("VRRP daemon started successfully for instance: {}", instance.getInstanceId());
} else {
throw new RuntimeException("Failed to start VRRP daemon");
}

} catch (Exception e) {
log.error("Error starting VRRP daemon", e);
throw new RuntimeException("Failed to start VRRP daemon", e);
}
}

// 注册健康检查
private void registerHealthCheck(VRRPInstance instance) {
try {
HealthCheckConfig healthCheckConfig = HealthCheckConfig.builder()
.instanceId(instance.getInstanceId())
.checkType(instance.getHealthCheckType())
.checkInterval(instance.getCheckInterval())
.checkTimeout(instance.getCheckTimeout())
.maxRetries(instance.getMaxRetries())
.build();

healthCheckManager.registerHealthCheck(healthCheckConfig);

log.info("Health check registered for VRRP instance: {}", instance.getInstanceId());

} catch (Exception e) {
log.error("Error registering health check", e);
throw new RuntimeException("Failed to register health check", e);
}
}

// 启动心跳检测
private void startHeartbeatDetection(VRRPInstance instance) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

scheduler.scheduleAtFixedRate(() -> {
try {
// 检测VRRP状态
VRRPStatus status = checkVRRPStatus(instance);

// 更新指标
updateVRRPMetrics(status);

// 处理状态变化
handleVRRPStatusChange(status);

} catch (Exception e) {
log.error("Error in heartbeat detection", e);
}
}, 0, 5, TimeUnit.SECONDS);

log.info("Heartbeat detection started for VRRP instance: {}", instance.getInstanceId());
}

// 检查VRRP状态
private VRRPStatus checkVRRPStatus(VRRPInstance instance) {
try {
// 检查VRRP进程状态
boolean processRunning = isVRRPProcessRunning();

// 检查VIP状态
boolean vipActive = isVIPActive(instance.getVirtualIpAddresses());

// 检查网络接口状态
boolean interfaceUp = networkManager.isInterfaceUp(instance.getInterface());

VRRPStatus status = VRRPStatus.builder()
.instanceId(instance.getInstanceId())
.processRunning(processRunning)
.vipActive(vipActive)
.interfaceUp(interfaceUp)
.timestamp(LocalDateTime.now())
.build();

return status;

} catch (Exception e) {
log.error("Error checking VRRP status", e);
return VRRPStatus.builder()
.instanceId(instance.getInstanceId())
.processRunning(false)
.vipActive(false)
.interfaceUp(false)
.timestamp(LocalDateTime.now())
.error(e.getMessage())
.build();
}
}

// 检查VRRP进程是否运行
private boolean isVRRPProcessRunning() {
try {
ProcessBuilder pb = new ProcessBuilder("pgrep", "keepalived");
Process process = pb.start();

int exitCode = process.waitFor();
return exitCode == 0;

} catch (Exception e) {
log.error("Error checking VRRP process", e);
return false;
}
}

// 检查VIP是否激活
private boolean isVIPActive(List<String> virtualIpAddresses) {
try {
for (String vip : virtualIpAddresses) {
if (networkManager.isIPAddressActive(vip)) {
return true;
}
}
return false;

} catch (Exception e) {
log.error("Error checking VIP status", e);
return false;
}
}

// 更新VRRP指标
private void updateVRRPMetrics(VRRPStatus status) {
meterRegistry.gauge("vrrp.process.running", status.isProcessRunning() ? 1 : 0);
meterRegistry.gauge("vrrp.vip.active", status.isVipActive() ? 1 : 0);
meterRegistry.gauge("vrrp.interface.up", status.isInterfaceUp() ? 1 : 0);
}

// 处理VRRP状态变化
private void handleVRRPStatusChange(VRRPStatus status) {
// 这里可以实现状态变化的处理逻辑
// 比如发送告警、记录日志等
}
}

2.2 健康检查机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
// 健康检查管理器
@Component
@Slf4j
public class HealthCheckManager {

private final Map<String, HealthCheckConfig> healthCheckConfigs = new ConcurrentHashMap<>();
private final Map<String, HealthCheckResult> healthCheckResults = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
private final MeterRegistry meterRegistry;

public HealthCheckManager(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}

// 注册健康检查
public void registerHealthCheck(HealthCheckConfig config) {
log.info("Registering health check: {}", config.getInstanceId());

try {
healthCheckConfigs.put(config.getInstanceId(), config);

// 启动健康检查任务
startHealthCheckTask(config);

meterRegistry.counter("health_check.registered").increment();
log.info("Health check registered successfully: {}", config.getInstanceId());

} catch (Exception e) {
log.error("Error registering health check: {}", config.getInstanceId(), e);
meterRegistry.counter("health_check.register_error").increment();
}
}

// 启动健康检查任务
private void startHealthCheckTask(HealthCheckConfig config) {
scheduler.scheduleAtFixedRate(() -> {
try {
HealthCheckResult result = performHealthCheck(config);
healthCheckResults.put(config.getInstanceId(), result);

// 更新指标
updateHealthCheckMetrics(result);

// 处理健康检查结果
handleHealthCheckResult(result);

} catch (Exception e) {
log.error("Error performing health check: {}", config.getInstanceId(), e);
}
}, 0, config.getCheckInterval(), TimeUnit.SECONDS);
}

// 执行健康检查
private HealthCheckResult performHealthCheck(HealthCheckConfig config) {
long startTime = System.currentTimeMillis();

try {
boolean healthy = false;
String errorMessage = null;

switch (config.getCheckType()) {
case HTTP:
healthy = performHttpHealthCheck(config);
break;
case TCP:
healthy = performTcpHealthCheck(config);
break;
case SCRIPT:
healthy = performScriptHealthCheck(config);
break;
case PING:
healthy = performPingHealthCheck(config);
break;
default:
throw new IllegalArgumentException("Unsupported health check type: " + config.getCheckType());
}

long duration = System.currentTimeMillis() - startTime;

return HealthCheckResult.builder()
.instanceId(config.getInstanceId())
.healthy(healthy)
.duration(duration)
.timestamp(LocalDateTime.now())
.errorMessage(errorMessage)
.build();

} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;

return HealthCheckResult.builder()
.instanceId(config.getInstanceId())
.healthy(false)
.duration(duration)
.timestamp(LocalDateTime.now())
.errorMessage(e.getMessage())
.build();
}
}

// HTTP健康检查
private boolean performHttpHealthCheck(HealthCheckConfig config) {
try {
URL url = new URL(config.getCheckUrl());
HttpURLConnection connection = (HttpURLConnection) url.openConnection();

connection.setRequestMethod("GET");
connection.setConnectTimeout(config.getCheckTimeout() * 1000);
connection.setReadTimeout(config.getCheckTimeout() * 1000);

int responseCode = connection.getResponseCode();
return responseCode >= 200 && responseCode < 300;

} catch (Exception e) {
log.error("HTTP health check failed: {}", config.getCheckUrl(), e);
return false;
}
}

// TCP健康检查
private boolean performTcpHealthCheck(HealthCheckConfig config) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(config.getCheckHost(), config.getCheckPort()),
config.getCheckTimeout() * 1000);
socket.close();
return true;

} catch (Exception e) {
log.error("TCP health check failed: {}:{}", config.getCheckHost(), config.getCheckPort(), e);
return false;
}
}

// 脚本健康检查
private boolean performScriptHealthCheck(HealthCheckConfig config) {
try {
ProcessBuilder pb = new ProcessBuilder(config.getCheckScript());
Process process = pb.start();

int exitCode = process.waitFor();
return exitCode == 0;

} catch (Exception e) {
log.error("Script health check failed: {}", config.getCheckScript(), e);
return false;
}
}

// Ping健康检查
private boolean performPingHealthCheck(HealthCheckConfig config) {
try {
ProcessBuilder pb = new ProcessBuilder("ping", "-c", "1", "-W",
String.valueOf(config.getCheckTimeout()), config.getCheckHost());
Process process = pb.start();

int exitCode = process.waitFor();
return exitCode == 0;

} catch (Exception e) {
log.error("Ping health check failed: {}", config.getCheckHost(), e);
return false;
}
}

// 更新健康检查指标
private void updateHealthCheckMetrics(HealthCheckResult result) {
meterRegistry.gauge("health_check.healthy", result.isHealthy() ? 1 : 0)
.tag("instance", result.getInstanceId()).register();
meterRegistry.gauge("health_check.duration", result.getDuration())
.tag("instance", result.getInstanceId()).register();

if (result.isHealthy()) {
meterRegistry.counter("health_check.success").increment();
} else {
meterRegistry.counter("health_check.failure").increment();
}
}

// 处理健康检查结果
private void handleHealthCheckResult(HealthCheckResult result) {
if (!result.isHealthy()) {
log.warn("Health check failed for instance: {}, error: {}",
result.getInstanceId(), result.getErrorMessage());

// 触发故障转移
triggerFailover(result.getInstanceId());
}
}

// 触发故障转移
private void triggerFailover(String instanceId) {
log.info("Triggering failover for instance: {}", instanceId);

try {
// 这里可以实现故障转移逻辑
// 比如降低优先级、发送通知等

meterRegistry.counter("health_check.failover_triggered").increment();

} catch (Exception e) {
log.error("Error triggering failover", e);
}
}

// 获取健康检查结果
public HealthCheckResult getHealthCheckResult(String instanceId) {
return healthCheckResults.get(instanceId);
}

// 获取所有健康检查结果
public Map<String, HealthCheckResult> getAllHealthCheckResults() {
return new HashMap<>(healthCheckResults);
}
}

3. LVS负载均衡集成

3.1 LVS配置管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
// LVS配置管理器
@Component
@Slf4j
public class LVSConfigurationManager {

private final KeepaliveConfig keepaliveConfig;
private final NetworkInterfaceManager networkManager;
private final MeterRegistry meterRegistry;

public LVSConfigurationManager(KeepaliveConfig keepaliveConfig,
NetworkInterfaceManager networkManager,
MeterRegistry meterRegistry) {
this.keepaliveConfig = keepaliveConfig;
this.networkManager = networkManager;
this.meterRegistry = meterRegistry;
}

// 配置LVS虚拟服务器
public void configureLVSVirtualServer(LVSVirtualServer virtualServer) {
log.info("Configuring LVS virtual server: {}", virtualServer.getName());

try {
// 生成LVS配置
String lvsConfig = generateLVSConfig(virtualServer);

// 应用LVS配置
applyLVSConfiguration(lvsConfig);

// 启动LVS服务
startLVSService();

meterRegistry.counter("lvs.virtual_server.configured").increment();
log.info("LVS virtual server configured successfully: {}", virtualServer.getName());

} catch (Exception e) {
log.error("Error configuring LVS virtual server: {}", virtualServer.getName(), e);
meterRegistry.counter("lvs.virtual_server.config_error").increment();
throw new RuntimeException("Failed to configure LVS virtual server", e);
}
}

// 生成LVS配置
private String generateLVSConfig(LVSVirtualServer virtualServer) {
StringBuilder configBuilder = new StringBuilder();

// 虚拟服务器配置
configBuilder.append("virtual_server ").append(virtualServer.getVirtualIp())
.append(" ").append(virtualServer.getVirtualPort()).append(" {\n");
configBuilder.append(" delay_loop ").append(virtualServer.getDelayLoop()).append("\n");
configBuilder.append(" lb_algo ").append(virtualServer.getLbAlgo()).append("\n");
configBuilder.append(" lb_kind ").append(virtualServer.getLbKind()).append("\n");
configBuilder.append(" protocol ").append(virtualServer.getProtocol()).append("\n");

// 真实服务器配置
for (LVSRealServer realServer : virtualServer.getRealServers()) {
configBuilder.append(" real_server ").append(realServer.getIp())
.append(" ").append(realServer.getPort()).append(" {\n");
configBuilder.append(" weight ").append(realServer.getWeight()).append("\n");
configBuilder.append(" TCP_CHECK {\n");
configBuilder.append(" connect_timeout ").append(realServer.getConnectTimeout()).append("\n");
configBuilder.append(" nb_get_retry ").append(realServer.getNbGetRetry()).append("\n");
configBuilder.append(" delay_before_retry ").append(realServer.getDelayBeforeRetry()).append("\n");
configBuilder.append(" }\n");
configBuilder.append(" }\n");
}

configBuilder.append("}\n");

return configBuilder.toString();
}

// 应用LVS配置
private void applyLVSConfiguration(String lvsConfig) {
try {
// 写入LVS配置文件
Path configPath = Paths.get("/etc/keepalived/keepalived.conf");
Files.write(configPath, lvsConfig.getBytes(), StandardOpenOption.APPEND);

log.info("LVS configuration applied successfully");

} catch (Exception e) {
log.error("Error applying LVS configuration", e);
throw new RuntimeException("Failed to apply LVS configuration", e);
}
}

// 启动LVS服务
private void startLVSService() {
try {
ProcessBuilder pb = new ProcessBuilder("systemctl", "start", "keepalived");
Process process = pb.start();

int exitCode = process.waitFor();
if (exitCode == 0) {
log.info("LVS service started successfully");
} else {
throw new RuntimeException("Failed to start LVS service");
}

} catch (Exception e) {
log.error("Error starting LVS service", e);
throw new RuntimeException("Failed to start LVS service", e);
}
}

// 添加真实服务器
public void addRealServer(String virtualServerName, LVSRealServer realServer) {
log.info("Adding real server to virtual server: {} -> {}:{}",
virtualServerName, realServer.getIp(), realServer.getPort());

try {
// 使用ipvsadm命令添加真实服务器
ProcessBuilder pb = new ProcessBuilder("ipvsadm", "-a", "-t",
virtualServerName, "-r", realServer.getIp() + ":" + realServer.getPort(),
"-w", String.valueOf(realServer.getWeight()));

Process process = pb.start();
int exitCode = process.waitFor();

if (exitCode == 0) {
meterRegistry.counter("lvs.real_server.added").increment();
log.info("Real server added successfully");
} else {
throw new RuntimeException("Failed to add real server");
}

} catch (Exception e) {
log.error("Error adding real server", e);
meterRegistry.counter("lvs.real_server.add_error").increment();
throw new RuntimeException("Failed to add real server", e);
}
}

// 删除真实服务器
public void removeRealServer(String virtualServerName, String realServerIp, int realServerPort) {
log.info("Removing real server from virtual server: {} -> {}:{}",
virtualServerName, realServerIp, realServerPort);

try {
// 使用ipvsadm命令删除真实服务器
ProcessBuilder pb = new ProcessBuilder("ipvsadm", "-d", "-t",
virtualServerName, "-r", realServerIp + ":" + realServerPort);

Process process = pb.start();
int exitCode = process.waitFor();

if (exitCode == 0) {
meterRegistry.counter("lvs.real_server.removed").increment();
log.info("Real server removed successfully");
} else {
throw new RuntimeException("Failed to remove real server");
}

} catch (Exception e) {
log.error("Error removing real server", e);
meterRegistry.counter("lvs.real_server.remove_error").increment();
throw new RuntimeException("Failed to remove real server", e);
}
}

// 获取LVS状态
public LVSStatus getLVSStatus() {
try {
// 获取虚拟服务器状态
List<LVSVirtualServerStatus> virtualServers = getVirtualServerStatus();

// 获取真实服务器状态
List<LVSRealServerStatus> realServers = getRealServerStatus();

return LVSStatus.builder()
.virtualServers(virtualServers)
.realServers(realServers)
.timestamp(LocalDateTime.now())
.build();

} catch (Exception e) {
log.error("Error getting LVS status", e);
return LVSStatus.builder()
.timestamp(LocalDateTime.now())
.error(e.getMessage())
.build();
}
}

// 获取虚拟服务器状态
private List<LVSVirtualServerStatus> getVirtualServerStatus() {
List<LVSVirtualServerStatus> statusList = new ArrayList<>();

try {
ProcessBuilder pb = new ProcessBuilder("ipvsadm", "-L", "-n");
Process process = pb.start();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;

while ((line = reader.readLine()) != null) {
if (line.contains("->")) {
// 解析虚拟服务器状态
LVSVirtualServerStatus status = parseVirtualServerStatus(line);
if (status != null) {
statusList.add(status);
}
}
}

} catch (Exception e) {
log.error("Error getting virtual server status", e);
}

return statusList;
}

// 获取真实服务器状态
private List<LVSRealServerStatus> getRealServerStatus() {
List<LVSRealServerStatus> statusList = new ArrayList<>();

try {
ProcessBuilder pb = new ProcessBuilder("ipvsadm", "-L", "-n");
Process process = pb.start();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line;

while ((line = reader.readLine()) != null) {
if (line.contains("->") && !line.startsWith("IP")) {
// 解析真实服务器状态
LVSRealServerStatus status = parseRealServerStatus(line);
if (status != null) {
statusList.add(status);
}
}
}

} catch (Exception e) {
log.error("Error getting real server status", e);
}

return statusList;
}

// 解析虚拟服务器状态
private LVSVirtualServerStatus parseVirtualServerStatus(String line) {
try {
String[] parts = line.split("\\s+");
if (parts.length >= 3) {
return LVSVirtualServerStatus.builder()
.virtualIp(parts[0])
.virtualPort(parts[1])
.protocol(parts[2])
.build();
}
} catch (Exception e) {
log.error("Error parsing virtual server status", e);
}

return null;
}

// 解析真实服务器状态
private LVSRealServerStatus parseRealServerStatus(String line) {
try {
String[] parts = line.split("\\s+");
if (parts.length >= 4) {
return LVSRealServerStatus.builder()
.realIp(parts[0])
.realPort(parts[1])
.weight(Integer.parseInt(parts[2]))
.activeConnections(Integer.parseInt(parts[3]))
.build();
}
} catch (Exception e) {
log.error("Error parsing real server status", e);
}

return null;
}
}

3.2 负载均衡算法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// 负载均衡算法管理器
@Component
@Slf4j
public class LoadBalancingAlgorithmManager {

private final Map<String, LoadBalancingAlgorithm> algorithms = new HashMap<>();
private final MeterRegistry meterRegistry;

public LoadBalancingAlgorithmManager(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
initializeAlgorithms();
}

// 初始化负载均衡算法
private void initializeAlgorithms() {
algorithms.put("rr", new RoundRobinAlgorithm());
algorithms.put("wrr", new WeightedRoundRobinAlgorithm());
algorithms.put("lc", new LeastConnectionsAlgorithm());
algorithms.put("wlc", new WeightedLeastConnectionsAlgorithm());
algorithms.put("sh", new SourceHashingAlgorithm());
algorithms.put("dh", new DestinationHashingAlgorithm());

log.info("Load balancing algorithms initialized: {}", algorithms.keySet());
}

// 选择服务器
public String selectServer(String algorithm, List<ServerInfo> servers, String clientIp) {
try {
LoadBalancingAlgorithm algo = algorithms.get(algorithm);
if (algo == null) {
throw new IllegalArgumentException("Unsupported algorithm: " + algorithm);
}

String selectedServer = algo.selectServer(servers, clientIp);

// 更新指标
meterRegistry.counter("load_balancer.server_selected")
.tag("algorithm", algorithm)
.tag("server", selectedServer)
.increment();

return selectedServer;

} catch (Exception e) {
log.error("Error selecting server with algorithm: {}", algorithm, e);
meterRegistry.counter("load_balancer.selection_error")
.tag("algorithm", algorithm)
.increment();
return null;
}
}

// 轮询算法
private static class RoundRobinAlgorithm implements LoadBalancingAlgorithm {
private final AtomicInteger counter = new AtomicInteger(0);

@Override
public String selectServer(List<ServerInfo> servers, String clientIp) {
if (servers.isEmpty()) {
return null;
}

int index = counter.getAndIncrement() % servers.size();
return servers.get(index).getAddress();
}
}

// 加权轮询算法
private static class WeightedRoundRobinAlgorithm implements LoadBalancingAlgorithm {
private final AtomicInteger counter = new AtomicInteger(0);

@Override
public String selectServer(List<ServerInfo> servers, String clientIp) {
if (servers.isEmpty()) {
return null;
}

int totalWeight = servers.stream().mapToInt(ServerInfo::getWeight).sum();
int currentWeight = counter.getAndIncrement() % totalWeight;

int weightSum = 0;
for (ServerInfo server : servers) {
weightSum += server.getWeight();
if (currentWeight < weightSum) {
return server.getAddress();
}
}

return servers.get(0).getAddress();
}
}

// 最少连接算法
private static class LeastConnectionsAlgorithm implements LoadBalancingAlgorithm {
@Override
public String selectServer(List<ServerInfo> servers, String clientIp) {
if (servers.isEmpty()) {
return null;
}

return servers.stream()
.min(Comparator.comparingInt(ServerInfo::getActiveConnections))
.map(ServerInfo::getAddress)
.orElse(null);
}
}

// 加权最少连接算法
private static class WeightedLeastConnectionsAlgorithm implements LoadBalancingAlgorithm {
@Override
public String selectServer(List<ServerInfo> servers, String clientIp) {
if (servers.isEmpty()) {
return null;
}

return servers.stream()
.min(Comparator.comparingDouble(server ->
(double) server.getActiveConnections() / server.getWeight()))
.map(ServerInfo::getAddress)
.orElse(null);
}
}

// 源地址哈希算法
private static class SourceHashingAlgorithm implements LoadBalancingAlgorithm {
@Override
public String selectServer(List<ServerInfo> servers, String clientIp) {
if (servers.isEmpty()) {
return null;
}

int hash = clientIp.hashCode();
int index = Math.abs(hash) % servers.size();
return servers.get(index).getAddress();
}
}

// 目标地址哈希算法
private static class DestinationHashingAlgorithm implements LoadBalancingAlgorithm {
@Override
public String selectServer(List<ServerInfo> servers, String clientIp) {
if (servers.isEmpty()) {
return null;
}

// 这里需要目标地址,暂时使用轮询
int hash = System.currentTimeMillis() % servers.size();
return servers.get(hash).getAddress();
}
}
}

4. VIP漂移机制

4.1 VIP管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// VIP管理器
@Component
@Slf4j
public class VIPManager {

private final NetworkInterfaceManager networkManager;
private final KeepaliveConfig keepaliveConfig;
private final MeterRegistry meterRegistry;

public VIPManager(NetworkInterfaceManager networkManager,
KeepaliveConfig keepaliveConfig,
MeterRegistry meterRegistry) {
this.networkManager = networkManager;
this.keepaliveConfig = keepaliveConfig;
this.meterRegistry = meterRegistry;
}

// 分配VIP
public void allocateVIP(String vip, String interfaceName) {
log.info("Allocating VIP: {} on interface: {}", vip, interfaceName);

try {
// 检查VIP是否已被占用
if (isVIPAllocated(vip)) {
throw new RuntimeException("VIP already allocated: " + vip);
}

// 添加VIP到网络接口
addVIPToInterface(vip, interfaceName);

// 更新路由表
updateRoutingTable(vip, interfaceName);

// 记录VIP分配
recordVIPAllocation(vip, interfaceName);

meterRegistry.counter("vip.allocated").increment();
log.info("VIP allocated successfully: {} on {}", vip, interfaceName);

} catch (Exception e) {
log.error("Error allocating VIP: {}", vip, e);
meterRegistry.counter("vip.allocation_error").increment();
throw new RuntimeException("Failed to allocate VIP", e);
}
}

// 释放VIP
public void releaseVIP(String vip, String interfaceName) {
log.info("Releasing VIP: {} from interface: {}", vip, interfaceName);

try {
// 从网络接口移除VIP
removeVIPFromInterface(vip, interfaceName);

// 更新路由表
removeFromRoutingTable(vip, interfaceName);

// 记录VIP释放
recordVIPRelease(vip, interfaceName);

meterRegistry.counter("vip.released").increment();
log.info("VIP released successfully: {} from {}", vip, interfaceName);

} catch (Exception e) {
log.error("Error releasing VIP: {}", vip, e);
meterRegistry.counter("vip.release_error").increment();
throw new RuntimeException("Failed to release VIP", e);
}
}

// 检查VIP是否已分配
private boolean isVIPAllocated(String vip) {
try {
return networkManager.isIPAddressActive(vip);
} catch (Exception e) {
log.error("Error checking VIP allocation: {}", vip, e);
return false;
}
}

// 添加VIP到网络接口
private void addVIPToInterface(String vip, String interfaceName) {
try {
ProcessBuilder pb = new ProcessBuilder("ip", "addr", "add", vip + "/32", "dev", interfaceName);
Process process = pb.start();

int exitCode = process.waitFor();
if (exitCode != 0) {
throw new RuntimeException("Failed to add VIP to interface");
}

} catch (Exception e) {
log.error("Error adding VIP to interface", e);
throw new RuntimeException("Failed to add VIP to interface", e);
}
}

// 从网络接口移除VIP
private void removeVIPFromInterface(String vip, String interfaceName) {
try {
ProcessBuilder pb = new ProcessBuilder("ip", "addr", "del", vip + "/32", "dev", interfaceName);
Process process = pb.start();

int exitCode = process.waitFor();
if (exitCode != 0) {
throw new RuntimeException("Failed to remove VIP from interface");
}

} catch (Exception e) {
log.error("Error removing VIP from interface", e);
throw new RuntimeException("Failed to remove VIP from interface", e);
}
}

// 更新路由表
private void updateRoutingTable(String vip, String interfaceName) {
try {
// 添加路由规则
ProcessBuilder pb = new ProcessBuilder("ip", "route", "add", vip + "/32", "dev", interfaceName);
Process process = pb.start();

int exitCode = process.waitFor();
if (exitCode != 0) {
log.warn("Failed to add route for VIP: {}", vip);
}

} catch (Exception e) {
log.error("Error updating routing table", e);
}
}

// 从路由表移除
private void removeFromRoutingTable(String vip, String interfaceName) {
try {
// 移除路由规则
ProcessBuilder pb = new ProcessBuilder("ip", "route", "del", vip + "/32", "dev", interfaceName);
Process process = pb.start();

int exitCode = process.waitFor();
if (exitCode != 0) {
log.warn("Failed to remove route for VIP: {}", vip);
}

} catch (Exception e) {
log.error("Error removing from routing table", e);
}
}

// 记录VIP分配
private void recordVIPAllocation(String vip, String interfaceName) {
try {
VIPAllocationRecord record = VIPAllocationRecord.builder()
.vip(vip)
.interfaceName(interfaceName)
.allocatedAt(LocalDateTime.now())
.status(VIPStatus.ALLOCATED)
.build();

// 这里可以将记录保存到数据库或文件
log.info("VIP allocation recorded: {}", record);

} catch (Exception e) {
log.error("Error recording VIP allocation", e);
}
}

// 记录VIP释放
private void recordVIPRelease(String vip, String interfaceName) {
try {
VIPAllocationRecord record = VIPAllocationRecord.builder()
.vip(vip)
.interfaceName(interfaceName)
.releasedAt(LocalDateTime.now())
.status(VIPStatus.RELEASED)
.build();

// 这里可以将记录保存到数据库或文件
log.info("VIP release recorded: {}", record);

} catch (Exception e) {
log.error("Error recording VIP release", e);
}
}

// 获取VIP状态
public VIPStatusInfo getVIPStatus(String vip) {
try {
boolean isActive = networkManager.isIPAddressActive(vip);
String interfaceName = networkManager.getInterfaceForIP(vip);

return VIPStatusInfo.builder()
.vip(vip)
.active(isActive)
.interfaceName(interfaceName)
.timestamp(LocalDateTime.now())
.build();

} catch (Exception e) {
log.error("Error getting VIP status: {}", vip, e);
return VIPStatusInfo.builder()
.vip(vip)
.active(false)
.timestamp(LocalDateTime.now())
.error(e.getMessage())
.build();
}
}

// 获取所有VIP状态
public List<VIPStatusInfo> getAllVIPStatus() {
List<VIPStatusInfo> statusList = new ArrayList<>();

try {
// 获取配置中的所有VIP
List<String> configuredVIPs = keepaliveConfig.getConfiguredVIPs();

for (String vip : configuredVIPs) {
VIPStatusInfo status = getVIPStatus(vip);
statusList.add(status);
}

} catch (Exception e) {
log.error("Error getting all VIP status", e);
}

return statusList;
}
}

4.2 故障转移机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// 故障转移管理器
@Component
@Slf4j
public class FailoverManager {

private final VIPManager vipManager;
private final HealthCheckManager healthCheckManager;
private final NotificationService notificationService;
private final MeterRegistry meterRegistry;

public FailoverManager(VIPManager vipManager,
HealthCheckManager healthCheckManager,
NotificationService notificationService,
MeterRegistry meterRegistry) {
this.vipManager = vipManager;
this.healthCheckManager = healthCheckManager;
this.notificationService = notificationService;
this.meterRegistry = meterRegistry;
}

// 触发故障转移
public void triggerFailover(String instanceId, String reason) {
log.warn("Triggering failover for instance: {}, reason: {}", instanceId, reason);

try {
// 记录故障转移开始
FailoverEvent failoverEvent = FailoverEvent.builder()
.instanceId(instanceId)
.reason(reason)
.startTime(LocalDateTime.now())
.status(FailoverStatus.IN_PROGRESS)
.build();

// 执行故障转移步骤
executeFailoverSteps(failoverEvent);

// 记录故障转移完成
failoverEvent.setEndTime(LocalDateTime.now());
failoverEvent.setStatus(FailoverStatus.COMPLETED);

// 发送通知
sendFailoverNotification(failoverEvent);

meterRegistry.counter("failover.triggered").increment();
log.info("Failover completed successfully for instance: {}", instanceId);

} catch (Exception e) {
log.error("Error during failover for instance: {}", instanceId, e);
meterRegistry.counter("failover.error").increment();

// 发送失败通知
sendFailoverFailureNotification(instanceId, reason, e.getMessage());
}
}

// 执行故障转移步骤
private void executeFailoverSteps(FailoverEvent failoverEvent) {
try {
// 步骤1:降低当前节点优先级
lowerCurrentNodePriority(failoverEvent.getInstanceId());

// 步骤2:释放VIP
releaseVIPFromCurrentNode(failoverEvent.getInstanceId());

// 步骤3:等待VIP释放完成
waitForVIPRelease(failoverEvent.getInstanceId());

// 步骤4:通知其他节点接管
notifyOtherNodesToTakeover(failoverEvent.getInstanceId());

// 步骤5:验证故障转移结果
verifyFailoverResult(failoverEvent.getInstanceId());

} catch (Exception e) {
log.error("Error executing failover steps", e);
throw new RuntimeException("Failed to execute failover steps", e);
}
}

// 降低当前节点优先级
private void lowerCurrentNodePriority(String instanceId) {
log.info("Lowering priority for instance: {}", instanceId);

try {
// 这里可以实现降低优先级的逻辑
// 比如修改keepalived配置文件中的priority值

meterRegistry.counter("failover.priority_lowered").increment();

} catch (Exception e) {
log.error("Error lowering priority", e);
throw new RuntimeException("Failed to lower priority", e);
}
}

// 从当前节点释放VIP
private void releaseVIPFromCurrentNode(String instanceId) {
log.info("Releasing VIP from current node for instance: {}", instanceId);

try {
// 获取当前节点的VIP列表
List<String> currentVIPs = getCurrentNodeVIPs(instanceId);

// 释放所有VIP
for (String vip : currentVIPs) {
vipManager.releaseVIP(vip, getCurrentNodeInterface());
}

meterRegistry.counter("failover.vip_released").increment();

} catch (Exception e) {
log.error("Error releasing VIP from current node", e);
throw new RuntimeException("Failed to release VIP from current node", e);
}
}

// 等待VIP释放完成
private void waitForVIPRelease(String instanceId) {
log.info("Waiting for VIP release completion for instance: {}", instanceId);

try {
int maxWaitTime = 30; // 30秒
int waitInterval = 1; // 1秒

for (int i = 0; i < maxWaitTime; i += waitInterval) {
Thread.sleep(waitInterval * 1000);

// 检查VIP是否已释放
if (isVIPReleased(instanceId)) {
log.info("VIP release completed for instance: {}", instanceId);
return;
}
}

throw new RuntimeException("VIP release timeout");

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("VIP release wait interrupted", e);
} catch (Exception e) {
log.error("Error waiting for VIP release", e);
throw new RuntimeException("Failed to wait for VIP release", e);
}
}

// 通知其他节点接管
private void notifyOtherNodesToTakeover(String instanceId) {
log.info("Notifying other nodes to takeover for instance: {}", instanceId);

try {
// 获取其他节点列表
List<String> otherNodes = getOtherNodes(instanceId);

// 通知每个节点
for (String node : otherNodes) {
notifyNodeToTakeover(node, instanceId);
}

meterRegistry.counter("failover.nodes_notified").increment();

} catch (Exception e) {
log.error("Error notifying other nodes", e);
throw new RuntimeException("Failed to notify other nodes", e);
}
}

// 验证故障转移结果
private void verifyFailoverResult(String instanceId) {
log.info("Verifying failover result for instance: {}", instanceId);

try {
// 检查VIP是否已转移到其他节点
if (isVIPTransferred(instanceId)) {
log.info("Failover verification successful for instance: {}", instanceId);
} else {
throw new RuntimeException("VIP transfer verification failed");
}

} catch (Exception e) {
log.error("Error verifying failover result", e);
throw new RuntimeException("Failed to verify failover result", e);
}
}

// 检查VIP是否已释放
private boolean isVIPReleased(String instanceId) {
try {
List<String> currentVIPs = getCurrentNodeVIPs(instanceId);

for (String vip : currentVIPs) {
VIPStatusInfo status = vipManager.getVIPStatus(vip);
if (status.isActive()) {
return false;
}
}

return true;

} catch (Exception e) {
log.error("Error checking VIP release status", e);
return false;
}
}

// 检查VIP是否已转移
private boolean isVIPTransferred(String instanceId) {
try {
List<String> configuredVIPs = getConfiguredVIPs(instanceId);

for (String vip : configuredVIPs) {
VIPStatusInfo status = vipManager.getVIPStatus(vip);
if (!status.isActive()) {
return false;
}
}

return true;

} catch (Exception e) {
log.error("Error checking VIP transfer status", e);
return false;
}
}

// 获取当前节点VIP列表
private List<String> getCurrentNodeVIPs(String instanceId) {
// 这里可以从配置中获取VIP列表
return Arrays.asList("192.168.1.100", "192.168.1.101");
}

// 获取当前节点网络接口
private String getCurrentNodeInterface() {
return "eth0";
}

// 获取其他节点列表
private List<String> getOtherNodes(String instanceId) {
// 这里可以从配置中获取其他节点列表
return Arrays.asList("node2", "node3");
}

// 通知节点接管
private void notifyNodeToTakeover(String node, String instanceId) {
log.info("Notifying node {} to takeover instance {}", node, instanceId);

try {
// 这里可以实现通知逻辑
// 比如发送HTTP请求、消息队列等

} catch (Exception e) {
log.error("Error notifying node to takeover", e);
}
}

// 获取配置的VIP列表
private List<String> getConfiguredVIPs(String instanceId) {
// 这里可以从配置中获取VIP列表
return Arrays.asList("192.168.1.100", "192.168.1.101");
}

// 发送故障转移通知
private void sendFailoverNotification(FailoverEvent failoverEvent) {
try {
notificationService.sendFailoverNotification(failoverEvent);
meterRegistry.counter("failover.notification_sent").increment();
} catch (Exception e) {
log.error("Error sending failover notification", e);
}
}

// 发送故障转移失败通知
private void sendFailoverFailureNotification(String instanceId, String reason, String error) {
try {
FailoverFailureEvent failureEvent = FailoverFailureEvent.builder()
.instanceId(instanceId)
.reason(reason)
.error(error)
.timestamp(LocalDateTime.now())
.build();

notificationService.sendFailoverFailureNotification(failureEvent);
meterRegistry.counter("failover.failure_notification_sent").increment();
} catch (Exception e) {
log.error("Error sending failover failure notification", e);
}
}
}

5. 监控与告警

5.1 Keepalive监控配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Keepalive监控配置
@Configuration
public class KeepaliveMonitoringConfig {

@Bean
public AlertRule keepaliveProcessDownAlertRule() {
return AlertRule.builder()
.name("Keepalive Process Down")
.description("Keepalive process is not running")
.condition("vrrp.process.running == 0")
.severity(AlertSeverity.CRITICAL)
.enabled(true)
.build();
}

@Bean
public AlertRule vipNotActiveAlertRule() {
return AlertRule.builder()
.name("VIP Not Active")
.description("Virtual IP is not active")
.condition("vrrp.vip.active == 0")
.severity(AlertSeverity.CRITICAL)
.enabled(true)
.build();
}

@Bean
public AlertRule healthCheckFailureAlertRule() {
return AlertRule.builder()
.name("Health Check Failure")
.description("Health check is failing")
.condition("health_check.healthy == 0")
.severity(AlertSeverity.WARNING)
.enabled(true)
.build();
}

@Bean
public AlertRule failoverTriggeredAlertRule() {
return AlertRule.builder()
.name("Failover Triggered")
.description("Failover has been triggered")
.condition("failover.triggered > 0")
.severity(AlertSeverity.WARNING)
.enabled(true)
.build();
}

@Bean
public AlertRule lvsServiceDownAlertRule() {
return AlertRule.builder()
.name("LVS Service Down")
.description("LVS service is not running")
.condition("lvs.service.running == 0")
.severity(AlertSeverity.CRITICAL)
.enabled(true)
.build();
}
}

5.2 健康检查器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Keepalive健康检查器
@Component
@Slf4j
public class KeepaliveHealthChecker implements HealthIndicator {

private final VRRPProtocolManager vrrpManager;
private final VIPManager vipManager;
private final HealthCheckManager healthCheckManager;
private final LVSConfigurationManager lvsManager;

public KeepaliveHealthChecker(VRRPProtocolManager vrrpManager,
VIPManager vipManager,
HealthCheckManager healthCheckManager,
LVSConfigurationManager lvsManager) {
this.vrrpManager = vrrpManager;
this.vipManager = vipManager;
this.healthCheckManager = healthCheckManager;
this.lvsManager = lvsManager;
}

@Override
public Health health() {
Health.Builder builder = new Health.Builder();

try {
// 检查VRRP状态
boolean vrrpHealthy = checkVRRPHealth();
builder.withDetail("vrrp", vrrpHealthy ? "UP" : "DOWN");

// 检查VIP状态
boolean vipHealthy = checkVIPHealth();
builder.withDetail("vip", vipHealthy ? "UP" : "DOWN");

// 检查健康检查状态
boolean healthCheckHealthy = checkHealthCheckStatus();
builder.withDetail("health_check", healthCheckHealthy ? "UP" : "DOWN");

// 检查LVS状态
boolean lvsHealthy = checkLVSHealth();
builder.withDetail("lvs", lvsHealthy ? "UP" : "DOWN");

// 计算总体健康状态
boolean overallHealthy = vrrpHealthy && vipHealthy && healthCheckHealthy && lvsHealthy;

if (overallHealthy) {
builder.up();
} else {
builder.down();
}

} catch (Exception e) {
log.error("Error checking keepalive health", e);
builder.down().withDetail("error", e.getMessage());
}

return builder.build();
}

// 检查VRRP健康状态
private boolean checkVRRPHealth() {
try {
// 检查VRRP进程是否运行
ProcessBuilder pb = new ProcessBuilder("pgrep", "keepalived");
Process process = pb.start();

int exitCode = process.waitFor();
return exitCode == 0;

} catch (Exception e) {
log.error("Error checking VRRP health", e);
return false;
}
}

// 检查VIP健康状态
private boolean checkVIPHealth() {
try {
List<VIPStatusInfo> vipStatusList = vipManager.getAllVIPStatus();

for (VIPStatusInfo status : vipStatusList) {
if (!status.isActive()) {
return false;
}
}

return true;

} catch (Exception e) {
log.error("Error checking VIP health", e);
return false;
}
}

// 检查健康检查状态
private boolean checkHealthCheckStatus() {
try {
Map<String, HealthCheckResult> results = healthCheckManager.getAllHealthCheckResults();

for (HealthCheckResult result : results.values()) {
if (!result.isHealthy()) {
return false;
}
}

return true;

} catch (Exception e) {
log.error("Error checking health check status", e);
return false;
}
}

// 检查LVS健康状态
private boolean checkLVSHealth() {
try {
LVSStatus lvsStatus = lvsManager.getLVSStatus();
return lvsStatus.getError() == null;

} catch (Exception e) {
log.error("Error checking LVS health", e);
return false;
}
}
}

6. 总结

Keepalive作为高可用架构的核心组件,通过VRRP协议、健康检查、LVS负载均衡等技术,为企业级应用提供了可靠的高可用保障。本文从架构师的角度深入分析了Keepalive的实现原理和最佳实践。

6.1 技术优势

  1. 高可用性: 通过VRRP协议实现自动故障转移
  2. 负载均衡: 集成LVS提供强大的负载均衡能力
  3. 健康检查: 多种健康检查机制确保服务可用性
  4. VIP管理: 虚拟IP的自动分配和回收
  5. 监控完善: 全面的监控和告警机制

6.2 实施要点

  1. 配置优化: 根据业务需求合理配置Keepalive参数
  2. 网络规划: 确保网络架构支持VIP漂移
  3. 健康检查: 设计有效的健康检查策略
  4. 监控告警: 建立完善的监控和告警体系
  5. 故障演练: 定期进行故障转移演练

6.3 最佳实践

  1. 多节点部署: 至少部署3个节点确保高可用
  2. 网络隔离: 使用专用网络进行心跳检测
  3. 配置管理: 使用配置管理工具统一管理配置
  4. 日志监控: 建立完善的日志收集和分析系统
  5. 文档维护: 维护完整的架构和运维文档

通过本文的学习,您应该已经掌握了Keepalive的核心技术,能够设计和实现高可用的企业级架构,为业务系统提供可靠的技术保障。