1. MySQL主从复制概述

MySQL主从复制是构建高可用、高性能数据库架构的核心技术,通过读写分离可以显著提升系统性能。本文将详细介绍基于Java的MySQL主从复制与读写分离架构设计,包括主从配置、读写分离、故障转移、数据同步的完整解决方案。

1.1 核心功能

  1. 主从复制: 实现数据的实时同步
  2. 读写分离: 读操作分散到从库,写操作集中在主库
  3. 故障转移: 主库故障时自动切换到从库
  4. 负载均衡: 多个从库间的负载均衡
  5. 数据一致性: 保证主从数据的一致性

1.2 技术架构

1
2
3
应用层 → 读写分离中间件 → 主库(写) → 从库(读) → 数据同步
↓ ↓ ↓ ↓ ↓
负载均衡 → 路由规则 → 事务日志 → 复制线程 → 数据一致性

2. Maven依赖配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<!-- pom.xml -->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>com.mysql</groupId>
<artifactId>mysql-master-slave-demo</artifactId>
<version>1.0.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
</parent>

<properties>
<java.version>11</java.version>
</properties>

<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- MySQL驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>

<!-- MyBatis Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.2</version>
</dependency>

<!-- 数据库连接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>

<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>

<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

3. 数据源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# application.yml
server:
port: 8080

spring:
application:
name: mysql-master-slave-demo

# 主从数据源配置
datasource:
# 主库配置
master:
jdbc-url: jdbc:mysql://localhost:3306/test_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000

# 从库配置
slaves:
- jdbc-url: jdbc:mysql://localhost:3307/test_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 15
minimum-idle: 3
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
- jdbc-url: jdbc:mysql://localhost:3308/test_db?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 15
minimum-idle: 3
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000

# Redis配置
redis:
host: localhost
port: 6379
database: 0
timeout: 5000
lettuce:
pool:
max-active: 20
max-idle: 10
min-idle: 5

# MyBatis Plus配置
mybatis-plus:
configuration:
map-underscore-to-camel-case: true
cache-enabled: false
global-config:
db-config:
id-type: ASSIGN_ID
logic-delete-field: deleted
logic-delete-value: 1
logic-not-delete-value: 0

# 监控配置
management:
endpoints:
web:
exposure:
include: "*"
metrics:
export:
prometheus:
enabled: true

4. 数据源配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package com.mysql.config;

import com.zaxxer.hikari.HikariDataSource;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import javax.sql.DataSource;
import java.util.List;

/**
* 数据源配置类
* @author Java实战
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.datasource")
public class DataSourceConfig {

/**
* 主库配置
*/
private MasterConfig master;

/**
* 从库配置列表
*/
private List<SlaveConfig> slaves;

/**
* 主库数据源
*/
@Bean("masterDataSource")
@Primary
public DataSource masterDataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(master.getJdbcUrl());
dataSource.setUsername(master.getUsername());
dataSource.setPassword(master.getPassword());
dataSource.setDriverClassName(master.getDriverClassName());

// 连接池配置
dataSource.setMaximumPoolSize(master.getHikari().getMaximumPoolSize());
dataSource.setMinimumIdle(master.getHikari().getMinimumIdle());
dataSource.setConnectionTimeout(master.getHikari().getConnectionTimeout());
dataSource.setIdleTimeout(master.getHikari().getIdleTimeout());
dataSource.setMaxLifetime(master.getHikari().getMaxLifetime());

return dataSource;
}

/**
* 从库数据源列表
*/
@Bean("slaveDataSources")
public List<DataSource> slaveDataSources() {
return slaves.stream()
.map(this::createSlaveDataSource)
.collect(java.util.stream.Collectors.toList());
}

/**
* 创建从库数据源
*/
private DataSource createSlaveDataSource(SlaveConfig slaveConfig) {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(slaveConfig.getJdbcUrl());
dataSource.setUsername(slaveConfig.getUsername());
dataSource.setPassword(slaveConfig.getPassword());
dataSource.setDriverClassName(slaveConfig.getDriverClassName());

// 连接池配置
dataSource.setMaximumPoolSize(slaveConfig.getHikari().getMaximumPoolSize());
dataSource.setMinimumIdle(slaveConfig.getHikari().getMinimumIdle());
dataSource.setConnectionTimeout(slaveConfig.getHikari().getConnectionTimeout());
dataSource.setIdleTimeout(slaveConfig.getHikari().getIdleTimeout());
dataSource.setMaxLifetime(slaveConfig.getHikari().getMaxLifetime());

return dataSource;
}

@Data
public static class MasterConfig {
private String jdbcUrl;
private String username;
private String password;
private String driverClassName;
private HikariConfig hikari;
}

@Data
public static class SlaveConfig {
private String jdbcUrl;
private String username;
private String password;
private String driverClassName;
private HikariConfig hikari;
}

@Data
public static class HikariConfig {
private int maximumPoolSize;
private int minimumIdle;
private long connectionTimeout;
private long idleTimeout;
private long maxLifetime;
}
}

5. 读写分离服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package com.mysql.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 读写分离服务
* @author Java实战
*/
@Slf4j
@Service
public class ReadWriteSplitService {

@Autowired
@Qualifier("masterDataSource")
private DataSource masterDataSource;

@Autowired
@Qualifier("slaveDataSources")
private List<DataSource> slaveDataSources;

/**
* 从库轮询计数器
*/
private final AtomicInteger slaveCounter = new AtomicInteger(0);

/**
* 获取主库连接(用于写操作)
*/
public Connection getMasterConnection() throws SQLException {
try {
Connection connection = masterDataSource.getConnection();
log.debug("获取主库连接成功");
return connection;
} catch (SQLException e) {
log.error("获取主库连接失败", e);
throw e;
}
}

/**
* 获取从库连接(用于读操作)
*/
public Connection getSlaveConnection() throws SQLException {
try {
if (slaveDataSources.isEmpty()) {
log.warn("没有可用的从库,使用主库连接");
return getMasterConnection();
}

// 轮询选择从库
int index = slaveCounter.getAndIncrement() % slaveDataSources.size();
DataSource slaveDataSource = slaveDataSources.get(index);

Connection connection = slaveDataSource.getConnection();
log.debug("获取从库连接成功,从库索引: {}", index);
return connection;

} catch (SQLException e) {
log.error("获取从库连接失败", e);
// 从库连接失败时,尝试使用主库
log.warn("从库连接失败,尝试使用主库连接");
return getMasterConnection();
}
}

/**
* 检查主库连接状态
*/
public boolean isMasterHealthy() {
try (Connection connection = getMasterConnection()) {
return connection.isValid(5);
} catch (SQLException e) {
log.error("主库健康检查失败", e);
return false;
}
}

/**
* 检查从库连接状态
*/
public boolean isSlaveHealthy(int slaveIndex) {
try {
if (slaveIndex >= slaveDataSources.size()) {
return false;
}

DataSource slaveDataSource = slaveDataSources.get(slaveIndex);
try (Connection connection = slaveDataSource.getConnection()) {
return connection.isValid(5);
}
} catch (SQLException e) {
log.error("从库健康检查失败,索引: {}", slaveIndex, e);
return false;
}
}

/**
* 获取所有从库的健康状态
*/
public List<Boolean> getAllSlavesHealth() {
return slaveDataSources.stream()
.map(dataSource -> {
try (Connection connection = dataSource.getConnection()) {
return connection.isValid(5);
} catch (SQLException e) {
log.error("从库健康检查失败", e);
return false;
}
})
.collect(java.util.stream.Collectors.toList());
}

/**
* 获取从库数量
*/
public int getSlaveCount() {
return slaveDataSources.size();
}

/**
* 获取主库信息
*/
public String getMasterInfo() {
try {
return masterDataSource.getConnection().getMetaData().getURL();
} catch (SQLException e) {
log.error("获取主库信息失败", e);
return "未知";
}
}

/**
* 获取从库信息
*/
public List<String> getSlavesInfo() {
return slaveDataSources.stream()
.map(dataSource -> {
try {
return dataSource.getConnection().getMetaData().getURL();
} catch (SQLException e) {
log.error("获取从库信息失败", e);
return "未知";
}
})
.collect(java.util.stream.Collectors.toList());
}
}

6. 主从复制监控服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package com.mysql.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

/**
* 主从复制监控服务
* @author Java实战
*/
@Slf4j
@Service
public class ReplicationMonitorService {

@Autowired
private ReadWriteSplitService readWriteSplitService;

/**
* 监控主从复制状态(每30秒执行一次)
*/
@Scheduled(fixedRate = 30000)
public void monitorReplicationStatus() {
try {
log.info("=== 主从复制监控报告 ===");

// 监控主库状态
monitorMasterStatus();

// 监控从库状态
monitorSlavesStatus();

// 监控复制延迟
monitorReplicationLag();

log.info("=== 监控报告结束 ===");

} catch (Exception e) {
log.error("主从复制监控异常", e);
}
}

/**
* 监控主库状态
*/
private void monitorMasterStatus() {
try {
log.info("监控主库状态");

if (readWriteSplitService.isMasterHealthy()) {
log.info("主库状态: 健康");
} else {
log.error("主库状态: 异常");
}

} catch (Exception e) {
log.error("监控主库状态失败", e);
}
}

/**
* 监控从库状态
*/
private void monitorSlavesStatus() {
try {
log.info("监控从库状态");

List<Boolean> slavesHealth = readWriteSplitService.getAllSlavesHealth();
for (int i = 0; i < slavesHealth.size(); i++) {
if (slavesHealth.get(i)) {
log.info("从库{}状态: 健康", i);
} else {
log.error("从库{}状态: 异常", i);
}
}

} catch (Exception e) {
log.error("监控从库状态失败", e);
}
}

/**
* 监控复制延迟
*/
private void monitorReplicationLag() {
try {
log.info("监控复制延迟");

List<DataSource> slaveDataSources = readWriteSplitService.getSlaveDataSources();
for (int i = 0; i < slaveDataSources.size(); i++) {
try {
long lag = getReplicationLag(slaveDataSources.get(i));
log.info("从库{}复制延迟: {}秒", i, lag);

if (lag > 60) {
log.warn("从库{}复制延迟过高: {}秒", i, lag);
}

} catch (Exception e) {
log.error("获取从库{}复制延迟失败", i, e);
}
}

} catch (Exception e) {
log.error("监控复制延迟失败", e);
}
}

/**
* 获取复制延迟
*/
private long getReplicationLag(DataSource dataSource) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
String sql = "SHOW SLAVE STATUS";

try (PreparedStatement statement = connection.prepareStatement(sql);
ResultSet resultSet = statement.executeQuery()) {

if (resultSet.next()) {
long secondsBehindMaster = resultSet.getLong("Seconds_Behind_Master");
return secondsBehindMaster;
}
}
}

return -1;
}

/**
* 获取主从复制状态
*/
public ReplicationStatus getReplicationStatus() {
try {
ReplicationStatus status = new ReplicationStatus();

// 主库状态
status.setMasterHealthy(readWriteSplitService.isMasterHealthy());
status.setMasterInfo(readWriteSplitService.getMasterInfo());

// 从库状态
List<Boolean> slavesHealth = readWriteSplitService.getAllSlavesHealth();
List<String> slavesInfo = readWriteSplitService.getSlavesInfo();

status.setSlavesHealth(slavesHealth);
status.setSlavesInfo(slavesInfo);
status.setSlaveCount(readWriteSplitService.getSlaveCount());

// 复制延迟
List<Long> replicationLags = new ArrayList<>();
List<DataSource> slaveDataSources = readWriteSplitService.getSlaveDataSources();

for (DataSource dataSource : slaveDataSources) {
try {
long lag = getReplicationLag(dataSource);
replicationLags.add(lag);
} catch (Exception e) {
replicationLags.add(-1L);
}
}

status.setReplicationLags(replicationLags);
status.setTimestamp(LocalDateTime.now());

return status;

} catch (Exception e) {
log.error("获取主从复制状态失败", e);
return null;
}
}

/**
* 主从复制状态
*/
@lombok.Data
public static class ReplicationStatus {
private boolean masterHealthy;
private String masterInfo;
private List<Boolean> slavesHealth;
private List<String> slavesInfo;
private int slaveCount;
private List<Long> replicationLags;
private LocalDateTime timestamp;
}
}

7. 用户实体定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.mysql.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.LocalDateTime;

/**
* 用户实体
* @author Java实战
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("user")
public class User {

/**
* 主键ID
*/
@TableId(type = IdType.ASSIGN_ID)
private Long id;

/**
* 用户名
*/
private String username;

/**
* 邮箱
*/
private String email;

/**
* 手机号
*/
private String phone;

/**
* 用户状态:ACTIVE-活跃, INACTIVE-非活跃, BANNED-封禁
*/
private String status;

/**
* 创建时间
*/
private LocalDateTime createTime;

/**
* 更新时间
*/
private LocalDateTime updateTime;

/**
* 逻辑删除标记
*/
private Integer deleted;
}

8. 用户服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package com.mysql.service;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.mysql.entity.User;
import com.mysql.mapper.UserMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.List;

/**
* 用户服务
* @author Java实战
*/
@Slf4j
@Service
public class UserService extends ServiceImpl<UserMapper, User> {

@Autowired
private UserMapper userMapper;

/**
* 创建用户(写操作,使用主库)
*/
@Transactional
public User createUser(User user) {
try {
log.info("创建用户: {}", user.getUsername());

user.setCreateTime(LocalDateTime.now());
user.setUpdateTime(LocalDateTime.now());
user.setStatus("ACTIVE");
user.setDeleted(0);

userMapper.insert(user);

log.info("用户创建成功: id={}", user.getId());
return user;

} catch (Exception e) {
log.error("创建用户失败", e);
throw new RuntimeException("创建用户失败", e);
}
}

/**
* 更新用户(写操作,使用主库)
*/
@Transactional
public boolean updateUser(User user) {
try {
log.info("更新用户: id={}", user.getId());

user.setUpdateTime(LocalDateTime.now());

boolean result = updateById(user);

log.info("用户更新结果: id={}, result={}", user.getId(), result);
return result;

} catch (Exception e) {
log.error("更新用户失败: id={}", user.getId(), e);
throw new RuntimeException("更新用户失败", e);
}
}

/**
* 删除用户(写操作,使用主库)
*/
@Transactional
public boolean deleteUser(Long userId) {
try {
log.info("删除用户: id={}", userId);

boolean result = removeById(userId);

log.info("用户删除结果: id={}, result={}", userId, result);
return result;

} catch (Exception e) {
log.error("删除用户失败: id={}", userId, e);
throw new RuntimeException("删除用户失败", e);
}
}

/**
* 根据ID查询用户(读操作,使用从库)
*/
public User getUserById(Long userId) {
try {
log.debug("查询用户: id={}", userId);

User user = getById(userId);

log.debug("用户查询结果: id={}, username={}", userId,
user != null ? user.getUsername() : "null");
return user;

} catch (Exception e) {
log.error("查询用户失败: id={}", userId, e);
throw new RuntimeException("查询用户失败", e);
}
}

/**
* 根据用户名查询用户(读操作,使用从库)
*/
public User getUserByUsername(String username) {
try {
log.debug("根据用户名查询用户: username={}", username);

QueryWrapper<User> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("username", username);
queryWrapper.eq("deleted", 0);

User user = getOne(queryWrapper);

log.debug("用户查询结果: username={}, id={}", username,
user != null ? user.getId() : "null");
return user;

} catch (Exception e) {
log.error("根据用户名查询用户失败: username={}", username, e);
throw new RuntimeException("查询用户失败", e);
}
}

/**
* 查询用户列表(读操作,使用从库)
*/
public List<User> getUserList(int page, int size) {
try {
log.debug("查询用户列表: page={}, size={}", page, size);

QueryWrapper<User> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("deleted", 0);
queryWrapper.orderByDesc("create_time");
queryWrapper.last("LIMIT " + (page - 1) * size + ", " + size);

List<User> users = list(queryWrapper);

log.debug("用户列表查询结果: count={}", users.size());
return users;

} catch (Exception e) {
log.error("查询用户列表失败", e);
throw new RuntimeException("查询用户列表失败", e);
}
}

/**
* 更新用户状态(写操作,使用主库)
*/
@Transactional
public boolean updateUserStatus(Long userId, String status) {
try {
log.info("更新用户状态: id={}, status={}", userId, status);

User user = new User();
user.setId(userId);
user.setStatus(status);
user.setUpdateTime(LocalDateTime.now());

boolean result = updateById(user);

log.info("用户状态更新结果: id={}, status={}, result={}", userId, status, result);
return result;

} catch (Exception e) {
log.error("更新用户状态失败: id={}, status={}", userId, status, e);
throw new RuntimeException("更新用户状态失败", e);
}
}
}

9. 控制器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
package com.mysql.controller;

import com.mysql.entity.User;
import com.mysql.service.ReplicationMonitorService;
import com.mysql.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
* MySQL主从复制控制器
* @author Java实战
*/
@Slf4j
@RestController
@RequestMapping("/api/mysql")
public class MySQLController {

@Autowired
private UserService userService;

@Autowired
private ReplicationMonitorService replicationMonitorService;

/**
* 创建用户
*/
@PostMapping("/user")
public ResponseEntity<User> createUser(@RequestBody User user) {
try {
User createdUser = userService.createUser(user);
return ResponseEntity.ok(createdUser);
} catch (Exception e) {
log.error("创建用户失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 更新用户
*/
@PutMapping("/user")
public ResponseEntity<Boolean> updateUser(@RequestBody User user) {
try {
boolean result = userService.updateUser(user);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("更新用户失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 删除用户
*/
@DeleteMapping("/user/{userId}")
public ResponseEntity<Boolean> deleteUser(@PathVariable Long userId) {
try {
boolean result = userService.deleteUser(userId);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("删除用户失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 根据ID查询用户
*/
@GetMapping("/user/{userId}")
public ResponseEntity<User> getUserById(@PathVariable Long userId) {
try {
User user = userService.getUserById(userId);
if (user != null) {
return ResponseEntity.ok(user);
} else {
return ResponseEntity.notFound().build();
}
} catch (Exception e) {
log.error("查询用户失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 根据用户名查询用户
*/
@GetMapping("/user/username/{username}")
public ResponseEntity<User> getUserByUsername(@PathVariable String username) {
try {
User user = userService.getUserByUsername(username);
if (user != null) {
return ResponseEntity.ok(user);
} else {
return ResponseEntity.notFound().build();
}
} catch (Exception e) {
log.error("根据用户名查询用户失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 查询用户列表
*/
@GetMapping("/users")
public ResponseEntity<List<User>> getUserList(
@RequestParam(defaultValue = "1") int page,
@RequestParam(defaultValue = "10") int size) {
try {
List<User> users = userService.getUserList(page, size);
return ResponseEntity.ok(users);
} catch (Exception e) {
log.error("查询用户列表失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 更新用户状态
*/
@PutMapping("/user/{userId}/status")
public ResponseEntity<Boolean> updateUserStatus(
@PathVariable Long userId,
@RequestParam String status) {
try {
boolean result = userService.updateUserStatus(userId, status);
return ResponseEntity.ok(result);
} catch (Exception e) {
log.error("更新用户状态失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 获取主从复制状态
*/
@GetMapping("/replication/status")
public ResponseEntity<ReplicationMonitorService.ReplicationStatus> getReplicationStatus() {
try {
ReplicationMonitorService.ReplicationStatus status =
replicationMonitorService.getReplicationStatus();
if (status != null) {
return ResponseEntity.ok(status);
} else {
return ResponseEntity.internalServerError().build();
}
} catch (Exception e) {
log.error("获取主从复制状态失败", e);
return ResponseEntity.internalServerError().build();
}
}

/**
* 手动触发主从复制监控
*/
@PostMapping("/replication/monitor")
public ResponseEntity<String> triggerReplicationMonitor() {
try {
replicationMonitorService.monitorReplicationStatus();
return ResponseEntity.ok("主从复制监控已触发");
} catch (Exception e) {
log.error("触发主从复制监控失败", e);
return ResponseEntity.internalServerError().build();
}
}
}

10. 总结

MySQL主从复制与读写分离是构建高可用、高性能数据库架构的核心技术。通过本文的详细介绍,我们了解了:

  1. 主从复制: 实现数据的实时同步
  2. 读写分离: 读操作分散到从库,写操作集中在主库
  3. 故障转移: 主库故障时自动切换到从库
  4. 负载均衡: 多个从库间的负载均衡
  5. 性能监控: 实时监控主从复制状态

通过合理的架构设计和实现,可以为应用提供高可用、高性能的数据库服务。


Java实战要点:

  • 数据源配置实现主从分离
  • 读写分离中间件提高查询性能
  • 连接池优化提高数据库连接效率
  • 定时监控确保主从复制稳定
  • 异常处理保证系统可靠性

代码注解说明:

  • @Primary: 主数据源注解
  • @Transactional: 事务管理注解
  • @Scheduled: 定时任务注解
  • @ConfigurationProperties: 配置属性注入
  • 读写分离: 根据操作类型选择数据源
  • 健康检查: 定期检查主从库状态