Redis+MySQL双写一致性策略:资金类数据安全保障实战

1. 双写一致性概述

在金融系统中,资金类数据的安全性和一致性至关重要。Redis+MySQL双写一致性策略通过同时写入Redis缓存和MySQL数据库,确保数据的一致性和可靠性。本文将详细介绍双写一致性策略、资金数据安全、事务处理、数据同步和容错机制的完整解决方案。

1.1 核心功能

  1. 双写一致性: Redis和MySQL数据同步写入
  2. 资金安全: 资金类数据的安全保障机制
  3. 事务处理: 分布式事务和本地事务管理
  4. 数据同步: 实时数据同步和一致性检查
  5. 容错机制: 故障恢复和数据修复

1.2 技术架构

1
2
3
4
5
业务请求 → 双写服务 → Redis缓存 → MySQL数据库
↓ ↓ ↓ ↓
数据验证 → 事务处理 → 缓存更新 → 数据持久化
↓ ↓ ↓ ↓
一致性检查 → 异常处理 → 数据同步 → 容错恢复

2. 双写一致性配置

2.1 Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- Spring Boot Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<!-- Spring Boot Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- Redis客户端 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>

<!-- 分布式事务 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
</dependencies>

2.2 双写一致性配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* 双写一致性配置类
*/
@Configuration
public class DualWriteConfig {

@Value("${dual-write.redis.enabled:true}")
private boolean redisEnabled;

@Value("${dual-write.mysql.enabled:true}")
private boolean mysqlEnabled;

@Value("${dual-write.consistency-check.enabled:true}")
private boolean consistencyCheckEnabled;

@Value("${dual-write.retry.max-attempts:3}")
private int maxRetryAttempts;

@Value("${dual-write.retry.delay:1000}")
private long retryDelay;

/**
* 双写一致性配置属性
*/
@Bean
public DualWriteProperties dualWriteProperties() {
return DualWriteProperties.builder()
.redisEnabled(redisEnabled)
.mysqlEnabled(mysqlEnabled)
.consistencyCheckEnabled(consistencyCheckEnabled)
.maxRetryAttempts(maxRetryAttempts)
.retryDelay(retryDelay)
.build();
}

/**
* 双写一致性服务
*/
@Bean
public DualWriteService dualWriteService() {
return new DualWriteService(dualWriteProperties());
}

/**
* 一致性检查服务
*/
@Bean
public ConsistencyCheckService consistencyCheckService() {
return new ConsistencyCheckService(dualWriteProperties());
}

/**
* 资金安全服务
*/
@Bean
public FundSecurityService fundSecurityService() {
return new FundSecurityService(dualWriteProperties());
}
}

/**
* 双写一致性配置属性
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DualWriteProperties {
private boolean redisEnabled;
private boolean mysqlEnabled;
private boolean consistencyCheckEnabled;
private int maxRetryAttempts;
private long retryDelay;

// 资金安全配置
private boolean fundSecurityEnabled = true;
private String fundSecurityAlgorithm = "AES";
private int fundSecurityKeySize = 256;

// 事务配置
private boolean transactionEnabled = true;
private int transactionTimeout = 30;
private boolean rollbackOnFailure = true;

// 同步配置
private boolean syncEnabled = true;
private int syncBatchSize = 100;
private long syncInterval = 5000;

// 容错配置
private boolean faultToleranceEnabled = true;
private int faultToleranceThreshold = 3;
private long faultToleranceTimeout = 10000;
}

3. 数据模型定义

3.1 资金数据模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/**
* 用户资金账户模型
*/
@Entity
@Table(name = "user_fund_account")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserFundAccount {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(name = "user_id", nullable = false)
private Long userId;

@Column(name = "account_type", nullable = false)
private String accountType; // BALANCE, FROZEN, CREDIT

@Column(name = "balance", nullable = false, precision = 15, scale = 2)
private BigDecimal balance;

@Column(name = "frozen_amount", precision = 15, scale = 2)
private BigDecimal frozenAmount;

@Column(name = "available_amount", precision = 15, scale = 2)
private BigDecimal availableAmount;

@Column(name = "currency", nullable = false)
private String currency;

@Column(name = "status", nullable = false)
private String status; // ACTIVE, FROZEN, CLOSED

@Column(name = "version", nullable = false)
private Long version;

@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;

@Column(name = "update_time", nullable = false)
private LocalDateTime updateTime;
}

/**
* 资金交易记录模型
*/
@Entity
@Table(name = "fund_transaction")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FundTransaction {

@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

@Column(name = "transaction_id", nullable = false, unique = true)
private String transactionId;

@Column(name = "user_id", nullable = false)
private Long userId;

@Column(name = "account_id", nullable = false)
private Long accountId;

@Column(name = "transaction_type", nullable = false)
private String transactionType; // DEPOSIT, WITHDRAW, TRANSFER, REFUND

@Column(name = "amount", nullable = false, precision = 15, scale = 2)
private BigDecimal amount;

@Column(name = "balance_before", precision = 15, scale = 2)
private BigDecimal balanceBefore;

@Column(name = "balance_after", precision = 15, scale = 2)
private BigDecimal balanceAfter;

@Column(name = "status", nullable = false)
private String status; // PENDING, SUCCESS, FAILED, CANCELLED

@Column(name = "description")
private String description;

@Column(name = "create_time", nullable = false)
private LocalDateTime createTime;

@Column(name = "update_time", nullable = false)
private LocalDateTime updateTime;
}

/**
* 双写操作结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DualWriteResult {
private boolean success;
private boolean redisSuccess;
private boolean mysqlSuccess;
private String errorMessage;
private String redisError;
private String mysqlError;
private Object data;
private LocalDateTime timestamp;
}

/**
* 一致性检查结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ConsistencyCheckResult {
private boolean consistent;
private String redisValue;
private String mysqlValue;
private String difference;
private String checkType;
private LocalDateTime timestamp;
}

/**
* 资金安全验证结果
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class FundSecurityResult {
private boolean secure;
private String securityLevel;
private String riskAssessment;
private String recommendation;
private LocalDateTime timestamp;
}

4. 双写一致性服务

4.1 双写一致性服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
/**
* 双写一致性服务
*/
@Service
public class DualWriteService {

private final DualWriteProperties properties;
private final RedisTemplate<String, Object> redisTemplate;
private final UserFundAccountRepository accountRepository;
private final FundTransactionRepository transactionRepository;
private final ConsistencyCheckService consistencyCheckService;
private final FundSecurityService fundSecurityService;

public DualWriteService(DualWriteProperties properties) {
this.properties = properties;
this.redisTemplate = new RedisTemplate<>();
this.accountRepository = null; // 注入
this.transactionRepository = null; // 注入
this.consistencyCheckService = null; // 注入
this.fundSecurityService = null; // 注入
}

/**
* 双写用户资金账户
* @param account 资金账户
* @return 双写结果
*/
@Transactional
public DualWriteResult dualWriteAccount(UserFundAccount account) {
try {
// 1. 资金安全检查
FundSecurityResult securityResult = fundSecurityService.validateFundSecurity(account);
if (!securityResult.isSecure()) {
return DualWriteResult.builder()
.success(false)
.errorMessage("资金安全检查失败: " + securityResult.getRiskAssessment())
.timestamp(LocalDateTime.now())
.build();
}

// 2. 生成缓存键
String cacheKey = generateAccountCacheKey(account.getUserId(), account.getAccountType());

// 3. 双写操作
boolean redisSuccess = false;
boolean mysqlSuccess = false;
String redisError = null;
String mysqlError = null;

// 3.1 写入Redis
if (properties.isRedisEnabled()) {
try {
redisTemplate.opsForValue().set(cacheKey, account, Duration.ofHours(24));
redisSuccess = true;
log.info("Redis写入成功: key={}", cacheKey);
} catch (Exception e) {
redisError = e.getMessage();
log.error("Redis写入失败: key={}", cacheKey, e);
}
}

// 3.2 写入MySQL
if (properties.isMysqlEnabled()) {
try {
accountRepository.save(account);
mysqlSuccess = true;
log.info("MySQL写入成功: accountId={}", account.getId());
} catch (Exception e) {
mysqlError = e.getMessage();
log.error("MySQL写入失败: accountId={}", account.getId(), e);
}
}

// 4. 一致性检查
if (properties.isConsistencyCheckEnabled() && redisSuccess && mysqlSuccess) {
ConsistencyCheckResult checkResult = consistencyCheckService.checkAccountConsistency(
account.getUserId(), account.getAccountType());
if (!checkResult.isConsistent()) {
log.warn("数据一致性检查失败: userId={}, accountType={}",
account.getUserId(), account.getAccountType());
}
}

// 5. 构建结果
boolean overallSuccess = (properties.isRedisEnabled() ? redisSuccess : true) &&
(properties.isMysqlEnabled() ? mysqlSuccess : true);

return DualWriteResult.builder()
.success(overallSuccess)
.redisSuccess(redisSuccess)
.mysqlSuccess(mysqlSuccess)
.redisError(redisError)
.mysqlError(mysqlError)
.data(account)
.timestamp(LocalDateTime.now())
.build();

} catch (Exception e) {
log.error("双写用户资金账户失败", e);
return DualWriteResult.builder()
.success(false)
.errorMessage("双写操作失败: " + e.getMessage())
.timestamp(LocalDateTime.now())
.build();
}
}

/**
* 双写资金交易记录
* @param transaction 交易记录
* @return 双写结果
*/
@Transactional
public DualWriteResult dualWriteTransaction(FundTransaction transaction) {
try {
// 1. 资金安全检查
FundSecurityResult securityResult = fundSecurityService.validateTransactionSecurity(transaction);
if (!securityResult.isSecure()) {
return DualWriteResult.builder()
.success(false)
.errorMessage("交易安全检查失败: " + securityResult.getRiskAssessment())
.timestamp(LocalDateTime.now())
.build();
}

// 2. 生成缓存键
String cacheKey = generateTransactionCacheKey(transaction.getTransactionId());

// 3. 双写操作
boolean redisSuccess = false;
boolean mysqlSuccess = false;
String redisError = null;
String mysqlError = null;

// 3.1 写入Redis
if (properties.isRedisEnabled()) {
try {
redisTemplate.opsForValue().set(cacheKey, transaction, Duration.ofHours(24));
redisSuccess = true;
log.info("Redis写入成功: key={}", cacheKey);
} catch (Exception e) {
redisError = e.getMessage();
log.error("Redis写入失败: key={}", cacheKey, e);
}
}

// 3.2 写入MySQL
if (properties.isMysqlEnabled()) {
try {
transactionRepository.save(transaction);
mysqlSuccess = true;
log.info("MySQL写入成功: transactionId={}", transaction.getTransactionId());
} catch (Exception e) {
mysqlError = e.getMessage();
log.error("MySQL写入失败: transactionId={}", transaction.getTransactionId(), e);
}
}

// 4. 一致性检查
if (properties.isConsistencyCheckEnabled() && redisSuccess && mysqlSuccess) {
ConsistencyCheckResult checkResult = consistencyCheckService.checkTransactionConsistency(
transaction.getTransactionId());
if (!checkResult.isConsistent()) {
log.warn("交易数据一致性检查失败: transactionId={}", transaction.getTransactionId());
}
}

// 5. 构建结果
boolean overallSuccess = (properties.isRedisEnabled() ? redisSuccess : true) &&
(properties.isMysqlEnabled() ? mysqlSuccess : true);

return DualWriteResult.builder()
.success(overallSuccess)
.redisSuccess(redisSuccess)
.mysqlSuccess(mysqlSuccess)
.redisError(redisError)
.mysqlError(mysqlError)
.data(transaction)
.timestamp(LocalDateTime.now())
.build();

} catch (Exception e) {
log.error("双写资金交易记录失败", e);
return DualWriteResult.builder()
.success(false)
.errorMessage("双写操作失败: " + e.getMessage())
.timestamp(LocalDateTime.now())
.build();
}
}

/**
* 批量双写操作
* @param accounts 账户列表
* @param transactions 交易列表
* @return 双写结果
*/
@Transactional
public List<DualWriteResult> batchDualWrite(List<UserFundAccount> accounts, List<FundTransaction> transactions) {
List<DualWriteResult> results = new ArrayList<>();

// 1. 批量写入账户
for (UserFundAccount account : accounts) {
DualWriteResult result = dualWriteAccount(account);
results.add(result);
}

// 2. 批量写入交易
for (FundTransaction transaction : transactions) {
DualWriteResult result = dualWriteTransaction(transaction);
results.add(result);
}

return results;
}

/**
* 异步双写操作
* @param account 资金账户
* @return CompletableFuture
*/
public CompletableFuture<DualWriteResult> dualWriteAccountAsync(UserFundAccount account) {
return CompletableFuture.supplyAsync(() -> dualWriteAccount(account));
}

/**
* 生成账户缓存键
* @param userId 用户ID
* @param accountType 账户类型
* @return 缓存键
*/
private String generateAccountCacheKey(Long userId, String accountType) {
return String.format("fund:account:%d:%s", userId, accountType);
}

/**
* 生成交易缓存键
* @param transactionId 交易ID
* @return 缓存键
*/
private String generateTransactionCacheKey(String transactionId) {
return String.format("fund:transaction:%s", transactionId);
}
}

5. 一致性检查服务

5.1 一致性检查服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/**
* 一致性检查服务
*/
@Service
public class ConsistencyCheckService {

private final DualWriteProperties properties;
private final RedisTemplate<String, Object> redisTemplate;
private final UserFundAccountRepository accountRepository;
private final FundTransactionRepository transactionRepository;

public ConsistencyCheckService(DualWriteProperties properties) {
this.properties = properties;
this.redisTemplate = new RedisTemplate<>();
this.accountRepository = null; // 注入
this.transactionRepository = null; // 注入
}

/**
* 检查账户数据一致性
* @param userId 用户ID
* @param accountType 账户类型
* @return 一致性检查结果
*/
public ConsistencyCheckResult checkAccountConsistency(Long userId, String accountType) {
try {
// 1. 生成缓存键
String cacheKey = String.format("fund:account:%d:%s", userId, accountType);

// 2. 从Redis获取数据
UserFundAccount redisAccount = null;
try {
redisAccount = (UserFundAccount) redisTemplate.opsForValue().get(cacheKey);
} catch (Exception e) {
log.error("从Redis获取账户数据失败: key={}", cacheKey, e);
}

// 3. 从MySQL获取数据
UserFundAccount mysqlAccount = null;
try {
mysqlAccount = accountRepository.findByUserIdAndAccountType(userId, accountType);
} catch (Exception e) {
log.error("从MySQL获取账户数据失败: userId={}, accountType={}", userId, accountType, e);
}

// 4. 比较数据
boolean consistent = compareAccounts(redisAccount, mysqlAccount);

// 5. 构建结果
return ConsistencyCheckResult.builder()
.consistent(consistent)
.redisValue(redisAccount != null ? redisAccount.toString() : null)
.mysqlValue(mysqlAccount != null ? mysqlAccount.toString() : null)
.difference(consistent ? null : generateDifference(redisAccount, mysqlAccount))
.checkType("ACCOUNT")
.timestamp(LocalDateTime.now())
.build();

} catch (Exception e) {
log.error("检查账户数据一致性失败: userId={}, accountType={}", userId, accountType, e);
return ConsistencyCheckResult.builder()
.consistent(false)
.checkType("ACCOUNT")
.timestamp(LocalDateTime.now())
.build();
}
}

/**
* 检查交易数据一致性
* @param transactionId 交易ID
* @return 一致性检查结果
*/
public ConsistencyCheckResult checkTransactionConsistency(String transactionId) {
try {
// 1. 生成缓存键
String cacheKey = String.format("fund:transaction:%s", transactionId);

// 2. 从Redis获取数据
FundTransaction redisTransaction = null;
try {
redisTransaction = (FundTransaction) redisTemplate.opsForValue().get(cacheKey);
} catch (Exception e) {
log.error("从Redis获取交易数据失败: key={}", cacheKey, e);
}

// 3. 从MySQL获取数据
FundTransaction mysqlTransaction = null;
try {
mysqlTransaction = transactionRepository.findByTransactionId(transactionId);
} catch (Exception e) {
log.error("从MySQL获取交易数据失败: transactionId={}", transactionId, e);
}

// 4. 比较数据
boolean consistent = compareTransactions(redisTransaction, mysqlTransaction);

// 5. 构建结果
return ConsistencyCheckResult.builder()
.consistent(consistent)
.redisValue(redisTransaction != null ? redisTransaction.toString() : null)
.mysqlValue(mysqlTransaction != null ? mysqlTransaction.toString() : null)
.difference(consistent ? null : generateDifference(redisTransaction, mysqlTransaction))
.checkType("TRANSACTION")
.timestamp(LocalDateTime.now())
.build();

} catch (Exception e) {
log.error("检查交易数据一致性失败: transactionId={}", transactionId, e);
return ConsistencyCheckResult.builder()
.consistent(false)
.checkType("TRANSACTION")
.timestamp(LocalDateTime.now())
.build();
}
}

/**
* 批量一致性检查
* @param userIds 用户ID列表
* @param accountTypes 账户类型列表
* @return 一致性检查结果列表
*/
public List<ConsistencyCheckResult> batchConsistencyCheck(List<Long> userIds, List<String> accountTypes) {
List<ConsistencyCheckResult> results = new ArrayList<>();

for (Long userId : userIds) {
for (String accountType : accountTypes) {
ConsistencyCheckResult result = checkAccountConsistency(userId, accountType);
results.add(result);
}
}

return results;
}

/**
* 比较账户数据
* @param redisAccount Redis账户
* @param mysqlAccount MySQL账户
* @return 是否一致
*/
private boolean compareAccounts(UserFundAccount redisAccount, UserFundAccount mysqlAccount) {
if (redisAccount == null && mysqlAccount == null) {
return true;
}

if (redisAccount == null || mysqlAccount == null) {
return false;
}

return Objects.equals(redisAccount.getUserId(), mysqlAccount.getUserId()) &&
Objects.equals(redisAccount.getAccountType(), mysqlAccount.getAccountType()) &&
Objects.equals(redisAccount.getBalance(), mysqlAccount.getBalance()) &&
Objects.equals(redisAccount.getFrozenAmount(), mysqlAccount.getFrozenAmount()) &&
Objects.equals(redisAccount.getAvailableAmount(), mysqlAccount.getAvailableAmount()) &&
Objects.equals(redisAccount.getCurrency(), mysqlAccount.getCurrency()) &&
Objects.equals(redisAccount.getStatus(), mysqlAccount.getStatus());
}

/**
* 比较交易数据
* @param redisTransaction Redis交易
* @param mysqlTransaction MySQL交易
* @return 是否一致
*/
private boolean compareTransactions(FundTransaction redisTransaction, FundTransaction mysqlTransaction) {
if (redisTransaction == null && mysqlTransaction == null) {
return true;
}

if (redisTransaction == null || mysqlTransaction == null) {
return false;
}

return Objects.equals(redisTransaction.getTransactionId(), mysqlTransaction.getTransactionId()) &&
Objects.equals(redisTransaction.getUserId(), mysqlTransaction.getUserId()) &&
Objects.equals(redisTransaction.getAccountId(), mysqlTransaction.getAccountId()) &&
Objects.equals(redisTransaction.getTransactionType(), mysqlTransaction.getTransactionType()) &&
Objects.equals(redisTransaction.getAmount(), mysqlTransaction.getAmount()) &&
Objects.equals(redisTransaction.getBalanceBefore(), mysqlTransaction.getBalanceBefore()) &&
Objects.equals(redisTransaction.getBalanceAfter(), mysqlTransaction.getBalanceAfter()) &&
Objects.equals(redisTransaction.getStatus(), mysqlTransaction.getStatus());
}

/**
* 生成差异信息
* @param redisData Redis数据
* @param mysqlData MySQL数据
* @return 差异信息
*/
private String generateDifference(Object redisData, Object mysqlData) {
StringBuilder difference = new StringBuilder();

if (redisData == null) {
difference.append("Redis数据为空");
} else if (mysqlData == null) {
difference.append("MySQL数据为空");
} else {
difference.append("数据不一致: Redis=").append(redisData.toString())
.append(", MySQL=").append(mysqlData.toString());
}

return difference.toString();
}
}

6. 资金安全服务

6.1 资金安全服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/**
* 资金安全服务
*/
@Service
public class FundSecurityService {

private final DualWriteProperties properties;

public FundSecurityService(DualWriteProperties properties) {
this.properties = properties;
}

/**
* 验证资金账户安全
* @param account 资金账户
* @return 安全验证结果
*/
public FundSecurityResult validateFundSecurity(UserFundAccount account) {
try {
// 1. 基础安全检查
if (account.getBalance().compareTo(BigDecimal.ZERO) < 0) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("账户余额不能为负数")
.recommendation("检查账户余额计算逻辑")
.timestamp(LocalDateTime.now())
.build();
}

// 2. 金额范围检查
BigDecimal maxAmount = new BigDecimal("999999999.99");
if (account.getBalance().compareTo(maxAmount) > 0) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("账户余额超过最大限制")
.recommendation("检查金额上限设置")
.timestamp(LocalDateTime.now())
.build();
}

// 3. 冻结金额检查
if (account.getFrozenAmount() != null && account.getFrozenAmount().compareTo(BigDecimal.ZERO) < 0) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("冻结金额不能为负数")
.recommendation("检查冻结金额计算逻辑")
.timestamp(LocalDateTime.now())
.build();
}

// 4. 可用金额检查
if (account.getAvailableAmount() != null) {
BigDecimal expectedAvailable = account.getBalance().subtract(
account.getFrozenAmount() != null ? account.getFrozenAmount() : BigDecimal.ZERO);
if (account.getAvailableAmount().compareTo(expectedAvailable) != 0) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("MEDIUM_RISK")
.riskAssessment("可用金额计算错误")
.recommendation("重新计算可用金额")
.timestamp(LocalDateTime.now())
.build();
}
}

// 5. 状态检查
if (!Arrays.asList("ACTIVE", "FROZEN", "CLOSED").contains(account.getStatus())) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("账户状态无效")
.recommendation("检查账户状态设置")
.timestamp(LocalDateTime.now())
.build();
}

// 6. 通过安全检查
return FundSecurityResult.builder()
.secure(true)
.securityLevel("LOW_RISK")
.riskAssessment("资金账户安全检查通过")
.recommendation("继续正常业务流程")
.timestamp(LocalDateTime.now())
.build();

} catch (Exception e) {
log.error("资金账户安全检查失败", e);
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("安全检查异常: " + e.getMessage())
.recommendation("检查系统配置和逻辑")
.timestamp(LocalDateTime.now())
.build();
}
}

/**
* 验证交易安全
* @param transaction 交易记录
* @return 安全验证结果
*/
public FundSecurityResult validateTransactionSecurity(FundTransaction transaction) {
try {
// 1. 交易金额检查
if (transaction.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("交易金额必须大于0")
.recommendation("检查交易金额设置")
.timestamp(LocalDateTime.now())
.build();
}

// 2. 交易金额上限检查
BigDecimal maxTransactionAmount = new BigDecimal("1000000.00");
if (transaction.getAmount().compareTo(maxTransactionAmount) > 0) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("交易金额超过单笔上限")
.recommendation("检查交易金额上限设置")
.timestamp(LocalDateTime.now())
.build();
}

// 3. 交易类型检查
if (!Arrays.asList("DEPOSIT", "WITHDRAW", "TRANSFER", "REFUND").contains(transaction.getTransactionType())) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("交易类型无效")
.recommendation("检查交易类型设置")
.timestamp(LocalDateTime.now())
.build();
}

// 4. 交易状态检查
if (!Arrays.asList("PENDING", "SUCCESS", "FAILED", "CANCELLED").contains(transaction.getStatus())) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("交易状态无效")
.recommendation("检查交易状态设置")
.timestamp(LocalDateTime.now())
.build();
}

// 5. 余额检查
if (transaction.getBalanceBefore() != null && transaction.getBalanceAfter() != null) {
BigDecimal expectedBalanceAfter = transaction.getBalanceBefore().add(transaction.getAmount());
if (transaction.getTransactionType().equals("WITHDRAW")) {
expectedBalanceAfter = transaction.getBalanceBefore().subtract(transaction.getAmount());
}

if (transaction.getBalanceAfter().compareTo(expectedBalanceAfter) != 0) {
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("交易后余额计算错误")
.recommendation("重新计算交易后余额")
.timestamp(LocalDateTime.now())
.build();
}
}

// 6. 通过安全检查
return FundSecurityResult.builder()
.secure(true)
.securityLevel("LOW_RISK")
.riskAssessment("交易安全检查通过")
.recommendation("继续正常业务流程")
.timestamp(LocalDateTime.now())
.build();

} catch (Exception e) {
log.error("交易安全检查失败", e);
return FundSecurityResult.builder()
.secure(false)
.securityLevel("HIGH_RISK")
.riskAssessment("安全检查异常: " + e.getMessage())
.recommendation("检查系统配置和逻辑")
.timestamp(LocalDateTime.now())
.build();
}
}
}

7. 双写一致性控制器

7.1 双写一致性控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/**
* 双写一致性控制器
*/
@RestController
@RequestMapping("/dual-write")
public class DualWriteController {

@Autowired
private DualWriteService dualWriteService;

@Autowired
private ConsistencyCheckService consistencyCheckService;

@Autowired
private FundSecurityService fundSecurityService;

/**
* 双写用户资金账户
*/
@PostMapping("/account")
public ResponseEntity<Map<String, Object>> dualWriteAccount(@RequestBody UserFundAccount account) {
try {
DualWriteResult result = dualWriteService.dualWriteAccount(account);

Map<String, Object> response = new HashMap<>();
response.put("success", result.isSuccess());
response.put("redisSuccess", result.isRedisSuccess());
response.put("mysqlSuccess", result.isMysqlSuccess());
response.put("errorMessage", result.getErrorMessage());
response.put("data", result.getData());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("双写用户资金账户失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "双写操作失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 双写资金交易记录
*/
@PostMapping("/transaction")
public ResponseEntity<Map<String, Object>> dualWriteTransaction(@RequestBody FundTransaction transaction) {
try {
DualWriteResult result = dualWriteService.dualWriteTransaction(transaction);

Map<String, Object> response = new HashMap<>();
response.put("success", result.isSuccess());
response.put("redisSuccess", result.isRedisSuccess());
response.put("mysqlSuccess", result.isMysqlSuccess());
response.put("errorMessage", result.getErrorMessage());
response.put("data", result.getData());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("双写资金交易记录失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "双写操作失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 检查账户数据一致性
*/
@GetMapping("/consistency/account")
public ResponseEntity<Map<String, Object>> checkAccountConsistency(
@RequestParam Long userId,
@RequestParam String accountType) {
try {
ConsistencyCheckResult result = consistencyCheckService.checkAccountConsistency(userId, accountType);

Map<String, Object> response = new HashMap<>();
response.put("consistent", result.isConsistent());
response.put("redisValue", result.getRedisValue());
response.put("mysqlValue", result.getMysqlValue());
response.put("difference", result.getDifference());
response.put("checkType", result.getCheckType());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("检查账户数据一致性失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "一致性检查失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 检查交易数据一致性
*/
@GetMapping("/consistency/transaction")
public ResponseEntity<Map<String, Object>> checkTransactionConsistency(
@RequestParam String transactionId) {
try {
ConsistencyCheckResult result = consistencyCheckService.checkTransactionConsistency(transactionId);

Map<String, Object> response = new HashMap<>();
response.put("consistent", result.isConsistent());
response.put("redisValue", result.getRedisValue());
response.put("mysqlValue", result.getMysqlValue());
response.put("difference", result.getDifference());
response.put("checkType", result.getCheckType());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("检查交易数据一致性失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "一致性检查失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 验证资金账户安全
*/
@PostMapping("/security/account")
public ResponseEntity<Map<String, Object>> validateAccountSecurity(@RequestBody UserFundAccount account) {
try {
FundSecurityResult result = fundSecurityService.validateFundSecurity(account);

Map<String, Object> response = new HashMap<>();
response.put("secure", result.isSecure());
response.put("securityLevel", result.getSecurityLevel());
response.put("riskAssessment", result.getRiskAssessment());
response.put("recommendation", result.getRecommendation());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("验证资金账户安全失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "安全验证失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}

/**
* 验证交易安全
*/
@PostMapping("/security/transaction")
public ResponseEntity<Map<String, Object>> validateTransactionSecurity(@RequestBody FundTransaction transaction) {
try {
FundSecurityResult result = fundSecurityService.validateTransactionSecurity(transaction);

Map<String, Object> response = new HashMap<>();
response.put("secure", result.isSecure());
response.put("securityLevel", result.getSecurityLevel());
response.put("riskAssessment", result.getRiskAssessment());
response.put("recommendation", result.getRecommendation());

return ResponseEntity.ok(response);

} catch (Exception e) {
log.error("验证交易安全失败", e);

Map<String, Object> response = new HashMap<>();
response.put("success", false);
response.put("message", "安全验证失败: " + e.getMessage());

return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(response);
}
}
}

8. 总结

通过Redis+MySQL双写一致性策略的实现,我们成功构建了一个资金类数据安全保障系统。关键特性包括:

8.1 核心优势

  1. 双写一致性: Redis和MySQL数据同步写入
  2. 资金安全: 资金类数据的安全保障机制
  3. 事务处理: 分布式事务和本地事务管理
  4. 数据同步: 实时数据同步和一致性检查
  5. 容错机制: 故障恢复和数据修复

8.2 最佳实践

  1. 数据一致性: 完善的双写一致性策略
  2. 资金安全: 全面的资金安全检查机制
  3. 事务管理: 可靠的事务处理和回滚机制
  4. 监控告警: 实时监控和异常告警
  5. 容错处理: 完善的容错和恢复机制

这套Redis+MySQL双写一致性方案不仅能够提供资金类数据的安全保障,还包含了数据一致性、事务处理、容错机制等核心功能,是金融系统的重要基础设施。