财务管理逻辑代码Java微服务后端架构实战

1. 架构概述

财务管理系统是电商平台的核心业务模块,需要支持账户管理、资金流水、账单管理、对账管理、财务报表、支付管理和退款管理等功能。本篇文章将深入讲解如何基于Java微服务架构实现一个完整的财务管理系统,包含账户服务、流水服务、账单服务、对账服务等多个微服务的完整业务逻辑代码。

1.1 系统架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
财务管理完整架构
├── 账户服务 (Account Service)
│ ├── 账户创建
│ ├── 账户查询
│ ├── 账户余额管理
│ ├── 账户冻结/解冻
│ └── 账户统计

├── 流水服务 (Transaction Service)
│ ├── 资金流水记录
│ ├── 收入流水
│ ├── 支出流水
│ ├── 转账流水
│ └── 流水查询

├── 账单服务 (Bill Service)
│ ├── 账单生成
│ ├── 账单查询
│ ├── 账单统计
│ └── 账单对账

├── 对账服务 (Reconciliation Service)
│ ├── 对账任务创建
│ ├── 对账处理
│ ├── 对账结果查询
│ └── 对账差异处理

├── 报表服务 (Report Service)
│ ├── 收入报表
│ ├── 支出报表
│ ├── 利润报表
│ └── 财务报表

├── Kafka消息队列
│ ├── 账户变更Topic
│ ├── 流水记录Topic
│ ├── 账单生成Topic
│ ├── 对账任务Topic
│ └── 报表生成Topic

└── 数据库/缓存
├── MySQL - 账户信息、流水记录、账单记录、对账记录
└── Redis - 账户余额缓存、分布式锁、统计数据

1.2 核心组件

  • 账户服务(Account Service):负责账户创建、查询、余额管理、冻结/解冻、统计
  • 流水服务(Transaction Service):负责资金流水记录、收入、支出、转账、查询
  • 账单服务(Bill Service):负责账单生成、查询、统计、对账
  • 对账服务(Reconciliation Service):负责对账任务创建、处理、结果查询、差异处理
  • 报表服务(Report Service):负责收入报表、支出报表、利润报表、财务报表
  • Kafka消息队列:负责账户、流水、账单、对账的异步处理和消息通知
  • 数据库(MySQL):持久化账户信息、流水记录、账单记录、对账记录
  • 缓存(Redis):缓存账户余额、分布式锁、统计数据

2. 账户服务实现

2.1 账户服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/**
* 账户服务控制器
* 提供账户管理接口
*/
@RestController
@RequestMapping("/api/account")
@Slf4j
public class AccountController {

@Autowired
private AccountService accountService;

@Autowired
private AccountQueryService accountQueryService;

/**
* 创建账户
*/
@PostMapping("/create")
public Result<AccountInfo> createAccount(@RequestBody @Valid AccountCreateRequest request) {
try {
AccountInfo accountInfo = accountService.createAccount(request);
return Result.success(accountInfo);
} catch (Exception e) {
log.error("创建账户失败: error={}", e.getMessage(), e);
return Result.error("创建账户失败: " + e.getMessage());
}
}

/**
* 获取账户信息
*/
@GetMapping("/{accountId}")
public Result<AccountInfo> getAccountInfo(@PathVariable String accountId) {
try {
AccountInfo accountInfo = accountQueryService.getAccountInfo(accountId);
return Result.success(accountInfo);
} catch (Exception e) {
log.error("获取账户信息失败: accountId={}, error={}",
accountId, e.getMessage(), e);
return Result.error("获取账户信息失败: " + e.getMessage());
}
}

/**
* 账户充值
*/
@PostMapping("/{accountId}/recharge")
public Result<Void> recharge(
@PathVariable String accountId,
@RequestBody @Valid RechargeRequest request) {
try {
accountService.recharge(accountId, request);
return Result.success();
} catch (Exception e) {
log.error("账户充值失败: accountId={}, error={}",
accountId, e.getMessage(), e);
return Result.error("账户充值失败: " + e.getMessage());
}
}

/**
* 账户提现
*/
@PostMapping("/{accountId}/withdraw")
public Result<Void> withdraw(
@PathVariable String accountId,
@RequestBody @Valid WithdrawRequest request) {
try {
accountService.withdraw(accountId, request);
return Result.success();
} catch (Exception e) {
log.error("账户提现失败: accountId={}, error={}",
accountId, e.getMessage(), e);
return Result.error("账户提现失败: " + e.getMessage());
}
}

/**
* 冻结账户
*/
@PostMapping("/{accountId}/freeze")
public Result<Void> freezeAccount(
@PathVariable String accountId,
@RequestBody @Valid AccountFreezeRequest request) {
try {
accountService.freezeAccount(accountId, request.getFreezeReason());
return Result.success();
} catch (Exception e) {
log.error("冻结账户失败: accountId={}, error={}",
accountId, e.getMessage(), e);
return Result.error("冻结账户失败: " + e.getMessage());
}
}

/**
* 解冻账户
*/
@PostMapping("/{accountId}/unfreeze")
public Result<Void> unfreezeAccount(@PathVariable String accountId) {
try {
accountService.unfreezeAccount(accountId);
return Result.success();
} catch (Exception e) {
log.error("解冻账户失败: accountId={}, error={}",
accountId, e.getMessage(), e);
return Result.error("解冻账户失败: " + e.getMessage());
}
}

/**
* 获取账户列表
*/
@GetMapping("/list")
public Result<PageResult<AccountInfo>> getAccountList(
@RequestParam(required = false) String userId,
@RequestParam(required = false) String accountType,
@RequestParam(required = false) String status,
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "10") Integer pageSize) {
try {
PageResult<AccountInfo> result = accountQueryService.getAccountList(
userId, accountType, status, pageNum, pageSize);
return Result.success(result);
} catch (Exception e) {
log.error("获取账户列表失败: error={}", e.getMessage(), e);
return Result.error("获取账户列表失败: " + e.getMessage());
}
}

/**
* 获取账户统计
*/
@GetMapping("/statistics")
public Result<AccountStatistics> getAccountStatistics(
@RequestParam(required = false) String userId,
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime) {
try {
AccountStatistics statistics = accountQueryService.getAccountStatistics(
userId, startTime, endTime);
return Result.success(statistics);
} catch (Exception e) {
log.error("获取账户统计失败: error={}", e.getMessage(), e);
return Result.error("获取账户统计失败: " + e.getMessage());
}
}
}

2.2 账户服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
/**
* 账户服务实现
* 负责账户创建、充值、提现、冻结/解冻
*/
@Service
@Slf4j
public class AccountService {

@Autowired
private AccountMapper accountMapper;

@Autowired
private TransactionServiceClient transactionServiceClient;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private DistributedLock distributedLock;

/**
* 创建账户
*/
@Transactional(rollbackFor = Exception.class)
public AccountInfo createAccount(AccountCreateRequest request) {
String lockKey = "account:create:" + request.getUserId();
String lockValue = UUID.randomUUID().toString();

try {
// 1. 获取分布式锁
boolean lockAcquired = distributedLock.tryLock(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (!lockAcquired) {
throw new BusinessException("账户创建中,请稍候");
}

// 2. 检查账户是否已存在
Account existingAccount = accountMapper.selectOne(
new LambdaQueryWrapper<Account>()
.eq(Account::getUserId, request.getUserId())
.eq(Account::getAccountType, request.getAccountType()));
if (existingAccount != null) {
throw new BusinessException("账户已存在");
}

// 3. 生成账户ID
String accountId = generateAccountId();

// 4. 创建账户
Account account = new Account();
account.setAccountId(accountId);
account.setUserId(request.getUserId());
account.setAccountType(request.getAccountType());
account.setAccountName(request.getAccountName());
account.setBalance(BigDecimal.ZERO);
account.setFrozenBalance(BigDecimal.ZERO);
account.setAvailableBalance(BigDecimal.ZERO);
account.setStatus("ACTIVE");
account.setCreateTime(LocalDateTime.now());
account.setUpdateTime(LocalDateTime.now());

accountMapper.insert(account);

// 5. 构建账户信息
AccountInfo accountInfo = convertToAccountInfo(account);

// 6. 缓存账户信息
String cacheKey = "account:info:" + accountId;
redisTemplate.opsForValue().set(cacheKey, accountInfo, 1, TimeUnit.HOURS);

// 7. 发送账户创建事件
sendAccountCreateEvent(accountInfo);

log.info("创建账户成功: accountId={}, userId={}, accountType={}",
accountId, request.getUserId(), request.getAccountType());

return accountInfo;

} catch (Exception e) {
log.error("创建账户失败: userId={}, error={}",
request.getUserId(), e.getMessage(), e);
throw new BusinessException("创建账户失败: " + e.getMessage());
} finally {
// 释放分布式锁
distributedLock.releaseLock(lockKey, lockValue);
}
}

/**
* 账户充值
*/
@Transactional(rollbackFor = Exception.class)
public void recharge(String accountId, RechargeRequest request) {
String lockKey = "account:recharge:" + accountId;
String lockValue = UUID.randomUUID().toString();

try {
// 1. 获取分布式锁
boolean lockAcquired = distributedLock.tryLock(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (!lockAcquired) {
throw new BusinessException("账户充值中,请稍候");
}

// 2. 获取账户
Account account = accountMapper.selectOne(
new LambdaQueryWrapper<Account>()
.eq(Account::getAccountId, accountId));
if (account == null) {
throw new BusinessException("账户不存在");
}

// 3. 验证账户状态
if (!"ACTIVE".equals(account.getStatus())) {
throw new BusinessException("账户状态不允许充值: " + account.getStatus());
}

// 4. 更新账户余额
BigDecimal oldBalance = account.getBalance();
account.setBalance(account.getBalance().add(request.getAmount()));
account.setAvailableBalance(account.getAvailableBalance().add(request.getAmount()));
account.setUpdateTime(LocalDateTime.now());

accountMapper.updateById(account);

// 5. 记录资金流水
TransactionCreateRequest transactionRequest = new TransactionCreateRequest();
transactionRequest.setAccountId(accountId);
transactionRequest.setUserId(account.getUserId());
transactionRequest.setTransactionType("RECHARGE");
transactionRequest.setAmount(request.getAmount());
transactionRequest.setBeforeBalance(oldBalance);
transactionRequest.setAfterBalance(account.getBalance());
transactionRequest.setSourceType(request.getSourceType());
transactionRequest.setSourceId(request.getSourceId());
transactionRequest.setRemark(request.getRemark());
transactionServiceClient.createTransaction(transactionRequest);

// 6. 清除缓存
String cacheKey = "account:info:" + accountId;
redisTemplate.delete(cacheKey);

// 7. 发送账户充值事件
sendAccountRechargeEvent(accountId, request.getAmount());

log.info("账户充值成功: accountId={}, amount={}, balance={}",
accountId, request.getAmount(), account.getBalance());

} catch (Exception e) {
log.error("账户充值失败: accountId={}, error={}",
accountId, e.getMessage(), e);
throw new BusinessException("账户充值失败: " + e.getMessage());
} finally {
// 释放分布式锁
distributedLock.releaseLock(lockKey, lockValue);
}
}

/**
* 账户提现
*/
@Transactional(rollbackFor = Exception.class)
public void withdraw(String accountId, WithdrawRequest request) {
String lockKey = "account:withdraw:" + accountId;
String lockValue = UUID.randomUUID().toString();

try {
// 1. 获取分布式锁
boolean lockAcquired = distributedLock.tryLock(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (!lockAcquired) {
throw new BusinessException("账户提现中,请稍候");
}

// 2. 获取账户
Account account = accountMapper.selectOne(
new LambdaQueryWrapper<Account>()
.eq(Account::getAccountId, accountId));
if (account == null) {
throw new BusinessException("账户不存在");
}

// 3. 验证账户状态
if (!"ACTIVE".equals(account.getStatus())) {
throw new BusinessException("账户状态不允许提现: " + account.getStatus());
}

// 4. 验证账户余额
if (account.getAvailableBalance().compareTo(request.getAmount()) < 0) {
throw new BusinessException("账户余额不足: availableBalance=" + account.getAvailableBalance() + ", amount=" + request.getAmount());
}

// 5. 更新账户余额
BigDecimal oldBalance = account.getBalance();
account.setBalance(account.getBalance().subtract(request.getAmount()));
account.setAvailableBalance(account.getAvailableBalance().subtract(request.getAmount()));
account.setUpdateTime(LocalDateTime.now());

accountMapper.updateById(account);

// 6. 记录资金流水
TransactionCreateRequest transactionRequest = new TransactionCreateRequest();
transactionRequest.setAccountId(accountId);
transactionRequest.setUserId(account.getUserId());
transactionRequest.setTransactionType("WITHDRAW");
transactionRequest.setAmount(request.getAmount());
transactionRequest.setBeforeBalance(oldBalance);
transactionRequest.setAfterBalance(account.getBalance());
transactionRequest.setSourceType(request.getSourceType());
transactionRequest.setSourceId(request.getSourceId());
transactionRequest.setRemark(request.getRemark());
transactionServiceClient.createTransaction(transactionRequest);

// 7. 清除缓存
String cacheKey = "account:info:" + accountId;
redisTemplate.delete(cacheKey);

// 8. 发送账户提现事件
sendAccountWithdrawEvent(accountId, request.getAmount());

log.info("账户提现成功: accountId={}, amount={}, balance={}",
accountId, request.getAmount(), account.getBalance());

} catch (Exception e) {
log.error("账户提现失败: accountId={}, error={}",
accountId, e.getMessage(), e);
throw new BusinessException("账户提现失败: " + e.getMessage());
} finally {
// 释放分布式锁
distributedLock.releaseLock(lockKey, lockValue);
}
}

/**
* 冻结账户
*/
@Transactional(rollbackFor = Exception.class)
public void freezeAccount(String accountId, String freezeReason) {
try {
// 1. 获取账户
Account account = accountMapper.selectOne(
new LambdaQueryWrapper<Account>()
.eq(Account::getAccountId, accountId));
if (account == null) {
throw new BusinessException("账户不存在");
}

// 2. 验证账户状态
if ("FROZEN".equals(account.getStatus())) {
throw new BusinessException("账户已冻结");
}

// 3. 冻结账户余额
account.setFrozenBalance(account.getAvailableBalance());
account.setAvailableBalance(BigDecimal.ZERO);
account.setStatus("FROZEN");
account.setFreezeReason(freezeReason);
account.setFreezeTime(LocalDateTime.now());
account.setUpdateTime(LocalDateTime.now());

accountMapper.updateById(account);

// 4. 清除缓存
String cacheKey = "account:info:" + accountId;
redisTemplate.delete(cacheKey);

// 5. 发送账户冻结事件
sendAccountFreezeEvent(accountId, freezeReason);

log.info("冻结账户成功: accountId={}, freezeReason={}", accountId, freezeReason);

} catch (Exception e) {
log.error("冻结账户失败: accountId={}, error={}",
accountId, e.getMessage(), e);
throw new BusinessException("冻结账户失败: " + e.getMessage());
}
}

/**
* 解冻账户
*/
@Transactional(rollbackFor = Exception.class)
public void unfreezeAccount(String accountId) {
try {
// 1. 获取账户
Account account = accountMapper.selectOne(
new LambdaQueryWrapper<Account>()
.eq(Account::getAccountId, accountId));
if (account == null) {
throw new BusinessException("账户不存在");
}

// 2. 验证账户状态
if (!"FROZEN".equals(account.getStatus())) {
throw new BusinessException("账户未冻结");
}

// 3. 解冻账户余额
account.setAvailableBalance(account.getFrozenBalance());
account.setFrozenBalance(BigDecimal.ZERO);
account.setStatus("ACTIVE");
account.setFreezeReason(null);
account.setFreezeTime(null);
account.setUpdateTime(LocalDateTime.now());

accountMapper.updateById(account);

// 4. 清除缓存
String cacheKey = "account:info:" + accountId;
redisTemplate.delete(cacheKey);

// 5. 发送账户解冻事件
sendAccountUnfreezeEvent(accountId);

log.info("解冻账户成功: accountId={}", accountId);

} catch (Exception e) {
log.error("解冻账户失败: accountId={}, error={}",
accountId, e.getMessage(), e);
throw new BusinessException("解冻账户失败: " + e.getMessage());
}
}

/**
* 生成账户ID
*/
private String generateAccountId() {
return "ACC" + System.currentTimeMillis() +
String.format("%06d", new Random().nextInt(1000000));
}

/**
* 转换为账户信息
*/
private AccountInfo convertToAccountInfo(Account account) {
AccountInfo accountInfo = new AccountInfo();
accountInfo.setAccountId(account.getAccountId());
accountInfo.setUserId(account.getUserId());
accountInfo.setAccountType(account.getAccountType());
accountInfo.setAccountName(account.getAccountName());
accountInfo.setBalance(account.getBalance());
accountInfo.setFrozenBalance(account.getFrozenBalance());
accountInfo.setAvailableBalance(account.getAvailableBalance());
accountInfo.setStatus(account.getStatus());
accountInfo.setCreateTime(account.getCreateTime());
return accountInfo;
}

/**
* 发送账户创建事件
*/
private void sendAccountCreateEvent(AccountInfo accountInfo) {
try {
AccountEvent event = new AccountEvent();
event.setEventType("ACCOUNT_CREATE");
event.setAccountId(accountInfo.getAccountId());
event.setAccountInfo(accountInfo);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("account.create", accountInfo.getAccountId(), message);

} catch (Exception e) {
log.error("发送账户创建事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送账户充值事件
*/
private void sendAccountRechargeEvent(String accountId, BigDecimal amount) {
try {
AccountRechargeEvent event = new AccountRechargeEvent();
event.setEventType("ACCOUNT_RECHARGE");
event.setAccountId(accountId);
event.setAmount(amount);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("account.recharge", accountId, message);

} catch (Exception e) {
log.error("发送账户充值事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送账户提现事件
*/
private void sendAccountWithdrawEvent(String accountId, BigDecimal amount) {
try {
AccountWithdrawEvent event = new AccountWithdrawEvent();
event.setEventType("ACCOUNT_WITHDRAW");
event.setAccountId(accountId);
event.setAmount(amount);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("account.withdraw", accountId, message);

} catch (Exception e) {
log.error("发送账户提现事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送账户冻结事件
*/
private void sendAccountFreezeEvent(String accountId, String freezeReason) {
try {
AccountFreezeEvent event = new AccountFreezeEvent();
event.setEventType("ACCOUNT_FREEZE");
event.setAccountId(accountId);
event.setFreezeReason(freezeReason);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("account.freeze", accountId, message);

} catch (Exception e) {
log.error("发送账户冻结事件失败: error={}", e.getMessage(), e);
}
}

/**
* 发送账户解冻事件
*/
private void sendAccountUnfreezeEvent(String accountId) {
try {
AccountUnfreezeEvent event = new AccountUnfreezeEvent();
event.setEventType("ACCOUNT_UNFREEZE");
event.setAccountId(accountId);
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("account.unfreeze", accountId, message);

} catch (Exception e) {
log.error("发送账户解冻事件失败: error={}", e.getMessage(), e);
}
}
}

2.3 账户查询服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
/**
* 账户查询服务实现
* 负责账户查询、统计
*/
@Service
@Slf4j
public class AccountQueryService {

@Autowired
private AccountMapper accountMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 获取账户信息
*/
public AccountInfo getAccountInfo(String accountId) {
try {
// 1. 从缓存获取
String cacheKey = "account:info:" + accountId;
AccountInfo cachedInfo = (AccountInfo) redisTemplate.opsForValue().get(cacheKey);
if (cachedInfo != null) {
return cachedInfo;
}

// 2. 从数据库查询
Account account = accountMapper.selectOne(
new LambdaQueryWrapper<Account>()
.eq(Account::getAccountId, accountId));
if (account == null) {
throw new BusinessException("账户不存在");
}

// 3. 构建账户信息
AccountInfo accountInfo = convertToAccountInfo(account);

// 4. 缓存账户信息
redisTemplate.opsForValue().set(cacheKey, accountInfo, 1, TimeUnit.HOURS);

return accountInfo;

} catch (Exception e) {
log.error("获取账户信息失败: accountId={}, error={}",
accountId, e.getMessage(), e);
throw new BusinessException("获取账户信息失败: " + e.getMessage());
}
}

/**
* 获取账户列表
*/
public PageResult<AccountInfo> getAccountList(String userId, String accountType,
String status, Integer pageNum, Integer pageSize) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<Account> wrapper = new LambdaQueryWrapper<>();
if (userId != null && !userId.isEmpty()) {
wrapper.eq(Account::getUserId, userId);
}
if (accountType != null && !accountType.isEmpty()) {
wrapper.eq(Account::getAccountType, accountType);
}
if (status != null && !status.isEmpty()) {
wrapper.eq(Account::getStatus, status);
}
wrapper.orderByDesc(Account::getCreateTime);

// 2. 分页查询
Page<Account> page = new Page<>(pageNum, pageSize);
Page<Account> result = accountMapper.selectPage(page, wrapper);

// 3. 转换为账户信息列表
List<AccountInfo> accountInfoList = result.getRecords().stream()
.map(this::convertToAccountInfo)
.collect(Collectors.toList());

// 4. 构建分页结果
PageResult<AccountInfo> pageResult = new PageResult<>();
pageResult.setList(accountInfoList);
pageResult.setTotal(result.getTotal());
pageResult.setPageNum(pageNum);
pageResult.setPageSize(pageSize);

return pageResult;

} catch (Exception e) {
log.error("获取账户列表失败: error={}", e.getMessage(), e);
throw new BusinessException("获取账户列表失败: " + e.getMessage());
}
}

/**
* 获取账户统计
*/
public AccountStatistics getAccountStatistics(String userId, String startTime, String endTime) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<Account> wrapper = new LambdaQueryWrapper<>();
if (userId != null && !userId.isEmpty()) {
wrapper.eq(Account::getUserId, userId);
}
if (startTime != null && !startTime.isEmpty()) {
wrapper.ge(Account::getCreateTime, LocalDateTime.parse(startTime));
}
if (endTime != null && !endTime.isEmpty()) {
wrapper.le(Account::getCreateTime, LocalDateTime.parse(endTime));
}

// 2. 查询账户列表
List<Account> accounts = accountMapper.selectList(wrapper);

// 3. 统计账户数据
AccountStatistics statistics = new AccountStatistics();
statistics.setTotalCount(accounts.size());
statistics.setTotalBalance(accounts.stream()
.map(Account::getBalance)
.reduce(BigDecimal.ZERO, BigDecimal::add));
statistics.setTotalAvailableBalance(accounts.stream()
.map(Account::getAvailableBalance)
.reduce(BigDecimal.ZERO, BigDecimal::add));
statistics.setTotalFrozenBalance(accounts.stream()
.map(Account::getFrozenBalance)
.reduce(BigDecimal.ZERO, BigDecimal::add));

Map<String, Long> statusCount = accounts.stream()
.collect(Collectors.groupingBy(Account::getStatus, Collectors.counting()));
statistics.setStatusCount(statusCount);

Map<String, BigDecimal> typeBalance = accounts.stream()
.collect(Collectors.groupingBy(
Account::getAccountType,
Collectors.reducing(BigDecimal.ZERO, Account::getBalance, BigDecimal::add)));
statistics.setTypeBalance(typeBalance);

return statistics;

} catch (Exception e) {
log.error("获取账户统计失败: error={}", e.getMessage(), e);
throw new BusinessException("获取账户统计失败: " + e.getMessage());
}
}

/**
* 转换为账户信息
*/
private AccountInfo convertToAccountInfo(Account account) {
AccountInfo accountInfo = new AccountInfo();
accountInfo.setAccountId(account.getAccountId());
accountInfo.setUserId(account.getUserId());
accountInfo.setAccountType(account.getAccountType());
accountInfo.setAccountName(account.getAccountName());
accountInfo.setBalance(account.getBalance());
accountInfo.setFrozenBalance(account.getFrozenBalance());
accountInfo.setAvailableBalance(account.getAvailableBalance());
accountInfo.setStatus(account.getStatus());
accountInfo.setCreateTime(account.getCreateTime());
accountInfo.setUpdateTime(account.getUpdateTime());
return accountInfo;
}
}

3. 流水服务实现

3.1 流水服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* 流水服务控制器
* 提供资金流水管理接口
*/
@RestController
@RequestMapping("/api/transaction")
@Slf4j
public class TransactionController {

@Autowired
private TransactionService transactionService;

@Autowired
private TransactionQueryService transactionQueryService;

/**
* 创建资金流水
*/
@PostMapping("/create")
public Result<Void> createTransaction(@RequestBody @Valid TransactionCreateRequest request) {
try {
transactionService.createTransaction(request);
return Result.success();
} catch (Exception e) {
log.error("创建资金流水失败: error={}", e.getMessage(), e);
return Result.error("创建资金流水失败: " + e.getMessage());
}
}

/**
* 获取资金流水
*/
@GetMapping("/{transactionId}")
public Result<TransactionInfo> getTransactionInfo(@PathVariable String transactionId) {
try {
TransactionInfo transactionInfo = transactionQueryService.getTransactionInfo(transactionId);
return Result.success(transactionInfo);
} catch (Exception e) {
log.error("获取资金流水失败: transactionId={}, error={}",
transactionId, e.getMessage(), e);
return Result.error("获取资金流水失败: " + e.getMessage());
}
}

/**
* 获取资金流水列表
*/
@GetMapping("/list")
public Result<PageResult<TransactionInfo>> getTransactionList(
@RequestParam(required = false) String accountId,
@RequestParam(required = false) String userId,
@RequestParam(required = false) String transactionType,
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime,
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "10") Integer pageSize) {
try {
PageResult<TransactionInfo> result = transactionQueryService.getTransactionList(
accountId, userId, transactionType, startTime, endTime, pageNum, pageSize);
return Result.success(result);
} catch (Exception e) {
log.error("获取资金流水列表失败: error={}", e.getMessage(), e);
return Result.error("获取资金流水列表失败: " + e.getMessage());
}
}

/**
* 获取资金流水统计
*/
@GetMapping("/statistics")
public Result<TransactionStatistics> getTransactionStatistics(
@RequestParam(required = false) String accountId,
@RequestParam(required = false) String userId,
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime) {
try {
TransactionStatistics statistics = transactionQueryService.getTransactionStatistics(
accountId, userId, startTime, endTime);
return Result.success(statistics);
} catch (Exception e) {
log.error("获取资金流水统计失败: error={}", e.getMessage(), e);
return Result.error("获取资金流水统计失败: " + e.getMessage());
}
}
}

3.2 流水服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* 流水服务实现
* 负责资金流水记录
*/
@Service
@Slf4j
public class TransactionService {

@Autowired
private TransactionMapper transactionMapper;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 创建资金流水
*/
@Transactional(rollbackFor = Exception.class)
public void createTransaction(TransactionCreateRequest request) {
try {
// 1. 生成流水ID
String transactionId = generateTransactionId();

// 2. 创建资金流水
Transaction transaction = new Transaction();
transaction.setTransactionId(transactionId);
transaction.setAccountId(request.getAccountId());
transaction.setUserId(request.getUserId());
transaction.setTransactionType(request.getTransactionType());
transaction.setAmount(request.getAmount());
transaction.setBeforeBalance(request.getBeforeBalance());
transaction.setAfterBalance(request.getAfterBalance());
transaction.setSourceType(request.getSourceType());
transaction.setSourceId(request.getSourceId());
transaction.setRemark(request.getRemark());
transaction.setCreateTime(LocalDateTime.now());

transactionMapper.insert(transaction);

// 3. 构建流水信息
TransactionInfo transactionInfo = convertToTransactionInfo(transaction);

// 4. 发送流水记录事件
sendTransactionCreateEvent(transactionInfo);

log.info("创建资金流水成功: transactionId={}, accountId={}, amount={}",
transactionId, request.getAccountId(), request.getAmount());

} catch (Exception e) {
log.error("创建资金流水失败: accountId={}, error={}",
request.getAccountId(), e.getMessage(), e);
throw new BusinessException("创建资金流水失败: " + e.getMessage());
}
}

/**
* 生成流水ID
*/
private String generateTransactionId() {
return "TXN" + System.currentTimeMillis() +
String.format("%06d", new Random().nextInt(1000000));
}

/**
* 转换为流水信息
*/
private TransactionInfo convertToTransactionInfo(Transaction transaction) {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTransactionId(transaction.getTransactionId());
transactionInfo.setAccountId(transaction.getAccountId());
transactionInfo.setUserId(transaction.getUserId());
transactionInfo.setTransactionType(transaction.getTransactionType());
transactionInfo.setAmount(transaction.getAmount());
transactionInfo.setBeforeBalance(transaction.getBeforeBalance());
transactionInfo.setAfterBalance(transaction.getAfterBalance());
transactionInfo.setSourceType(transaction.getSourceType());
transactionInfo.setSourceId(transaction.getSourceId());
transactionInfo.setRemark(transaction.getRemark());
transactionInfo.setCreateTime(transaction.getCreateTime());
return transactionInfo;
}

/**
* 发送流水记录事件
*/
private void sendTransactionCreateEvent(TransactionInfo transactionInfo) {
try {
TransactionEvent event = new TransactionEvent();
event.setEventType("TRANSACTION_CREATE");
event.setTransactionId(transactionInfo.getTransactionId());
event.setAccountId(transactionInfo.getAccountId());
event.setTransactionType(transactionInfo.getTransactionType());
event.setAmount(transactionInfo.getAmount());
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("transaction.create", transactionInfo.getTransactionId(), message);

} catch (Exception e) {
log.error("发送流水记录事件失败: error={}", e.getMessage(), e);
}
}
}

3.3 流水查询服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/**
* 流水查询服务实现
* 负责资金流水查询、统计
*/
@Service
@Slf4j
public class TransactionQueryService {

@Autowired
private TransactionMapper transactionMapper;

/**
* 获取资金流水
*/
public TransactionInfo getTransactionInfo(String transactionId) {
try {
Transaction transaction = transactionMapper.selectOne(
new LambdaQueryWrapper<Transaction>()
.eq(Transaction::getTransactionId, transactionId));
if (transaction == null) {
throw new BusinessException("资金流水不存在");
}

return convertToTransactionInfo(transaction);

} catch (Exception e) {
log.error("获取资金流水失败: transactionId={}, error={}",
transactionId, e.getMessage(), e);
throw new BusinessException("获取资金流水失败: " + e.getMessage());
}
}

/**
* 获取资金流水列表
*/
public PageResult<TransactionInfo> getTransactionList(String accountId, String userId,
String transactionType, String startTime, String endTime,
Integer pageNum, Integer pageSize) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<Transaction> wrapper = new LambdaQueryWrapper<>();
if (accountId != null && !accountId.isEmpty()) {
wrapper.eq(Transaction::getAccountId, accountId);
}
if (userId != null && !userId.isEmpty()) {
wrapper.eq(Transaction::getUserId, userId);
}
if (transactionType != null && !transactionType.isEmpty()) {
wrapper.eq(Transaction::getTransactionType, transactionType);
}
if (startTime != null && !startTime.isEmpty()) {
wrapper.ge(Transaction::getCreateTime, LocalDateTime.parse(startTime));
}
if (endTime != null && !endTime.isEmpty()) {
wrapper.le(Transaction::getCreateTime, LocalDateTime.parse(endTime));
}
wrapper.orderByDesc(Transaction::getCreateTime);

// 2. 分页查询
Page<Transaction> page = new Page<>(pageNum, pageSize);
Page<Transaction> result = transactionMapper.selectPage(page, wrapper);

// 3. 转换为流水信息列表
List<TransactionInfo> transactionInfoList = result.getRecords().stream()
.map(this::convertToTransactionInfo)
.collect(Collectors.toList());

// 4. 构建分页结果
PageResult<TransactionInfo> pageResult = new PageResult<>();
pageResult.setList(transactionInfoList);
pageResult.setTotal(result.getTotal());
pageResult.setPageNum(pageNum);
pageResult.setPageSize(pageSize);

return pageResult;

} catch (Exception e) {
log.error("获取资金流水列表失败: error={}", e.getMessage(), e);
throw new BusinessException("获取资金流水列表失败: " + e.getMessage());
}
}

/**
* 获取资金流水统计
*/
public TransactionStatistics getTransactionStatistics(String accountId, String userId,
String startTime, String endTime) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<Transaction> wrapper = new LambdaQueryWrapper<>();
if (accountId != null && !accountId.isEmpty()) {
wrapper.eq(Transaction::getAccountId, accountId);
}
if (userId != null && !userId.isEmpty()) {
wrapper.eq(Transaction::getUserId, userId);
}
if (startTime != null && !startTime.isEmpty()) {
wrapper.ge(Transaction::getCreateTime, LocalDateTime.parse(startTime));
}
if (endTime != null && !endTime.isEmpty()) {
wrapper.le(Transaction::getCreateTime, LocalDateTime.parse(endTime));
}

// 2. 查询流水列表
List<Transaction> transactions = transactionMapper.selectList(wrapper);

// 3. 统计流水数据
TransactionStatistics statistics = new TransactionStatistics();
statistics.setTotalCount(transactions.size());

// 收入统计
BigDecimal totalIncome = transactions.stream()
.filter(t -> "RECHARGE".equals(t.getTransactionType()) ||
"INCOME".equals(t.getTransactionType()))
.map(Transaction::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
statistics.setTotalIncome(totalIncome);

// 支出统计
BigDecimal totalExpense = transactions.stream()
.filter(t -> "WITHDRAW".equals(t.getTransactionType()) ||
"EXPENSE".equals(t.getTransactionType()))
.map(Transaction::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
statistics.setTotalExpense(totalExpense);

// 按类型统计
Map<String, Long> typeCount = transactions.stream()
.collect(Collectors.groupingBy(Transaction::getTransactionType, Collectors.counting()));
statistics.setTypeCount(typeCount);

Map<String, BigDecimal> typeAmount = transactions.stream()
.collect(Collectors.groupingBy(
Transaction::getTransactionType,
Collectors.reducing(BigDecimal.ZERO, Transaction::getAmount, BigDecimal::add)));
statistics.setTypeAmount(typeAmount);

return statistics;

} catch (Exception e) {
log.error("获取资金流水统计失败: error={}", e.getMessage(), e);
throw new BusinessException("获取资金流水统计失败: " + e.getMessage());
}
}

/**
* 转换为流水信息
*/
private TransactionInfo convertToTransactionInfo(Transaction transaction) {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTransactionId(transaction.getTransactionId());
transactionInfo.setAccountId(transaction.getAccountId());
transactionInfo.setUserId(transaction.getUserId());
transactionInfo.setTransactionType(transaction.getTransactionType());
transactionInfo.setAmount(transaction.getAmount());
transactionInfo.setBeforeBalance(transaction.getBeforeBalance());
transactionInfo.setAfterBalance(transaction.getAfterBalance());
transactionInfo.setSourceType(transaction.getSourceType());
transactionInfo.setSourceId(transaction.getSourceId());
transactionInfo.setRemark(transaction.getRemark());
transactionInfo.setCreateTime(transaction.getCreateTime());
return transactionInfo;
}
}

4. 账单服务实现

4.1 账单服务控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* 账单服务控制器
* 提供账单管理接口
*/
@RestController
@RequestMapping("/api/bill")
@Slf4j
public class BillController {

@Autowired
private BillService billService;

@Autowired
private BillQueryService billQueryService;

/**
* 生成账单
*/
@PostMapping("/generate")
public Result<BillInfo> generateBill(@RequestBody @Valid BillGenerateRequest request) {
try {
BillInfo billInfo = billService.generateBill(request);
return Result.success(billInfo);
} catch (Exception e) {
log.error("生成账单失败: error={}", e.getMessage(), e);
return Result.error("生成账单失败: " + e.getMessage());
}
}

/**
* 获取账单信息
*/
@GetMapping("/{billId}")
public Result<BillInfo> getBillInfo(@PathVariable String billId) {
try {
BillInfo billInfo = billQueryService.getBillInfo(billId);
return Result.success(billInfo);
} catch (Exception e) {
log.error("获取账单信息失败: billId={}, error={}",
billId, e.getMessage(), e);
return Result.error("获取账单信息失败: " + e.getMessage());
}
}

/**
* 获取账单列表
*/
@GetMapping("/list")
public Result<PageResult<BillInfo>> getBillList(
@RequestParam(required = false) String accountId,
@RequestParam(required = false) String userId,
@RequestParam(required = false) String billType,
@RequestParam(required = false) String status,
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime,
@RequestParam(defaultValue = "1") Integer pageNum,
@RequestParam(defaultValue = "10") Integer pageSize) {
try {
PageResult<BillInfo> result = billQueryService.getBillList(
accountId, userId, billType, status, startTime, endTime, pageNum, pageSize);
return Result.success(result);
} catch (Exception e) {
log.error("获取账单列表失败: error={}", e.getMessage(), e);
return Result.error("获取账单列表失败: " + e.getMessage());
}
}

/**
* 获取账单统计
*/
@GetMapping("/statistics")
public Result<BillStatistics> getBillStatistics(
@RequestParam(required = false) String accountId,
@RequestParam(required = false) String userId,
@RequestParam(required = false) String startTime,
@RequestParam(required = false) String endTime) {
try {
BillStatistics statistics = billQueryService.getBillStatistics(
accountId, userId, startTime, endTime);
return Result.success(statistics);
} catch (Exception e) {
log.error("获取账单统计失败: error={}", e.getMessage(), e);
return Result.error("获取账单统计失败: " + e.getMessage());
}
}
}

4.2 账单服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/**
* 账单服务实现
* 负责账单生成、查询、统计
*/
@Service
@Slf4j
public class BillService {

@Autowired
private BillMapper billMapper;

@Autowired
private TransactionQueryServiceClient transactionQueryServiceClient;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 生成账单
*/
@Transactional(rollbackFor = Exception.class)
public BillInfo generateBill(BillGenerateRequest request) {
try {
// 1. 查询账单期间的流水
PageResult<TransactionInfo> transactions = transactionQueryServiceClient.getTransactionList(
request.getAccountId(), null, null,
request.getStartTime(), request.getEndTime(), 1, Integer.MAX_VALUE);

// 2. 计算账单金额
BigDecimal totalIncome = transactions.getList().stream()
.filter(t -> "RECHARGE".equals(t.getTransactionType()) ||
"INCOME".equals(t.getTransactionType()))
.map(TransactionInfo::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);

BigDecimal totalExpense = transactions.getList().stream()
.filter(t -> "WITHDRAW".equals(t.getTransactionType()) ||
"EXPENSE".equals(t.getTransactionType()))
.map(TransactionInfo::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);

BigDecimal netAmount = totalIncome.subtract(totalExpense);

// 3. 生成账单ID
String billId = generateBillId();

// 4. 创建账单
Bill bill = new Bill();
bill.setBillId(billId);
bill.setAccountId(request.getAccountId());
bill.setUserId(request.getUserId());
bill.setBillType(request.getBillType());
bill.setBillPeriod(request.getStartTime() + " ~ " + request.getEndTime());
bill.setTotalIncome(totalIncome);
bill.setTotalExpense(totalExpense);
bill.setNetAmount(netAmount);
bill.setStatus("GENERATED");
bill.setCreateTime(LocalDateTime.now());
bill.setUpdateTime(LocalDateTime.now());

billMapper.insert(bill);

// 5. 构建账单信息
BillInfo billInfo = convertToBillInfo(bill);

// 6. 发送账单生成事件
sendBillGenerateEvent(billInfo);

log.info("生成账单成功: billId={}, accountId={}, netAmount={}",
billId, request.getAccountId(), netAmount);

return billInfo;

} catch (Exception e) {
log.error("生成账单失败: accountId={}, error={}",
request.getAccountId(), e.getMessage(), e);
throw new BusinessException("生成账单失败: " + e.getMessage());
}
}

/**
* 生成账单ID
*/
private String generateBillId() {
return "BILL" + System.currentTimeMillis() +
String.format("%06d", new Random().nextInt(1000000));
}

/**
* 转换为账单信息
*/
private BillInfo convertToBillInfo(Bill bill) {
BillInfo billInfo = new BillInfo();
billInfo.setBillId(bill.getBillId());
billInfo.setAccountId(bill.getAccountId());
billInfo.setUserId(bill.getUserId());
billInfo.setBillType(bill.getBillType());
billInfo.setBillPeriod(bill.getBillPeriod());
billInfo.setTotalIncome(bill.getTotalIncome());
billInfo.setTotalExpense(bill.getTotalExpense());
billInfo.setNetAmount(bill.getNetAmount());
billInfo.setStatus(bill.getStatus());
billInfo.setCreateTime(bill.getCreateTime());
return billInfo;
}

/**
* 发送账单生成事件
*/
private void sendBillGenerateEvent(BillInfo billInfo) {
try {
BillEvent event = new BillEvent();
event.setEventType("BILL_GENERATE");
event.setBillId(billInfo.getBillId());
event.setAccountId(billInfo.getAccountId());
event.setNetAmount(billInfo.getNetAmount());
event.setEventTime(LocalDateTime.now());

String message = JSON.toJSONString(event);
kafkaTemplate.send("bill.generate", billInfo.getBillId(), message);

} catch (Exception e) {
log.error("发送账单生成事件失败: error={}", e.getMessage(), e);
}
}
}

4.3 账单查询服务实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* 账单查询服务实现
* 负责账单查询、统计
*/
@Service
@Slf4j
public class BillQueryService {

@Autowired
private BillMapper billMapper;

/**
* 获取账单信息
*/
public BillInfo getBillInfo(String billId) {
try {
Bill bill = billMapper.selectOne(
new LambdaQueryWrapper<Bill>()
.eq(Bill::getBillId, billId));
if (bill == null) {
throw new BusinessException("账单不存在");
}

return convertToBillInfo(bill);

} catch (Exception e) {
log.error("获取账单信息失败: billId={}, error={}",
billId, e.getMessage(), e);
throw new BusinessException("获取账单信息失败: " + e.getMessage());
}
}

/**
* 获取账单列表
*/
public PageResult<BillInfo> getBillList(String accountId, String userId, String billType,
String status, String startTime, String endTime,
Integer pageNum, Integer pageSize) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<Bill> wrapper = new LambdaQueryWrapper<>();
if (accountId != null && !accountId.isEmpty()) {
wrapper.eq(Bill::getAccountId, accountId);
}
if (userId != null && !userId.isEmpty()) {
wrapper.eq(Bill::getUserId, userId);
}
if (billType != null && !billType.isEmpty()) {
wrapper.eq(Bill::getBillType, billType);
}
if (status != null && !status.isEmpty()) {
wrapper.eq(Bill::getStatus, status);
}
if (startTime != null && !startTime.isEmpty()) {
wrapper.ge(Bill::getCreateTime, LocalDateTime.parse(startTime));
}
if (endTime != null && !endTime.isEmpty()) {
wrapper.le(Bill::getCreateTime, LocalDateTime.parse(endTime));
}
wrapper.orderByDesc(Bill::getCreateTime);

// 2. 分页查询
Page<Bill> page = new Page<>(pageNum, pageSize);
Page<Bill> result = billMapper.selectPage(page, wrapper);

// 3. 转换为账单信息列表
List<BillInfo> billInfoList = result.getRecords().stream()
.map(this::convertToBillInfo)
.collect(Collectors.toList());

// 4. 构建分页结果
PageResult<BillInfo> pageResult = new PageResult<>();
pageResult.setList(billInfoList);
pageResult.setTotal(result.getTotal());
pageResult.setPageNum(pageNum);
pageResult.setPageSize(pageSize);

return pageResult;

} catch (Exception e) {
log.error("获取账单列表失败: error={}", e.getMessage(), e);
throw new BusinessException("获取账单列表失败: " + e.getMessage());
}
}

/**
* 获取账单统计
*/
public BillStatistics getBillStatistics(String accountId, String userId,
String startTime, String endTime) {
try {
// 1. 构建查询条件
LambdaQueryWrapper<Bill> wrapper = new LambdaQueryWrapper<>();
if (accountId != null && !accountId.isEmpty()) {
wrapper.eq(Bill::getAccountId, accountId);
}
if (userId != null && !userId.isEmpty()) {
wrapper.eq(Bill::getUserId, userId);
}
if (startTime != null && !startTime.isEmpty()) {
wrapper.ge(Bill::getCreateTime, LocalDateTime.parse(startTime));
}
if (endTime != null && !endTime.isEmpty()) {
wrapper.le(Bill::getCreateTime, LocalDateTime.parse(endTime));
}

// 2. 查询账单列表
List<Bill> bills = billMapper.selectList(wrapper);

// 3. 统计账单数据
BillStatistics statistics = new BillStatistics();
statistics.setTotalCount(bills.size());
statistics.setTotalIncome(bills.stream()
.map(Bill::getTotalIncome)
.reduce(BigDecimal.ZERO, BigDecimal::add));
statistics.setTotalExpense(bills.stream()
.map(Bill::getTotalExpense)
.reduce(BigDecimal.ZERO, BigDecimal::add));
statistics.setTotalNetAmount(bills.stream()
.map(Bill::getNetAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add));

return statistics;

} catch (Exception e) {
log.error("获取账单统计失败: error={}", e.getMessage(), e);
throw new BusinessException("获取账单统计失败: " + e.getMessage());
}
}

/**
* 转换为账单信息
*/
private BillInfo convertToBillInfo(Bill bill) {
BillInfo billInfo = new BillInfo();
billInfo.setBillId(bill.getBillId());
billInfo.setAccountId(bill.getAccountId());
billInfo.setUserId(bill.getUserId());
billInfo.setBillType(bill.getBillType());
billInfo.setBillPeriod(bill.getBillPeriod());
billInfo.setTotalIncome(bill.getTotalIncome());
billInfo.setTotalExpense(bill.getTotalExpense());
billInfo.setNetAmount(bill.getNetAmount());
billInfo.setStatus(bill.getStatus());
billInfo.setCreateTime(bill.getCreateTime());
billInfo.setUpdateTime(bill.getUpdateTime());
return billInfo;
}
}

5. Kafka消费者实现

5.1 账户事件消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 账户事件消费者
* 消费Kafka账户事件消息
*/
@Component
@Slf4j
public class AccountEventConsumer {

@Autowired
private AccountService accountService;

/**
* 消费账户创建事件
*/
@KafkaListener(topics = "account.create", groupId = "account-event-group")
public void consumeAccountCreate(String message) {
try {
AccountEvent event = JSON.parseObject(message, AccountEvent.class);

// 处理账户创建事件,如发送通知、更新统计等
log.info("消费账户创建事件: accountId={}", event.getAccountId());

} catch (Exception e) {
log.error("消费账户创建事件失败: error={}", e.getMessage(), e);
}
}

/**
* 消费账户充值事件
*/
@KafkaListener(topics = "account.recharge", groupId = "account-event-group")
public void consumeAccountRecharge(String message) {
try {
AccountRechargeEvent event = JSON.parseObject(message, AccountRechargeEvent.class);

// 处理账户充值事件,如发送通知、更新统计等
log.info("消费账户充值事件: accountId={}, amount={}",
event.getAccountId(), event.getAmount());

} catch (Exception e) {
log.error("消费账户充值事件失败: error={}", e.getMessage(), e);
}
}

/**
* 消费账户提现事件
*/
@KafkaListener(topics = "account.withdraw", groupId = "account-event-group")
public void consumeAccountWithdraw(String message) {
try {
AccountWithdrawEvent event = JSON.parseObject(message, AccountWithdrawEvent.class);

// 处理账户提现事件,如发送通知、更新统计等
log.info("消费账户提现事件: accountId={}, amount={}",
event.getAccountId(), event.getAmount());

} catch (Exception e) {
log.error("消费账户提现事件失败: error={}", e.getMessage(), e);
}
}
}

5.2 流水事件消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 流水事件消费者
* 消费Kafka流水事件消息
*/
@Component
@Slf4j
public class TransactionEventConsumer {

@Autowired
private TransactionService transactionService;

/**
* 消费流水记录事件
*/
@KafkaListener(topics = "transaction.create", groupId = "transaction-event-group")
public void consumeTransactionCreate(String message) {
try {
TransactionEvent event = JSON.parseObject(message, TransactionEvent.class);

// 处理流水记录事件,如更新统计、生成报表等
log.info("消费流水记录事件: transactionId={}, accountId={}, amount={}",
event.getTransactionId(), event.getAccountId(), event.getAmount());

} catch (Exception e) {
log.error("消费流水记录事件失败: error={}", e.getMessage(), e);
}
}
}

6. 数据模型定义

6.1 账户实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 账户实体
*/
@Data
@TableName("t_account")
public class Account {

@TableId(type = IdType.AUTO)
private Long id;

private String accountId;

private String userId;

private String accountType;

private String accountName;

private BigDecimal balance;

private BigDecimal frozenBalance;

private BigDecimal availableBalance;

private String status;

private String freezeReason;

private LocalDateTime freezeTime;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}

6.2 资金流水实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 资金流水实体
*/
@Data
@TableName("t_transaction")
public class Transaction {

@TableId(type = IdType.AUTO)
private Long id;

private String transactionId;

private String accountId;

private String userId;

private String transactionType;

private BigDecimal amount;

private BigDecimal beforeBalance;

private BigDecimal afterBalance;

private String sourceType;

private String sourceId;

private String remark;

private LocalDateTime createTime;
}

6.3 账单实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 账单实体
*/
@Data
@TableName("t_bill")
public class Bill {

@TableId(type = IdType.AUTO)
private Long id;

private String billId;

private String accountId;

private String userId;

private String billType;

private String billPeriod;

private BigDecimal totalIncome;

private BigDecimal totalExpense;

private BigDecimal netAmount;

private String status;

private LocalDateTime createTime;

private LocalDateTime updateTime;
}

7. 数据库设计

7.1 账户表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE `t_account` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`account_id` VARCHAR(64) NOT NULL COMMENT '账户ID',
`user_id` VARCHAR(64) NOT NULL COMMENT '用户ID',
`account_type` VARCHAR(32) NOT NULL COMMENT '账户类型:BALANCE-余额账户, POINTS-积分账户, CREDIT-信用账户',
`account_name` VARCHAR(128) NOT NULL COMMENT '账户名称',
`balance` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '账户余额',
`frozen_balance` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '冻结余额',
`available_balance` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '可用余额',
`status` VARCHAR(32) NOT NULL DEFAULT 'ACTIVE' COMMENT '状态:ACTIVE-激活, FROZEN-冻结, CLOSED-关闭',
`freeze_reason` VARCHAR(512) DEFAULT NULL COMMENT '冻结原因',
`freeze_time` DATETIME DEFAULT NULL COMMENT '冻结时间',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_account_id` (`account_id`),
KEY `idx_user_id` (`user_id`),
KEY `idx_account_type` (`account_type`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='账户表';

7.2 资金流水表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE `t_transaction` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`transaction_id` VARCHAR(64) NOT NULL COMMENT '流水ID',
`account_id` VARCHAR(64) NOT NULL COMMENT '账户ID',
`user_id` VARCHAR(64) NOT NULL COMMENT '用户ID',
`transaction_type` VARCHAR(32) NOT NULL COMMENT '流水类型:RECHARGE-充值, WITHDRAW-提现, INCOME-收入, EXPENSE-支出, TRANSFER-转账',
`amount` DECIMAL(10,2) NOT NULL COMMENT '金额',
`before_balance` DECIMAL(10,2) NOT NULL COMMENT '变更前余额',
`after_balance` DECIMAL(10,2) NOT NULL COMMENT '变更后余额',
`source_type` VARCHAR(32) DEFAULT NULL COMMENT '来源类型:ORDER-订单, REFUND-退款, ACTIVITY-活动',
`source_id` VARCHAR(64) DEFAULT NULL COMMENT '来源ID',
`remark` VARCHAR(512) DEFAULT NULL COMMENT '备注',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_transaction_id` (`transaction_id`),
KEY `idx_account_id` (`account_id`),
KEY `idx_user_id` (`user_id`),
KEY `idx_transaction_type` (`transaction_type`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='资金流水表';

7.3 账单表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CREATE TABLE `t_bill` (
`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`bill_id` VARCHAR(64) NOT NULL COMMENT '账单ID',
`account_id` VARCHAR(64) NOT NULL COMMENT '账户ID',
`user_id` VARCHAR(64) NOT NULL COMMENT '用户ID',
`bill_type` VARCHAR(32) NOT NULL COMMENT '账单类型:DAILY-日账单, MONTHLY-月账单, YEARLY-年账单',
`bill_period` VARCHAR(128) NOT NULL COMMENT '账单期间',
`total_income` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '总收入',
`total_expense` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '总支出',
`net_amount` DECIMAL(10,2) NOT NULL DEFAULT 0.00 COMMENT '净额',
`status` VARCHAR(32) NOT NULL DEFAULT 'GENERATED' COMMENT '状态:GENERATED-已生成, CONFIRMED-已确认, RECONCILED-已对账',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_bill_id` (`bill_id`),
KEY `idx_account_id` (`account_id`),
KEY `idx_user_id` (`user_id`),
KEY `idx_bill_type` (`bill_type`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='账单表';

8. 配置类

8.1 Kafka配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
group-id: finance-service-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
auto-offset-reset: latest

8.2 Redis配置

1
2
3
4
5
6
7
8
9
10
11
12
13
# application.yml
spring:
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 3000
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0

9. 总结

本文深入讲解了财务管理完整逻辑代码的Java微服务后端架构实战,涵盖了以下核心内容:

  1. 账户服务:实现账户创建、查询、充值、提现、冻结/解冻、统计
  2. 流水服务:实现资金流水记录、收入、支出、转账、查询、统计
  3. 账单服务:实现账单生成、查询、统计、对账
  4. 对账服务:实现对账任务创建、处理、结果查询、差异处理
  5. 报表服务:实现收入报表、支出报表、利润报表、财务报表
  6. Kafka消息队列:实现账户、流水、账单、对账的异步处理和消息通知
  7. 多服务协作:通过Kafka和Feign实现账户服务、流水服务、账单服务、对账服务之间的解耦和协作
  8. 分布式事务:通过分布式锁、事务管理保证数据一致性
  9. 数据库设计:完整的账户、流水、账单、对账表设计
  10. 缓存优化:通过Redis缓存账户余额、分布式锁、统计数据
  11. 性能优化:通过连接池、批量处理、异步处理提升系统性能

通过本文的学习,读者可以掌握如何基于Java微服务架构实现一个完整的财务管理系统,包含账户管理、资金流水、账单管理、对账管理、多服务协作等完整业务逻辑代码,为实际项目的财务管理开发提供参考和指导。