1. 高并发MySQL架构概述

在互联网应用中,如何设计一个支持每日亿级写入和10万QPS的MySQL架构是一个极具挑战性的问题。本文将详细介绍基于Java的高并发MySQL架构设计,包括分库分表、读写分离、缓存优化、性能调优的完整解决方案。

1.1 性能指标分析

  • 每日亿级写入: 约115万次/秒写入操作
  • 10万QPS: 每秒10万次查询操作
  • 数据量: 假设单条记录1KB,每日新增约100GB数据
  • 存储需求: 年增长约36TB数据

1.2 技术架构

1
2
3
应用层 → 分库分表中间件 → 读写分离 → MySQL集群 → 缓存层
↓ ↓ ↓ ↓ ↓
负载均衡 → 路由规则 → 主从复制 → 分片存储 → Redis集群

2. Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.mysql</groupId>
<artifactId>high-concurrency-mysql-demo</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>

<properties>
<java.version>11</java.version>
<sharding-sphere.version>5.1.2</sharding-sphere.version>
</properties>

<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- ShardingSphere JDBC -->
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>${sharding-sphere.version}</version>
</dependency>

<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- MyBatis Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>

<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- HikariCP连接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

3. 分库分表配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# application.yml
server:
port: 8080

spring:
application:
name: high-concurrency-mysql-demo

# ShardingSphere配置
shardingsphere:
# 数据源配置
datasource:
names: ds0,ds1,ds2,ds3
ds0:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/user_db_0?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
ds1:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/user_db_1?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
ds2:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/user_db_2?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
ds3:
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3306/user_db_3?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000

# 分片规则
rules:
sharding:
# 分库规则
default-database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: database-inline
# 分表规则
tables:
user_info:
actual-data-nodes: ds$->{0..3}.user_info_$->{0..15}
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: database-inline
table-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: table-inline
key-generate-strategy:
column: id
key-generator-name: snowflake
user_order:
actual-data-nodes: ds$->{0..3}.user_order_$->{0..15}
database-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: database-inline
table-strategy:
standard:
sharding-column: user_id
sharding-algorithm-name: table-inline
key-generate-strategy:
column: id
key-generator-name: snowflake

# 分片算法
sharding-algorithms:
database-inline:
type: INLINE
props:
algorithm-expression: ds$->{user_id % 4}
table-inline:
type: INLINE
props:
algorithm-expression: user_info_$->{user_id % 16}

# 主键生成器
key-generators:
snowflake:
type: SNOWFLAKE
props:
worker-id: 1

# Redis配置
redis:
host: localhost
port: 6379
database: 0
timeout: 5000
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5

# MyBatis Plus配置
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
cache-enabled: false
global-config:
db-config:
id-type: ASSIGN_ID
logic-delete-field: deleted
logic-delete-value: 1
logic-not-delete-value: 0

# 监控配置
management:
endpoints:
web:
exposure:
include: "*"
metrics:
export:
prometheus:
enabled: true

4. 用户实体定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.mysql.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

/**
* 用户信息实体
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("user_info")
public class UserInfo {

/**
* 主键ID(雪花算法生成)
*/
@TableId(type = IdType.ASSIGN_ID)
private Long id;

/**
* 用户ID(分片键)
*/
private Long userId;

/**
* 用户名
*/
private String username;

/**
* 邮箱
*/
private String email;

/**
* 手机号
*/
private String phone;

/**
* 用户状态:ACTIVE-活跃, INACTIVE-非活跃, BANNED-封禁
*/
private String status;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;

/**
* 逻辑删除标记
*/
private Integer deleted;
}

5. 订单实体定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.mysql.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
* 用户订单实体
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("user_order")
public class UserOrder {

/**
* 主键ID(雪花算法生成)
*/
@TableId(type = IdType.ASSIGN_ID)
private Long id;

/**
* 用户ID(分片键)
*/
private Long userId;

/**
* 订单号
*/
private String orderNo;

/**
* 商品ID
*/
private Long productId;

/**
* 商品名称
*/
private String productName;

/**
* 订单金额
*/
private BigDecimal amount;

/**
* 订单状态:PENDING-待支付, PAID-已支付, CANCELLED-已取消, REFUNDED-已退款
*/
private String status;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;

/**
* 支付时间
*/
private LocalDateTime payTime;

/**
* 逻辑删除标记
*/
private Integer deleted;
}

6. 分库分表服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package com.mysql.service;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.mysql.entity.UserInfo;
import com.mysql.entity.UserOrder;
import com.mysql.mapper.UserInfoMapper;
import com.mysql.mapper.UserOrderMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* 分库分表服务
* @author Java实战
*/
@Slf4j
@Service
public class ShardingService extends ServiceImpl<UserInfoMapper, UserInfo> {

@Autowired
private UserInfoMapper userInfoMapper;

@Autowired
private UserOrderMapper userOrderMapper;

/**
* 线程池用于异步处理
*/
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new java.util.concurrent.LinkedBlockingQueue<>(1000),
r -> new Thread(r, "sharding-async-" + System.currentTimeMillis())
);

/**
* 批量插入用户信息
*/
@Transactional
public void batchInsertUsers(List<UserInfo> users) {
try {
log.info("开始批量插入用户信息,数量: {}", users.size());

// 分批处理,每批1000条
int batchSize = 1000;
for (int i = 0; i < users.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, users.size());
List<UserInfo> batch = users.subList(i, endIndex);

// 异步处理每批数据
CompletableFuture.runAsync(() -> {
try {
userInfoMapper.batchInsert(batch);
log.info("批量插入用户信息完成,批次: {}-{}", i, endIndex);
} catch (Exception e) {
log.error("批量插入用户信息失败,批次: {}-{}", i, endIndex, e);
}
}, executor);
}

} catch (Exception e) {
log.error("批量插入用户信息失败", e);
throw new RuntimeException("批量插入用户信息失败", e);
}
}

/**
* 批量插入订单信息
*/
@Transactional
public void batchInsertOrders(List<UserOrder> orders) {
try {
log.info("开始批量插入订单信息,数量: {}", orders.size());

// 分批处理,每批1000条
int batchSize = 1000;
for (int i = 0; i < orders.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, orders.size());
List<UserOrder> batch = orders.subList(i, endIndex);

// 异步处理每批数据
CompletableFuture.runAsync(() -> {
try {
userOrderMapper.batchInsert(batch);
log.info("批量插入订单信息完成,批次: {}-{}", i, endIndex);
} catch (Exception e) {
log.error("批量插入订单信息失败,批次: {}-{}", i, endIndex, e);
}
}, executor);
}

} catch (Exception e) {
log.error("批量插入订单信息失败", e);
throw new RuntimeException("批量插入订单信息失败", e);
}
}

/**
* 根据用户ID查询用户信息
*/
public UserInfo getUserById(Long userId) {
try {
log.debug("查询用户信息: userId={}", userId);

QueryWrapper<UserInfo> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("user_id", userId);

return userInfoMapper.selectOne(queryWrapper);

} catch (Exception e) {
log.error("查询用户信息失败: userId={}", userId, e);
throw new RuntimeException("查询用户信息失败", e);
}
}

/**
* 根据用户ID查询订单列表
*/
public List<UserOrder> getOrdersByUserId(Long userId) {
try {
log.debug("查询用户订单列表: userId={}", userId);

QueryWrapper<UserOrder> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("user_id", userId);
queryWrapper.orderByDesc("create_time");

return userOrderMapper.selectList(queryWrapper);

} catch (Exception e) {
log.error("查询用户订单列表失败: userId={}", userId, e);
throw new RuntimeException("查询用户订单列表失败", e);
}
}

/**
* 更新用户状态
*/
@Transactional
public boolean updateUserStatus(Long userId, String status) {
try {
log.info("更新用户状态: userId={}, status={}", userId, status);

QueryWrapper<UserInfo> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("user_id", userId);

UserInfo userInfo = new UserInfo();
userInfo.setStatus(status);
userInfo.setUpdateTime(LocalDateTime.now());

int result = userInfoMapper.update(userInfo, queryWrapper);

log.info("更新用户状态完成: userId={}, result={}", userId, result);
return result > 0;

} catch (Exception e) {
log.error("更新用户状态失败: userId={}", userId, e);
throw new RuntimeException("更新用户状态失败", e);
}
}

/**
* 更新订单状态
*/
@Transactional
public boolean updateOrderStatus(Long orderId, String status) {
try {
log.info("更新订单状态: orderId={}, status={}", orderId, status);

QueryWrapper<UserOrder> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", orderId);

UserOrder order = new UserOrder();
order.setStatus(status);
order.setUpdateTime(LocalDateTime.now());

if ("PAID".equals(status)) {
order.setPayTime(LocalDateTime.now());
}

int result = userOrderMapper.update(order, queryWrapper);

log.info("更新订单状态完成: orderId={}, result={}", orderId, result);
return result > 0;

} catch (Exception e) {
log.error("更新订单状态失败: orderId={}", orderId, e);
throw new RuntimeException("更新订单状态失败", e);
}
}

/**
* 获取分片信息
*/
public String getShardInfo(Long userId) {
try {
int dbIndex = (int) (userId % 4);
int tableIndex = (int) (userId % 16);

return String.format("数据库: ds%d, 表: user_info_%d", dbIndex, tableIndex);

} catch (Exception e) {
log.error("获取分片信息失败: userId={}", userId, e);
return "未知分片";
}
}
}

7. 缓存服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package com.mysql.service;

import com.mysql.entity.UserInfo;
import com.mysql.entity.UserOrder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* 缓存服务
* @author Java实战
*/
@Slf4j
@Service
public class CacheService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

private static final String USER_CACHE_KEY = "user:info:";
private static final String ORDER_CACHE_KEY = "user:orders:";
private static final String USER_STATUS_KEY = "user:status:";

/**
* 缓存用户信息
*/
public void cacheUserInfo(UserInfo userInfo) {
try {
String key = USER_CACHE_KEY + userInfo.getUserId();
redisTemplate.opsForValue().set(key, userInfo, Duration.ofMinutes(30));

log.debug("缓存用户信息: userId={}", userInfo.getUserId());

} catch (Exception e) {
log.error("缓存用户信息失败: userId={}", userInfo.getUserId(), e);
}
}

/**
* 获取缓存的用户信息
*/
public UserInfo getCachedUserInfo(Long userId) {
try {
String key = USER_CACHE_KEY + userId;
Object cached = redisTemplate.opsForValue().get(key);

if (cached instanceof UserInfo) {
log.debug("从缓存获取用户信息: userId={}", userId);
return (UserInfo) cached;
}

} catch (Exception e) {
log.error("获取缓存用户信息失败: userId={}", userId, e);
}

return null;
}

/**
* 缓存用户订单列表
*/
public void cacheUserOrders(Long userId, List<UserOrder> orders) {
try {
String key = ORDER_CACHE_KEY + userId;
redisTemplate.opsForValue().set(key, orders, Duration.ofMinutes(15));

log.debug("缓存用户订单列表: userId={}, count={}", userId, orders.size());

} catch (Exception e) {
log.error("缓存用户订单列表失败: userId={}", userId, e);
}
}

/**
* 获取缓存的用户订单列表
*/
@SuppressWarnings("unchecked")
public List<UserOrder> getCachedUserOrders(Long userId) {
try {
String key = ORDER_CACHE_KEY + userId;
Object cached = redisTemplate.opsForValue().get(key);

if (cached instanceof List) {
log.debug("从缓存获取用户订单列表: userId={}", userId);
return (List<UserOrder>) cached;
}

} catch (Exception e) {
log.error("获取缓存用户订单列表失败: userId={}", userId, e);
}

return null;
}

/**
* 缓存用户状态
*/
public void cacheUserStatus(Long userId, String status) {
try {
String key = USER_STATUS_KEY + userId;
redisTemplate.opsForValue().set(key, status, Duration.ofHours(1));

log.debug("缓存用户状态: userId={}, status={}", userId, status);

} catch (Exception e) {
log.error("缓存用户状态失败: userId={}", userId, e);
}
}

/**
* 获取缓存的用户状态
*/
public String getCachedUserStatus(Long userId) {
try {
String key = USER_STATUS_KEY + userId;
Object cached = redisTemplate.opsForValue().get(key);

if (cached instanceof String) {
log.debug("从缓存获取用户状态: userId={}", userId);
return (String) cached;
}

} catch (Exception e) {
log.error("获取缓存用户状态失败: userId={}", userId, e);
}

return null;
}

/**
* 删除用户缓存
*/
public void evictUserCache(Long userId) {
try {
String userKey = USER_CACHE_KEY + userId;
String orderKey = ORDER_CACHE_KEY + userId;
String statusKey = USER_STATUS_KEY + userId;

redisTemplate.delete(userKey);
redisTemplate.delete(orderKey);
redisTemplate.delete(statusKey);

log.debug("删除用户缓存: userId={}", userId);

} catch (Exception e) {
log.error("删除用户缓存失败: userId={}", userId, e);
}
}

/**
* 批量删除用户缓存
*/
public void batchEvictUserCache(List<Long> userIds) {
try {
for (Long userId : userIds) {
evictUserCache(userId);
}

log.info("批量删除用户缓存完成,数量: {}", userIds.size());

} catch (Exception e) {
log.error("批量删除用户缓存失败", e);
}
}
}

8. 性能监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package com.mysql.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicLong;

/**
* 性能监控服务
* @author Java实战
*/
@Slf4j
@Service
public class PerformanceMonitorService {

@Autowired
private DataSource dataSource;

/**
* 性能统计
*/
private final AtomicLong totalQueries = new AtomicLong(0);
private final AtomicLong totalWrites = new AtomicLong(0);
private final AtomicLong totalErrors = new AtomicLong(0);

/**
* 监控数据库性能(每30秒执行一次)
*/
@Scheduled(fixedRate = 30000)
public void monitorDatabasePerformance() {
try {
log.info("=== 数据库性能监控报告 ===");

// 监控连接池状态
monitorConnectionPool();

// 监控查询性能
monitorQueryPerformance();

// 监控写入性能
monitorWritePerformance();

// 监控错误率
monitorErrorRate();

log.info("=== 监控报告结束 ===");

} catch (Exception e) {
log.error("数据库性能监控异常", e);
}
}

/**
* 监控连接池状态
*/
private void monitorConnectionPool() {
try {
if (dataSource instanceof com.zaxxer.hikari.HikariDataSource) {
com.zaxxer.hikari.HikariDataSource hikariDataSource =
(com.zaxxer.hikari.HikariDataSource) dataSource;

log.info("连接池状态 - 活跃连接: {}, 空闲连接: {}, 总连接: {}, 等待连接: {}",
hikariDataSource.getHikariPoolMXBean().getActiveConnections(),
hikariDataSource.getHikariPoolMXBean().getIdleConnections(),
hikariDataSource.getHikariPoolMXBean().getTotalConnections(),
hikariDataSource.getHikariPoolMXBean().getThreadsAwaitingConnection());
}

} catch (Exception e) {
log.error("监控连接池状态失败", e);
}
}

/**
* 监控查询性能
*/
private void monitorQueryPerformance() {
try {
long currentQueries = totalQueries.get();
log.info("查询性能 - 总查询数: {}, QPS: {}", currentQueries, currentQueries / 30);

} catch (Exception e) {
log.error("监控查询性能失败", e);
}
}

/**
* 监控写入性能
*/
private void monitorWritePerformance() {
try {
long currentWrites = totalWrites.get();
log.info("写入性能 - 总写入数: {}, WPS: {}", currentWrites, currentWrites / 30);

} catch (Exception e) {
log.error("监控写入性能失败", e);
}
}

/**
* 监控错误率
*/
private void monitorErrorRate() {
try {
long currentErrors = totalErrors.get();
long totalOperations = totalQueries.get() + totalWrites.get();

if (totalOperations > 0) {
double errorRate = (double) currentErrors / totalOperations * 100;
log.info("错误率 - 总错误数: {}, 错误率: {:.2f}%", currentErrors, errorRate);
}

} catch (Exception e) {
log.error("监控错误率失败", e);
}
}

/**
* 记录查询操作
*/
public void recordQuery() {
totalQueries.incrementAndGet();
}

/**
* 记录写入操作
*/
public void recordWrite() {
totalWrites.incrementAndGet();
}

/**
* 记录错误
*/
public void recordError() {
totalErrors.incrementAndGet();
}

/**
* 获取性能统计
*/
public PerformanceStats getPerformanceStats() {
return PerformanceStats.builder()
.totalQueries(totalQueries.get())
.totalWrites(totalWrites.get())
.totalErrors(totalErrors.get())
.timestamp(LocalDateTime.now())
.build();
}

/**
* 性能统计信息
*/
@lombok.Data
@lombok.Builder
public static class PerformanceStats {
private long totalQueries;
private long totalWrites;
private long totalErrors;
private LocalDateTime timestamp;
}
}

9. 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package com.mysql.controller;

import com.mysql.entity.UserInfo;
import com.mysql.entity.UserOrder;
import com.mysql.service.CacheService;
import com.mysql.service.PerformanceMonitorService;
import com.mysql.service.ShardingService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
* 高并发MySQL控制器
* @author Java实战
*/
@Slf4j
@RestController
@RequestMapping("/api/mysql")
public class HighConcurrencyMySQLController {

@Autowired
private ShardingService shardingService;

@Autowired
private CacheService cacheService;

@Autowired
private PerformanceMonitorService performanceMonitorService;

/**
* 批量插入用户
*/
@PostMapping("/users/batch")
public ResponseEntity<String> batchInsertUsers(@RequestBody List<UserInfo> users) {
try {
performanceMonitorService.recordWrite();
shardingService.batchInsertUsers(users);
return ResponseEntity.ok("批量插入用户成功");
} catch (Exception e) {
performanceMonitorService.recordError();
log.error("批量插入用户失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 批量插入订单
*/
@PostMapping("/orders/batch")
public ResponseEntity<String> batchInsertOrders(@RequestBody List<UserOrder> orders) {
try {
performanceMonitorService.recordWrite();
shardingService.batchInsertOrders(orders);
return ResponseEntity.ok("批量插入订单成功");
} catch (Exception e) {
performanceMonitorService.recordError();
log.error("批量插入订单失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 查询用户信息
*/
@GetMapping("/user/{userId}")
public ResponseEntity<UserInfo> getUserInfo(@PathVariable Long userId) {
try {
performanceMonitorService.recordQuery();

// 先从缓存获取
UserInfo cachedUser = cacheService.getCachedUserInfo(userId);
if (cachedUser != null) {
return ResponseEntity.ok(cachedUser);
}

// 从数据库获取
UserInfo userInfo = shardingService.getUserById(userId);
if (userInfo != null) {
// 缓存用户信息
cacheService.cacheUserInfo(userInfo);
}

return ResponseEntity.ok(userInfo);
} catch (Exception e) {
performanceMonitorService.recordError();
log.error("查询用户信息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 查询用户订单
*/
@GetMapping("/user/{userId}/orders")
public ResponseEntity<List<UserOrder>> getUserOrders(@PathVariable Long userId) {
try {
performanceMonitorService.recordQuery();

// 先从缓存获取
List<UserOrder> cachedOrders = cacheService.getCachedUserOrders(userId);
if (cachedOrders != null) {
return ResponseEntity.ok(cachedOrders);
}

// 从数据库获取
List<UserOrder> orders = shardingService.getOrdersByUserId(userId);
if (orders != null && !orders.isEmpty()) {
// 缓存订单列表
cacheService.cacheUserOrders(userId, orders);
}

return ResponseEntity.ok(orders);
} catch (Exception e) {
performanceMonitorService.recordError();
log.error("查询用户订单失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 更新用户状态
*/
@PutMapping("/user/{userId}/status")
public ResponseEntity<Boolean> updateUserStatus(@PathVariable Long userId, @RequestParam String status) {
try {
performanceMonitorService.recordWrite();

boolean success = shardingService.updateUserStatus(userId, status);

if (success) {
// 更新缓存
cacheService.cacheUserStatus(userId, status);
// 删除用户信息缓存
cacheService.evictUserCache(userId);
}

return ResponseEntity.ok(success);
} catch (Exception e) {
performanceMonitorService.recordError();
log.error("更新用户状态失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取分片信息
*/
@GetMapping("/shard/{userId}")
public ResponseEntity<String> getShardInfo(@PathVariable Long userId) {
try {
String shardInfo = shardingService.getShardInfo(userId);
return ResponseEntity.ok(shardInfo);
} catch (Exception e) {
log.error("获取分片信息失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取性能统计
*/
@GetMapping("/performance")
public ResponseEntity<PerformanceMonitorService.PerformanceStats> getPerformanceStats() {
try {
PerformanceMonitorService.PerformanceStats stats = performanceMonitorService.getPerformanceStats();
return ResponseEntity.ok(stats);
} catch (Exception e) {
log.error("获取性能统计失败", e);
return ResponseEntity.internalServerError().build();
}
}
}

10. 总结

设计一个支持每日亿级写入和10万QPS的MySQL架构需要综合考虑多个方面。通过本文的详细介绍,我们了解了:

  1. 分库分表: 使用ShardingSphere实现水平分片
  2. 读写分离: 主从复制提高查询性能
  3. 缓存优化: Redis缓存减少数据库压力
  4. 连接池优化: HikariCP提供高性能连接池
  5. 性能监控: 实时监控数据库性能指标

通过合理的架构设计和优化,可以实现高并发、高可用的MySQL系统。


Java实战要点:

  • ShardingSphere提供完整的分库分表解决方案
  • Redis缓存显著提升查询性能
  • 连接池优化提高数据库连接效率
  • 异步处理提高系统吞吐量
  • 性能监控确保系统稳定运行

代码注解说明:

  • @Transactional: 事务管理注解
  • @Scheduled: 定时任务注解
  • @TableName: MyBatis Plus表名注解
  • @TableId: 主键注解
  • 分片算法: 基于用户ID的取模分片
  • 缓存策略: 多级缓存提高性能