1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
| import pymysql import time import logging import smtplib from email.mime.text import MIMEText from datetime import datetime, timedelta
class BackupMonitor: def __init__(self, db_config, backup_dir, alert_config): self.db_config = db_config self.backup_dir = backup_dir self.alert_config = alert_config self.logger = logging.getLogger(__name__) def check_backup_status(self): """检查备份状态""" try: backup_status = { 'full_backup': self.check_full_backup(), 'incremental_backup': self.check_incremental_backup(), 'binlog_backup': self.check_binlog_backup(), 'storage_usage': self.check_storage_usage() } return backup_status except Exception as e: self.logger.error(f"检查备份状态失败: {str(e)}") return None def check_full_backup(self): """检查全量备份""" try: full_backups = [] for root, dirs, files in os.walk(self.backup_dir): for file in files: if 'full_backup' in file and file.endswith('.sql'): full_backups.append(os.path.join(root, file)) if not full_backups: return {'status': 'failed', 'message': '未找到全量备份'} latest_backup = max(full_backups, key=os.path.getmtime) backup_time = datetime.fromtimestamp(os.path.getmtime(latest_backup)) if datetime.now() - backup_time > timedelta(hours=24): return {'status': 'warning', 'message': f'全量备份超过24小时: {backup_time}'} return {'status': 'success', 'message': f'全量备份正常: {backup_time}'} except Exception as e: return {'status': 'error', 'message': f'检查全量备份失败: {str(e)}'} def check_incremental_backup(self): """检查增量备份""" try: incremental_backups = [] for root, dirs, files in os.walk(self.backup_dir): for file in files: if 'incremental_backup' in file and file.endswith('.sql'): incremental_backups.append(os.path.join(root, file)) if not incremental_backups: return {'status': 'warning', 'message': '未找到增量备份'} latest_backup = max(incremental_backups, key=os.path.getmtime) backup_time = datetime.fromtimestamp(os.path.getmtime(latest_backup)) if datetime.now() - backup_time > timedelta(hours=1): return {'status': 'warning', 'message': f'增量备份超过1小时: {backup_time}'} return {'status': 'success', 'message': f'增量备份正常: {backup_time}'} except Exception as e: return {'status': 'error', 'message': f'检查增量备份失败: {str(e)}'} def check_binlog_backup(self): """检查binlog备份""" try: binlog_backups = [] for root, dirs, files in os.walk(self.backup_dir): for file in files: if file.startswith('mysql-bin.') and file.endswith('.log'): binlog_backups.append(os.path.join(root, file)) if not binlog_backups: return {'status': 'warning', 'message': '未找到binlog备份'} latest_backup = max(binlog_backups, key=os.path.getmtime) backup_time = datetime.fromtimestamp(os.path.getmtime(latest_backup)) if datetime.now() - backup_time > timedelta(minutes=10): return {'status': 'warning', 'message': f'binlog备份超过10分钟: {backup_time}'} return {'status': 'success', 'message': f'binlog备份正常: {backup_time}'} except Exception as e: return {'status': 'error', 'message': f'检查binlog备份失败: {str(e)}'} def check_storage_usage(self): """检查存储使用率""" try: import shutil total, used, free = shutil.disk_usage(self.backup_dir) usage_percent = (used / total) * 100 if usage_percent > 90: return {'status': 'critical', 'message': f'存储使用率过高: {usage_percent:.2f}%'} elif usage_percent > 80: return {'status': 'warning', 'message': f'存储使用率较高: {usage_percent:.2f}%'} else: return {'status': 'success', 'message': f'存储使用率正常: {usage_percent:.2f}%'} except Exception as e: return {'status': 'error', 'message': f'检查存储使用率失败: {str(e)}'} def send_alert(self, alert_type, message): """发送告警""" try: if self.alert_config.get('email_enabled'): self.send_email_alert(alert_type, message) if self.alert_config.get('sms_enabled'): self.send_sms_alert(alert_type, message) if self.alert_config.get('dingtalk_enabled'): self.send_dingtalk_alert(alert_type, message) self.logger.info(f"告警已发送: {alert_type} - {message}") except Exception as e: self.logger.error(f"发送告警失败: {str(e)}") def send_email_alert(self, alert_type, message): """发送邮件告警""" try: msg = MIMEText(f"数据库备份告警\n\n类型: {alert_type}\n消息: {message}", 'plain', 'utf-8') msg['From'] = self.alert_config['email_from'] msg['To'] = self.alert_config['email_to'] msg['Subject'] = f"数据库备份告警 - {alert_type}" server = smtplib.SMTP(self.alert_config['smtp_server']) server.send_message(msg) server.quit() except Exception as e: self.logger.error(f"发送邮件告警失败: {str(e)}") def send_sms_alert(self, alert_type, message): """发送短信告警""" pass def send_dingtalk_alert(self, alert_type, message): """发送钉钉告警""" try: import requests webhook_url = self.alert_config['dingtalk_webhook'] alert_message = { "msgtype": "text", "text": { "content": f"数据库备份告警\n类型: {alert_type}\n消息: {message}" } } requests.post(webhook_url, json=alert_message) except Exception as e: self.logger.error(f"发送钉钉告警失败: {str(e)}") def start_monitoring(self): """启动监控""" self.logger.info("启动备份监控...") while True: try: backup_status = self.check_backup_status() if backup_status: for backup_type, status in backup_status.items(): if status['status'] in ['failed', 'critical']: self.send_alert('critical', f"{backup_type}: {status['message']}") elif status['status'] == 'warning': self.send_alert('warning', f"{backup_type}: {status['message']}") time.sleep(300) except KeyboardInterrupt: self.logger.info("备份监控已停止") break except Exception as e: self.logger.error(f"备份监控异常: {str(e)}") time.sleep(60)
if __name__ == "__main__": db_config = { 'host': '192.168.1.10', 'user': 'monitor', 'password': 'monitor_password', 'database': 'mysql' } backup_dir = '/backup/mysql' alert_config = { 'email_enabled': True, 'email_from': 'monitor@company.com', 'email_to': 'admin@company.com', 'smtp_server': 'smtp.company.com', 'sms_enabled': False, 'dingtalk_enabled': True, 'dingtalk_webhook': 'https://oapi.dingtalk.com/robot/send?access_token=xxx' } monitor = BackupMonitor(db_config, backup_dir, alert_config) monitor.start_monitoring()
|