用户服务完整逻辑代码Java微服务后端架构实战

1. 架构概述

用户服务系统是电商平台的基础业务模块,需要支持用户注册、登录、信息管理、积分管理、等级管理、认证授权和用户统计等功能。本篇文章将深入讲解如何基于Java微服务架构实现一个完整的用户服务系统,包含用户服务、认证服务、积分服务、通知服务等多个微服务的完整业务逻辑代码。

1.1 系统架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
用户服务完整架构
├── 用户服务 (User Service)
│ ├── 用户注册
│ ├── 用户登录
│ ├── 用户信息管理
│ ├── 用户查询
│ └── 用户统计

├── 认证服务 (Auth Service)
│ ├── 用户认证
│ ├── Token生成
│ ├── Token验证
│ ├── 权限验证
│ └── 会话管理

├── 积分服务 (Points Service)
│ ├── 积分查询
│ ├── 积分增加
│ ├── 积分扣减
│ ├── 积分记录
│ └── 积分统计

├── 通知服务 (Notification Service)
│ ├── 短信通知
│ ├── 邮件通知
│ ├── 站内消息
│ └── 推送通知

├── Kafka消息队列
│ ├── 用户注册Topic
│ ├── 用户登录Topic
│ ├── 用户信息变更Topic
│ ├── 积分变更Topic
│ └── 用户统计Topic

└── 数据库/缓存
├── MySQL - 用户信息、积分记录、登录记录
└── Redis - 用户会话、Token缓存、分布式锁

1.2 核心组件

  • 用户服务(User Service):负责用户注册、登录、信息管理、查询、统计
  • 认证服务(Auth Service):负责用户认证、Token生成、验证、权限管理
  • 积分服务(Points Service):负责积分查询、增加、扣减、记录、统计
  • 通知服务(Notification Service):负责短信、邮件、站内消息、推送通知
  • Kafka消息队列:负责用户、认证、积分的异步处理和消息通知
  • 数据库(MySQL):持久化用户信息、积分记录、登录记录
  • 缓存(Redis):缓存用户会话、Token信息、分布式锁

2. 用户服务实现

2.1 用户服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* 用户服务控制器
* 提供用户管理接口
*/
@RestController
@RequestMapping("/api/user")
@Slf4j
public class UserController {

@Autowired
private UserService userService;

@Autowired
private UserQueryService userQueryService;

@Autowired
private AuthService authService;

/**
* 用户注册
*/
@PostMapping("/register")
public Result<UserInfo> register(@RequestBody @Valid UserRegisterRequest request) {
try {
UserInfo userInfo = userService.register(request);
return Result.success(userInfo);
} catch (Exception e) {
log.error("用户注册失败: error={}", e.getMessage(), e);
return Result.error("用户注册失败: " + e.getMessage());
}
}

/**
* 用户登录
*/
@PostMapping("/login")
public Result<LoginResponse> login(@RequestBody @Valid UserLoginRequest request) {
try {
LoginResponse loginResponse = authService.login(request);
return Result.success(loginResponse);
} catch (Exception e) {
log.error("用户登录失败: error={}", e.getMessage(), e);
return Result.error("用户登录失败: " + e.getMessage());
}
}

/**
* 获取用户信息
*/
@GetMapping("/{userId}")
public Result<UserInfo> getUserInfo(@PathVariable String userId) {
try {
UserInfo userInfo = userQueryService.getUserInfo(userId);
return Result.success(userInfo);
} catch (Exception e) {
log.error("获取用户信息失败: userId={}, error={}",
userId, e.getMessage(), e);
return Result.error("获取用户信息失败: " + e.getMessage());
}
}

/**
* 更新用户信息
*/
@PutMapping("/{userId}")
public Result<Void> updateUserInfo(
@PathVariable String userId,
@RequestBody @Valid UserUpdateRequest request) {
try {
userService.updateUserInfo(userId, request);
return Result.success();
} catch (Exception e) {
log.error("更新用户信息失败: userId={}, error={}",
userId, e.getMessage(), e);
return Result.error("更新用户信息失败: " + e.getMessage());
}
}

/**
* 修改密码
*/
@PostMapping("/{userId}/password")
public Result<Void> changePassword(
@PathVariable String userId,
@RequestBody @Valid PasswordChangeRequest request) {
try {
userService.changePassword(userId, request);
return Result.success();
} catch (Exception e) {
log.error("修改密码失败: userId={}, error={}",
userId, e.getMessage(), e);
return Result.error("修改密码失败: " + e.getMessage());
}
}

/**
* 获取用户列表
*/
@GetMapping("/list")
public Result<PageResult<UserInfo>> getUserList(
@RequestParam(required = false) String keyword,
@RequestParam(required = false) String status,
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime,
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "10") Integer pageSize) {
try {
PageResult<UserInfo> result = userQueryService.getUserList(
keyword, status, startTime, endTime, pageNum, pageSize);
return Result.success(result);
} catch (Exception e) {
log.error("获取用户列表失败: error={}", e.getMessage(), e);
return Result.error("获取用户列表失败: " + e.getMessage());
}
}

/**
* 获取用户统计
*/
@GetMapping("/statistics")
public Result<UserStatistics> getUserStatistics(
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime) {
try {
UserStatistics statistics = userQueryService.getUserStatistics(startTime, endTime);
return Result.success(statistics);
} catch (Exception e) {
log.error("获取用户统计失败: error={}", e.getMessage(), e);
return Result.error("获取用户统计失败: " + e.getMessage());
}
}
}

2.2 用户服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
/**
* 用户服务实现
* 负责用户注册、信息管理、密码修改
*/
@Service
@Slf4j
public class UserService {

@Autowired
private UserMapper userMapper;

@Autowired
private UserProfileMapper userProfileMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private PointsServiceClient pointsServiceClient;

@Autowired
private NotificationServiceClient notificationServiceClient;

@Autowired
private DistributedLock distributedLock;

@Autowired
private PasswordEncoder passwordEncoder;

/**
* 用户注册
*/
@Transactional(rollbackFor = Exception.class)
public UserInfo register(UserRegisterRequest request) {
String lockKey = "user:register:" + request.getUsername();
String lockValue = UUID.randomUUID().toString();

try {
// 1. 获取分布式锁
boolean lockAcquired = distributedLock.tryLock(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (!lockAcquired) {
throw new BusinessException("注册中,请稍候");
}

// 2. 验证用户名是否已存在
User existingUser = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getUsername, request.getUsername()));
if (existingUser != null) {
throw new BusinessException("用户名已存在");
}

// 3. 验证手机号是否已存在
if (request.getPhone() != null && !request.getPhone().isEmpty()) {
User existingPhoneUser = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getPhone, request.getPhone()));
if (existingPhoneUser != null) {
throw new BusinessException("手机号已存在");
}
}

// 4. 验证邮箱是否已存在
if (request.getEmail() != null && !request.getEmail().isEmpty()) {
User existingEmailUser = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getEmail, request.getEmail()));
if (existingEmailUser != null) {
throw new BusinessException("邮箱已存在");
}
}

// 5. 生成用户ID
String userId = generateUserId();

// 6. 加密密码
String encodedPassword = passwordEncoder.encode(request.getPassword());

// 7. 创建用户
User user = new User();
user.setUserId(userId);
user.setUsername(request.getUsername());
user.setPassword(encodedPassword);
user.setPhone(request.getPhone());
user.setEmail(request.getEmail());
user.setStatus("ACTIVE");
user.setCreateTime(LocalDateTime.now());
user.setUpdateTime(LocalDateTime.now());

userMapper.insert(user);

// 8. 创建用户资料
UserProfile userProfile = new UserProfile();
userProfile.setUserId(userId);
userProfile.setNickname(request.getNickname());
userProfile.setAvatar(request.getAvatar());
userProfile.setGender(request.getGender());
userProfile.setBirthday(request.getBirthday());
userProfile.setCreateTime(LocalDateTime.now());
userProfile.setUpdateTime(LocalDateTime.now());

userProfileMapper.insert(userProfile);

// 9. 初始化用户积分
pointsServiceClient.initUserPoints(userId);

// 10. 构建用户信息
UserInfo userInfo = convertToUserInfo(user, userProfile);

// 11. 缓存用户信息
String cacheKey = "user:info:" + userId;
redisTemplate.opsForValue().set(cacheKey, userInfo, 1, TimeUnit.HOURS);

// 12. 发送用户注册事件
sendUserRegisterEvent(userInfo);

// 13. 发送注册通知
notificationServiceClient.sendRegisterNotification(userId, request.getPhone(), request.getEmail());

log.info("用户注册成功: userId={}, username={}", userId, request.getUsername());

return userInfo;

} catch (Exception e) {
log.error("用户注册失败: username={}, error={}",
request.getUsername(), e.getMessage(), e);
throw new BusinessException("用户注册失败: " + e.getMessage());
} finally {
// 释放分布式锁
distributedLock.releaseLock(lockKey, lockValue);
}
}

/**
* 更新用户信息
*/
@Transactional(rollbackFor = Exception.class)
public void updateUserInfo(String userId, UserUpdateRequest request) {
try {
// 1. 获取用户
User user = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getUserId, userId));
if (user == null) {
throw new BusinessException("用户不存在");
}

// 2. 更新用户基本信息
if (request.getPhone() != null) {
// 验证手机号是否已被其他用户使用
User existingPhoneUser = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getPhone, request.getPhone())
.ne(User::getUserId, userId));
if (existingPhoneUser != null) {
throw new BusinessException("手机号已被使用");
}
user.setPhone(request.getPhone());
}

if (request.getEmail() != null) {
// 验证邮箱是否已被其他用户使用
User existingEmailUser = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getEmail, request.getEmail())
.ne(User::getUserId, userId));
if (existingEmailUser != null) {
throw new BusinessException("邮箱已被使用");
}
user.setEmail(request.getEmail());
}

user.setUpdateTime(LocalDateTime.now());
userMapper.updateById(user);

// 3. 更新用户资料
UserProfile userProfile = userProfileMapper.selectOne(
new LambdaQueryWrapper<UserProfile>()
.eq(UserProfile::getUserId, userId));
if (userProfile == null) {
userProfile = new UserProfile();
userProfile.setUserId(userId);
userProfile.setCreateTime(LocalDateTime.now());
}

if (request.getNickname() != null) {
userProfile.setNickname(request.getNickname());
}
if (request.getAvatar() != null) {
userProfile.setAvatar(request.getAvatar());
}
if (request.getGender() != null) {
userProfile.setGender(request.getGender());
}
if (request.getBirthday() != null) {
userProfile.setBirthday(request.getBirthday());
}
userProfile.setUpdateTime(LocalDateTime.now());

if (userProfile.getId() == null) {
userProfileMapper.insert(userProfile);
} else {
userProfileMapper.updateById(userProfile);
}

// 4. 清除缓存
String cacheKey = "user:info:" + userId;
redisTemplate.delete(cacheKey);

// 5. 发送用户信息变更事件
sendUserInfoChangeEvent(userId);

log.info("更新用户信息成功: userId={}", userId);

} catch (Exception e) {
log.error("更新用户信息失败: userId={}, error={}",
userId, e.getMessage(), e);
throw new BusinessException("更新用户信息失败: " + e.getMessage());
}
}

/**
* 修改密码
*/
@Transactional(rollbackFor = Exception.class)
public void changePassword(String userId, PasswordChangeRequest request) {
try {
// 1. 获取用户
User user = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getUserId, userId));
if (user == null) {
throw new BusinessException("用户不存在");
}

// 2. 验证旧密码
if (!passwordEncoder.matches(request.getOldPassword(), user.getPassword())) {
throw new BusinessException("旧密码错误");
}

// 3. 加密新密码
String encodedPassword = passwordEncoder.encode(request.getNewPassword());

// 4. 更新密码
user.setPassword(encodedPassword);
user.setUpdateTime(LocalDateTime.now());
userMapper.updateById(user);

// 5. 清除用户会话
String sessionKey = "user:session:" + userId;
redisTemplate.delete(sessionKey);

// 6. 发送密码修改事件
sendPasswordChangeEvent(userId);

// 7. 发送密码修改通知
notificationServiceClient.sendPasswordChangeNotification(userId, user.getPhone(), user.getEmail());

log.info("修改密码成功: userId={}", userId);

} catch (Exception e) {
log.error("修改密码失败: userId={}, error={}",
userId, e.getMessage(), e);
throw new BusinessException("修改密码失败: " + e.getMessage());
}
}

/**
* 生成用户ID
*/
private String generateUserId() {
return "U" + System.currentTimeMillis() +
String.format("%06d", new Random().nextInt(1000000));
}

/**
* 转换为用户信息
*/
private UserInfo convertToUserInfo(User user, UserProfile userProfile) {
UserInfo userInfo = new UserInfo();
userInfo.setUserId(user.getUserId());
userInfo.setUsername(user.getUsername());
userInfo.setPhone(user.getPhone());
userInfo.setEmail(user.getEmail());
userInfo.setStatus(user.getStatus());
userInfo.setCreateTime(user.getCreateTime());

if (userProfile != null) {
userInfo.setNickname(userProfile.getNickname());
userInfo.setAvatar(userProfile.getAvatar());
userInfo.setGender(userProfile.getGender());
userInfo.setBirthday(userProfile.getBirthday());
}

return userInfo;
}

/**
* 发送用户注册事件
*/
private void sendUserRegisterEvent(UserInfo userInfo) {
try {
UserEvent event = new UserEvent();
event.setEventType("USER_REGISTER");
event.setUserId(userInfo.getUserId());
event.setUserInfo(userInfo);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("user.register", userInfo.getUserId(), message);

} catch (Exception e) {
log.error("发送用户注册事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送用户信息变更事件
*/
private void sendUserInfoChangeEvent(String userId) {
try {
UserInfoChangeEvent event = new UserInfoChangeEvent();
event.setEventType("USER_INFO_CHANGE");
event.setUserId(userId);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("user.info.change", userId, message);

} catch (Exception e) {
log.error("发送用户信息变更事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送密码修改事件
*/
private void sendPasswordChangeEvent(String userId) {
try {
PasswordChangeEvent event = new PasswordChangeEvent();
event.setEventType("PASSWORD_CHANGE");
event.setUserId(userId);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("user.password.change", userId, message);

} catch (Exception e) {
log.error("发送密码修改事件失败: error={}", e.getMessage(), e);
}
}
}

2.3 用户查询服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/**
* 用户查询服务实现
* 负责用户查询、统计
*/
@Service
@Slf4j
public class UserQueryService {

@Autowired
private UserMapper userMapper;

@Autowired
private UserProfileMapper userProfileMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 获取用户信息
*/
public UserInfo getUserInfo(String userId) {
try {
// 1. 从缓存获取
String cacheKey = "user:info:" + userId;
UserInfo cachedInfo = (UserInfo) redisTemplate.opsForValue().get(cacheKey);
if (cachedInfo != null) {
return cachedInfo;
}

// 2. 从数据库查询
User user = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getUserId, userId));
if (user == null) {
throw new BusinessException("用户不存在");
}

// 3. 查询用户资料
UserProfile userProfile = userProfileMapper.selectOne(
new LambdaQueryWrapper<UserProfile>()
.eq(UserProfile::getUserId, userId));

// 4. 构建用户信息
UserInfo userInfo = convertToUserInfo(user, userProfile);

// 5. 缓存用户信息
redisTemplate.opsForValue().set(cacheKey, userInfo, 1, TimeUnit.HOURS);

return userInfo;

} catch (Exception e) {
log.error("获取用户信息失败: userId={}, error={}",
userId, e.getMessage(), e);
throw new BusinessException("获取用户信息失败: " + e.getMessage());
}
}

/**
* 获取用户列表
*/
public PageResult<UserInfo> getUserList(String keyword, String status,
String startTime, String endTime,
Integer pageNum, Integer pageSize) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<User> wrapper = new LambdaQueryWrapper<>();
if (keyword != null && !keyword.isEmpty()) {
wrapper.and(w -> w.like(User::getUsername, keyword)
.or().like(User::getPhone, keyword)
.or().like(User::getEmail, keyword));
}
if (status != null && !status.isEmpty()) {
wrapper.eq(User::getStatus, status);
}
if (startTime != null && !startTime.isEmpty()) {
wrapper.ge(User::getCreateTime, LocalDateTime.parse(startTime));
}
if (endTime != null && !endTime.isEmpty()) {
wrapper.le(User::getCreateTime, LocalDateTime.parse(endTime));
}
wrapper.orderByDesc(User::getCreateTime);

// 2. 分页查询
Page<User> page = new Page<>(pageNum, pageSize);
Page<User> result = userMapper.selectPage(page, wrapper);

// 3. 转换为用户信息列表
List<UserInfo> userInfoList = result.getRecords().stream()
.map(user -> {
UserProfile userProfile = userProfileMapper.selectOne(
new LambdaQueryWrapper<UserProfile>()
.eq(UserProfile::getUserId, user.getUserId()));
return convertToUserInfo(user, userProfile);
})
.collect(Collectors.toList());

// 4. 构建分页结果
PageResult<UserInfo> pageResult = new PageResult<>();
pageResult.setList(userInfoList);
pageResult.setTotal(result.getTotal());
pageResult.setPageNum(pageNum);
pageResult.setPageSize(pageSize);

return pageResult;

} catch (Exception e) {
log.error("获取用户列表失败: error={}", e.getMessage(), e);
throw new BusinessException("获取用户列表失败: " + e.getMessage());
}
}

/**
* 获取用户统计
*/
public UserStatistics getUserStatistics(String startTime, String endTime) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<User> wrapper = new LambdaQueryWrapper<>();
if (startTime != null && !startTime.isEmpty()) {
wrapper.ge(User::getCreateTime, LocalDateTime.parse(startTime));
}
if (endTime != null && !endTime.isEmpty()) {
wrapper.le(User::getCreateTime, LocalDateTime.parse(endTime));
}

// 2. 查询用户列表
List<User> users = userMapper.selectList(wrapper);

// 3. 统计用户数据
UserStatistics statistics = new UserStatistics();
statistics.setTotalCount(users.size());

Map<String, Long> statusCount = users.stream()
.collect(Collectors.groupingBy(User::getStatus, Collectors.counting()));
statistics.setStatusCount(statusCount);

// 4. 按日期统计注册用户数
Map<String, Long> dailyRegisterCount = users.stream()
.collect(Collectors.groupingBy(
user -> user.getCreateTime().toLocalDate().toString(),
Collectors.counting()));
statistics.setDailyRegisterCount(dailyRegisterCount);

return statistics;

} catch (Exception e) {
log.error("获取用户统计失败: error={}", e.getMessage(), e);
throw new BusinessException("获取用户统计失败: " + e.getMessage());
}
}

/**
* 转换为用户信息
*/
private UserInfo convertToUserInfo(User user, UserProfile userProfile) {
UserInfo userInfo = new UserInfo();
userInfo.setUserId(user.getUserId());
userInfo.setUsername(user.getUsername());
userInfo.setPhone(user.getPhone());
userInfo.setEmail(user.getEmail());
userInfo.setStatus(user.getStatus());
userInfo.setCreateTime(user.getCreateTime());
userInfo.setUpdateTime(user.getUpdateTime());

if (userProfile != null) {
userInfo.setNickname(userProfile.getNickname());
userInfo.setAvatar(userProfile.getAvatar());
userInfo.setGender(userProfile.getGender());
userInfo.setBirthday(userProfile.getBirthday());
}

return userInfo;
}
}

3. 认证服务实现

3.1 认证服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 认证服务控制器
* 提供认证管理接口
*/
@RestController
@RequestMapping("/api/auth")
@Slf4j
public class AuthController {

@Autowired
private AuthService authService;

/**
* 用户登录
*/
@PostMapping("/login")
public Result<LoginResponse> login(@RequestBody @Valid UserLoginRequest request) {
try {
LoginResponse loginResponse = authService.login(request);
return Result.success(loginResponse);
} catch (Exception e) {
log.error("用户登录失败: error={}", e.getMessage(), e);
return Result.error("用户登录失败: " + e.getMessage());
}
}

/**
* 用户登出
*/
@PostMapping("/logout")
public Result<Void> logout(@RequestHeader("Authorization") String token) {
try {
authService.logout(token);
return Result.success();
} catch (Exception e) {
log.error("用户登出失败: error={}", e.getMessage(), e);
return Result.error("用户登出失败: " + e.getMessage());
}
}

/**
* 刷新Token
*/
@PostMapping("/refresh")
public Result<LoginResponse> refreshToken(@RequestBody @Valid RefreshTokenRequest request) {
try {
LoginResponse loginResponse = authService.refreshToken(request.getRefreshToken());
return Result.success(loginResponse);
} catch (Exception e) {
log.error("刷新Token失败: error={}", e.getMessage(), e);
return Result.error("刷新Token失败: " + e.getMessage());
}
}

/**
* 验证Token
*/
@GetMapping("/verify")
public Result<UserInfo> verifyToken(@RequestHeader("Authorization") String token) {
try {
UserInfo userInfo = authService.verifyToken(token);
return Result.success(userInfo);
} catch (Exception e) {
log.error("验证Token失败: error={}", e.getMessage(), e);
return Result.error("验证Token失败: " + e.getMessage());
}
}
}

3.2 认证服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
/**
* 认证服务实现
* 负责用户认证、Token生成、验证
*/
@Service
@Slf4j
public class AuthService {

@Autowired
private UserMapper userMapper;

@Autowired
private UserQueryService userQueryService;

@Autowired
private LoginRecordMapper loginRecordMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private PasswordEncoder passwordEncoder;

@Autowired
private JwtTokenUtil jwtTokenUtil;

@Value("${jwt.token.expiration:3600}")
private Long tokenExpiration;

@Value("${jwt.refresh.token.expiration:86400}")
private Long refreshTokenExpiration;

/**
* 用户登录
*/
@Transactional(rollbackFor = Exception.class)
public LoginResponse login(UserLoginRequest request) {
try {
// 1. 查询用户
User user = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getUsername, request.getUsername())
.or()
.eq(User::getPhone, request.getUsername())
.or()
.eq(User::getEmail, request.getUsername()));
if (user == null) {
throw new BusinessException("用户名或密码错误");
}

// 2. 验证用户状态
if (!"ACTIVE".equals(user.getStatus())) {
throw new BusinessException("用户已被禁用");
}

// 3. 验证密码
if (!passwordEncoder.matches(request.getPassword(), user.getPassword())) {
// 记录登录失败
recordLoginFailure(user.getUserId(), request.getIp(), "密码错误");
throw new BusinessException("用户名或密码错误");
}

// 4. 生成Token
String accessToken = jwtTokenUtil.generateToken(user.getUserId(), user.getUsername(), tokenExpiration);
String refreshToken = jwtTokenUtil.generateToken(user.getUserId(), user.getUsername(), refreshTokenExpiration);

// 5. 缓存Token
String tokenKey = "user:token:" + user.getUserId();
redisTemplate.opsForValue().set(tokenKey, accessToken, tokenExpiration, TimeUnit.SECONDS);

String refreshTokenKey = "user:refresh:token:" + user.getUserId();
redisTemplate.opsForValue().set(refreshTokenKey, refreshToken, refreshTokenExpiration, TimeUnit.SECONDS);

// 6. 缓存用户会话
String sessionKey = "user:session:" + user.getUserId();
UserSession session = new UserSession();
session.setUserId(user.getUserId());
session.setUsername(user.getUsername());
session.setAccessToken(accessToken);
session.setRefreshToken(refreshToken);
session.setLoginTime(LocalDateTime.now());
session.setLoginIp(request.getIp());
redisTemplate.opsForValue().set(sessionKey, session, tokenExpiration, TimeUnit.SECONDS);

// 7. 记录登录成功
recordLoginSuccess(user.getUserId(), request.getIp());

// 8. 获取用户信息
UserInfo userInfo = userQueryService.getUserInfo(user.getUserId());

// 9. 构建登录响应
LoginResponse loginResponse = new LoginResponse();
loginResponse.setAccessToken(accessToken);
loginResponse.setRefreshToken(refreshToken);
loginResponse.setTokenType("Bearer");
loginResponse.setExpiresIn(tokenExpiration);
loginResponse.setUserInfo(userInfo);

// 10. 发送登录事件
sendLoginEvent(user.getUserId(), request.getIp());

log.info("用户登录成功: userId={}, username={}", user.getUserId(), user.getUsername());

return loginResponse;

} catch (Exception e) {
log.error("用户登录失败: username={}, error={}",
request.getUsername(), e.getMessage(), e);
throw new BusinessException("用户登录失败: " + e.getMessage());
}
}

/**
* 用户登出
*/
public void logout(String token) {
try {
// 1. 解析Token
String userId = jwtTokenUtil.getUserIdFromToken(token);
if (userId == null) {
throw new BusinessException("Token无效");
}

// 2. 清除Token缓存
String tokenKey = "user:token:" + userId;
redisTemplate.delete(tokenKey);

String refreshTokenKey = "user:refresh:token:" + userId;
redisTemplate.delete(refreshTokenKey);

// 3. 清除用户会话
String sessionKey = "user:session:" + userId;
redisTemplate.delete(sessionKey);

// 4. 发送登出事件
sendLogoutEvent(userId);

log.info("用户登出成功: userId={}", userId);

} catch (Exception e) {
log.error("用户登出失败: error={}", e.getMessage(), e);
throw new BusinessException("用户登出失败: " + e.getMessage());
}
}

/**
* 刷新Token
*/
public LoginResponse refreshToken(String refreshToken) {
try {
// 1. 验证Refresh Token
if (!jwtTokenUtil.validateToken(refreshToken)) {
throw new BusinessException("Refresh Token无效");
}

// 2. 获取用户ID
String userId = jwtTokenUtil.getUserIdFromToken(refreshToken);
if (userId == null) {
throw new BusinessException("Refresh Token无效");
}

// 3. 验证Refresh Token是否在缓存中
String refreshTokenKey = "user:refresh:token:" + userId;
String cachedRefreshToken = (String) redisTemplate.opsForValue().get(refreshTokenKey);
if (!refreshToken.equals(cachedRefreshToken)) {
throw new BusinessException("Refresh Token无效");
}

// 4. 获取用户信息
User user = userMapper.selectOne(
new LambdaQueryWrapper<User>()
.eq(User::getUserId, userId));
if (user == null) {
throw new BusinessException("用户不存在");
}

// 5. 生成新的Token
String newAccessToken = jwtTokenUtil.generateToken(user.getUserId(), user.getUsername(), tokenExpiration);
String newRefreshToken = jwtTokenUtil.generateToken(user.getUserId(), user.getUsername(), refreshTokenExpiration);

// 6. 更新Token缓存
String tokenKey = "user:token:" + userId;
redisTemplate.opsForValue().set(tokenKey, newAccessToken, tokenExpiration, TimeUnit.SECONDS);
redisTemplate.opsForValue().set(refreshTokenKey, newRefreshToken, refreshTokenExpiration, TimeUnit.SECONDS);

// 7. 获取用户信息
UserInfo userInfo = userQueryService.getUserInfo(userId);

// 8. 构建登录响应
LoginResponse loginResponse = new LoginResponse();
loginResponse.setAccessToken(newAccessToken);
loginResponse.setRefreshToken(newRefreshToken);
loginResponse.setTokenType("Bearer");
loginResponse.setExpiresIn(tokenExpiration);
loginResponse.setUserInfo(userInfo);

log.info("刷新Token成功: userId={}", userId);

return loginResponse;

} catch (Exception e) {
log.error("刷新Token失败: error={}", e.getMessage(), e);
throw new BusinessException("刷新Token失败: " + e.getMessage());
}
}

/**
* 验证Token
*/
public UserInfo verifyToken(String token) {
try {
// 1. 验证Token
if (!jwtTokenUtil.validateToken(token)) {
throw new BusinessException("Token无效");
}

// 2. 获取用户ID
String userId = jwtTokenUtil.getUserIdFromToken(token);
if (userId == null) {
throw new BusinessException("Token无效");
}

// 3. 验证Token是否在缓存中
String tokenKey = "user:token:" + userId;
String cachedToken = (String) redisTemplate.opsForValue().get(tokenKey);
if (!token.equals(cachedToken)) {
throw new BusinessException("Token已失效");
}

// 4. 获取用户信息
UserInfo userInfo = userQueryService.getUserInfo(userId);

return userInfo;

} catch (Exception e) {
log.error("验证Token失败: error={}", e.getMessage(), e);
throw new BusinessException("验证Token失败: " + e.getMessage());
}
}

/**
* 记录登录成功
*/
private void recordLoginSuccess(String userId, String ip) {
try {
LoginRecord record = new LoginRecord();
record.setUserId(userId);
record.setLoginIp(ip);
record.setLoginTime(LocalDateTime.now());
record.setStatus("SUCCESS");
record.setCreateTime(LocalDateTime.now());

loginRecordMapper.insert(record);

} catch (Exception e) {
log.error("记录登录成功失败: error={}", e.getMessage(), e);
}
}

/**
* 记录登录失败
*/
private void recordLoginFailure(String userId, String ip, String reason) {
try {
LoginRecord record = new LoginRecord();
record.setUserId(userId);
record.setLoginIp(ip);
record.setLoginTime(LocalDateTime.now());
record.setStatus("FAILED");
record.setFailureReason(reason);
record.setCreateTime(LocalDateTime.now());

loginRecordMapper.insert(record);

} catch (Exception e) {
log.error("记录登录失败失败: error={}", e.getMessage(), e);
}
}

/**
* 发送登录事件
*/
private void sendLoginEvent(String userId, String ip) {
try {
LoginEvent event = new LoginEvent();
event.setEventType("USER_LOGIN");
event.setUserId(userId);
event.setLoginIp(ip);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("user.login", userId, message);

} catch (Exception e) {
log.error("发送登录事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送登出事件
*/
private void sendLogoutEvent(String userId) {
try {
LogoutEvent event = new LogoutEvent();
event.setEventType("USER_LOGOUT");
event.setUserId(userId);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("user.logout", userId, message);

} catch (Exception e) {
log.error("发送登出事件失败: error={}", e.getMessage(), e);
}
}
}

4. 积分服务实现

4.1 积分服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* 积分服务控制器
* 提供积分管理接口
*/
@RestController
@RequestMapping("/api/points")
@Slf4j
public class PointsController {

@Autowired
private PointsService pointsService;

@Autowired
private PointsQueryService pointsQueryService;

/**
* 获取用户积分
*/
@GetMapping("/{userId}")
public Result<PointsInfo> getUserPoints(@PathVariable String userId) {
try {
PointsInfo pointsInfo = pointsQueryService.getUserPoints(userId);
return Result.success(pointsInfo);
} catch (Exception e) {
log.error("获取用户积分失败: userId={}, error={}",
userId, e.getMessage(), e);
return Result.error("获取用户积分失败: " + e.getMessage());
}
}

/**
* 增加积分
*/
@PostMapping("/add")
public Result<Void> addPoints(@RequestBody @Valid PointsAddRequest request) {
try {
pointsService.addPoints(request);
return Result.success();
} catch (Exception e) {
log.error("增加积分失败: error={}", e.getMessage(), e);
return Result.error("增加积分失败: " + e.getMessage());
}
}

/**
* 扣减积分
*/
@PostMapping("/deduct")
public Result<Void> deductPoints(@RequestBody @Valid PointsDeductRequest request) {
try {
pointsService.deductPoints(request);
return Result.success();
} catch (Exception e) {
log.error("扣减积分失败: error={}", e.getMessage(), e);
return Result.error("扣减积分失败: " + e.getMessage());
}
}

/**
* 获取积分记录
*/
@GetMapping("/{userId}/records")
public Result<PageResult<PointsRecord>> getPointsRecords(
@PathVariable String userId,
@RequestParam(required = false) String changeType,
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "10") Integer pageSize) {
try {
PageResult<PointsRecord> result = pointsQueryService.getPointsRecords(
userId, changeType, pageNum, pageSize);
return Result.success(result);
} catch (Exception e) {
log.error("获取积分记录失败: userId={}, error={}",
userId, e.getMessage(), e);
return Result.error("获取积分记录失败: " + e.getMessage());
}
}
}

4.2 积分服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/**
* 积分服务实现
* 负责积分增加、扣减、查询
*/
@Service
@Slf4j
public class PointsService {

@Autowired
private UserPointsMapper userPointsMapper;

@Autowired
private PointsRecordMapper pointsRecordMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private DistributedLock distributedLock;

/**
* 初始化用户积分
*/
@Transactional(rollbackFor = Exception.class)
public void initUserPoints(String userId) {
try {
// 检查是否已初始化
UserPoints existingPoints = userPointsMapper.selectOne(
new LambdaQueryWrapper<UserPoints>()
.eq(UserPoints::getUserId, userId));
if (existingPoints != null) {
return;
}

// 创建用户积分记录
UserPoints userPoints = new UserPoints();
userPoints.setUserId(userId);
userPoints.setTotalPoints(0);
userPoints.setAvailablePoints(0);
userPoints.setUsedPoints(0);
userPoints.setCreateTime(LocalDateTime.now());
userPoints.setUpdateTime(LocalDateTime.now());

userPointsMapper.insert(userPoints);

log.info("初始化用户积分成功: userId={}", userId);

} catch (Exception e) {
log.error("初始化用户积分失败: userId={}, error={}",
userId, e.getMessage(), e);
throw new BusinessException("初始化用户积分失败: " + e.getMessage());
}
}

/**
* 增加积分
*/
@Transactional(rollbackFor = Exception.class)
public void addPoints(PointsAddRequest request) {
String lockKey = "points:add:" + request.getUserId();
String lockValue = UUID.randomUUID().toString();

try {
// 1. 获取分布式锁
boolean lockAcquired = distributedLock.tryLock(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (!lockAcquired) {
throw new BusinessException("积分增加中,请稍候");
}

// 2. 查询用户积分
UserPoints userPoints = userPointsMapper.selectOne(
new LambdaQueryWrapper<UserPoints>()
.eq(UserPoints::getUserId, request.getUserId()));
if (userPoints == null) {
initUserPoints(request.getUserId());
userPoints = userPointsMapper.selectOne(
new LambdaQueryWrapper<UserPoints>()
.eq(UserPoints::getUserId, request.getUserId()));
}

// 3. 增加积分
userPoints.setTotalPoints(userPoints.getTotalPoints() + request.getPoints());
userPoints.setAvailablePoints(userPoints.getAvailablePoints() + request.getPoints());
userPoints.setUpdateTime(LocalDateTime.now());

userPointsMapper.updateById(userPoints);

// 4. 记录积分变更
PointsRecord record = new PointsRecord();
record.setUserId(request.getUserId());
record.setChangeType("ADD");
record.setChangePoints(request.getPoints());
record.setBeforePoints(userPoints.getAvailablePoints() - request.getPoints());
record.setAfterPoints(userPoints.getAvailablePoints());
record.setSourceType(request.getSourceType());
record.setSourceId(request.getSourceId());
record.setRemark(request.getRemark());
record.setCreateTime(LocalDateTime.now());

pointsRecordMapper.insert(record);

// 5. 清除缓存
String cacheKey = "points:info:" + request.getUserId();
redisTemplate.delete(cacheKey);

// 6. 发送积分增加事件
sendPointsAddEvent(request.getUserId(), request.getPoints(), request.getSourceType());

log.info("增加积分成功: userId={}, points={}, sourceType={}",
request.getUserId(), request.getPoints(), request.getSourceType());

} catch (Exception e) {
log.error("增加积分失败: userId={}, error={}",
request.getUserId(), e.getMessage(), e);
throw new BusinessException("增加积分失败: " + e.getMessage());
} finally {
// 释放分布式锁
distributedLock.releaseLock(lockKey, lockValue);
}
}

/**
* 扣减积分
*/
@Transactional(rollbackFor = Exception.class)
public void deductPoints(PointsDeductRequest request) {
String lockKey = "points:deduct:" + request.getUserId();
String lockValue = UUID.randomUUID().toString();

try {
// 1. 获取分布式锁
boolean lockAcquired = distributedLock.tryLock(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (!lockAcquired) {
throw new BusinessException("积分扣减中,请稍候");
}

// 2. 查询用户积分
UserPoints userPoints = userPointsMapper.selectOne(
new LambdaQueryWrapper<UserPoints>()
.eq(UserPoints::getUserId, request.getUserId()));
if (userPoints == null) {
throw new BusinessException("用户积分不存在");
}

// 3. 验证积分是否足够
if (userPoints.getAvailablePoints() < request.getPoints()) {
throw new BusinessException("积分不足: availablePoints=" + userPoints.getAvailablePoints() + ", points=" + request.getPoints());
}

// 4. 扣减积分
userPoints.setAvailablePoints(userPoints.getAvailablePoints() - request.getPoints());
userPoints.setUsedPoints(userPoints.getUsedPoints() + request.getPoints());
userPoints.setUpdateTime(LocalDateTime.now());

userPointsMapper.updateById(userPoints);

// 5. 记录积分变更
PointsRecord record = new PointsRecord();
record.setUserId(request.getUserId());
record.setChangeType("DEDUCT");
record.setChangePoints(request.getPoints());
record.setBeforePoints(userPoints.getAvailablePoints() + request.getPoints());
record.setAfterPoints(userPoints.getAvailablePoints());
record.setSourceType(request.getSourceType());
record.setSourceId(request.getSourceId());
record.setRemark(request.getRemark());
record.setCreateTime(LocalDateTime.now());

pointsRecordMapper.insert(record);

// 6. 清除缓存
String cacheKey = "points:info:" + request.getUserId();
redisTemplate.delete(cacheKey);

// 7. 发送积分扣减事件
sendPointsDeductEvent(request.getUserId(), request.getPoints(), request.getSourceType());

log.info("扣减积分成功: userId={}, points={}, sourceType={}",
request.getUserId(), request.getPoints(), request.getSourceType());

} catch (Exception e) {
log.error("扣减积分失败: userId={}, error={}",
request.getUserId(), e.getMessage(), e);
throw new BusinessException("扣减积分失败: " + e.getMessage());
} finally {
// 释放分布式锁
distributedLock.releaseLock(lockKey, lockValue);
}
}

/**
* 发送积分增加事件
*/
private void sendPointsAddEvent(String userId, Integer points, String sourceType) {
try {
PointsEvent event = new PointsEvent();
event.setEventType("POINTS_ADD");
event.setUserId(userId);
event.setPoints(points);
event.setSourceType(sourceType);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("points.add", userId, message);

} catch (Exception e) {
log.error("发送积分增加事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送积分扣减事件
*/
private void sendPointsDeductEvent(String userId, Integer points, String sourceType) {
try {
PointsEvent event = new PointsEvent();
event.setEventType("POINTS_DEDUCT");
event.setUserId(userId);
event.setPoints(points);
event.setSourceType(sourceType);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("points.deduct", userId, message);

} catch (Exception e) {
log.error("发送积分扣减事件失败: error={}", e.getMessage(), e);
}
}
}

4.3 积分查询服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* 积分查询服务实现
* 负责积分查询
*/
@Service
@Slf4j
public class PointsQueryService {

@Autowired
private UserPointsMapper userPointsMapper;

@Autowired
private PointsRecordMapper pointsRecordMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 获取用户积分
*/
public PointsInfo getUserPoints(String userId) {
try {
// 1. 从缓存获取
String cacheKey = "points:info:" + userId;
PointsInfo cachedInfo = (PointsInfo) redisTemplate.opsForValue().get(cacheKey);
if (cachedInfo != null) {
return cachedInfo;
}

// 2. 从数据库查询
UserPoints userPoints = userPointsMapper.selectOne(
new LambdaQueryWrapper<UserPoints>()
.eq(UserPoints::getUserId, userId));
if (userPoints == null) {
throw new BusinessException("用户积分不存在");
}

// 3. 构建积分信息
PointsInfo pointsInfo = new PointsInfo();
pointsInfo.setUserId(userPoints.getUserId());
pointsInfo.setTotalPoints(userPoints.getTotalPoints());
pointsInfo.setAvailablePoints(userPoints.getAvailablePoints());
pointsInfo.setUsedPoints(userPoints.getUsedPoints());

// 4. 缓存积分信息
redisTemplate.opsForValue().set(cacheKey, pointsInfo, 1, TimeUnit.HOURS);

return pointsInfo;

} catch (Exception e) {
log.error("获取用户积分失败: userId={}, error={}",
userId, e.getMessage(), e);
throw new BusinessException("获取用户积分失败: " + e.getMessage());
}
}

/**
* 获取积分记录
*/
public PageResult<PointsRecord> getPointsRecords(String userId, String changeType,
Integer pageNum, Integer pageSize) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<PointsRecord> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(PointsRecord::getUserId, userId);
if (changeType != null && !changeType.isEmpty()) {
wrapper.eq(PointsRecord::getChangeType, changeType);
}
wrapper.orderByDesc(PointsRecord::getCreateTime);

// 2. 分页查询
Page<PointsRecord> page = new Page<>(pageNum, pageSize);
Page<PointsRecord> result = pointsRecordMapper.selectPage(page, wrapper);

// 3. 构建分页结果
PageResult<PointsRecord> pageResult = new PageResult<>();
pageResult.setList(result.getRecords());
pageResult.setTotal(result.getTotal());
pageResult.setPageNum(pageNum);
pageResult.setPageSize(pageSize);

return pageResult;

} catch (Exception e) {
log.error("获取积分记录失败: userId={}, error={}",
userId, e.getMessage(), e);
throw new BusinessException("获取积分记录失败: " + e.getMessage());
}
}
}

5. Kafka消费者实现

5.1 用户事件消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/**
* 用户事件消费者
* 消费Kafka用户事件消息
*/
@Component
@Slf4j
public class UserEventConsumer {

@Autowired
private UserService userService;

@Autowired
private PointsServiceClient pointsServiceClient;

/**
* 消费用户注册事件
*/
@KafkaListener(topics = "user.register", groupId = "user-event-group")
public void consumeUserRegister(String message) {
try {
UserEvent event = JSON.parseObject(message, UserEvent.class);

// 处理用户注册事件,如发送欢迎消息、赠送积分等
log.info("消费用户注册事件: userId={}", event.getUserId());

// 注册赠送积分
PointsAddRequest pointsRequest = new PointsAddRequest();
pointsRequest.setUserId(event.getUserId());
pointsRequest.setPoints(100);
pointsRequest.setSourceType("REGISTER");
pointsRequest.setSourceId(event.getUserId());
pointsRequest.setRemark("注册赠送积分");
pointsServiceClient.addPoints(pointsRequest);

} catch (Exception e) {
log.error("消费用户注册事件失败: error={}", e.getMessage(), e);
}
}

/**
* 消费用户登录事件
*/
@KafkaListener(topics = "user.login", groupId = "user-event-group")
public void consumeUserLogin(String message) {
try {
LoginEvent event = JSON.parseObject(message, LoginEvent.class);

// 处理用户登录事件,如更新最后登录时间、发送通知等
log.info("消费用户登录事件: userId={}, loginIp={}",
event.getUserId(), event.getLoginIp());

} catch (Exception e) {
log.error("消费用户登录事件失败: error={}", e.getMessage(), e);
}
}

/**
* 消费用户信息变更事件
*/
@KafkaListener(topics = "user.info.change", groupId = "user-event-group")
public void consumeUserInfoChange(String message) {
try {
UserInfoChangeEvent event = JSON.parseObject(message, UserInfoChangeEvent.class);

// 处理用户信息变更事件,如更新缓存、发送通知等
log.info("消费用户信息变更事件: userId={}", event.getUserId());

} catch (Exception e) {
log.error("消费用户信息变更事件失败: error={}", e.getMessage(), e);
}
}
}

5.2 积分事件消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 积分事件消费者
* 消费Kafka积分事件消息
*/
@Component
@Slf4j
public class PointsEventConsumer {

@Autowired
private PointsService pointsService;

/**
* 消费积分增加事件
*/
@KafkaListener(topics = "points.add", groupId = "points-event-group")
public void consumePointsAdd(String message) {
try {
PointsEvent event = JSON.parseObject(message, PointsEvent.class);

// 处理积分增加事件,如发送通知、更新统计等
log.info("消费积分增加事件: userId={}, points={}, sourceType={}",
event.getUserId(), event.getPoints(), event.getSourceType());

} catch (Exception e) {
log.error("消费积分增加事件失败: error={}", e.getMessage(), e);
}
}

/**
* 消费积分扣减事件
*/
@KafkaListener(topics = "points.deduct", groupId = "points-event-group")
public void consumePointsDeduct(String message) {
try {
PointsEvent event = JSON.parseObject(message, PointsEvent.class);

// 处理积分扣减事件,如发送通知、更新统计等
log.info("消费积分扣减事件: userId={}, points={}, sourceType={}",
event.getUserId(), event.getPoints(), event.getSourceType());

} catch (Exception e) {
log.error("消费积分扣减事件失败: error={}", e.getMessage(), e);
}
}
}

6. 数据模型定义

6.1 用户实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 用户实体
*/
@Data
@TableName("t_user")
public class User {

@TableId(type = IdType.AUTO)
private Long id;

private String userId;

private String username;

private String password;

private String phone;

private String email;

private String status;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}

6.2 用户资料实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 用户资料实体
*/
@Data
@TableName("t_user_profile")
public class UserProfile {

@TableId(type = IdType.AUTO)
private Long id;

private String userId;

private String nickname;

private String avatar;

private String gender;

private LocalDate birthday;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}

6.3 用户积分实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 用户积分实体
*/
@Data
@TableName("t_user_points")
public class UserPoints {

@TableId(type = IdType.AUTO)
private Long id;

private String userId;

private Integer totalPoints;

private Integer availablePoints;

private Integer usedPoints;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}

6.4 积分记录实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 积分记录实体
*/
@Data
@TableName("t_points_record")
public class PointsRecord {

@TableId(type = IdType.AUTO)
private Long id;

private String userId;

private String changeType;

private Integer changePoints;

private Integer beforePoints;

private Integer afterPoints;

private String sourceType;

private String sourceId;

private String remark;

private LocalDateTime createTime;
}

6.5 登录记录实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 登录记录实体
*/
@Data
@TableName("t_login_record")
public class LoginRecord {

@TableId(type = IdType.AUTO)
private Long id;

private String userId;

private String loginIp;

private LocalDateTime loginTime;

private String status;

private String failureReason;

private LocalDateTime createTime;
}

7. 数据库设计

7.1 用户表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
CREATE TABLE `t_user` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` VARCHAR(64) NOT NULL COMMENT '用户ID',
`username` VARCHAR(64) NOT NULL COMMENT '用户名',
`password` VARCHAR(128) NOT NULL COMMENT '密码',
`phone` VARCHAR(32) DEFAULT NULL COMMENT '手机号',
`email` VARCHAR(128) DEFAULT NULL COMMENT '邮箱',
`status` VARCHAR(32) NOT NULL DEFAULT 'ACTIVE' COMMENT '状态:ACTIVE-激活, DISABLED-禁用, LOCKED-锁定',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_user_id` (`user_id`),
UNIQUE KEY `uk_username` (`username`),
KEY `idx_phone` (`phone`),
KEY `idx_email` (`email`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户表';

7.2 用户资料表

1
2
3
4
5
6
7
8
9
10
11
12
CREATE TABLE `t_user_profile` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` VARCHAR(64) NOT NULL COMMENT '用户ID',
`nickname` VARCHAR(64) DEFAULT NULL COMMENT '昵称',
`avatar` VARCHAR(512) DEFAULT NULL COMMENT '头像',
`gender` VARCHAR(8) DEFAULT NULL COMMENT '性别:MALE-男, FEMALE-女',
`birthday` DATE DEFAULT NULL COMMENT '生日',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户资料表';

7.3 用户积分表

1
2
3
4
5
6
7
8
9
10
11
CREATE TABLE `t_user_points` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` VARCHAR(64) NOT NULL COMMENT '用户ID',
`total_points` INT(11) NOT NULL DEFAULT 0 COMMENT '总积分',
`available_points` INT(11) NOT NULL DEFAULT 0 COMMENT '可用积分',
`used_points` INT(11) NOT NULL DEFAULT 0 COMMENT '已用积分',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户积分表';

7.4 积分记录表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
CREATE TABLE `t_points_record` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` VARCHAR(64) NOT NULL COMMENT '用户ID',
`change_type` VARCHAR(32) NOT NULL COMMENT '变更类型:ADD-增加, DEDUCT-扣减',
`change_points` INT(11) NOT NULL COMMENT '变更积分',
`before_points` INT(11) NOT NULL COMMENT '变更前积分',
`after_points` INT(11) NOT NULL COMMENT '变更后积分',
`source_type` VARCHAR(32) DEFAULT NULL COMMENT '来源类型:REGISTER-注册, ORDER-订单, ACTIVITY-活动',
`source_id` VARCHAR(64) DEFAULT NULL COMMENT '来源ID',
`remark` VARCHAR(512) DEFAULT NULL COMMENT '备注',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_user_id` (`user_id`),
KEY `idx_change_type` (`change_type`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='积分记录表';

7.5 登录记录表

1
2
3
4
5
6
7
8
9
10
11
12
13
CREATE TABLE `t_login_record` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` VARCHAR(64) NOT NULL COMMENT '用户ID',
`login_ip` VARCHAR(64) DEFAULT NULL COMMENT '登录IP',
`login_time` DATETIME NOT NULL COMMENT '登录时间',
`status` VARCHAR(32) NOT NULL COMMENT '状态:SUCCESS-成功, FAILED-失败',
`failure_reason` VARCHAR(512) DEFAULT NULL COMMENT '失败原因',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_user_id` (`user_id`),
KEY `idx_status` (`status`),
KEY `idx_login_time` (`login_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='登录记录表';

8. 配置类

8.1 Kafka配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
group-id: user-service-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
auto-offset-reset: latest

8.2 Redis配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# application.yml
spring:
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 3000
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0

8.3 JWT配置

1
2
3
4
5
6
7
8
# application.yml
jwt:
secret: your-secret-key
token:
expiration: 3600
refresh:
token:
expiration: 86400

9. 总结

本文深入讲解了用户服务完整逻辑代码的Java微服务后端架构实战,涵盖了以下核心内容:

  1. 用户服务:实现用户注册、登录、信息管理、查询、统计
  2. 认证服务:实现用户认证、Token生成、验证、权限管理、会话管理
  3. 积分服务:实现积分查询、增加、扣减、记录、统计
  4. 通知服务:实现短信、邮件、站内消息、推送通知
  5. Kafka消息队列:实现用户、认证、积分的异步处理和消息通知
  6. 多服务协作:通过Kafka和Feign实现用户服务、认证服务、积分服务、通知服务之间的解耦和协作
  7. 分布式事务:通过分布式锁、事务管理保证数据一致性
  8. 数据库设计:完整的用户、用户资料、积分、积分记录、登录记录表设计
  9. 缓存优化:通过Redis缓存用户信息、Token信息、分布式锁
  10. 性能优化:通过连接池、批量处理、异步处理提升系统性能

通过本文的学习,读者可以掌握如何基于Java微服务架构实现一个完整的用户服务系统,包含用户注册、登录、信息管理、积分管理、多服务协作等完整业务逻辑代码,为实际项目的用户服务开发提供参考和指导。