第308集:MySQL主从同步架构师实战:GTID+并行复制、延迟优化与一致性保障策略

前言

MySQL主从同步是构建高可用数据库架构的核心技术,理解GTID、并行复制等高级特性对于架构师来说至关重要。本文将深入解析MySQL主从同步的核心机制,从基础原理到高级优化,提供完整的延迟优化与一致性保障策略。

一、MySQL主从同步核心原理深度解析

1.1 传统主从同步机制

MySQL主从同步基于binlog(二进制日志)实现,主要包含三个核心线程:

1
2
3
4
5
6
7
8
9
10
11
12
-- 查看主从同步状态
SHOW SLAVE STATUS\G

-- 关键字段解析:
-- Slave_IO_Running: IO线程状态
-- Slave_SQL_Running: SQL线程状态
-- Master_Log_File: 主库binlog文件名
-- Read_Master_Log_Pos: 已读取的主库binlog位置
-- Relay_Log_File: 从库relay-log文件名
-- Relay_Log_Pos: 从库relay-log位置
-- Exec_Master_Log_Pos: 已执行的主库binlog位置
-- Seconds_Behind_Master: 复制延迟(秒)

IO线程工作原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# IO线程工作流程模拟
class MySQLIOThread:
def __init__(self, master_config):
self.master_config = master_config
self.connection = None
self.current_log_file = None
self.current_log_pos = 0

def connect_to_master(self):
"""连接到主库"""
try:
self.connection = pymysql.connect(**self.master_config)
return True
except Exception as e:
print(f"连接主库失败: {str(e)}")
return False

def request_binlog_dump(self):
"""请求binlog数据"""
try:
cursor = self.connection.cursor()

# 发送COM_BINLOG_DUMP命令
dump_command = {
'command': 'COM_BINLOG_DUMP',
'binlog_pos': self.current_log_pos,
'flags': 0,
'server_id': self.master_config['server_id']
}

cursor.execute("SHOW MASTER STATUS")
master_status = cursor.fetchone()

if master_status:
self.current_log_file = master_status[0]
self.current_log_pos = master_status[1]

return True

except Exception as e:
print(f"请求binlog失败: {str(e)}")
return False

def receive_binlog_events(self):
"""接收binlog事件"""
try:
while True:
# 接收binlog事件
event = self.connection.recv()

if event:
# 写入relay-log
self.write_to_relay_log(event)

# 更新位置
self.current_log_pos = event.log_pos

# 检查是否需要停止
if self.should_stop():
break

except Exception as e:
print(f"接收binlog事件失败: {str(e)}")

def write_to_relay_log(self, event):
"""写入relay-log"""
# 模拟写入relay-log文件
pass

def should_stop(self):
"""检查是否需要停止"""
# 检查停止条件
return False

SQL线程工作原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# SQL线程工作流程模拟
class MySQLSQLThread:
def __init__(self, slave_config):
self.slave_config = slave_config
self.connection = None
self.current_relay_log_file = None
self.current_relay_log_pos = 0

def connect_to_slave(self):
"""连接到从库"""
try:
self.connection = pymysql.connect(**self.slave_config)
return True
except Exception as e:
print(f"连接从库失败: {str(e)}")
return False

def read_relay_log_events(self):
"""读取relay-log事件"""
try:
while True:
# 读取relay-log事件
event = self.read_next_event()

if event:
# 执行SQL语句
self.execute_sql_event(event)

# 更新位置
self.current_relay_log_pos = event.log_pos

# 检查是否需要停止
if self.should_stop():
break

except Exception as e:
print(f"读取relay-log事件失败: {str(e)}")

def execute_sql_event(self, event):
"""执行SQL事件"""
try:
cursor = self.connection.cursor()

# 根据事件类型执行相应操作
if event.event_type == 'QUERY_EVENT':
cursor.execute(event.sql)
elif event.event_type == 'WRITE_ROWS_EVENT':
self.execute_insert(event)
elif event.event_type == 'UPDATE_ROWS_EVENT':
self.execute_update(event)
elif event.event_type == 'DELETE_ROWS_EVENT':
self.execute_delete(event)

self.connection.commit()

except Exception as e:
print(f"执行SQL事件失败: {str(e)}")
# 处理错误,可能需要跳过或停止复制

def read_next_event(self):
"""读取下一个事件"""
# 模拟读取relay-log事件
pass

def should_stop(self):
"""检查是否需要停止"""
# 检查停止条件
return False

1.2 GTID机制深度解析

GTID(Global Transaction Identifier)是MySQL 5.6引入的全局事务标识符,为每个事务分配全局唯一的标识。

GTID组成结构

1
2
3
4
5
6
7
8
9
10
11
-- GTID格式:source_id:transaction_id
-- 示例:3E11FA47-71CA-11E1-9E33-C80AA9429562:23

-- 查看GTID相关变量
SHOW VARIABLES LIKE '%gtid%';

-- 关键变量:
-- gtid_mode: GTID模式(ON/OFF)
-- gtid_executed: 已执行的GTID集合
-- gtid_purged: 已清理的GTID集合
-- enforce_gtid_consistency: 强制GTID一致性

GTID配置与启用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 1. 启用GTID模式
SET GLOBAL gtid_mode = ON;
SET GLOBAL enforce_gtid_consistency = ON;

-- 2. 配置主库
[mysqld]
server-id = 1
log-bin = mysql-bin
gtid_mode = ON
enforce_gtid_consistency = ON
log-slave-updates = ON

-- 3. 配置从库
[mysqld]
server-id = 2
gtid_mode = ON
enforce_gtid_consistency = ON
log-slave-updates = ON

GTID复制配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 使用GTID配置复制
CHANGE MASTER TO
MASTER_HOST = '192.168.1.10',
MASTER_USER = 'repl',
MASTER_PASSWORD = 'repl_password',
MASTER_AUTO_POSITION = 1;

-- 启动复制
START SLAVE;

-- 查看GTID状态
SHOW SLAVE STATUS\G
-- 关注字段:
-- Retrieved_Gtid_Set: 已获取的GTID集合
-- Executed_Gtid_Set: 已执行的GTID集合
-- Auto_Position: 是否启用自动定位

1.3 并行复制机制

MySQL 5.7引入了基于组提交的并行复制,显著提升了复制性能。

并行复制类型

1
2
3
4
5
6
7
8
9
10
11
12
-- 1. DATABASE并行复制(按数据库并行)
SET GLOBAL slave_parallel_type = 'DATABASE';
SET GLOBAL slave_parallel_workers = 4;

-- 2. LOGICAL_CLOCK并行复制(按逻辑时钟并行)
SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK';
SET GLOBAL slave_parallel_workers = 8;

-- 3. WRITESET并行复制(按写集合并行)
SET GLOBAL slave_parallel_type = 'WRITESET';
SET GLOBAL slave_parallel_workers = 8;
SET GLOBAL binlog_transaction_dependency_tracking = 'WRITESET';

并行复制配置优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 主库配置
[mysqld]
# 启用组提交
binlog_group_commit_sync_delay = 1000
binlog_group_commit_sync_no_delay_count = 10

# 启用写集合依赖跟踪
binlog_transaction_dependency_tracking = WRITESET
transaction_write_set_extraction = XXHASH64

# 从库配置
[mysqld]
# 并行复制配置
slave_parallel_type = LOGICAL_CLOCK
slave_parallel_workers = 8
slave_preserve_commit_order = 1

# 性能优化
slave_net_timeout = 60
slave_io_timeout = 60

二、主从同步配置与优化实战

2.1 主库配置优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-- 主库核心配置
[mysqld]
# 基础配置
server-id = 1
log-bin = mysql-bin
binlog-format = ROW
expire_logs_days = 7
max_binlog_size = 100M

# GTID配置
gtid_mode = ON
enforce_gtid_consistency = ON
log-slave-updates = ON

# 性能优化
innodb_buffer_pool_size = 2G
innodb_log_file_size = 256M
innodb_log_buffer_size = 16M
innodb_flush_log_at_trx_commit = 1

# 组提交优化
binlog_group_commit_sync_delay = 1000
binlog_group_commit_sync_no_delay_count = 10

# 写集合依赖跟踪
binlog_transaction_dependency_tracking = WRITESET
transaction_write_set_extraction = XXHASH64

# 连接优化
max_connections = 1000
max_user_connections = 800

2.2 从库配置优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
-- 从库核心配置
[mysqld]
# 基础配置
server-id = 2
relay-log = relay-bin
relay-log-index = relay-bin.index
relay_log_purge = 1

# GTID配置
gtid_mode = ON
enforce_gtid_consistency = ON
log-slave-updates = ON

# 并行复制配置
slave_parallel_type = LOGICAL_CLOCK
slave_parallel_workers = 8
slave_preserve_commit_order = 1

# 性能优化
innodb_buffer_pool_size = 2G
innodb_log_file_size = 256M
innodb_log_buffer_size = 16M
innodb_flush_log_at_trx_commit = 2

# 网络优化
slave_net_timeout = 60
slave_io_timeout = 60

# 只读配置
read_only = 1
super_read_only = 1

2.3 复制性能监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# 复制性能监控脚本
import pymysql
import time
import logging

class ReplicationMonitor:
def __init__(self, master_config, slave_configs):
self.master_config = master_config
self.slave_configs = slave_configs
self.logger = logging.getLogger(__name__)

def get_master_status(self):
"""获取主库状态"""
try:
conn = pymysql.connect(**self.master_config)
cursor = conn.cursor()

# 获取主库状态
cursor.execute("SHOW MASTER STATUS")
master_status = cursor.fetchone()

# 获取GTID状态
cursor.execute("SHOW VARIABLES LIKE 'gtid_executed'")
gtid_executed = cursor.fetchone()

conn.close()

return {
'file': master_status[0] if master_status else None,
'position': master_status[1] if master_status else None,
'gtid_executed': gtid_executed[1] if gtid_executed else None
}

except Exception as e:
self.logger.error(f"获取主库状态失败: {str(e)}")
return None

def get_slave_status(self, slave_config):
"""获取从库状态"""
try:
conn = pymysql.connect(**slave_config)
cursor = conn.cursor()

# 获取从库状态
cursor.execute("SHOW SLAVE STATUS")
result = cursor.fetchone()

if result:
# 获取字段信息
cursor.execute("SHOW COLUMNS FROM information_schema.processlist")
columns = [row[0] for row in cursor.fetchall()]

# 构建状态字典
status = {}
for i, col in enumerate(columns):
status[col] = result[i]

# 获取并行复制状态
cursor.execute("SELECT * FROM performance_schema.replication_applier_status_by_worker")
workers = cursor.fetchall()

conn.close()

return {
'status': status,
'workers': workers
}

conn.close()
return None

except Exception as e:
self.logger.error(f"获取从库状态失败: {str(e)}")
return None

def calculate_replication_lag(self, master_status, slave_status):
"""计算复制延迟"""
if not master_status or not slave_status:
return None

# 获取主库binlog位置
master_file = master_status['file']
master_pos = master_status['position']

# 获取从库状态
slave_status_dict = slave_status['status']

# 计算延迟
if slave_status_dict['Master_Log_File'] == master_file:
lag = master_pos - slave_status_dict['Read_Master_Log_Pos']
else:
# 不同文件,需要更复杂的计算
lag = None

return lag

def monitor_replication_performance(self):
"""监控复制性能"""
while True:
try:
# 获取主库状态
master_status = self.get_master_status()

for slave_config in self.slave_configs:
# 获取从库状态
slave_status = self.get_slave_status(slave_config)

if slave_status:
# 计算延迟
lag = self.calculate_replication_lag(master_status, slave_status)

# 输出监控信息
print(f"从库 {slave_config['host']} 状态:")
print(f" IO线程: {slave_status['status']['Slave_IO_Running']}")
print(f" SQL线程: {slave_status['status']['Slave_SQL_Running']}")
print(f" 复制延迟: {slave_status['status']['Seconds_Behind_Master']}秒")
print(f" 并行工作线程数: {len(slave_status['workers'])}")

# 检查告警条件
if slave_status['status']['Seconds_Behind_Master'] > 60:
self.logger.warning(f"从库 {slave_config['host']} 复制延迟过高")

if slave_status['status']['Slave_IO_Running'] != 'Yes':
self.logger.error(f"从库 {slave_config['host']} IO线程异常")

if slave_status['status']['Slave_SQL_Running'] != 'Yes':
self.logger.error(f"从库 {slave_config['host']} SQL线程异常")

time.sleep(30) # 每30秒监控一次

except KeyboardInterrupt:
self.logger.info("监控已停止")
break
except Exception as e:
self.logger.error(f"监控异常: {str(e)}")
time.sleep(60)

# 使用示例
if __name__ == "__main__":
master_config = {
'host': '192.168.1.10',
'user': 'monitor',
'password': 'monitor_password',
'database': 'mysql'
}

slave_configs = [
{
'host': '192.168.1.11',
'user': 'monitor',
'password': 'monitor_password',
'database': 'mysql'
},
{
'host': '192.168.1.12',
'user': 'monitor',
'password': 'monitor_password',
'database': 'mysql'
}
]

monitor = ReplicationMonitor(master_config, slave_configs)
monitor.monitor_replication_performance()

三、延迟优化策略

3.1 网络优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/bin/bash
# 网络优化脚本

# 1. 启用TCP优化
echo "优化TCP参数..."

# 增加TCP缓冲区大小
echo 'net.core.rmem_max = 16777216' >> /etc/sysctl.conf
echo 'net.core.wmem_max = 16777216' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_rmem = 4096 87380 16777216' >> /etc/sysctl.conf
echo 'net.ipv4.tcp_wmem = 4096 65536 16777216' >> /etc/sysctl.conf

# 启用TCP窗口缩放
echo 'net.ipv4.tcp_window_scaling = 1' >> /etc/sysctl.conf

# 优化TCP拥塞控制
echo 'net.ipv4.tcp_congestion_control = bbr' >> /etc/sysctl.conf

# 应用配置
sysctl -p

# 2. 优化网络接口
echo "优化网络接口..."

# 增加网络接口队列长度
echo 'net.core.netdev_max_backlog = 5000' >> /etc/sysctl.conf

# 优化网络包处理
echo 'net.core.netdev_budget = 600' >> /etc/sysctl.conf

# 应用配置
sysctl -p

echo "网络优化完成"

3.2 存储优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
-- 存储优化配置
-- 1. 使用SSD存储
-- 2. 配置RAID
-- 3. 优化文件系统

-- InnoDB存储优化
SET GLOBAL innodb_io_capacity = 2000; -- SSD推荐值
SET GLOBAL innodb_io_capacity_max = 4000;
SET GLOBAL innodb_flush_neighbors = 0; -- SSD不需要邻居刷新

-- 优化日志文件
SET GLOBAL innodb_log_file_size = 256M;
SET GLOBAL innodb_log_files_in_group = 2;
SET GLOBAL innodb_log_buffer_size = 16M;

-- 优化缓冲池
SET GLOBAL innodb_buffer_pool_size = 2G;
SET GLOBAL innodb_buffer_pool_instances = 8;

3.3 参数优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# 复制参数优化脚本
import pymysql
import time

class ReplicationOptimizer:
def __init__(self, master_config, slave_configs):
self.master_config = master_config
self.slave_configs = slave_configs

def optimize_master_parameters(self):
"""优化主库参数"""
try:
conn = pymysql.connect(**self.master_config)
cursor = conn.cursor()

# 优化组提交参数
cursor.execute("SET GLOBAL binlog_group_commit_sync_delay = 1000")
cursor.execute("SET GLOBAL binlog_group_commit_sync_no_delay_count = 10")

# 优化写集合依赖跟踪
cursor.execute("SET GLOBAL binlog_transaction_dependency_tracking = 'WRITESET'")
cursor.execute("SET GLOBAL transaction_write_set_extraction = 'XXHASH64'")

# 优化binlog参数
cursor.execute("SET GLOBAL sync_binlog = 1")
cursor.execute("SET GLOBAL innodb_flush_log_at_trx_commit = 1")

conn.close()
print("主库参数优化完成")

except Exception as e:
print(f"主库参数优化失败: {str(e)}")

def optimize_slave_parameters(self, slave_config):
"""优化从库参数"""
try:
conn = pymysql.connect(**slave_config)
cursor = conn.cursor()

# 优化并行复制参数
cursor.execute("SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK'")
cursor.execute("SET GLOBAL slave_parallel_workers = 8")
cursor.execute("SET GLOBAL slave_preserve_commit_order = 1")

# 优化网络参数
cursor.execute("SET GLOBAL slave_net_timeout = 60")
cursor.execute("SET GLOBAL slave_io_timeout = 60")

# 优化InnoDB参数
cursor.execute("SET GLOBAL innodb_io_capacity = 2000")
cursor.execute("SET GLOBAL innodb_io_capacity_max = 4000")
cursor.execute("SET GLOBAL innodb_flush_neighbors = 0")

conn.close()
print(f"从库 {slave_config['host']} 参数优化完成")

except Exception as e:
print(f"从库 {slave_config['host']} 参数优化失败: {str(e)}")

def optimize_all_parameters(self):
"""优化所有参数"""
# 优化主库参数
self.optimize_master_parameters()

# 优化从库参数
for slave_config in self.slave_configs:
self.optimize_slave_parameters(slave_config)

def monitor_optimization_effect(self):
"""监控优化效果"""
while True:
try:
for slave_config in self.slave_configs:
conn = pymysql.connect(**slave_config)
cursor = conn.cursor()

# 获取复制状态
cursor.execute("SHOW SLAVE STATUS")
result = cursor.fetchone()

if result:
# 获取延迟信息
cursor.execute("SHOW SLAVE STATUS")
status = cursor.fetchone()

if status:
lag = status[32] # Seconds_Behind_Master
print(f"从库 {slave_config['host']} 复制延迟: {lag}秒")

conn.close()

time.sleep(60) # 每分钟检查一次

except KeyboardInterrupt:
print("监控已停止")
break
except Exception as e:
print(f"监控异常: {str(e)}")
time.sleep(60)

# 使用示例
if __name__ == "__main__":
master_config = {
'host': '192.168.1.10',
'user': 'root',
'password': 'password',
'database': 'mysql'
}

slave_configs = [
{
'host': '192.168.1.11',
'user': 'root',
'password': 'password',
'database': 'mysql'
}
]

optimizer = ReplicationOptimizer(master_config, slave_configs)
optimizer.optimize_all_parameters()
optimizer.monitor_optimization_effect()

四、一致性保障策略

4.1 数据一致性检查

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# 数据一致性检查脚本
import pymysql
import hashlib
import time

class DataConsistencyChecker:
def __init__(self, master_config, slave_configs):
self.master_config = master_config
self.slave_configs = slave_configs

def get_table_checksum(self, config, database, table):
"""获取表校验和"""
try:
conn = pymysql.connect(**config)
cursor = conn.cursor()

# 计算表数据校验和
cursor.execute(f"""
SELECT
COUNT(*) as row_count,
MD5(GROUP_CONCAT(CONCAT_WS('|', *))) as data_hash
FROM {database}.{table}
""")

result = cursor.fetchone()
conn.close()

return {
'row_count': result[0],
'data_hash': result[1]
}

except Exception as e:
print(f"获取表校验和失败: {str(e)}")
return None

def check_table_consistency(self, database, table):
"""检查表一致性"""
# 获取主库校验和
master_checksum = self.get_table_checksum(self.master_config, database, table)

if not master_checksum:
return False

inconsistencies = []

for slave_config in self.slave_configs:
# 获取从库校验和
slave_checksum = self.get_table_checksum(slave_config, database, table)

if not slave_checksum:
inconsistencies.append({
'slave_host': slave_config['host'],
'error': '无法获取校验和'
})
continue

# 比较校验和
if (master_checksum['row_count'] != slave_checksum['row_count'] or
master_checksum['data_hash'] != slave_checksum['data_hash']):

inconsistencies.append({
'slave_host': slave_config['host'],
'master_row_count': master_checksum['row_count'],
'slave_row_count': slave_checksum['row_count'],
'master_hash': master_checksum['data_hash'],
'slave_hash': slave_checksum['data_hash']
})

return inconsistencies

def check_all_tables_consistency(self):
"""检查所有表的一致性"""
try:
conn = pymysql.connect(**self.master_config)
cursor = conn.cursor()

# 获取所有数据库
cursor.execute("SHOW DATABASES")
databases = [row[0] for row in cursor.fetchall()]

all_inconsistencies = {}

for database in databases:
# 跳过系统数据库
if database in ['information_schema', 'performance_schema', 'mysql', 'sys']:
continue

# 获取数据库中的所有表
cursor.execute(f"SHOW TABLES FROM {database}")
tables = [row[0] for row in cursor.fetchall()]

for table in tables:
print(f"检查表 {database}.{table}...")
inconsistencies = self.check_table_consistency(database, table)

if inconsistencies:
all_inconsistencies[f"{database}.{table}"] = inconsistencies

conn.close()
return all_inconsistencies

except Exception as e:
print(f"检查表一致性失败: {str(e)}")
return None

def repair_inconsistency(self, database, table, slave_config):
"""修复不一致"""
try:
print(f"修复从库 {slave_config['host']} 的表 {database}.{table}")

# 停止从库复制
conn = pymysql.connect(**slave_config)
cursor = conn.cursor()
cursor.execute("STOP SLAVE")

# 备份当前表
backup_time = time.strftime('%Y%m%d_%H%M%S')
backup_cmd = [
'mysqldump', '-h', slave_config['host'],
'-u', slave_config['user'],
'-p' + slave_config['password'],
database, table
]

import subprocess
with open(f'/tmp/{database}_{table}_backup_{backup_time}.sql', 'w') as f:
subprocess.run(backup_cmd, stdout=f, check=True)

# 删除原表
cursor.execute(f"DROP TABLE IF EXISTS {database}.{table}")

# 从主库导出表结构
master_conn = pymysql.connect(**self.master_config)
master_cursor = master_conn.cursor()

master_cursor.execute(f"SHOW CREATE TABLE {database}.{table}")
create_table_sql = master_cursor.fetchone()[1]

# 在从库创建表
cursor.execute(create_table_sql)

# 从主库导出表数据
master_cursor.execute(f"SELECT * FROM {database}.{table}")
rows = master_cursor.fetchall()

# 插入数据到从库
for row in rows:
placeholders = ','.join(['%s'] * len(row))
cursor.execute(f"INSERT INTO {database}.{table} VALUES ({placeholders})", row)

master_conn.close()
conn.commit()
conn.close()

print(f"表 {database}.{table} 修复完成")
return True

except Exception as e:
print(f"修复表 {database}.{table} 失败: {str(e)}")
return False

# 使用示例
if __name__ == "__main__":
master_config = {
'host': '192.168.1.10',
'user': 'root',
'password': 'password',
'database': 'mysql'
}

slave_configs = [
{
'host': '192.168.1.11',
'user': 'root',
'password': 'password',
'database': 'mysql'
}
]

checker = DataConsistencyChecker(master_config, slave_configs)
inconsistencies = checker.check_all_tables_consistency()

if inconsistencies:
print("发现数据不一致:")
for table, issues in inconsistencies.items():
print(f"表 {table}:")
for issue in issues:
print(f" 从库 {issue['slave_host']}: {issue}")
else:
print("所有表数据一致")

4.2 事务一致性保障

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 事务一致性保障配置

-- 1. 启用GTID一致性检查
SET GLOBAL enforce_gtid_consistency = ON;

-- 2. 配置事务隔离级别
SET GLOBAL transaction_isolation = 'REPEATABLE-READ';

-- 3. 启用binlog事务压缩
SET GLOBAL binlog_transaction_compression = ON;

-- 4. 配置事务提交顺序
SET GLOBAL slave_preserve_commit_order = 1;

-- 5. 启用从库事务重试
SET GLOBAL slave_transaction_retries = 128;

4.3 故障恢复一致性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# 故障恢复一致性脚本
import pymysql
import time

class ConsistencyRecoveryManager:
def __init__(self, master_config, slave_configs):
self.master_config = master_config
self.slave_configs = slave_configs

def check_gtid_consistency(self):
"""检查GTID一致性"""
try:
master_conn = pymysql.connect(**self.master_config)
master_cursor = master_conn.cursor()

# 获取主库GTID
master_cursor.execute("SHOW VARIABLES LIKE 'gtid_executed'")
master_gtid = master_cursor.fetchone()[1]

master_conn.close()

slave_gtids = {}
for slave_config in self.slave_configs:
slave_conn = pymysql.connect(**slave_config)
slave_cursor = slave_conn.cursor()

# 获取从库GTID
slave_cursor.execute("SHOW VARIABLES LIKE 'gtid_executed'")
slave_gtid = slave_cursor.fetchone()[1]

slave_gtids[slave_config['host']] = slave_gtid
slave_conn.close()

return {
'master_gtid': master_gtid,
'slave_gtids': slave_gtids
}

except Exception as e:
print(f"检查GTID一致性失败: {str(e)}")
return None

def recover_slave_consistency(self, slave_config):
"""恢复从库一致性"""
try:
print(f"恢复从库 {slave_config['host']} 一致性...")

conn = pymysql.connect(**slave_config)
cursor = conn.cursor()

# 停止复制
cursor.execute("STOP SLAVE")

# 重置复制状态
cursor.execute("RESET SLAVE ALL")

# 重新配置复制
cursor.execute(f"""
CHANGE MASTER TO
MASTER_HOST='{self.master_config['host']}',
MASTER_USER='repl',
MASTER_PASSWORD='repl_password',
MASTER_AUTO_POSITION=1
""")

# 启动复制
cursor.execute("START SLAVE")

conn.close()

print(f"从库 {slave_config['host']} 一致性恢复完成")
return True

except Exception as e:
print(f"恢复从库 {slave_config['host']} 一致性失败: {str(e)}")
return False

def verify_consistency_recovery(self):
"""验证一致性恢复"""
time.sleep(30) # 等待复制同步

gtid_status = self.check_gtid_consistency()
if not gtid_status:
return False

# 检查所有从库是否与主库GTID一致
for slave_host, slave_gtid in gtid_status['slave_gtids'].items():
if slave_gtid != gtid_status['master_gtid']:
print(f"从库 {slave_host} GTID不一致")
return False

print("所有从库GTID一致")
return True

def full_consistency_recovery(self):
"""完整一致性恢复"""
print("开始完整一致性恢复...")

# 检查GTID一致性
gtid_status = self.check_gtid_consistency()
if not gtid_status:
print("无法检查GTID一致性")
return False

# 恢复每个从库
for slave_config in self.slave_configs:
if not self.recover_slave_consistency(slave_config):
print(f"从库 {slave_config['host']} 恢复失败")
return False

# 验证恢复结果
if self.verify_consistency_recovery():
print("一致性恢复成功")
return True
else:
print("一致性恢复验证失败")
return False

# 使用示例
if __name__ == "__main__":
master_config = {
'host': '192.168.1.10',
'user': 'root',
'password': 'password',
'database': 'mysql'
}

slave_configs = [
{
'host': '192.168.1.11',
'user': 'root',
'password': 'password',
'database': 'mysql'
}
]

recovery_manager = ConsistencyRecoveryManager(master_config, slave_configs)
recovery_manager.full_consistency_recovery()

五、高级特性与最佳实践

5.1 半同步复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 半同步复制配置

-- 主库配置
[mysqld]
# 启用半同步复制插件
plugin-load = "rpl_semi_sync_master=semisync_master.so"
rpl_semi_sync_master_enabled = 1
rpl_semi_sync_master_timeout = 10000
rpl_semi_sync_master_wait_for_slave_count = 1

-- 从库配置
[mysqld]
# 启用半同步复制插件
plugin-load = "rpl_semi_sync_slave=semisync_slave.so"
rpl_semi_sync_slave_enabled = 1

-- 安装半同步复制插件
INSTALL PLUGIN rpl_semi_sync_master SONAME 'semisync_master.so';
INSTALL PLUGIN rpl_semi_sync_slave SONAME 'semisync_slave.so';

-- 启用半同步复制
SET GLOBAL rpl_semi_sync_master_enabled = 1;
SET GLOBAL rpl_semi_sync_slave_enabled = 1;

5.2 组复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- 组复制配置

-- 节点1配置
[mysqld]
server-id = 1
gtid_mode = ON
enforce_gtid_consistency = ON
log-bin = mysql-bin
log-slave-updates = ON
binlog-checksum = NONE
master-info-repository = TABLE
relay-log-info-repository = TABLE
transaction-write-set-extraction = XXHASH64
loose-group_replication_group_name = "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
loose-group_replication_start_on_boot = OFF
loose-group_replication_local_address = "127.0.0.1:24901"
loose-group_replication_group_seeds = "127.0.0.1:24901,127.0.0.1:24902,127.0.0.1:24903"
loose-group_replication_bootstrap_group = OFF
loose-group_replication_single_primary_mode = ON
loose-group_replication_enforce_update_everywhere_checks = OFF

-- 启动组复制
SET GLOBAL group_replication_bootstrap_group = ON;
START GROUP_REPLICATION;
SET GLOBAL group_replication_bootstrap_group = OFF;

5.3 最佳实践总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# MySQL主从同步最佳实践
replication_best_practices:
# 配置最佳实践
configuration:
- 使用GTID模式
- 启用并行复制
- 配置合适的参数
- 使用SSD存储

# 监控最佳实践
monitoring:
- 实时监控复制延迟
- 监控GTID状态
- 监控并行复制性能
- 设置告警阈值

# 维护最佳实践
maintenance:
- 定期检查数据一致性
- 定期优化表结构
- 定期清理日志文件
- 定期测试故障恢复

# 安全最佳实践
security:
- 使用专用复制用户
- 限制复制用户权限
- 启用SSL连接
- 定期更新密码

# 性能最佳实践
performance:
- 优化网络配置
- 优化存储配置
- 优化参数配置
- 使用读写分离

六、总结

MySQL主从同步是构建高可用数据库架构的核心技术,本文深入解析了MySQL主从同步的核心机制,从基础原理到高级优化,提供了完整的延迟优化与一致性保障策略。

关键要点:

  1. 核心机制:理解binlog、GTID、并行复制的工作原理
  2. 配置优化:主库和从库的参数配置与性能优化
  3. 延迟优化:网络、存储、参数等多层面的优化策略
  4. 一致性保障:数据一致性检查和故障恢复机制
  5. 高级特性:半同步复制、组复制等高级功能
  6. 最佳实践:配置、监控、维护、安全、性能的最佳实践

通过本文的学习和实践,架构师可以构建高效稳定的MySQL主从同步架构,确保数据的高可用性和一致性。


作者简介:资深架构师,专注于MySQL高可用架构设计与优化,拥有丰富的MySQL主从同步实战经验。

技术交流:欢迎关注我的技术博客,分享更多MySQL主从同步经验。