1. 统一RPC通道与gRPC降级概述

统一RPC通道是现代微服务架构的核心组件,通过统一的RPC框架可以简化服务间的通信,提高开发效率。gRPC作为高性能的RPC框架,结合降级策略可以确保系统的高可用性。本文将详细介绍RPC框架设计、gRPC集成、降级策略和服务治理的完整实现。

1.1 核心功能

  1. 统一RPC通道: 统一的RPC调用接口和协议
  2. gRPC集成: gRPC服务端和客户端集成
  3. 降级策略: 服务降级和熔断保护
  4. 负载均衡: 智能负载均衡和故障转移
  5. 监控告警: RPC调用监控和性能分析

1.2 技术架构

1
2
3
4
5
客户端 → RPC通道 → 负载均衡 → 服务端
↓ ↓ ↓ ↓
业务调用 → 协议转换 → 路由选择 → 业务处理
↓ ↓ ↓ ↓
降级策略 → 熔断保护 → 监控告警 → 结果返回

2. RPC框架配置

2.1 RPC配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/**
* RPC框架配置
*/
@Configuration
public class RPCConfig {

@Value("${rpc.registry.type}")
private String registryType;

@Value("${rpc.registry.address}")
private String registryAddress;

@Value("${rpc.provider.timeout}")
private int providerTimeout;

@Value("${rpc.consumer.timeout}")
private int consumerTimeout;

@Value("${rpc.load-balancer.type}")
private String loadBalancerType;

/**
* RPC注册中心
*/
@Bean
public RegistryConfig registryConfig() {
RegistryConfig config = new RegistryConfig();
config.setType(registryType);
config.setAddress(registryAddress);
config.setTimeout(5000);
return config;
}

/**
* RPC提供者配置
*/
@Bean
public ProviderConfig providerConfig() {
ProviderConfig config = new ProviderConfig();
config.setTimeout(providerTimeout);
config.setRetries(3);
config.setLoadBalance(loadBalancerType);
return config;
}

/**
* RPC消费者配置
*/
@Bean
public ConsumerConfig consumerConfig() {
ConsumerConfig config = new ConsumerConfig();
config.setTimeout(consumerTimeout);
config.setRetries(2);
config.setLoadBalance(loadBalancerType);
return config;
}

/**
* gRPC服务器配置
*/
@Bean
public GrpcServerConfig grpcServerConfig() {
GrpcServerConfig config = new GrpcServerConfig();
config.setPort(9090);
config.setMaxInboundMessageSize(4 * 1024 * 1024); // 4MB
config.setMaxInboundMetadataSize(8192); // 8KB
return config;
}

/**
* gRPC客户端配置
*/
@Bean
public GrpcClientConfig grpcClientConfig() {
GrpcClientConfig config = new GrpcClientConfig();
config.setTimeout(3000);
config.setMaxInboundMessageSize(4 * 1024 * 1024); // 4MB
config.setKeepAliveTime(30);
config.setKeepAliveTimeout(5);
return config;
}
}

/**
* RPC注册中心配置
*/
@Data
public class RegistryConfig {
private String type;
private String address;
private int timeout;
private Map<String, Object> parameters = new HashMap<>();
}

/**
* RPC提供者配置
*/
@Data
public class ProviderConfig {
private int timeout;
private int retries;
private String loadBalance;
private Map<String, Object> parameters = new HashMap<>();
}

/**
* RPC消费者配置
*/
@Data
public class ConsumerConfig {
private int timeout;
private int retries;
private String loadBalance;
private Map<String, Object> parameters = new HashMap<>();
}

/**
* gRPC服务器配置
*/
@Data
public class GrpcServerConfig {
private int port;
private int maxInboundMessageSize;
private int maxInboundMetadataSize;
private boolean enableTls;
private String certFile;
private String keyFile;
}

/**
* gRPC客户端配置
*/
@Data
public class GrpcClientConfig {
private int timeout;
private int maxInboundMessageSize;
private int keepAliveTime;
private int keepAliveTimeout;
private boolean enableTls;
private String trustCertFile;
}

2.2 应用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# application.yml
rpc:
registry:
type: nacos
address: localhost:8848
provider:
timeout: 5000
retries: 3
load-balancer: roundrobin
consumer:
timeout: 3000
retries: 2
load-balancer: roundrobin
grpc:
server:
port: 9090
max-inbound-message-size: 4194304
max-inbound-metadata-size: 8192
enable-tls: false
client:
timeout: 3000
max-inbound-message-size: 4194304
keep-alive-time: 30
keep-alive-timeout: 5
enable-tls: false
degradation:
enabled: true
circuit-breaker:
enabled: true
failure-threshold: 5
timeout: 60000
reset-timeout: 30000
fallback:
enabled: true
default-response: "服务暂时不可用"

3. 统一RPC通道服务

3.1 RPC通道服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/**
* 统一RPC通道服务
*/
@Service
public class RPCChannelService {

@Autowired
private RegistryConfig registryConfig;

@Autowired
private ProviderConfig providerConfig;

@Autowired
private ConsumerConfig consumerConfig;

@Autowired
private RPCLoadBalancer loadBalancer;

@Autowired
private RPCDegradationService degradationService;

private final Map<String, RPCChannel> channels = new ConcurrentHashMap<>();

/**
* 获取RPC通道
* @param serviceName 服务名称
* @return RPC通道
*/
public RPCChannel getChannel(String serviceName) {
return channels.computeIfAbsent(serviceName, this::createChannel);
}

/**
* 创建RPC通道
* @param serviceName 服务名称
* @return RPC通道
*/
private RPCChannel createChannel(String serviceName) {
try {
// 1. 从注册中心获取服务实例
List<ServiceInstance> instances = getServiceInstances(serviceName);

// 2. 创建通道
RPCChannel channel = new RPCChannel();
channel.setServiceName(serviceName);
channel.setInstances(instances);
channel.setLoadBalancer(loadBalancer);
channel.setDegradationService(degradationService);

// 3. 初始化通道
channel.initialize();

log.info("创建RPC通道成功: serviceName={}, instances={}", serviceName, instances.size());

return channel;

} catch (Exception e) {
log.error("创建RPC通道失败: serviceName={}", serviceName, e);
throw new BusinessException("创建RPC通道失败: " + e.getMessage());
}
}

/**
* 从注册中心获取服务实例
* @param serviceName 服务名称
* @return 服务实例列表
*/
private List<ServiceInstance> getServiceInstances(String serviceName) {
try {
// 这里可以根据不同的注册中心类型实现
switch (registryConfig.getType()) {
case "nacos":
return getNacosInstances(serviceName);
case "zookeeper":
return getZookeeperInstances(serviceName);
case "consul":
return getConsulInstances(serviceName);
default:
throw new BusinessException("不支持的注册中心类型: " + registryConfig.getType());
}
} catch (Exception e) {
log.error("获取服务实例失败: serviceName={}", serviceName, e);
return Collections.emptyList();
}
}

/**
* 获取Nacos服务实例
*/
private List<ServiceInstance> getNacosInstances(String serviceName) {
// 实现Nacos服务发现逻辑
return Collections.emptyList();
}

/**
* 获取Zookeeper服务实例
*/
private List<ServiceInstance> getZookeeperInstances(String serviceName) {
// 实现Zookeeper服务发现逻辑
return Collections.emptyList();
}

/**
* 获取Consul服务实例
*/
private List<ServiceInstance> getConsulInstances(String serviceName) {
// 实现Consul服务发现逻辑
return Collections.emptyList();
}

/**
* 调用RPC服务
* @param serviceName 服务名称
* @param methodName 方法名称
* @param parameters 参数
* @return 调用结果
*/
public RPCResult call(String serviceName, String methodName, Object... parameters) {
try {
// 1. 获取RPC通道
RPCChannel channel = getChannel(serviceName);

// 2. 检查服务是否降级
if (degradationService.isServiceDegraded(serviceName)) {
return degradationService.getFallbackResult(serviceName, methodName);
}

// 3. 执行RPC调用
return channel.call(methodName, parameters);

} catch (Exception e) {
log.error("RPC调用失败: serviceName={}, methodName={}", serviceName, methodName, e);

// 4. 记录失败并触发降级
degradationService.recordFailure(serviceName, methodName);

return degradationService.getFallbackResult(serviceName, methodName);
}
}

/**
* 异步调用RPC服务
* @param serviceName 服务名称
* @param methodName 方法名称
* @param parameters 参数
* @return 异步结果
*/
public CompletableFuture<RPCResult> callAsync(String serviceName, String methodName, Object... parameters) {
return CompletableFuture.supplyAsync(() -> call(serviceName, methodName, parameters));
}
}

/**
* RPC通道
*/
@Data
public class RPCChannel {
private String serviceName;
private List<ServiceInstance> instances;
private RPCLoadBalancer loadBalancer;
private RPCDegradationService degradationService;
private Map<String, Object> metadata = new HashMap<>();

/**
* 初始化通道
*/
public void initialize() {
// 初始化通道相关资源
log.debug("初始化RPC通道: serviceName={}", serviceName);
}

/**
* 调用RPC方法
* @param methodName 方法名称
* @param parameters 参数
* @return 调用结果
*/
public RPCResult call(String methodName, Object... parameters) {
try {
// 1. 选择服务实例
ServiceInstance instance = loadBalancer.select(instances, serviceName);
if (instance == null) {
throw new BusinessException("没有可用的服务实例");
}

// 2. 执行调用
return executeCall(instance, methodName, parameters);

} catch (Exception e) {
log.error("RPC调用失败: serviceName={}, methodName={}", serviceName, methodName, e);
throw e;
}
}

/**
* 执行RPC调用
* @param instance 服务实例
* @param methodName 方法名称
* @param parameters 参数
* @return 调用结果
*/
private RPCResult executeCall(ServiceInstance instance, String methodName, Object... parameters) {
try {
// 这里可以实现具体的RPC调用逻辑
// 例如:HTTP调用、gRPC调用等

long startTime = System.currentTimeMillis();

// 模拟RPC调用
Object result = simulateRPCCall(instance, methodName, parameters);

long responseTime = System.currentTimeMillis() - startTime;

// 记录成功调用
degradationService.recordSuccess(serviceName, methodName, responseTime);

return RPCResult.success(result);

} catch (Exception e) {
// 记录失败调用
degradationService.recordFailure(serviceName, methodName);
throw e;
}
}

/**
* 模拟RPC调用
*/
private Object simulateRPCCall(ServiceInstance instance, String methodName, Object... parameters) {
// 这里可以实现具体的RPC调用逻辑
return "RPC调用结果: " + methodName;
}
}

/**
* 服务实例
*/
@Data
public class ServiceInstance {
private String host;
private int port;
private String protocol;
private Map<String, Object> metadata = new HashMap<>();
private boolean healthy = true;
private long lastHeartbeat;

public String getAddress() {
return host + ":" + port;
}
}

/**
* RPC结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class RPCResult {
private boolean success;
private Object data;
private String errorMessage;
private long responseTime;
private String serviceName;
private String methodName;

public static RPCResult success(Object data) {
return RPCResult.builder()
.success(true)
.data(data)
.build();
}

public static RPCResult error(String errorMessage) {
return RPCResult.builder()
.success(false)
.errorMessage(errorMessage)
.build();
}
}

4. gRPC集成服务

4.1 gRPC服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
/**
* gRPC服务端
*/
@Component
public class GrpcServer {

@Autowired
private GrpcServerConfig grpcServerConfig;

@Autowired
private ApplicationContext applicationContext;

private Server server;

/**
* 启动gRPC服务器
*/
@PostConstruct
public void start() throws IOException {
try {
ServerBuilder<?> serverBuilder = ServerBuilder.forPort(grpcServerConfig.getPort());

// 配置消息大小
serverBuilder.maxInboundMessageSize(grpcServerConfig.getMaxInboundMessageSize());
serverBuilder.maxInboundMetadataSize(grpcServerConfig.getMaxInboundMetadataSize());

// 注册服务
registerServices(serverBuilder);

// 构建并启动服务器
server = serverBuilder.build().start();

log.info("gRPC服务器启动成功: port={}", grpcServerConfig.getPort());

// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));

} catch (Exception e) {
log.error("gRPC服务器启动失败", e);
throw new RuntimeException("gRPC服务器启动失败", e);
}
}

/**
* 注册gRPC服务
*/
private void registerServices(ServerBuilder<?> serverBuilder) {
// 获取所有gRPC服务实现
Map<String, Object> grpcServices = applicationContext.getBeansWithAnnotation(GrpcService.class);

for (Object service : grpcServices.values()) {
if (service instanceof BindableService) {
serverBuilder.addService((BindableService) service);
log.info("注册gRPC服务: {}", service.getClass().getSimpleName());
}
}
}

/**
* 停止gRPC服务器
*/
@PreDestroy
public void stop() {
if (server != null && !server.isShutdown()) {
server.shutdown();
try {
if (!server.awaitTermination(30, TimeUnit.SECONDS)) {
server.shutdownNow();
}
} catch (InterruptedException e) {
server.shutdownNow();
Thread.currentThread().interrupt();
}
log.info("gRPC服务器已停止");
}
}

/**
* 等待服务器终止
*/
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
}

/**
* gRPC服务注解
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GrpcService {
String value() default "";
}

/**
* 用户服务gRPC实现
*/
@GrpcService
public class UserServiceGrpcImpl extends UserServiceGrpc.UserServiceImplBase {

@Autowired
private UserService userService;

@Override
public void getUser(UserRequest request, StreamObserver<UserResponse> responseObserver) {
try {
// 1. 参数验证
if (request.getUserId() <= 0) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("用户ID无效")
.asRuntimeException());
return;
}

// 2. 调用业务服务
User user = userService.getUserById(request.getUserId());

// 3. 构建响应
UserResponse response = UserResponse.newBuilder()
.setUserId(user.getId())
.setUsername(user.getUsername())
.setEmail(user.getEmail())
.setStatus(user.getStatus())
.build();

// 4. 返回响应
responseObserver.onNext(response);
responseObserver.onCompleted();

} catch (Exception e) {
log.error("获取用户信息失败: userId={}", request.getUserId(), e);
responseObserver.onError(Status.INTERNAL
.withDescription("获取用户信息失败: " + e.getMessage())
.asRuntimeException());
}
}

@Override
public void createUser(CreateUserRequest request, StreamObserver<UserResponse> responseObserver) {
try {
// 1. 参数验证
if (StringUtils.isEmpty(request.getUsername())) {
responseObserver.onError(Status.INVALID_ARGUMENT
.withDescription("用户名不能为空")
.asRuntimeException());
return;
}

// 2. 创建用户
User user = new User();
user.setUsername(request.getUsername());
user.setEmail(request.getEmail());
user.setStatus("ACTIVE");

User createdUser = userService.createUser(user);

// 3. 构建响应
UserResponse response = UserResponse.newBuilder()
.setUserId(createdUser.getId())
.setUsername(createdUser.getUsername())
.setEmail(createdUser.getEmail())
.setStatus(createdUser.getStatus())
.build();

// 4. 返回响应
responseObserver.onNext(response);
responseObserver.onCompleted();

} catch (Exception e) {
log.error("创建用户失败: username={}", request.getUsername(), e);
responseObserver.onError(Status.INTERNAL
.withDescription("创建用户失败: " + e.getMessage())
.asRuntimeException());
}
}
}

4.2 gRPC客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/**
* gRPC客户端管理器
*/
@Component
public class GrpcClientManager {

@Autowired
private GrpcClientConfig grpcClientConfig;

private final Map<String, ManagedChannel> channels = new ConcurrentHashMap<>();

/**
* 获取gRPC通道
* @param serviceName 服务名称
* @return gRPC通道
*/
public ManagedChannel getChannel(String serviceName) {
return channels.computeIfAbsent(serviceName, this::createChannel);
}

/**
* 创建gRPC通道
* @param serviceName 服务名称
* @return gRPC通道
*/
private ManagedChannel createChannel(String serviceName) {
try {
// 这里可以根据服务名称获取服务地址
String serviceAddress = getServiceAddress(serviceName);

ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget(serviceAddress)
.maxInboundMessageSize(grpcClientConfig.getMaxInboundMessageSize())
.keepAliveTime(grpcClientConfig.getKeepAliveTime(), TimeUnit.SECONDS)
.keepAliveTimeout(grpcClientConfig.getKeepAliveTimeout(), TimeUnit.SECONDS)
.keepAliveWithoutCalls(true)
.usePlaintext(); // 生产环境建议使用TLS

ManagedChannel channel = builder.build();

log.info("创建gRPC通道成功: serviceName={}, address={}", serviceName, serviceAddress);

return channel;

} catch (Exception e) {
log.error("创建gRPC通道失败: serviceName={}", serviceName, e);
throw new BusinessException("创建gRPC通道失败: " + e.getMessage());
}
}

/**
* 获取服务地址
* @param serviceName 服务名称
* @return 服务地址
*/
private String getServiceAddress(String serviceName) {
// 这里可以实现服务发现逻辑
// 例如:从注册中心获取服务地址
return "localhost:9090";
}

/**
* 关闭所有通道
*/
@PreDestroy
public void shutdown() {
channels.values().forEach(channel -> {
if (!channel.isShutdown()) {
channel.shutdown();
try {
if (!channel.awaitTermination(30, TimeUnit.SECONDS)) {
channel.shutdownNow();
}
} catch (InterruptedException e) {
channel.shutdownNow();
Thread.currentThread().interrupt();
}
}
});
log.info("所有gRPC通道已关闭");
}
}

/**
* gRPC客户端服务
*/
@Service
public class GrpcClientService {

@Autowired
private GrpcClientManager grpcClientManager;

@Autowired
private RPCDegradationService degradationService;

/**
* 调用用户服务
* @param userId 用户ID
* @return 用户信息
*/
public User getUser(Long userId) {
try {
// 1. 检查服务是否降级
if (degradationService.isServiceDegraded("user-service")) {
return degradationService.getFallbackUser(userId);
}

// 2. 获取gRPC通道
ManagedChannel channel = grpcClientManager.getChannel("user-service");

// 3. 创建客户端存根
UserServiceGrpc.UserServiceBlockingStub stub = UserServiceGrpc.newBlockingStub(channel);

// 4. 构建请求
UserRequest request = UserRequest.newBuilder()
.setUserId(userId)
.build();

// 5. 执行调用
long startTime = System.currentTimeMillis();
UserResponse response = stub.getUser(request);
long responseTime = System.currentTimeMillis() - startTime;

// 6. 记录成功调用
degradationService.recordSuccess("user-service", "getUser", responseTime);

// 7. 转换响应
return convertToUser(response);

} catch (Exception e) {
log.error("调用用户服务失败: userId={}", userId, e);

// 8. 记录失败调用
degradationService.recordFailure("user-service", "getUser");

// 9. 返回降级结果
return degradationService.getFallbackUser(userId);
}
}

/**
* 创建用户
* @param username 用户名
* @param email 邮箱
* @return 用户信息
*/
public User createUser(String username, String email) {
try {
// 1. 检查服务是否降级
if (degradationService.isServiceDegraded("user-service")) {
return degradationService.getFallbackUser(username);
}

// 2. 获取gRPC通道
ManagedChannel channel = grpcClientManager.getChannel("user-service");

// 3. 创建客户端存根
UserServiceGrpc.UserServiceBlockingStub stub = UserServiceGrpc.newBlockingStub(channel);

// 4. 构建请求
CreateUserRequest request = CreateUserRequest.newBuilder()
.setUsername(username)
.setEmail(email)
.build();

// 5. 执行调用
long startTime = System.currentTimeMillis();
UserResponse response = stub.createUser(request);
long responseTime = System.currentTimeMillis() - startTime;

// 6. 记录成功调用
degradationService.recordSuccess("user-service", "createUser", responseTime);

// 7. 转换响应
return convertToUser(response);

} catch (Exception e) {
log.error("创建用户失败: username={}", username, e);

// 8. 记录失败调用
degradationService.recordFailure("user-service", "createUser");

// 9. 返回降级结果
return degradationService.getFallbackUser(username);
}
}

/**
* 转换gRPC响应为用户对象
* @param response gRPC响应
* @return 用户对象
*/
private User convertToUser(UserResponse response) {
User user = new User();
user.setId(response.getUserId());
user.setUsername(response.getUsername());
user.setEmail(response.getEmail());
user.setStatus(response.getStatus());
return user;
}
}

5. RPC降级服务

5.1 RPC降级服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/**
* RPC降级服务
*/
@Service
public class RPCDegradationService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private final Map<String, CircuitBreaker> circuitBreakers = new ConcurrentHashMap<>();

/**
* 检查服务是否降级
* @param serviceName 服务名称
* @return 是否降级
*/
public boolean isServiceDegraded(String serviceName) {
// 1. 检查手动降级
if (isManuallyDegraded(serviceName)) {
return true;
}

// 2. 检查熔断器状态
CircuitBreaker circuitBreaker = circuitBreakers.get(serviceName);
if (circuitBreaker != null && circuitBreaker.isOpen()) {
return true;
}

return false;
}

/**
* 检查手动降级
* @param serviceName 服务名称
* @return 是否手动降级
*/
private boolean isManuallyDegraded(String serviceName) {
String key = "rpc:degradation:manual:" + serviceName;
Object degraded = redisTemplate.opsForValue().get(key);
return degraded != null && (Boolean) degraded;
}

/**
* 设置手动降级
* @param serviceName 服务名称
* @param degraded 是否降级
* @param expireSeconds 过期时间(秒)
*/
public void setManualDegradation(String serviceName, boolean degraded, int expireSeconds) {
String key = "rpc:degradation:manual:" + serviceName;
redisTemplate.opsForValue().set(key, degraded, Duration.ofSeconds(expireSeconds));

log.info("设置手动降级: serviceName={}, degraded={}, expireSeconds={}",
serviceName, degraded, expireSeconds);
}

/**
* 记录成功调用
* @param serviceName 服务名称
* @param methodName 方法名称
* @param responseTime 响应时间
*/
public void recordSuccess(String serviceName, String methodName, long responseTime) {
try {
// 1. 更新熔断器状态
CircuitBreaker circuitBreaker = getOrCreateCircuitBreaker(serviceName);
circuitBreaker.recordSuccess();

// 2. 记录调用统计
recordCallStats(serviceName, methodName, true, responseTime);

} catch (Exception e) {
log.error("记录成功调用失败: serviceName={}, methodName={}", serviceName, methodName, e);
}
}

/**
* 记录失败调用
* @param serviceName 服务名称
* @param methodName 方法名称
*/
public void recordFailure(String serviceName, String methodName) {
try {
// 1. 更新熔断器状态
CircuitBreaker circuitBreaker = getOrCreateCircuitBreaker(serviceName);
circuitBreaker.recordFailure();

// 2. 记录调用统计
recordCallStats(serviceName, methodName, false, 0);

} catch (Exception e) {
log.error("记录失败调用失败: serviceName={}, methodName={}", serviceName, methodName, e);
}
}

/**
* 获取或创建熔断器
* @param serviceName 服务名称
* @return 熔断器
*/
private CircuitBreaker getOrCreateCircuitBreaker(String serviceName) {
return circuitBreakers.computeIfAbsent(serviceName, name -> new CircuitBreaker(name));
}

/**
* 记录调用统计
* @param serviceName 服务名称
* @param methodName 方法名称
* @param success 是否成功
* @param responseTime 响应时间
*/
private void recordCallStats(String serviceName, String methodName, boolean success, long responseTime) {
try {
String key = "rpc:stats:" + serviceName + ":" + methodName;

// 记录调用次数
redisTemplate.opsForHash().increment(key, "totalCalls", 1);

if (success) {
// 记录成功次数
redisTemplate.opsForHash().increment(key, "successCalls", 1);

// 记录响应时间
redisTemplate.opsForList().leftPush(key + ":responseTimes", responseTime);
redisTemplate.opsForList().trim(key + ":responseTimes", 0, 99); // 保留最近100次
} else {
// 记录失败次数
redisTemplate.opsForHash().increment(key, "failureCalls", 1);
}

// 设置过期时间
redisTemplate.expire(key, Duration.ofMinutes(10));
redisTemplate.expire(key + ":responseTimes", Duration.ofMinutes(10));

} catch (Exception e) {
log.error("记录调用统计失败: serviceName={}, methodName={}", serviceName, methodName, e);
}
}

/**
* 获取降级结果
* @param serviceName 服务名称
* @param methodName 方法名称
* @return 降级结果
*/
public RPCResult getFallbackResult(String serviceName, String methodName) {
try {
// 1. 尝试获取自定义降级结果
String key = "rpc:fallback:" + serviceName + ":" + methodName;
Object fallbackResult = redisTemplate.opsForValue().get(key);

if (fallbackResult != null) {
return RPCResult.success(fallbackResult);
}

// 2. 返回默认降级结果
return getDefaultFallbackResult(serviceName, methodName);

} catch (Exception e) {
log.error("获取降级结果失败: serviceName={}, methodName={}", serviceName, methodName, e);
return getDefaultFallbackResult(serviceName, methodName);
}
}

/**
* 获取默认降级结果
* @param serviceName 服务名称
* @param methodName 方法名称
* @return 默认降级结果
*/
private RPCResult getDefaultFallbackResult(String serviceName, String methodName) {
String message = "服务暂时不可用: " + serviceName + "." + methodName;
return RPCResult.error(message);
}

/**
* 获取降级用户
* @param userId 用户ID
* @return 降级用户
*/
public User getFallbackUser(Long userId) {
User user = new User();
user.setId(userId);
user.setUsername("降级用户");
user.setEmail("fallback@example.com");
user.setStatus("DEGRADED");
return user;
}

/**
* 获取降级用户
* @param username 用户名
* @return 降级用户
*/
public User getFallbackUser(String username) {
User user = new User();
user.setId(0L);
user.setUsername(username);
user.setEmail("fallback@example.com");
user.setStatus("DEGRADED");
return user;
}
}

/**
* 熔断器
*/
@Data
public class CircuitBreaker {
private String serviceName;
private CircuitState state = CircuitState.CLOSED;
private int failureCount = 0;
private int successCount = 0;
private long lastFailureTime = 0;
private int failureThreshold = 5;
private long timeout = 60000; // 60秒
private long resetTimeout = 30000; // 30秒

public CircuitBreaker(String serviceName) {
this.serviceName = serviceName;
}

/**
* 记录成功调用
*/
public void recordSuccess() {
successCount++;

if (state == CircuitState.HALF_OPEN) {
// 半开状态下,成功调用达到阈值则关闭熔断器
if (successCount >= failureThreshold) {
state = CircuitState.CLOSED;
failureCount = 0;
successCount = 0;
log.info("熔断器关闭: serviceName={}", serviceName);
}
}
}

/**
* 记录失败调用
*/
public void recordFailure() {
failureCount++;
lastFailureTime = System.currentTimeMillis();

if (state == CircuitState.CLOSED && failureCount >= failureThreshold) {
// 关闭状态下,失败次数达到阈值则开启熔断器
state = CircuitState.OPEN;
log.warn("熔断器开启: serviceName={}, failureCount={}", serviceName, failureCount);
} else if (state == CircuitState.HALF_OPEN) {
// 半开状态下,失败调用则重新开启熔断器
state = CircuitState.OPEN;
log.warn("熔断器重新开启: serviceName={}", serviceName);
}
}

/**
* 检查熔断器是否开启
* @return 是否开启
*/
public boolean isOpen() {
if (state == CircuitState.OPEN) {
// 检查是否应该进入半开状态
if (System.currentTimeMillis() - lastFailureTime > resetTimeout) {
state = CircuitState.HALF_OPEN;
successCount = 0;
log.info("熔断器进入半开状态: serviceName={}", serviceName);
}
}

return state == CircuitState.OPEN;
}
}

/**
* 熔断器状态
*/
public enum CircuitState {
CLOSED, // 关闭
OPEN, // 开启
HALF_OPEN // 半开
}

6. 总结

通过统一RPC通道与gRPC降级的实现,我们成功构建了一个高性能、高可用的RPC通信框架。关键特性包括:

6.1 核心优势

  1. 统一RPC通道: 统一的RPC调用接口和协议
  2. gRPC集成: gRPC服务端和客户端集成
  3. 降级策略: 服务降级和熔断保护
  4. 负载均衡: 智能负载均衡和故障转移
  5. 监控告警: RPC调用监控和性能分析

6.2 最佳实践

  1. 框架设计: 统一的RPC框架和协议
  2. gRPC集成: 高性能的gRPC服务端和客户端
  3. 降级策略: 完善的降级机制和熔断保护
  4. 负载均衡: 智能的负载均衡和故障转移
  5. 监控告警: 全面的监控和性能分析

这套统一RPC通道与gRPC降级方案不仅能够提供高性能的RPC通信能力,还通过降级策略确保了系统的高可用性,是现代微服务架构的重要基础设施。