用户取换还电Java微服务后端架构实战

1. 架构概述

用户取换还电系统是hd平台的核心模块,需要支持换电柜查询、换电操作(取电/换电/退电)、设备控制指令下发、订单资产更新等关键功能。本篇文章将深入讲解如何基于Java微服务架构实现一个高性能、高可用、实时响应的用户取换还电系统。

1.1 系统架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
用户端 → 用户网关 → 设备服务 → 数据库

身份认证

附近换电柜分布/换电柜详情请求处理

返回数据结果

用户端 → 用户网关 → 订单服务 → 设备服务 → 接入服务

身份认证

请求是否允许换电操作

换电操作请求处理(与接入端交互)

下发设备控制指令

接收设备控制指令响应

请求更新订单信息, 资产状态,记录换电操作记录

返回换电操作已处理提示

用户端 → 用户网关 → 设备服务

身份认证

查询换电操作结果请求处理

返回换电操作查询结果

1.2 核心组件

  • 用户网关(User Gateway):负责用户请求的接入、身份认证、请求路由、流程编排
  • 设备服务(Device Service):负责换电柜管理、换电操作处理、设备控制指令管理
  • 订单服务(Order Service):负责订单管理、换电权限校验、订单资产更新、换电操作记录
  • 接入服务(Access Service):负责与物理设备交互、设备控制指令下发、设备响应接收
  • 数据库(MySQL):持久化换电柜信息、换电操作记录、订单资产信息
  • 消息队列(Kafka/RocketMQ):异步处理设备响应和订单更新
  • MQTT服务:与物理设备实时通信
  • 分布式锁(Redisson):保证换电操作的并发安全

2. 用户网关服务实现

2.1 用户网关服务核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/**
* 用户网关服务
* 负责用户请求的接入、身份认证、请求路由、流程编排
*/
@RestController
@RequestMapping("/api/user/gateway")
@Slf4j
public class UserGatewayController {

@Autowired
private AuthService authService;

@Autowired
private DeviceServiceClient deviceServiceClient;

@Autowired
private OrderServiceClient orderServiceClient;

/**
* 附近换电柜分布/换电柜详情请求
* 流程:身份认证 → 附近换电柜分布/换电柜详情请求处理 → 返回数据结果
*/
@PostMapping("/cabinet/nearby")
public Result<CabinetListResult> getNearbyCabinets(
@RequestHeader("Authorization") String token,
@RequestBody CabinetQueryRequest request) {

try {
// 1. 身份认证
UserInfo userInfo = authService.authenticate(token);
if (userInfo == null) {
return Result.error("身份认证失败");
}

// 2. 调用设备服务查询换电柜
CabinetListResult result = deviceServiceClient.getNearbyCabinets(request);

return Result.success(result);

} catch (Exception e) {
log.error("查询附近换电柜失败: error={}", e.getMessage(), e);
return Result.error("查询失败: " + e.getMessage());
}
}

/**
* 换电柜详情请求
*/
@GetMapping("/cabinet/{cabinetId}")
public Result<CabinetDetailResult> getCabinetDetail(
@RequestHeader("Authorization") String token,
@PathVariable Long cabinetId) {

try {
// 1. 身份认证
UserInfo userInfo = authService.authenticate(token);
if (userInfo == null) {
return Result.error("身份认证失败");
}

// 2. 调用设备服务查询换电柜详情
CabinetDetailResult result = deviceServiceClient.getCabinetDetail(cabinetId);

return Result.success(result);

} catch (Exception e) {
log.error("查询换电柜详情失败: cabinetId={}, error={}",
cabinetId, e.getMessage(), e);
return Result.error("查询失败: " + e.getMessage());
}
}

/**
* 换电操作(取/换/退)请求
* 流程:身份认证 → 请求是否允许换电操作 → 换电操作请求处理 →
* 下发设备控制指令 → 接收设备控制指令响应 →
* 请求更新订单信息, 资产状态,记录换电操作记录 → 返回换电操作已处理提示
*/
@PostMapping("/battery/operation")
public Result<BatteryOperationResult> batteryOperation(
@RequestHeader("Authorization") String token,
@RequestBody BatteryOperationRequest request) {

try {
// 1. 身份认证
UserInfo userInfo = authService.authenticate(token);
if (userInfo == null) {
return Result.error("身份认证失败");
}

// 2. 设置用户ID
request.setUserId(userInfo.getUserId());

// 3. 请求是否允许换电操作
BatteryOperationCheckRequest checkRequest = new BatteryOperationCheckRequest();
checkRequest.setUserId(userInfo.getUserId());
checkRequest.setOperationType(request.getOperationType());
checkRequest.setCabinetId(request.getCabinetId());

BatteryOperationCheckResult checkResult =
orderServiceClient.checkBatteryOperationAllowed(checkRequest);
if (!checkResult.isAllowed()) {
return Result.error(checkResult.getMessage());
}

// 4. 换电操作请求处理(与接入端交互)
/**
* 创建电池操作处理请求对象
* 并设置相关操作参数
*/
// 创建电池操作处理请求对象实例
BatteryOperationProcessRequest processRequest = new BatteryOperationProcessRequest();
// 设置用户ID,从用户信息对象中获取
processRequest.setUserId(userInfo.getUserId());
// 设置操作类型,从请求对象中获取
processRequest.setOperationType(request.getOperationType());
// 设置机柜ID,从请求对象中获取
processRequest.setCabinetId(request.getCabinetId());
// 设置插槽编号,从请求对象中获取
processRequest.setSlotNo(request.getSlotNo());
// 设置电池ID,从请求对象中获取
processRequest.setBatteryId(request.getBatteryId());

BatteryOperationResult result = deviceServiceClient.processBatteryOperation(processRequest);

if (!result.isSuccess()) {
return Result.error(result.getMessage());
}

log.info("换电操作处理成功: userId={}, operationType={}, cabinetId={}, operationId={}",
userInfo.getUserId(), request.getOperationType(),
request.getCabinetId(), result.getOperationId());

return Result.success(result);

} catch (Exception e) {
log.error("换电操作失败: operationType={}, cabinetId={}, error={}",
request.getOperationType(), request.getCabinetId(), e.getMessage(), e);
return Result.error("换电操作失败: " + e.getMessage());
}
}

/**
* 查询换电操作结果请求
* 流程:身份认证 → 查询换电操作结果请求处理 → 返回换电操作查询结果
*/
@GetMapping("/battery/operation/{operationId}")
public Result<BatteryOperationQueryResult> getBatteryOperationResult(
@RequestHeader("Authorization") String token,
@PathVariable Long operationId) {

try {
// 1. 身份认证
UserInfo userInfo = authService.authenticate(token);
if (userInfo == null) {
return Result.error("身份认证失败");
}

// 2. 调用设备服务查询换电操作结果
BatteryOperationQueryResult result =
deviceServiceClient.getBatteryOperationResult(operationId, userInfo.getUserId());

return Result.success(result);

} catch (Exception e) {
log.error("查询换电操作结果失败: operationId={}, error={}",
operationId, e.getMessage(), e);
return Result.error("查询失败: " + e.getMessage());
}
}
}

2.2 身份认证服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* 身份认证服务
*/
@Service
@Slf4j
public class AuthService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private UserServiceClient userServiceClient;

/**
* 身份认证
* 验证Token有效性
*/
public UserInfo authenticate(String token) {
try {
// 1. 从Token中解析用户信息
String userId = parseToken(token);
if (StringUtils.isEmpty(userId)) {
return null;
}

// 2. 从缓存中获取用户信息
String userCacheKey = "user:info:" + userId;
UserInfo userInfo = (UserInfo) redisTemplate.opsForValue().get(userCacheKey);

if (userInfo != null) {
return userInfo;
}

// 3. 缓存未命中,调用用户服务查询
userInfo = userServiceClient.getUserInfo(userId);

// 4. 将用户信息写入缓存
if (userInfo != null) {
redisTemplate.opsForValue().set(userCacheKey, userInfo, 30, TimeUnit.MINUTES);
}

return userInfo;

} catch (Exception e) {
log.error("身份认证失败: token={}, error={}", token, e.getMessage(), e);
return null;
}
}

/**
* 解析Token
*/
private String parseToken(String token) {
// JWT Token解析逻辑
if (StringUtils.isEmpty(token) || !token.startsWith("Bearer ")) {
return null;
}

String jwtToken = token.substring(7);
// 解析JWT获取userId
return extractUserIdFromJWT(jwtToken);
}

private String extractUserIdFromJWT(String jwtToken) {
// JWT解析实现
// 实际实现需要使用JWT库
return "user123";
}
}

3. 设备服务实现

3.1 设备服务核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/**
* 设备服务
* 负责换电柜管理、换电操作处理、设备控制指令管理
*/
@Service
@Slf4j
public class DeviceService {

@Autowired
private BatteryCabinetMapper batteryCabinetMapper;

@Autowired
private BatteryOperationMapper batteryOperationMapper;

@Autowired
private AccessServiceClient accessServiceClient;

@Autowired
private OrderServiceClient orderServiceClient;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private RedissonClient redissonClient;

/**
* 附近换电柜分布/换电柜详情请求处理
*/
public CabinetListResult getNearbyCabinets(CabinetQueryRequest request) {
try {
// 1. 根据地理位置查询附近换电柜
List<BatteryCabinet> cabinets = batteryCabinetMapper.selectNearbyCabinets(
request.getLatitude(), request.getLongitude(), request.getRadius());

// 2. 转换为返回对象
List<CabinetInfo> cabinetInfoList = cabinets.stream()
.map(this::convertToCabinetInfo)
.collect(Collectors.toList());

// 3. 构建返回结果
CabinetListResult result = new CabinetListResult();
result.setSuccess(true);
result.setCabinets(cabinetInfoList);
result.setTotal(cabinetInfoList.size());
result.setMessage("查询成功");

return result;

} catch (Exception e) {
log.error("查询附近换电柜失败: error={}", e.getMessage(), e);
return CabinetListResult.failed("查询失败: " + e.getMessage());
}
}

/**
* 换电柜详情请求处理
*/
public CabinetDetailResult getCabinetDetail(Long cabinetId) {
try {
// 1. 查询换电柜信息
BatteryCabinet cabinet = batteryCabinetMapper.selectById(cabinetId);
if (cabinet == null) {
return CabinetDetailResult.failed("换电柜不存在");
}

// 2. 查询换电柜槽位信息
List<CabinetSlot> slots = batteryCabinetMapper.selectCabinetSlots(cabinetId);

// 3. 构建返回结果
CabinetDetailResult result = new CabinetDetailResult();
result.setSuccess(true);
result.setCabinet(convertToCabinetInfo(cabinet));
result.setSlots(slots);
result.setMessage("查询成功");

return result;

} catch (Exception e) {
log.error("查询换电柜详情失败: cabinetId={}, error={}",
cabinetId, e.getMessage(), e);
return CabinetDetailResult.failed("查询失败: " + e.getMessage());
}
}

/**
* 换电操作请求处理(与接入端交互)
*/
@Transactional(rollbackFor = Exception.class)
public BatteryOperationResult processBatteryOperation(
BatteryOperationProcessRequest request) {

String lockKey = "battery:operation:lock:" + request.getCabinetId() + ":" + request.getSlotNo();
RLock lock = redissonClient.getLock(lockKey);

try {
if (lock.tryLock(10, TimeUnit.SECONDS)) {
// 1. 创建换电操作记录
BatteryOperation operation = createBatteryOperation(request);
batteryOperationMapper.insert(operation);

// 2. 下发设备控制指令
DeviceControlCommand command = buildDeviceControlCommand(request, operation);
DeviceControlResult controlResult = accessServiceClient.sendDeviceCommand(command);

if (!controlResult.isSuccess()) {
// 更新操作状态为失败
operation.setStatus(BatteryOperationStatus.FAILED.getCode());
operation.setErrorMessage(controlResult.getMessage());
operation.setUpdateTime(new Date());
batteryOperationMapper.updateById(operation);

return BatteryOperationResult.failed("设备控制指令下发失败: " + controlResult.getMessage());
}

// 3. 更新操作状态为处理中
// 设置电池操作状态为"处理中"状态
operation.setStatus(BatteryOperationStatus.PROCESSING.getCode());
// 设置操作的命令ID,使用从控制结果中获取的命令ID
operation.setCommandId(controlResult.getCommandId());
operation.setUpdateTime(new Date());
batteryOperationMapper.updateById(operation);

// 4. 异步处理设备响应和订单更新
asyncProcessDeviceResponse(operation, controlResult);

log.info("换电操作处理成功: operationId={}, operationType={}, cabinetId={}",
operation.getId(), request.getOperationType(), request.getCabinetId());

// 5. 构建返回结果
BatteryOperationResult result = new BatteryOperationResult();
result.setSuccess(true);
result.setOperationId(operation.getId());
result.setOperationNo(operation.getOperationNo());
result.setStatus(BatteryOperationStatus.PROCESSING.getCode());
result.setMessage("换电操作已提交,处理中");

return result;

} else {
return BatteryOperationResult.failed("换电操作超时,请稍后再试");
}
} catch (Exception e) {
log.error("换电操作处理失败: operationType={}, cabinetId={}, error={}",
request.getOperationType(), request.getCabinetId(), e.getMessage(), e);
return BatteryOperationResult.failed("换电操作失败: " + e.getMessage());
} finally {
// 检查当前线程是否持有锁
if (lock.isHeldByCurrentThread()) {
// 如果当前线程持有锁,则释放该锁
lock.unlock();
}
}
}

/**
* 创建换电操作记录
*/
private BatteryOperation createBatteryOperation(BatteryOperationProcessRequest request) {
BatteryOperation operation = new BatteryOperation();
/**
* 设置操作对象的各项属性
* 包括操作编号、用户ID、机柜ID、槽位号、电池ID、操作类型、状态、创建时间和更新时间
*/
operation.setOperationNo(generateOperationNo()); // 设置操作编号,通过generateOperationNo()方法生成
operation.setUserId(request.getUserId()); // 设置用户ID,从请求对象中获取
operation.setCabinetId(request.getCabinetId()); // 设置机柜ID,从请求对象中获取
operation.setSlotNo(request.getSlotNo()); // 设置槽位号,从请求对象中获取
operation.setBatteryId(request.getBatteryId()); // 设置电池ID,从请求对象中获取
operation.setOperationType(request.getOperationType()); // 设置操作类型,从请求对象中获取
operation.setStatus(BatteryOperationStatus.PENDING.getCode()); // 设置操作状态为"待处理"
operation.setCreateTime(new Date()); // 设置创建时间为当前时间
operation.setUpdateTime(new Date()); // 设置更新时间为当前时间

return operation;
}

/**
* 构建设备控制指令
*/
private DeviceControlCommand buildDeviceControlCommand(
BatteryOperationProcessRequest request, BatteryOperation operation) {
DeviceControlCommand command = new DeviceControlCommand();
/**
* 设置命令对象的各项属性
* 这些属性来源于请求对象和操作对象
*/
command.setCabinetId(request.getCabinetId()); // 设置机柜ID,来自请求对象
command.setSlotNo(request.getSlotNo()); // 设置槽位号,来自请求对象
command.setOperationType(request.getOperationType()); // 设置操作类型,来自请求对象
command.setOperationId(operation.getId()); // 设置操作ID,来自操作对象
command.setOperationNo(operation.getOperationNo()); // 设置操作编号,来自操作对象
command.setBatteryId(request.getBatteryId()); // 设置电池ID,来自请求对象
command.setTimestamp(new Date()); // 设置时间戳为当前系统时间

return command;
}

/**
* 异步处理设备响应和订单更新
*/
@Async("batteryOperationExecutor")
public void asyncProcessDeviceResponse(BatteryOperation operation,
DeviceControlResult controlResult) {
try {
// 发送到消息队列,等待设备响应
BatteryOperationMessage message = new BatteryOperationMessage();
/**
* 设置消息对象的各项属性值
* 该代码段主要用于将操作对象和控制结果对象中的相关信息
* 设置到消息对象中,以便进行后续的消息传递和处理
*/
message.setOperationId(operation.getId()); // 设置消息的操作ID,来源于操作对象的ID
message.setOperationNo(operation.getOperationNo()); // 设置消息的操作编号,来源于操作对象的操作编号
message.setCommandId(controlResult.getCommandId()); // 设置消息的命令ID,来源于控制结果对象的命令ID
message.setCabinetId(operation.getCabinetId()); // 设置消息的机柜ID,来源于操作对象的机柜ID
message.setOperationType(operation.getOperationType()); // 设置消息的操作类型,来源于操作对象的操作类型

kafkaTemplate.send("battery.operation.response",
operation.getOperationNo(), JSON.toJSONString(message));

} catch (Exception e) {
log.error("异步处理设备响应失败: operationId={}, error={}",
operation.getId(), e.getMessage(), e);
}
}

/**
* 接收设备控制指令响应
*/
@KafkaListener(topics = "device.control.response", groupId = "device-control-group")
public void receiveDeviceControlResponse(String message) {
try {
DeviceControlResponse response = JSON.parseObject(message, DeviceControlResponse.class);

// 1. 查询换电操作记录
BatteryOperation operation = batteryOperationMapper.selectByCommandId(
response.getCommandId());
if (operation == null) {
log.warn("换电操作记录不存在: commandId={}", response.getCommandId());
return;
}

// 2. 更新操作状态
if (response.isSuccess()) {
operation.setStatus(BatteryOperationStatus.SUCCESS.getCode());
} else {
operation.setStatus(BatteryOperationStatus.FAILED.getCode());
operation.setErrorMessage(response.getErrorMessage());
}
operation.setResponseTime(new Date());
operation.setUpdateTime(new Date());
batteryOperationMapper.updateById(operation);

// 3. 请求更新订单信息, 资产状态,记录换电操作记录
if (response.isSuccess()) {
updateOrderAndAsset(operation, response);
}

log.info("设备控制指令响应处理完成: operationId={}, success={}",
operation.getId(), response.isSuccess());

} catch (Exception e) {
log.error("接收设备控制指令响应失败: error={}", e.getMessage(), e);
}
}

/**
* 请求更新订单信息, 资产状态,记录换电操作记录
*/
private void updateOrderAndAsset(BatteryOperation operation,
DeviceControlResponse response) {
try {
OrderAssetUpdateRequest updateRequest = new OrderAssetUpdateRequest();
// 设置更新请求的用户ID
updateRequest.setUserId(operation.getUserId());
// 设置更新请求的操作ID
updateRequest.setOperationId(operation.getId());
// 设置更新请求的操作类型
updateRequest.setOperationType(operation.getOperationType());
// 设置更新请求的机柜ID
updateRequest.setCabinetId(operation.getCabinetId());
// 设置更新请求的槽位号
updateRequest.setSlotNo(operation.getSlotNo());
// 设置更新请求的电池ID,从响应中获取
updateRequest.setBatteryId(response.getBatteryId());
// 设置更新请求的执行状态为成功
updateRequest.setSuccess(true);

OrderAssetUpdateResult updateResult = orderServiceClient.updateOrderAndAsset(updateRequest);

if (!updateResult.isSuccess()) {
log.error("更新订单资产失败: operationId={}, error={}",
operation.getId(), updateResult.getMessage());
}

} catch (Exception e) {
log.error("更新订单资产异常: operationId={}, error={}",
operation.getId(), e.getMessage(), e);
}
}

/**
* 查询换电操作结果请求处理
*/
public BatteryOperationQueryResult getBatteryOperationResult(Long operationId, Long userId) {
try {
// 1. 查询换电操作记录
BatteryOperation operation = batteryOperationMapper.selectById(operationId);
if (operation == null) {
return BatteryOperationQueryResult.failed("换电操作记录不存在");
}

// 2. 检查权限
if (!operation.getUserId().equals(userId)) {
return BatteryOperationQueryResult.failed("无权限查询该换电操作");
}

// 3. 构建返回结果
BatteryOperationQueryResult result = new BatteryOperationQueryResult();
/**
* 设置操作结果对象,将操作信息复制到结果对象中
* 并设置操作状态为成功,返回成功消息
*/
result.setSuccess(true); // 设置操作结果为成功状态
result.setOperationId(operation.getId()); // 设置操作ID
result.setOperationNo(operation.getOperationNo()); // 设置操作编号
result.setOperationType(operation.getOperationType()); // 设置操作类型
result.setStatus(operation.getStatus()); // 设置操作状态
result.setCabinetId(operation.getCabinetId()); // 设置机柜ID
result.setSlotNo(operation.getSlotNo()); // 设置插槽号
result.setBatteryId(operation.getBatteryId()); // 设置电池ID
result.setCreateTime(operation.getCreateTime()); // 设置创建时间
result.setResponseTime(operation.getResponseTime()); // 设置响应时间
result.setErrorMessage(operation.getErrorMessage()); // 设置错误信息
result.setMessage("查询成功"); // 设置返回消息为"查询成功"

return result;

} catch (Exception e) {
log.error("查询换电操作结果失败: operationId={}, error={}",
operationId, e.getMessage(), e);
return BatteryOperationQueryResult.failed("查询失败: " + e.getMessage());
}
}

/**
* 转换换电柜为换电柜信息
*/
private CabinetInfo convertToCabinetInfo(BatteryCabinet cabinet) {
CabinetInfo info = new CabinetInfo();
info.setCabinetId(cabinet.getId());
info.setCabinetName(cabinet.getCabinetName());
info.setCabinetCode(cabinet.getCabinetCode());
info.setLatitude(cabinet.getLatitude());
info.setLongitude(cabinet.getLongitude());
info.setAddress(cabinet.getAddress());
info.setStatus(cabinet.getStatus());
info.setAvailableSlots(cabinet.getAvailableSlots());
info.setTotalSlots(cabinet.getTotalSlots());
return info;
}

/**
* 生成操作号
*/
private String generateOperationNo() {
return "OPR" + System.currentTimeMillis() + RandomUtils.nextInt(10000, 99999);
}
}

4. 订单服务实现

4.1 订单服务核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/**
* 订单服务
* 负责订单管理、换电权限校验、订单资产更新、换电操作记录
*/
@Service
@Slf4j
public class OrderService {

@Autowired
private OrderMapper orderMapper;

@Autowired
private UserAssetMapper userAssetMapper;

@Autowired
private BatteryOperationRecordMapper batteryOperationRecordMapper;

@Autowired
private RedissonClient redissonClient;

/**
* 请求是否允许换电操作
* 是否允许换电请求处理
*/
public BatteryOperationCheckResult checkBatteryOperationAllowed(
BatteryOperationCheckRequest request) {

try {
// 1. 查询用户有效订单
List<Order> validOrders = orderMapper.selectValidOrdersByUserId(request.getUserId());
if (validOrders.isEmpty()) {
return BatteryOperationCheckResult.notAllowed("用户不存在有效订单,不允许换电操作");
}

// 2. 检查订单状态
Order order = validOrders.get(0);
// 检查订单状态是否允许进行换电操作
// 如果订单状态不是已支付(PAID)也不是已完成(COMPLETED),则不允许换电
if (!OrderStatus.PAID.getCode().equals(order.getStatus()) &&
!OrderStatus.COMPLETED.getCode().equals(order.getStatus())) {
// 返回不允许换电的操作结果,并提示当前订单状态
return BatteryOperationCheckResult.notAllowed("订单状态不允许换电操作,当前状态: " + order.getStatus());
}

// 3. 检查换电柜是否可用
// 这里可以添加换电柜状态检查逻辑

// 4. 构建返回结果
/**
* 创建一个新的电池操作检查结果对象
* 该对象用于存储和表示电池操作检查的结果信息
*/
BatteryOperationCheckResult result = new BatteryOperationCheckResult();
// 设置检查结果为允许状态
// 即电池操作检查通过,可以进行相关操作
result.setAllowed(true);
result.setOrderId(order.getId());
result.setOrderNo(order.getOrderNo());
result.setMessage("允许换电操作");

return result;

} catch (Exception e) {
log.error("检查换电操作权限失败: userId={}, error={}",
request.getUserId(), e.getMessage(), e);
return BatteryOperationCheckResult.notAllowed("检查失败: " + e.getMessage());
}
}

/**
* 请求更新订单信息, 资产状态,记录换电操作记录
* 请求更新订单资产信息处理
*/
@Transactional(rollbackFor = Exception.class)
public OrderAssetUpdateResult updateOrderAndAsset(OrderAssetUpdateRequest request) {
String lockKey = "order:asset:lock:" + request.getUserId();
RLock lock = redissonClient.getLock(lockKey);

try {
if (lock.tryLock(10, TimeUnit.SECONDS)) {
// 1. 查询用户订单
List<Order> orders = orderMapper.selectValidOrdersByUserId(request.getUserId());
if (orders.isEmpty()) {
return OrderAssetUpdateResult.failed("用户不存在有效订单");
}

Order order = orders.get(0);

// 2. 更新订单信息
order.setLastOperationTime(new Date());
order.setLastOperationType(request.getOperationType());
order.setUpdateTime(new Date());
orderMapper.updateById(order);

// 3. 更新资产状态
updateUserAsset(request, order);

// 4. 记录换电操作记录
recordBatteryOperation(request, order);

log.info("订单资产更新成功: userId={}, operationId={}, operationType={}",
request.getUserId(), request.getOperationId(), request.getOperationType());

// 5. 构建返回结果
OrderAssetUpdateResult result = new OrderAssetUpdateResult();
result.setSuccess(true);
result.setOrderId(order.getId());
result.setMessage("订单资产更新成功");

return result;

} else {
return OrderAssetUpdateResult.failed("订单资产更新超时,请稍后再试");
}
} catch (Exception e) {
log.error("更新订单资产失败: userId={}, operationId={}, error={}",
request.getUserId(), request.getOperationId(), e.getMessage(), e);
return OrderAssetUpdateResult.failed("更新失败: " + e.getMessage());
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}

/**
* 更新用户资产状态
*/
private void updateUserAsset(OrderAssetUpdateRequest request, Order order) {
// 1. 查询用户资产
UserAsset asset = userAssetMapper.selectByUserId(request.getUserId());
if (asset == null) {
// 创建用户资产
asset = createUserAsset(request.getUserId());
}

// 2. 根据操作类型更新资产
if ("PICKUP".equals(request.getOperationType())) {
// 取电:增加电池数量
asset.setBatteryCount(asset.getBatteryCount() + 1);
asset.setBatteryId(request.getBatteryId());
} else if ("EXCHANGE".equals(request.getOperationType())) {
// 换电:更新电池ID
asset.setBatteryId(request.getBatteryId());
} else if ("RETURN".equals(request.getOperationType())) {
// 退电:减少电池数量
asset.setBatteryCount(asset.getBatteryCount() - 1);
asset.setBatteryId(null);
}

asset.setUpdateTime(new Date());
userAssetMapper.updateById(asset);
}

/**
* 创建用户资产
*/
private UserAsset createUserAsset(Long userId) {
UserAsset asset = new UserAsset();
asset.setUserId(userId);
asset.setBatteryCount(0);
asset.setBatteryId(null);
asset.setCreateTime(new Date());
asset.setUpdateTime(new Date());
userAssetMapper.insert(asset);
return asset;
}

/**
* 记录换电操作记录
*/
private void recordBatteryOperation(OrderAssetUpdateRequest request, Order order) {
BatteryOperationRecord record = new BatteryOperationRecord();
record.setUserId(request.getUserId());
record.setOrderId(order.getId());
record.setOrderNo(order.getOrderNo());
record.setOperationId(request.getOperationId());
record.setOperationType(request.getOperationType());
record.setCabinetId(request.getCabinetId());
record.setSlotNo(request.getSlotNo());
record.setBatteryId(request.getBatteryId());
record.setSuccess(request.isSuccess());
record.setCreateTime(new Date());

batteryOperationRecordMapper.insert(record);
}
}

5. 接入服务实现

5.1 接入服务核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 接入服务
* 负责与物理设备交互、设备控制指令下发、设备响应接收
*/
@Service
@Slf4j
public class AccessService {

@Autowired
private MqttService mqttService;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 下发设备控制指令
*/
public DeviceControlResult sendDeviceCommand(DeviceControlCommand command) {
try {
// 1. 构建MQTT消息
String topic = "device/" + command.getCabinetId() + "/command";
String message = JSON.toJSONString(command);

// 2. 发送MQTT消息
mqttService.publish(topic, message);

// 3. 生成命令ID
String commandId = generateCommandId();

log.info("设备控制指令下发成功: cabinetId={}, commandId={}, operationType={}",
command.getCabinetId(), commandId, command.getOperationType());

// 4. 构建返回结果
DeviceControlResult result = new DeviceControlResult();
result.setSuccess(true);
result.setCommandId(commandId);
result.setMessage("设备控制指令下发成功");

return result;

} catch (Exception e) {
log.error("设备控制指令下发失败: cabinetId={}, error={}",
command.getCabinetId(), e.getMessage(), e);
return DeviceControlResult.failed("设备控制指令下发失败: " + e.getMessage());
}
}

/**
* 接收设备控制指令响应
*/
@MqttListener(topics = "device/+/response")
public void receiveDeviceResponse(String topic, String message) {
try {
// 1. 解析设备响应
DeviceControlResponse response = JSON.parseObject(message, DeviceControlResponse.class);

// 2. 发送到Kafka消息队列
kafkaTemplate.send("device.control.response",
response.getCommandId(), JSON.toJSONString(response));

log.info("设备控制指令响应接收成功: commandId={}, success={}",
response.getCommandId(), response.isSuccess());

} catch (Exception e) {
log.error("接收设备控制指令响应失败: topic={}, error={}",
topic, e.getMessage(), e);
}
}

/**
* 生成命令ID
*/
private String generateCommandId() {
return "CMD" + System.currentTimeMillis() + RandomUtils.nextInt(10000, 99999);
}
}

6. 数据模型定义

6.1 换电操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 换电操作
*/
@Data
@TableName("battery_operation")
public class BatteryOperation {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 操作号
*/
private String operationNo;

/**
* 用户ID
*/
private Long userId;

/**
* 换电柜ID
*/
private Long cabinetId;

/**
* 槽位号
*/
private Integer slotNo;

/**
* 电池ID
*/
private String batteryId;

/**
* 操作类型:PICKUP-取电, EXCHANGE-换电, RETURN-退电
*/
private String operationType;

/**
* 操作状态:PENDING-待处理, PROCESSING-处理中, SUCCESS-成功, FAILED-失败
*/
private String status;

/**
* 命令ID
*/
private String commandId;

/**
* 错误信息
*/
private String errorMessage;

/**
* 响应时间
*/
private Date responseTime;

/**
* 创建时间
*/
private Date createTime;

/**
* 更新时间
*/
private Date updateTime;
}

6.2 换电柜

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 换电柜
*/
@Data
@TableName("battery_cabinet")
public class BatteryCabinet {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 换电柜名称
*/
private String cabinetName;

/**
* 换电柜编码
*/
private String cabinetCode;

/**
* 纬度
*/
private BigDecimal latitude;

/**
* 经度
*/
private BigDecimal longitude;

/**
* 地址
*/
private String address;

/**
* 状态:ONLINE-在线, OFFLINE-离线, MAINTENANCE-维护中
*/
private String status;

/**
* 可用槽位数
*/
private Integer availableSlots;

/**
* 总槽位数
*/
private Integer totalSlots;

/**
* 创建时间
*/
private Date createTime;

/**
* 更新时间
*/
private Date updateTime;
}

6.3 换电操作记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* 换电操作记录
*/
@Data
@TableName("battery_operation_record")
public class BatteryOperationRecord {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 用户ID
*/
private Long userId;

/**
* 订单ID
*/
private Long orderId;

/**
* 订单号
*/
private String orderNo;

/**
* 操作ID
*/
private Long operationId;

/**
* 操作类型
*/
private String operationType;

/**
* 换电柜ID
*/
private Long cabinetId;

/**
* 槽位号
*/
private Integer slotNo;

/**
* 电池ID
*/
private String batteryId;

/**
* 是否成功
*/
private Boolean success;

/**
* 创建时间
*/
private Date createTime;
}

6.4 用户资产

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 用户资产
*/
@Data
@TableName("user_asset")
public class UserAsset {
/**
* 主键ID
*/
@TableId(type = IdType.AUTO)
private Long id;

/**
* 用户ID
*/
private Long userId;

/**
* 电池数量
*/
private Integer batteryCount;

/**
* 当前电池ID
*/
private String batteryId;

/**
* 创建时间
*/
private Date createTime;

/**
* 更新时间
*/
private Date updateTime;
}

6.5 换电操作类型枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 换电操作类型枚举
*/
public enum BatteryOperationType {
PICKUP("PICKUP", "取电"),
EXCHANGE("EXCHANGE", "换电"),
RETURN("RETURN", "退电");

private final String code;
private final String name;

BatteryOperationType(String code, String name) {
this.code = code;
this.name = name;
}

public String getCode() {
return code;
}

public String getName() {
return name;
}
}

6.6 换电操作状态枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 换电操作状态枚举
*/
public enum BatteryOperationStatus {
PENDING("PENDING", "待处理"),
PROCESSING("PROCESSING", "处理中"),
SUCCESS("SUCCESS", "成功"),
FAILED("FAILED", "失败");

private final String code;
private final String name;

BatteryOperationStatus(String code, String name) {
this.code = code;
this.name = name;
}

public String getCode() {
return code;
}

public String getName() {
return name;
}
}

7. 数据库Mapper实现

7.1 BatteryCabinetMapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 换电柜Mapper
*/
@Mapper
public interface BatteryCabinetMapper extends BaseMapper<BatteryCabinet> {

/**
* 查询附近换电柜
*/
@Select("SELECT *, " +
"(6371 * acos(cos(radians(#{latitude})) * cos(radians(latitude)) * " +
"cos(radians(longitude) - radians(#{longitude})) + " +
"sin(radians(#{latitude})) * sin(radians(latitude)))) AS distance " +
"FROM battery_cabinet " +
"WHERE status = 'ONLINE' AND deleted = 0 " +
"HAVING distance <= #{radius} " +
"ORDER BY distance ASC")
List<BatteryCabinet> selectNearbyCabinets(
@Param("latitude") BigDecimal latitude,
@Param("longitude") BigDecimal longitude,
@Param("radius") Double radius);

/**
* 查询换电柜槽位
*/
@Select("SELECT * FROM cabinet_slot WHERE cabinet_id = #{cabinetId} AND deleted = 0")
List<CabinetSlot> selectCabinetSlots(@Param("cabinetId") Long cabinetId);
}

7.2 BatteryOperationMapper

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 换电操作Mapper
*/
@Mapper
public interface BatteryOperationMapper extends BaseMapper<BatteryOperation> {

/**
* 根据命令ID查询操作
*/
@Select("SELECT * FROM battery_operation WHERE command_id = #{commandId}")
BatteryOperation selectByCommandId(@Param("commandId") String commandId);
}

8. 数据库表设计

8.1 换电柜表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
-- 换电柜表
CREATE TABLE `battery_cabinet` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`cabinet_name` varchar(100) NOT NULL COMMENT '换电柜名称',
`cabinet_code` varchar(64) NOT NULL COMMENT '换电柜编码',
`latitude` decimal(10,7) NOT NULL COMMENT '纬度',
`longitude` decimal(10,7) NOT NULL COMMENT '经度',
`address` varchar(200) DEFAULT NULL COMMENT '地址',
`status` varchar(20) NOT NULL DEFAULT 'ONLINE' COMMENT '状态:ONLINE-在线, OFFLINE-离线, MAINTENANCE-维护中',
`available_slots` int(11) NOT NULL DEFAULT '0' COMMENT '可用槽位数',
`total_slots` int(11) NOT NULL DEFAULT '0' COMMENT '总槽位数',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`deleted` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否删除',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_cabinet_code` (`cabinet_code`),
KEY `idx_status` (`status`),
KEY `idx_location` (`latitude`, `longitude`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='换电柜表';

8.2 换电操作表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-- 换电操作表
CREATE TABLE `battery_operation` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`operation_no` varchar(64) NOT NULL COMMENT '操作号',
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`cabinet_id` bigint(20) NOT NULL COMMENT '换电柜ID',
`slot_no` int(11) NOT NULL COMMENT '槽位号',
`battery_id` varchar(64) DEFAULT NULL COMMENT '电池ID',
`operation_type` varchar(20) NOT NULL COMMENT '操作类型:PICKUP-取电, EXCHANGE-换电, RETURN-退电',
`status` varchar(20) NOT NULL DEFAULT 'PENDING' COMMENT '操作状态:PENDING-待处理, PROCESSING-处理中, SUCCESS-成功, FAILED-失败',
`command_id` varchar(64) DEFAULT NULL COMMENT '命令ID',
`error_message` varchar(500) DEFAULT NULL COMMENT '错误信息',
`response_time` datetime DEFAULT NULL COMMENT '响应时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_operation_no` (`operation_no`),
KEY `idx_user_id` (`user_id`),
KEY `idx_cabinet_id` (`cabinet_id`),
KEY `idx_status` (`status`),
KEY `idx_command_id` (`command_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='换电操作表';

8.3 换电操作记录表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 换电操作记录表
CREATE TABLE `battery_operation_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`order_id` bigint(20) NOT NULL COMMENT '订单ID',
`order_no` varchar(64) NOT NULL COMMENT '订单号',
`operation_id` bigint(20) NOT NULL COMMENT '操作ID',
`operation_type` varchar(20) NOT NULL COMMENT '操作类型',
`cabinet_id` bigint(20) NOT NULL COMMENT '换电柜ID',
`slot_no` int(11) NOT NULL COMMENT '槽位号',
`battery_id` varchar(64) DEFAULT NULL COMMENT '电池ID',
`success` tinyint(1) NOT NULL DEFAULT '0' COMMENT '是否成功',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_user_id` (`user_id`),
KEY `idx_order_id` (`order_id`),
KEY `idx_operation_id` (`operation_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='换电操作记录表';

8.4 用户资产表

1
2
3
4
5
6
7
8
9
10
11
-- 用户资产表
CREATE TABLE `user_asset` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`battery_count` int(11) NOT NULL DEFAULT '0' COMMENT '电池数量',
`battery_id` varchar(64) DEFAULT NULL COMMENT '当前电池ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户资产表';

9. 消息队列配置

9.1 Kafka配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Kafka配置
*/
@Configuration
@EnableKafka
public class KafkaConfig {

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

@Bean
public ProducerFactory<String, String> producerFactory() {
/**
* 创建并返回一个Kafka生产者工厂实例
* 配置了Kafka生产者的基本参数,包括服务器地址、序列化方式和确认机制
*
* @return DefaultKafkaProducerFactory<String> 配置好的Kafka生产者工厂实例
*/
Map<String, Object> configProps = new HashMap<>(); // 创建一个HashMap用于存储配置属性
// 设置Kafka服务器地址,这里配置为本地localhost的9092端口
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 设置键(key)的序列化器为StringSerializer,用于序列化消息的键
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置值(value)的序列化器为StringSerializer,用于序列化消息的内容
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置确认机制为1,表示leader副本收到消息后即认为发送成功
// 设置确认机制为-1,表示leader和所有follower副本都收到消息后才认为发送成功
configProps.put(ProducerConfig.ACKS_CONFIG, "1");
// 使用配置好的属性创建并返回一个DefaultKafkaProducerFactory实例
return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
/*
* 配置Kafka消费者属性并创建消费者工厂
* 设置Kafka服务器地址、消费者组ID以及序列化器
*/
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 设置Kafka服务器地址为localhost:9092
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "device-control-group"); // 设置消费者组ID为device-control-group
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 设置键的序列化器为StringDeserializer
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 设置值的序列化器为StringDeserializer
return new DefaultKafkaConsumerFactory<>(configProps); // 返回一个使用上述配置的DefaultKafkaConsumerFactory实例
}
}

10. 总结

本文详细介绍了用户取换还电的Java微服务架构实现,包括:

  1. 用户网关服务:负责用户请求接入、身份认证、请求路由、流程编排
  2. 设备服务:负责换电柜管理、换电操作处理、设备控制指令管理
  3. 订单服务:负责订单管理、换电权限校验、订单资产更新、换电操作记录
  4. 接入服务:负责与物理设备交互、设备控制指令下发、设备响应接收
  5. 三个主要流程
    • 查询附近换电柜分布/换电柜详情
    • 换电操作(取/换/退)请求
    • 查询换电操作结果
  6. 换电操作流程
    • 身份认证
    • 请求是否允许换电操作
    • 换电操作请求处理(与接入端交互)
    • 下发设备控制指令
    • 接收设备控制指令响应
    • 请求更新订单信息, 资产状态,记录换电操作记录
    • 返回换电操作已处理提示
  7. 消息队列:使用Kafka异步处理设备响应和订单更新
  8. 设备控制:通过MQTT与物理设备通信

该架构具有以下优势:

  • 实时性:MQTT实时通信,设备响应延迟<1s
  • 高性能:消息队列异步处理,支持高并发
  • 高可用:数据库持久化,支持数据恢复
  • 可扩展:微服务架构,支持水平扩展
  • 完整性:完整的换电操作流程和记录
  • 可靠性:分布式锁保证并发安全,消息队列保证消息不丢失

通过本文的实战代码,可以快速搭建一个高性能、高可用、实时响应的用户取换还电系统。