第307集:数据库崩溃应急处理架构师实战:故障诊断、数据恢复与高可用切换策略

前言

数据库崩溃是生产环境中最严重的故障之一,如何快速诊断、有效恢复并确保业务连续性,是每个架构师必须掌握的核心技能。本文将深入解析数据库崩溃的应急处理流程,从故障诊断到数据恢复,提供完整的高可用切换策略与灾难恢复方案。

一、数据库崩溃类型与诊断方法

1.1 崩溃类型分类

硬件故障诊断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/bin/bash
# 硬件故障诊断脚本

echo "=== CPU检查 ==="
# 检查CPU使用率和温度
top -bn1 | grep "Cpu(s)"
sensors 2>/dev/null || echo "sensors命令不可用"

echo "=== 内存检查 ==="
# 检查内存使用情况
free -h
cat /proc/meminfo | grep -E "(MemTotal|MemFree|MemAvailable)"

echo "=== 磁盘检查 ==="
# 检查磁盘使用情况
df -h
# 检查磁盘I/O
iostat -x 1 3

echo "=== 网络检查 ==="
# 检查网络连接
netstat -tuln | grep :3306
ss -tuln | grep :3306

echo "=== 系统负载检查 ==="
# 检查系统负载
uptime
cat /proc/loadavg

软件故障诊断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- MySQL进程状态检查
SHOW PROCESSLIST;

-- 检查MySQL错误日志
-- 查看错误日志文件位置
SHOW VARIABLES LIKE 'log_error';

-- 检查InnoDB状态
SHOW ENGINE INNODB STATUS\G

-- 检查锁等待情况
SELECT * FROM information_schema.INNODB_LOCKS;
SELECT * FROM information_schema.INNODB_LOCK_WAITS;

-- 检查死锁情况
SELECT * FROM information_schema.INNODB_TRX;

1.2 故障检测与告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# 数据库健康检查脚本
import pymysql
import psutil
import time
import logging
import smtplib
from email.mime.text import MIMEText

class DatabaseHealthChecker:
def __init__(self, db_config, alert_config):
self.db_config = db_config
self.alert_config = alert_config
self.logger = logging.getLogger(__name__)

def check_database_connection(self):
"""检查数据库连接"""
try:
conn = pymysql.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
conn.close()
return result[0] == 1
except Exception as e:
self.logger.error(f"数据库连接检查失败: {str(e)}")
return False

def check_database_performance(self):
"""检查数据库性能"""
try:
conn = pymysql.connect(**self.db_config)
cursor = conn.cursor()

# 检查连接数
cursor.execute("SHOW STATUS LIKE 'Threads_connected'")
connections = cursor.fetchone()[1]

# 检查慢查询
cursor.execute("SHOW STATUS LIKE 'Slow_queries'")
slow_queries = cursor.fetchone()[1]

# 检查锁等待
cursor.execute("SHOW STATUS LIKE 'Innodb_row_lock_waits'")
lock_waits = cursor.fetchone()[1]

conn.close()

return {
'connections': int(connections),
'slow_queries': int(slow_queries),
'lock_waits': int(lock_waits)
}
except Exception as e:
self.logger.error(f"数据库性能检查失败: {str(e)}")
return None

def check_system_resources(self):
"""检查系统资源"""
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'load_avg': psutil.getloadavg()[0]
}

def send_alert(self, message):
"""发送告警"""
try:
msg = MIMEText(message, 'plain', 'utf-8')
msg['From'] = self.alert_config['from_email']
msg['To'] = self.alert_config['to_email']
msg['Subject'] = '数据库健康检查告警'

server = smtplib.SMTP(self.alert_config['smtp_server'])
server.send_message(msg)
server.quit()

self.logger.info("告警邮件发送成功")
except Exception as e:
self.logger.error(f"告警邮件发送失败: {str(e)}")

def run_health_check(self):
"""运行健康检查"""
# 检查数据库连接
if not self.check_database_connection():
self.send_alert("数据库连接失败,请立即检查!")
return False

# 检查数据库性能
perf_data = self.check_database_performance()
if perf_data:
if perf_data['connections'] > 800:
self.send_alert(f"数据库连接数过高: {perf_data['connections']}")

if perf_data['slow_queries'] > 100:
self.send_alert(f"慢查询数量过多: {perf_data['slow_queries']}")

if perf_data['lock_waits'] > 50:
self.send_alert(f"锁等待过多: {perf_data['lock_waits']}")

# 检查系统资源
sys_data = self.check_system_resources()
if sys_data['cpu_percent'] > 90:
self.send_alert(f"CPU使用率过高: {sys_data['cpu_percent']}%")

if sys_data['memory_percent'] > 90:
self.send_alert(f"内存使用率过高: {sys_data['memory_percent']}%")

if sys_data['disk_percent'] > 90:
self.send_alert(f"磁盘使用率过高: {sys_data['disk_percent']}%")

return True

# 使用示例
if __name__ == "__main__":
db_config = {
'host': '192.168.1.10',
'user': 'monitor',
'password': 'monitor_password',
'database': 'mysql'
}

alert_config = {
'from_email': 'monitor@company.com',
'to_email': 'admin@company.com',
'smtp_server': 'smtp.company.com'
}

checker = DatabaseHealthChecker(db_config, alert_config)

while True:
checker.run_health_check()
time.sleep(60) # 每分钟检查一次

二、数据库崩溃应急处理流程

2.1 应急响应流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/bin/bash
# 数据库崩溃应急处理脚本

# 配置参数
DB_HOST="192.168.1.10"
DB_USER="root"
DB_PASSWORD="password"
BACKUP_DIR="/backup/mysql"
LOG_FILE="/var/log/mysql_emergency.log"

# 日志函数
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a $LOG_FILE
}

# 检查数据库状态
check_database_status() {
log_message "检查数据库状态..."

# 检查MySQL进程
if ! pgrep mysqld > /dev/null; then
log_message "MySQL进程未运行"
return 1
fi

# 检查端口监听
if ! netstat -tuln | grep :3306 > /dev/null; then
log_message "MySQL端口3306未监听"
return 1
fi

# 检查数据库连接
if ! mysql -h $DB_HOST -u $DB_USER -p$DB_PASSWORD -e "SELECT 1" > /dev/null 2>&1; then
log_message "数据库连接失败"
return 1
fi

log_message "数据库状态正常"
return 0
}

# 诊断故障类型
diagnose_failure() {
log_message "开始故障诊断..."

# 检查错误日志
ERROR_LOG=$(mysql -h $DB_HOST -u $DB_USER -p$DB_PASSWORD -e "SHOW VARIABLES LIKE 'log_error'" 2>/dev/null | grep log_error | awk '{print $2}')

if [ -f "$ERROR_LOG" ]; then
log_message "检查错误日志: $ERROR_LOG"
tail -50 $ERROR_LOG | tee -a $LOG_FILE
fi

# 检查系统资源
log_message "检查系统资源..."
echo "CPU使用率: $(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)%"
echo "内存使用率: $(free | grep Mem | awk '{printf "%.2f%%", $3/$2 * 100.0}')"
echo "磁盘使用率: $(df -h / | awk 'NR==2{printf "%s", $5}')"

# 检查InnoDB状态
log_message "检查InnoDB状态..."
mysql -h $DB_HOST -u $DB_USER -p$DB_PASSWORD -e "SHOW ENGINE INNODB STATUS\G" 2>/dev/null | tee -a $LOG_FILE
}

# 尝试修复数据库
repair_database() {
log_message "尝试修复数据库..."

# 停止MySQL服务
systemctl stop mysql
log_message "MySQL服务已停止"

# 检查数据文件完整性
log_message "检查数据文件完整性..."
mysqlcheck --all-databases --check --extended 2>/dev/null

# 尝试修复损坏的表
log_message "尝试修复损坏的表..."
mysqlcheck --all-databases --repair --extended 2>/dev/null

# 启动MySQL服务
systemctl start mysql
log_message "MySQL服务已启动"

# 等待服务启动
sleep 10

# 检查服务状态
if check_database_status; then
log_message "数据库修复成功"
return 0
else
log_message "数据库修复失败"
return 1
fi
}

# 从备份恢复
restore_from_backup() {
log_message "从备份恢复数据库..."

# 查找最新备份
LATEST_BACKUP=$(ls -t $BACKUP_DIR/*.sql | head -1)

if [ -z "$LATEST_BACKUP" ]; then
log_message "未找到备份文件"
return 1
fi

log_message "使用备份文件: $LATEST_BACKUP"

# 停止MySQL服务
systemctl stop mysql

# 备份当前数据目录
mv /var/lib/mysql /var/lib/mysql.backup.$(date +%Y%m%d_%H%M%S)

# 重新初始化数据目录
mysql_install_db --user=mysql --datadir=/var/lib/mysql

# 启动MySQL服务
systemctl start mysql

# 恢复数据
mysql -h $DB_HOST -u $DB_USER -p$DB_PASSWORD < $LATEST_BACKUP

log_message "数据库恢复完成"
}

# 主流程
main() {
log_message "开始数据库崩溃应急处理..."

# 检查数据库状态
if check_database_status; then
log_message "数据库状态正常,无需处理"
exit 0
fi

# 诊断故障
diagnose_failure

# 尝试修复
if repair_database; then
log_message "数据库修复成功"
exit 0
fi

# 从备份恢复
if restore_from_backup; then
log_message "数据库恢复成功"
exit 0
fi

log_message "数据库恢复失败,需要人工干预"
exit 1
}

# 执行主流程
main "$@"

2.2 故障分类处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# 故障分类处理脚本
import subprocess
import time
import logging

class DatabaseFailureHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)

def handle_hardware_failure(self):
"""处理硬件故障"""
self.logger.info("处理硬件故障...")

# 检查硬件状态
hardware_status = self.check_hardware_status()

if hardware_status['cpu_failed']:
self.logger.error("CPU故障,需要更换硬件")
return False

if hardware_status['memory_failed']:
self.logger.error("内存故障,需要更换硬件")
return False

if hardware_status['disk_failed']:
self.logger.error("磁盘故障,需要更换硬件")
return False

return True

def handle_software_failure(self):
"""处理软件故障"""
self.logger.info("处理软件故障...")

# 检查MySQL进程
if not self.check_mysql_process():
self.logger.info("MySQL进程异常,尝试重启...")
return self.restart_mysql()

# 检查配置文件
if not self.check_mysql_config():
self.logger.error("MySQL配置文件错误")
return False

# 检查权限
if not self.check_mysql_permissions():
self.logger.error("MySQL权限问题")
return False

return True

def handle_storage_failure(self):
"""处理存储故障"""
self.logger.info("处理存储故障...")

# 检查磁盘空间
if not self.check_disk_space():
self.logger.error("磁盘空间不足")
return False

# 检查磁盘I/O
if not self.check_disk_io():
self.logger.error("磁盘I/O异常")
return False

# 检查文件系统
if not self.check_filesystem():
self.logger.error("文件系统损坏")
return False

return True

def handle_network_failure(self):
"""处理网络故障"""
self.logger.info("处理网络故障...")

# 检查网络连接
if not self.check_network_connection():
self.logger.error("网络连接异常")
return False

# 检查防火墙
if not self.check_firewall():
self.logger.error("防火墙阻塞")
return False

# 检查DNS
if not self.check_dns():
self.logger.error("DNS解析问题")
return False

return True

def check_hardware_status(self):
"""检查硬件状态"""
try:
# 检查CPU
cpu_info = subprocess.run(['cat', '/proc/cpuinfo'], capture_output=True, text=True)
cpu_failed = 'processor' not in cpu_info.stdout

# 检查内存
mem_info = subprocess.run(['cat', '/proc/meminfo'], capture_output=True, text=True)
memory_failed = 'MemTotal' not in mem_info.stdout

# 检查磁盘
disk_info = subprocess.run(['df', '-h'], capture_output=True, text=True)
disk_failed = len(disk_info.stdout.split('\n')) < 2

return {
'cpu_failed': cpu_failed,
'memory_failed': memory_failed,
'disk_failed': disk_failed
}
except Exception as e:
self.logger.error(f"硬件状态检查失败: {str(e)}")
return {
'cpu_failed': True,
'memory_failed': True,
'disk_failed': True
}

def check_mysql_process(self):
"""检查MySQL进程"""
try:
result = subprocess.run(['pgrep', 'mysqld'], capture_output=True)
return result.returncode == 0
except Exception as e:
self.logger.error(f"MySQL进程检查失败: {str(e)}")
return False

def restart_mysql(self):
"""重启MySQL"""
try:
# 停止MySQL
subprocess.run(['systemctl', 'stop', 'mysql'], check=True)
time.sleep(5)

# 启动MySQL
subprocess.run(['systemctl', 'start', 'mysql'], check=True)
time.sleep(10)

# 检查状态
result = subprocess.run(['systemctl', 'is-active', 'mysql'], capture_output=True, text=True)
return result.stdout.strip() == 'active'
except Exception as e:
self.logger.error(f"MySQL重启失败: {str(e)}")
return False

def check_disk_space(self):
"""检查磁盘空间"""
try:
result = subprocess.run(['df', '-h', '/'], capture_output=True, text=True)
lines = result.stdout.strip().split('\n')
if len(lines) > 1:
usage = lines[1].split()[4].rstrip('%')
return int(usage) < 90
return False
except Exception as e:
self.logger.error(f"磁盘空间检查失败: {str(e)}")
return False

def check_network_connection(self):
"""检查网络连接"""
try:
result = subprocess.run(['netstat', '-tuln'], capture_output=True, text=True)
return ':3306' in result.stdout
except Exception as e:
self.logger.error(f"网络连接检查失败: {str(e)}")
return False

# 使用示例
if __name__ == "__main__":
handler = DatabaseFailureHandler()

# 根据故障类型选择处理方式
failure_type = "software" # 可以从监控系统获取

if failure_type == "hardware":
success = handler.handle_hardware_failure()
elif failure_type == "software":
success = handler.handle_software_failure()
elif failure_type == "storage":
success = handler.handle_storage_failure()
elif failure_type == "network":
success = handler.handle_network_failure()
else:
success = False

if success:
print("故障处理成功")
else:
print("故障处理失败,需要人工干预")

三、高可用切换策略

3.1 自动故障切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# 自动故障切换脚本
import pymysql
import time
import logging
import subprocess

class AutoFailoverManager:
def __init__(self, master_config, slave_configs, vip_config):
self.master_config = master_config
self.slave_configs = slave_configs
self.vip_config = vip_config
self.logger = logging.getLogger(__name__)
self.current_master = master_config['host']

def check_master_health(self):
"""检查主库健康状态"""
try:
conn = pymysql.connect(**self.master_config)
cursor = conn.cursor()

# 检查基本连接
cursor.execute("SELECT 1")
result = cursor.fetchone()

# 检查复制状态
cursor.execute("SHOW MASTER STATUS")
master_status = cursor.fetchone()

# 检查InnoDB状态
cursor.execute("SHOW ENGINE INNODB STATUS")
innodb_status = cursor.fetchone()

conn.close()

return {
'connection_ok': result[0] == 1,
'master_status': master_status is not None,
'innodb_ok': innodb_status is not None
}
except Exception as e:
self.logger.error(f"主库健康检查失败: {str(e)}")
return {
'connection_ok': False,
'master_status': False,
'innodb_ok': False
}

def select_new_master(self):
"""选择新的主库"""
best_slave = None
min_lag = float('inf')

for slave_config in self.slave_configs:
try:
conn = pymysql.connect(**slave_config)
cursor = conn.cursor()

# 检查从库状态
cursor.execute("SHOW SLAVE STATUS")
result = cursor.fetchone()

if result:
slave_io_running = result[10]
slave_sql_running = result[11]
seconds_behind_master = result[32]

if (slave_io_running == 'Yes' and
slave_sql_running == 'Yes' and
seconds_behind_master < min_lag):
min_lag = seconds_behind_master
best_slave = slave_config

conn.close()

except Exception as e:
self.logger.error(f"从库 {slave_config['host']} 检查失败: {str(e)}")

return best_slave

def promote_slave_to_master(self, new_master_config):
"""提升从库为主库"""
try:
conn = pymysql.connect(**new_master_config)
cursor = conn.cursor()

# 停止从库复制
cursor.execute("STOP SLAVE")

# 重置从库状态
cursor.execute("RESET SLAVE ALL")

# 启用写操作
cursor.execute("SET GLOBAL read_only = 0")
cursor.execute("SET GLOBAL super_read_only = 0")

conn.close()

self.logger.info(f"从库 {new_master_config['host']} 已提升为主库")
return True

except Exception as e:
self.logger.error(f"提升从库为主库失败: {str(e)}")
return False

def update_slave_configs(self, new_master_config):
"""更新其他从库配置"""
for slave_config in self.slave_configs:
if slave_config['host'] != new_master_config['host']:
try:
conn = pymysql.connect(**slave_config)
cursor = conn.cursor()

# 停止复制
cursor.execute("STOP SLAVE")

# 更新主库配置
cursor.execute(f"""
CHANGE MASTER TO
MASTER_HOST='{new_master_config['host']}',
MASTER_USER='{new_master_config['user']}',
MASTER_PASSWORD='{new_master_config['password']}',
MASTER_AUTO_POSITION=1
""")

# 启动复制
cursor.execute("START SLAVE")

conn.close()

self.logger.info(f"从库 {slave_config['host']} 已更新主库配置")

except Exception as e:
self.logger.error(f"更新从库 {slave_config['host']} 配置失败: {str(e)}")

def update_load_balancer(self, new_master_config):
"""更新负载均衡器配置"""
try:
# 更新HAProxy配置
haproxy_config = f"""
global
daemon
maxconn 4096

defaults
mode tcp
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms

listen mysql-cluster
bind 0.0.0.0:3306
mode tcp
balance roundrobin
option mysql-check user haproxy_check

server mysql-master {new_master_config['host']}:3306 check weight 3
"""

# 写入配置文件
with open('/etc/haproxy/haproxy.cfg', 'w') as f:
f.write(haproxy_config)

# 重新加载HAProxy
subprocess.run(['systemctl', 'reload', 'haproxy'], check=True)

self.logger.info("负载均衡器配置已更新")
return True

except Exception as e:
self.logger.error(f"更新负载均衡器配置失败: {str(e)}")
return False

def execute_failover(self):
"""执行故障切换"""
self.logger.info("开始执行故障切换...")

# 检查主库健康状态
master_health = self.check_master_health()
if master_health['connection_ok']:
self.logger.info("主库健康,无需切换")
return True

# 选择新的主库
new_master = self.select_new_master()
if not new_master:
self.logger.error("未找到可用的从库")
return False

self.logger.info(f"选择新主库: {new_master['host']}")

# 提升从库为主库
if not self.promote_slave_to_master(new_master):
return False

# 更新其他从库配置
self.update_slave_configs(new_master)

# 更新负载均衡器配置
self.update_load_balancer(new_master)

# 更新当前主库
self.current_master = new_master['host']

self.logger.info("故障切换完成")
return True

def start_monitoring(self):
"""启动监控"""
while True:
try:
self.execute_failover()
time.sleep(30) # 每30秒检查一次
except KeyboardInterrupt:
self.logger.info("监控已停止")
break
except Exception as e:
self.logger.error(f"监控异常: {str(e)}")
time.sleep(60) # 异常时等待1分钟

# 使用示例
if __name__ == "__main__":
master_config = {
'host': '192.168.1.10',
'user': 'root',
'password': 'password',
'database': 'mysql'
}

slave_configs = [
{
'host': '192.168.1.11',
'user': 'root',
'password': 'password',
'database': 'mysql'
},
{
'host': '192.168.1.12',
'user': 'root',
'password': 'password',
'database': 'mysql'
}
]

vip_config = {
'vip': '192.168.1.100',
'interface': 'eth0'
}

failover_manager = AutoFailoverManager(master_config, slave_configs, vip_config)
failover_manager.start_monitoring()

3.2 手动故障切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/bin/bash
# 手动故障切换脚本

# 配置参数
MASTER_HOST="192.168.1.10"
SLAVE_HOSTS=("192.168.1.11" "192.168.1.12")
NEW_MASTER=""
VIP="192.168.1.100"

# 日志函数
log_message() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1"
}

# 检查主库状态
check_master_status() {
log_message "检查主库状态: $MASTER_HOST"

# 检查网络连通性
if ! ping -c 3 $MASTER_HOST > /dev/null 2>&1; then
log_message "主库网络不可达"
return 1
fi

# 检查MySQL服务
if ! mysql -h $MASTER_HOST -u root -p$MYSQL_PASSWORD -e "SELECT 1" > /dev/null 2>&1; then
log_message "主库MySQL服务异常"
return 1
fi

log_message "主库状态正常"
return 0
}

# 选择新主库
select_new_master() {
log_message "选择新主库..."

for slave in "${SLAVE_HOSTS[@]}"; do
log_message "检查从库: $slave"

# 检查从库状态
slave_status=$(mysql -h $slave -u root -p$MYSQL_PASSWORD -e "SHOW SLAVE STATUS\G" 2>/dev/null)

if [ $? -eq 0 ]; then
# 检查复制延迟
lag=$(echo "$slave_status" | grep "Seconds_Behind_Master" | awk '{print $2}')

if [ "$lag" = "0" ]; then
log_message "选择 $slave 作为新主库"
NEW_MASTER=$slave
return 0
else
log_message "$slave 复制延迟: ${lag}秒"
fi
else
log_message "$slave 连接失败"
fi
done

log_message "未找到合适的新主库"
return 1
}

# 提升从库为主库
promote_slave_to_master() {
log_message "提升 $NEW_MASTER 为主库..."

# 停止从库复制
mysql -h $NEW_MASTER -u root -p$MYSQL_PASSWORD -e "STOP SLAVE;" 2>/dev/null

# 重置从库状态
mysql -h $NEW_MASTER -u root -p$MYSQL_PASSWORD -e "RESET SLAVE ALL;" 2>/dev/null

# 启用写操作
mysql -h $NEW_MASTER -u root -p$MYSQL_PASSWORD -e "SET GLOBAL read_only = 0;" 2>/dev/null
mysql -h $NEW_MASTER -u root -p$MYSQL_PASSWORD -e "SET GLOBAL super_read_only = 0;" 2>/dev/null

log_message "$NEW_MASTER 已提升为主库"
}

# 更新其他从库配置
update_slave_configs() {
log_message "更新其他从库配置..."

for slave in "${SLAVE_HOSTS[@]}"; do
if [ "$slave" != "$NEW_MASTER" ]; then
log_message "更新从库 $slave 配置"

# 停止复制
mysql -h $slave -u root -p$MYSQL_PASSWORD -e "STOP SLAVE;" 2>/dev/null

# 更新主库配置
mysql -h $slave -u root -p$MYSQL_PASSWORD -e "
CHANGE MASTER TO
MASTER_HOST='$NEW_MASTER',
MASTER_USER='repl',
MASTER_PASSWORD='repl_password',
MASTER_AUTO_POSITION=1;
" 2>/dev/null

# 启动复制
mysql -h $slave -u root -p$MYSQL_PASSWORD -e "START SLAVE;" 2>/dev/null

log_message "从库 $slave 配置已更新"
fi
done
}

# 更新负载均衡器
update_load_balancer() {
log_message "更新负载均衡器配置..."

# 更新HAProxy配置
cat > /etc/haproxy/haproxy.cfg << EOF
global
daemon
maxconn 4096

defaults
mode tcp
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms

listen mysql-cluster
bind 0.0.0.0:3306
mode tcp
balance roundrobin
option mysql-check user haproxy_check

server mysql-master $NEW_MASTER:3306 check weight 3
EOF

# 重新加载HAProxy
systemctl reload haproxy

log_message "负载均衡器配置已更新"
}

# 主流程
main() {
log_message "开始手动故障切换..."

# 检查主库状态
if check_master_status; then
log_message "主库状态正常,无需切换"
exit 0
fi

# 选择新主库
if ! select_new_master; then
log_message "无法选择新主库,切换失败"
exit 1
fi

# 提升从库为主库
promote_slave_to_master

# 更新其他从库配置
update_slave_configs

# 更新负载均衡器
update_load_balancer

log_message "手动故障切换完成"
log_message "新主库: $NEW_MASTER"
}

# 执行主流程
main "$@"

四、数据恢复策略

4.1 备份策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/bin/bash
# MySQL备份策略脚本

# 配置参数
DB_HOST="192.168.1.10"
DB_USER="root"
DB_PASSWORD="password"
BACKUP_DIR="/backup/mysql"
DATE=$(date +%Y%m%d_%H%M%S)
RETENTION_DAYS=30

# 创建备份目录
mkdir -p $BACKUP_DIR/$DATE

# 全量备份
full_backup() {
log_message "开始全量备份..."

mysqldump -h $DB_HOST -u $DB_USER -p$DB_PASSWORD \
--single-transaction \
--routines \
--triggers \
--events \
--all-databases > $BACKUP_DIR/$DATE/full_backup.sql

if [ $? -eq 0 ]; then
log_message "全量备份完成"
return 0
else
log_message "全量备份失败"
return 1
fi
}

# 增量备份
incremental_backup() {
log_message "开始增量备份..."

# 刷新日志
mysql -h $DB_HOST -u $DB_USER -p$DB_PASSWORD -e "FLUSH LOGS;"

# 备份binlog
cp /var/lib/mysql/mysql-bin.* $BACKUP_DIR/$DATE/

if [ $? -eq 0 ]; then
log_message "增量备份完成"
return 0
else
log_message "增量备份失败"
return 1
fi
}

# 表级备份
table_backup() {
local database=$1
local table=$2

log_message "开始表级备份: $database.$table"

mysqldump -h $DB_HOST -u $DB_USER -p$DB_PASSWORD \
--single-transaction \
$database $table > $BACKUP_DIR/$DATE/${database}_${table}.sql

if [ $? -eq 0 ]; then
log_message "表级备份完成: $database.$table"
return 0
else
log_message "表级备份失败: $database.$table"
return 1
fi
}

# 清理旧备份
cleanup_old_backups() {
log_message "清理旧备份..."

find $BACKUP_DIR -type d -mtime +$RETENTION_DAYS -exec rm -rf {} \;

log_message "旧备份清理完成"
}

# 压缩备份
compress_backup() {
log_message "压缩备份文件..."

cd $BACKUP_DIR
tar -czf $DATE.tar.gz $DATE/
rm -rf $DATE/

log_message "备份文件压缩完成: $DATE.tar.gz"
}

# 主流程
main() {
log_message "开始备份流程..."

# 全量备份
if full_backup; then
# 增量备份
incremental_backup

# 压缩备份
compress_backup

# 清理旧备份
cleanup_old_backups

log_message "备份流程完成"
else
log_message "备份流程失败"
exit 1
fi
}

# 执行主流程
main "$@"

4.2 数据恢复策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# 数据恢复策略脚本
import pymysql
import subprocess
import os
import time
import logging

class DataRecoveryManager:
def __init__(self, db_config, backup_dir):
self.db_config = db_config
self.backup_dir = backup_dir
self.logger = logging.getLogger(__name__)

def full_recovery(self, backup_file):
"""全量恢复"""
self.logger.info(f"开始全量恢复: {backup_file}")

try:
# 停止MySQL服务
subprocess.run(['systemctl', 'stop', 'mysql'], check=True)

# 备份当前数据目录
backup_time = time.strftime('%Y%m%d_%H%M%S')
subprocess.run(['mv', '/var/lib/mysql', f'/var/lib/mysql.backup.{backup_time}'], check=True)

# 重新初始化数据目录
subprocess.run(['mysql_install_db', '--user=mysql', '--datadir=/var/lib/mysql'], check=True)

# 启动MySQL服务
subprocess.run(['systemctl', 'start', 'mysql'], check=True)

# 等待服务启动
time.sleep(10)

# 恢复数据
with open(backup_file, 'r') as f:
subprocess.run(['mysql', '-h', self.db_config['host'],
'-u', self.db_config['user'],
'-p' + self.db_config['password']],
stdin=f, check=True)

self.logger.info("全量恢复完成")
return True

except Exception as e:
self.logger.error(f"全量恢复失败: {str(e)}")
return False

def incremental_recovery(self, backup_dir, target_time):
"""增量恢复"""
self.logger.info(f"开始增量恢复到时间点: {target_time}")

try:
# 获取binlog文件列表
binlog_files = [f for f in os.listdir(backup_dir) if f.startswith('mysql-bin.')]
binlog_files.sort()

# 恢复binlog
for binlog_file in binlog_files:
binlog_path = os.path.join(backup_dir, binlog_file)

# 使用mysqlbinlog恢复
cmd = ['mysqlbinlog', '--stop-datetime=' + target_time, binlog_path]
result = subprocess.run(cmd, capture_output=True, text=True)

if result.returncode == 0:
# 执行恢复
subprocess.run(['mysql', '-h', self.db_config['host'],
'-u', self.db_config['user'],
'-p' + self.db_config['password']],
input=result.stdout, text=True, check=True)

self.logger.info("增量恢复完成")
return True

except Exception as e:
self.logger.error(f"增量恢复失败: {str(e)}")
return False

def table_recovery(self, database, table, backup_file):
"""表级恢复"""
self.logger.info(f"开始表级恢复: {database}.{table}")

try:
# 备份当前表
backup_time = time.strftime('%Y%m%d_%H%M%S')
backup_cmd = [
'mysqldump', '-h', self.db_config['host'],
'-u', self.db_config['user'],
'-p' + self.db_config['password'],
database, table
]

with open(f'/tmp/{database}_{table}_backup_{backup_time}.sql', 'w') as f:
subprocess.run(backup_cmd, stdout=f, check=True)

# 删除原表
conn = pymysql.connect(**self.db_config)
cursor = conn.cursor()
cursor.execute(f"DROP TABLE IF EXISTS {database}.{table}")
conn.close()

# 恢复表
with open(backup_file, 'r') as f:
subprocess.run(['mysql', '-h', self.db_config['host'],
'-u', self.db_config['user'],
'-p' + self.db_config['password']],
stdin=f, check=True)

self.logger.info("表级恢复完成")
return True

except Exception as e:
self.logger.error(f"表级恢复失败: {str(e)}")
return False

def point_in_time_recovery(self, target_time):
"""点对点恢复"""
self.logger.info(f"开始点对点恢复到: {target_time}")

try:
# 查找最近的备份
latest_backup = self.find_latest_backup()
if not latest_backup:
self.logger.error("未找到备份文件")
return False

# 全量恢复
if not self.full_recovery(latest_backup):
return False

# 增量恢复
backup_dir = os.path.dirname(latest_backup)
if not self.incremental_recovery(backup_dir, target_time):
return False

self.logger.info("点对点恢复完成")
return True

except Exception as e:
self.logger.error(f"点对点恢复失败: {str(e)}")
return False

def find_latest_backup(self):
"""查找最新备份"""
try:
backup_files = []
for root, dirs, files in os.walk(self.backup_dir):
for file in files:
if file.endswith('.sql'):
backup_files.append(os.path.join(root, file))

if backup_files:
latest_backup = max(backup_files, key=os.path.getmtime)
return latest_backup

return None

except Exception as e:
self.logger.error(f"查找备份文件失败: {str(e)}")
return None

def verify_recovery(self):
"""验证恢复结果"""
self.logger.info("验证恢复结果...")

try:
conn = pymysql.connect(**self.db_config)
cursor = conn.cursor()

# 检查数据库连接
cursor.execute("SELECT 1")
result = cursor.fetchone()

if result[0] != 1:
self.logger.error("数据库连接验证失败")
return False

# 检查表完整性
cursor.execute("SHOW DATABASES")
databases = cursor.fetchall()

for db in databases:
db_name = db[0]
if db_name not in ['information_schema', 'performance_schema', 'mysql', 'sys']:
cursor.execute(f"USE {db_name}")
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()

for table in tables:
table_name = table[0]
cursor.execute(f"CHECK TABLE {db_name}.{table_name}")
check_result = cursor.fetchone()

if check_result[2] != 'OK':
self.logger.error(f"表 {db_name}.{table_name} 检查失败")
return False

conn.close()

self.logger.info("恢复结果验证通过")
return True

except Exception as e:
self.logger.error(f"恢复结果验证失败: {str(e)}")
return False

# 使用示例
if __name__ == "__main__":
db_config = {
'host': '192.168.1.10',
'user': 'root',
'password': 'password',
'database': 'mysql'
}

backup_dir = '/backup/mysql'

recovery_manager = DataRecoveryManager(db_config, backup_dir)

# 选择恢复策略
recovery_type = "point_in_time" # 可以从命令行参数获取

if recovery_type == "full":
success = recovery_manager.full_recovery('/backup/mysql/20231201_120000/full_backup.sql')
elif recovery_type == "incremental":
success = recovery_manager.incremental_recovery('/backup/mysql/20231201_120000', '2023-12-01 12:00:00')
elif recovery_type == "point_in_time":
success = recovery_manager.point_in_time_recovery('2023-12-01 12:00:00')
else:
success = False

if success:
# 验证恢复结果
if recovery_manager.verify_recovery():
print("数据恢复成功")
else:
print("数据恢复验证失败")
else:
print("数据恢复失败")

五、监控与告警系统

5.1 实时监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# 数据库实时监控系统
import pymysql
import time
import json
import logging
from datetime import datetime

class DatabaseMonitor:
def __init__(self, db_configs, alert_config):
self.db_configs = db_configs
self.alert_config = alert_config
self.logger = logging.getLogger(__name__)
self.metrics_history = []

def collect_metrics(self, db_config):
"""收集数据库指标"""
try:
conn = pymysql.connect(**db_config)
cursor = conn.cursor()

# 基础指标
cursor.execute("SHOW STATUS")
status_vars = dict(cursor.fetchall())

# 连接数
connections = int(status_vars.get('Threads_connected', 0))
max_connections = int(status_vars.get('Max_used_connections', 0))

# 查询统计
queries = int(status_vars.get('Queries', 0))
slow_queries = int(status_vars.get('Slow_queries', 0))

# 复制状态
cursor.execute("SHOW SLAVE STATUS")
slave_status = cursor.fetchone()

replication_lag = 0
if slave_status:
replication_lag = slave_status[32] or 0

# InnoDB状态
cursor.execute("SHOW ENGINE INNODB STATUS")
innodb_status = cursor.fetchone()

conn.close()

return {
'timestamp': datetime.now().isoformat(),
'host': db_config['host'],
'connections': connections,
'max_connections': max_connections,
'queries': queries,
'slow_queries': slow_queries,
'replication_lag': replication_lag,
'innodb_status': innodb_status is not None
}

except Exception as e:
self.logger.error(f"收集指标失败 {db_config['host']}: {str(e)}")
return None

def check_alerts(self, metrics):
"""检查告警条件"""
alerts = []

# 连接数告警
if metrics['connections'] > 800:
alerts.append({
'level': 'warning',
'message': f"连接数过高: {metrics['connections']}",
'host': metrics['host']
})

# 慢查询告警
if metrics['slow_queries'] > 100:
alerts.append({
'level': 'warning',
'message': f"慢查询过多: {metrics['slow_queries']}",
'host': metrics['host']
})

# 复制延迟告警
if metrics['replication_lag'] > 60:
alerts.append({
'level': 'critical',
'message': f"复制延迟过高: {metrics['replication_lag']}秒",
'host': metrics['host']
})

# InnoDB状态告警
if not metrics['innodb_status']:
alerts.append({
'level': 'critical',
'message': "InnoDB状态异常",
'host': metrics['host']
})

return alerts

def send_alert(self, alert):
"""发送告警"""
try:
# 发送邮件
if self.alert_config.get('email_enabled'):
self.send_email_alert(alert)

# 发送短信
if self.alert_config.get('sms_enabled'):
self.send_sms_alert(alert)

# 发送钉钉
if self.alert_config.get('dingtalk_enabled'):
self.send_dingtalk_alert(alert)

self.logger.info(f"告警已发送: {alert['message']}")

except Exception as e:
self.logger.error(f"发送告警失败: {str(e)}")

def send_email_alert(self, alert):
"""发送邮件告警"""
import smtplib
from email.mime.text import MIMEText

msg = MIMEText(f"数据库告警\n\n级别: {alert['level']}\n主机: {alert['host']}\n消息: {alert['message']}", 'plain', 'utf-8')
msg['From'] = self.alert_config['email_from']
msg['To'] = self.alert_config['email_to']
msg['Subject'] = f"数据库告警 - {alert['level']}"

server = smtplib.SMTP(self.alert_config['smtp_server'])
server.send_message(msg)
server.quit()

def send_sms_alert(self, alert):
"""发送短信告警"""
# 这里可以集成短信服务商的API
pass

def send_dingtalk_alert(self, alert):
"""发送钉钉告警"""
import requests

webhook_url = self.alert_config['dingtalk_webhook']
message = {
"msgtype": "text",
"text": {
"content": f"数据库告警\n级别: {alert['level']}\n主机: {alert['host']}\n消息: {alert['message']}"
}
}

requests.post(webhook_url, json=message)

def start_monitoring(self):
"""启动监控"""
self.logger.info("启动数据库监控...")

while True:
try:
for db_config in self.db_configs:
# 收集指标
metrics = self.collect_metrics(db_config)

if metrics:
# 存储指标历史
self.metrics_history.append(metrics)

# 检查告警
alerts = self.check_alerts(metrics)

# 发送告警
for alert in alerts:
self.send_alert(alert)

# 清理历史数据(保留最近1000条)
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]

time.sleep(30) # 每30秒监控一次

except KeyboardInterrupt:
self.logger.info("监控已停止")
break
except Exception as e:
self.logger.error(f"监控异常: {str(e)}")
time.sleep(60)

# 使用示例
if __name__ == "__main__":
db_configs = [
{
'host': '192.168.1.10',
'user': 'monitor',
'password': 'monitor_password',
'database': 'mysql'
},
{
'host': '192.168.1.11',
'user': 'monitor',
'password': 'monitor_password',
'database': 'mysql'
}
]

alert_config = {
'email_enabled': True,
'email_from': 'monitor@company.com',
'email_to': 'admin@company.com',
'smtp_server': 'smtp.company.com',
'sms_enabled': False,
'dingtalk_enabled': True,
'dingtalk_webhook': 'https://oapi.dingtalk.com/robot/send?access_token=xxx'
}

monitor = DatabaseMonitor(db_configs, alert_config)
monitor.start_monitoring()

5.2 告警配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Prometheus告警规则配置
groups:
- name: mysql_alerts
rules:
- alert: MySQLDown
expr: mysql_up == 0
for: 1m
labels:
severity: critical
annotations:
summary: "MySQL实例宕机"
description: "MySQL实例 {{ $labels.instance }} 已宕机超过1分钟"

- alert: MySQLHighConnections
expr: mysql_global_status_threads_connected > 800
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL连接数过高"
description: "MySQL实例 {{ $labels.instance }} 连接数: {{ $value }}"

- alert: MySQLSlowQueries
expr: rate(mysql_global_status_slow_queries[5m]) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL慢查询过多"
description: "MySQL实例 {{ $labels.instance }} 慢查询率: {{ $value }}"

- alert: MySQLReplicationLag
expr: mysql_slave_lag_seconds > 60
for: 5m
labels:
severity: critical
annotations:
summary: "MySQL复制延迟过高"
description: "MySQL实例 {{ $labels.instance }} 复制延迟: {{ $value }}秒"

- alert: MySQLDiskSpace
expr: mysql_global_status_innodb_data_file_size_bytes / mysql_global_status_innodb_data_file_size_bytes > 0.9
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL磁盘空间不足"
description: "MySQL实例 {{ $labels.instance }} 磁盘使用率: {{ $value }}%"

六、最佳实践与运维建议

6.1 预防措施

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 数据库健康检查配置
-- 1. 启用慢查询日志
SET GLOBAL slow_query_log = 1;
SET GLOBAL long_query_time = 2;
SET GLOBAL log_queries_not_using_indexes = 1;

-- 2. 配置连接数限制
SET GLOBAL max_connections = 1000;
SET GLOBAL max_user_connections = 800;

-- 3. 配置InnoDB参数
SET GLOBAL innodb_buffer_pool_size = 2G;
SET GLOBAL innodb_log_file_size = 256M;
SET GLOBAL innodb_log_buffer_size = 16M;
SET GLOBAL innodb_flush_log_at_trx_commit = 1;

-- 4. 配置复制参数
SET GLOBAL slave_parallel_workers = 4;
SET GLOBAL slave_parallel_type = 'LOGICAL_CLOCK';
SET GLOBAL slave_net_timeout = 60;

6.2 应急响应流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#!/bin/bash
# 应急响应流程脚本

# 1. 故障检测
detect_failure() {
echo "检测数据库故障..."

# 检查MySQL进程
if ! pgrep mysqld > /dev/null; then
echo "MySQL进程异常"
return 1
fi

# 检查端口监听
if ! netstat -tuln | grep :3306 > /dev/null; then
echo "MySQL端口异常"
return 1
fi

# 检查数据库连接
if ! mysql -e "SELECT 1" > /dev/null 2>&1; then
echo "数据库连接异常"
return 1
fi

echo "数据库状态正常"
return 0
}

# 2. 故障诊断
diagnose_failure() {
echo "诊断故障原因..."

# 检查错误日志
ERROR_LOG=$(mysql -e "SHOW VARIABLES LIKE 'log_error'" | grep log_error | awk '{print $2}')
if [ -f "$ERROR_LOG" ]; then
echo "错误日志:"
tail -20 "$ERROR_LOG"
fi

# 检查系统资源
echo "系统资源:"
echo "CPU: $(top -bn1 | grep "Cpu(s)" | awk '{print $2}')"
echo "内存: $(free | grep Mem | awk '{printf "%.2f%%", $3/$2 * 100.0}')"
echo "磁盘: $(df -h / | awk 'NR==2{printf "%s", $5}')"
}

# 3. 应急处理
emergency_response() {
echo "执行应急处理..."

# 停止MySQL服务
systemctl stop mysql

# 检查数据文件
mysqlcheck --all-databases --check --extended

# 尝试修复
mysqlcheck --all-databases --repair --extended

# 启动MySQL服务
systemctl start mysql

# 等待服务启动
sleep 10

# 检查服务状态
if systemctl is-active mysql > /dev/null; then
echo "MySQL服务恢复成功"
return 0
else
echo "MySQL服务恢复失败"
return 1
fi
}

# 4. 数据恢复
data_recovery() {
echo "执行数据恢复..."

# 从备份恢复
LATEST_BACKUP=$(ls -t /backup/mysql/*.sql | head -1)

if [ -n "$LATEST_BACKUP" ]; then
echo "使用备份文件: $LATEST_BACKUP"

# 停止MySQL服务
systemctl stop mysql

# 备份当前数据
mv /var/lib/mysql /var/lib/mysql.backup.$(date +%Y%m%d_%H%M%S)

# 重新初始化
mysql_install_db --user=mysql --datadir=/var/lib/mysql

# 启动MySQL服务
systemctl start mysql

# 恢复数据
mysql < "$LATEST_BACKUP"

echo "数据恢复完成"
return 0
else
echo "未找到备份文件"
return 1
fi
}

# 5. 服务验证
verify_service() {
echo "验证服务状态..."

# 检查MySQL服务
if systemctl is-active mysql > /dev/null; then
echo "MySQL服务运行正常"
else
echo "MySQL服务异常"
return 1
fi

# 检查数据库连接
if mysql -e "SELECT 1" > /dev/null 2>&1; then
echo "数据库连接正常"
else
echo "数据库连接异常"
return 1
fi

# 检查数据完整性
mysqlcheck --all-databases --check --extended

echo "服务验证完成"
return 0
}

# 主流程
main() {
echo "开始应急响应流程..."

# 故障检测
if detect_failure; then
echo "数据库状态正常,无需处理"
exit 0
fi

# 故障诊断
diagnose_failure

# 应急处理
if emergency_response; then
echo "应急处理成功"
else
echo "应急处理失败,尝试数据恢复"
if data_recovery; then
echo "数据恢复成功"
else
echo "数据恢复失败,需要人工干预"
exit 1
fi
fi

# 服务验证
if verify_service; then
echo "应急响应流程完成"
else
echo "服务验证失败,需要人工干预"
exit 1
fi
}

# 执行主流程
main "$@"

6.3 运维最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 数据库运维最佳实践配置
database_ops:
# 监控配置
monitoring:
- 实时监控数据库状态
- 监控系统资源使用情况
- 监控复制状态和延迟
- 监控慢查询和锁等待

# 备份策略
backup:
- 每日全量备份
- 每小时增量备份
- 保留30天备份
- 定期测试备份恢复

# 维护计划
maintenance:
- 定期清理日志文件
- 定期优化表结构
- 定期更新统计信息
- 定期检查数据完整性

# 安全措施
security:
- 定期更新密码
- 限制数据库访问权限
- 启用SSL连接
- 定期安全审计

# 性能优化
performance:
- 定期分析慢查询
- 优化索引结构
- 调整缓存参数
- 监控连接数使用

七、总结

数据库崩溃应急处理是架构师必须掌握的核心技能,本文深入解析了数据库崩溃的应急处理流程,从故障诊断到数据恢复,提供了完整的高可用切换策略与灾难恢复方案。

关键要点:

  1. 故障诊断:快速识别故障类型,分析根本原因
  2. 应急响应:建立标准化的应急处理流程
  3. 高可用切换:实现自动和手动故障切换
  4. 数据恢复:制定完整的备份和恢复策略
  5. 监控告警:建立实时监控和告警系统
  6. 预防措施:通过配置优化和定期维护预防故障

通过本文的学习和实践,架构师可以构建稳定可靠的数据库应急处理体系,确保业务的高可用性和数据安全性。


作者简介:资深架构师,专注于数据库高可用架构设计与应急处理,拥有丰富的数据库崩溃处理实战经验。

技术交流:欢迎关注我的技术博客,分享更多数据库应急处理经验。