背景
早期有一个saas服务的数据库,各租户的表结构内容完全相同,由于某大客户租户的业务较广,从集中管理的租户中独立出来做私有化的业务(gitlab代码仓库、db 数据库、服务器来单独部署)。
几年间,私有化的租户db和其他租户db管理就有了较大差异化。
由于业务变更,私有化的管理方式要切换回集中租户管理方式,所以要评估出将私有化的业务和db表结构之前差异点。
问题
这里跳过其他业务内容对比,说一下db之间怎么整理出差异点,然后由差异点来生成 alter 修改表结构的语句,判断哪些需要调整修改,哪些可以直接跳过来处理。
实现方案
常见的方案有很多,在 RDS 云服务上一般都会附有比较工具,另外可以找一下开源的对比工具,把各自的数据源和要对比的内容配置上即可。
只不过常规的对比工具结果,可能就不是那么的直观来表达出差异点,还需要人工二次主观意识的判断验证。如果表的数量和字段数量很少的情况下,一个个表人工比对结果还可以,那如果是上千张表呢?如果有多套环境需要验证多次呢?
需要遇到的场景有如下
From 表多字段、From 多表
To 表多字段、To 多表
同表,From 多索引
同表,To 多索引
同表,索引内容一致,索引名不同
同表同字段,类型相同,长度不同,注释相同
同表同字段,类型相同,长度不同,注释不同
同表同字段,类型相同,长度相同,注释不同
同表同字段,类型不同,长度相同,注释相同
同表同字段,类型不同,长度不同,注释不同
同表同字段,类型不同,长度不同,注释相同
同表同字段,类型相同,长度相同,注释相同,字段在表里的上下位置不同
...
如下通过比较工具,两个数据库 From_online 和 To_online 对比结果,给出两边的修改建议
场景1:多字段
场景1:多字段 # 左表 From 和右表 To 比较出现异常 # Comparing `F_online`.`cs_tag` to `T_online`.`cs_tag` [FAIL] # Transformation for --changes-for=server2: # # 右表 To 建议修改语句 ALTER TABLE `T_online`.`cs_tag` ADD COLUMN remark varchar(100) NULL COMMENT '备注' AFTER type; # # Transformation for reverse changes (--changes-for=server1): # # 右表 From 建议修改语句 # ALTER TABLE `F_online`.`cs_tag` # DROP COLUMN remark; #
很明显,这里是 From 数据库表,多一个 remark 字段,而 To 表建议添加,所以主观分析,只需要修改 To 表一方即可。
场景2:字段名大小写不同
# 场景2:字段大小写不同 # Comparing `F_online`.`cs_rule` to `T_online`.`cs_rule` [FAIL] # Transformation for --changes-for=server2: # ALTER TABLE `T_online`.`cs_rule` DROP COLUMN SpecialDayTime, ADD COLUMN specialDayTime datetime NULL COMMENT '特殊日子' AFTER conditionValue; # # Transformation for reverse changes (--changes-for=server1): # # ALTER TABLE `F_online`.`cs_rule` # DROP COLUMN specialDayTime, # ADD COLUMN SpecialDayTime datetime NULL COMMENT '特殊日子' AFTER conditionValue; #
这个也很简单,只是字段的大小写不同而已,mysql 大小写不敏感,可选择性的修改。
场景3:类型相同,值不同,索引不同,注释不同
如下场景3,加个难度
# 场景3:类型相同,枚举值不同,索引数量不同,索引名大小写不同,注释不同 # Comparing `F_online`.`cs_order` to `T_online`.`cs_order` [FAIL] # Transformation for --changes-for=server2: # ALTER TABLE `T_online`.`cs_order` DROP INDEX IDX_CARPARK_CON, DROP INDEX IDX_CARPARK_ORDERNUMBER, ADD INDEX idx_CARPARK_ORDERNUMBER (orderNumber), CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MOUTH','DAY','WEEK') NOT NULL COMMENT '周期 YEAR-年 MONTH-月 DAY-日 WEEK-周'; # # Transformation for reverse changes (--changes-for=server1): # # ALTER TABLE `F_online`.`cs_order` # ADD INDEX IDX_CARPARK_CON (carparkOrderNumber), # DROP INDEX idx_CARPARK_ORDERNUMBER, # ADD INDEX IDX_CARPARK_ORDERNUMBER (orderNumber), # CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日'; #
分析
原本 F_online 表,有索引 idx_CARPARK_ORDERNUMBER,表字段 limitPeriod 注数据类型 enum('YEAR','MOUTH','DAY','WEEK'),
原本 T_online 表,有索引 IDX_CARPARK_ORDERNUMBER、IDX_CARPARK_CON,表字段 limitPeriod 注数据类型 enum('YEAR','MONTH','DAY')
分析结果
有索引相同,大小写命名不同而已,可以不用修改
有索引类型相同,索引的内容不同,甚至还有错别字的情况
需要做的是两边数据库,首先保证数据类型能够全覆盖,修改业务把错别字情况改正,两边做修改调整
ALTER TABLE `F_online`.`cs_order` ADD INDEX IDX_CARPARK_CON (carparkOrderNumber), CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY', 'WEEK') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日 WEEK-周'; ALTER TABLE `T_online`.`cs_order` ADD INDEX IDX_CARPARK_CON (carparkOrderNumber), CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY', 'WEEK') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日 WEEK-周';
场景4:类型不同,是否为空不同,默认值不同,注释不同
场景4:类型不同,允许为空不同,默认值不同,注释不同 # Comparing `F_online`.`cs_task` to `T_online`.`cs_task` [FAIL] # Transformation for --changes-for=server2: # ALTER TABLE `T_online`.`cs_task` CHANGE COLUMN taskStatus taskStatus tinyint(5) NULL DEFAULT 1 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功'; # # Transformation for reverse changes (--changes-for=server1): # # ALTER TABLE `F_online`.`cs_task` # CHANGE COLUMN taskStatus taskStatus int(4) NOT NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除'; #
分析
原本 F_online.cs_task 表,字段 taskStatus 注数据类型是 tinyint(5),允许为Null,默认值为 1,注释是‘状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功’
原本 T_online.cs_task 表,字段 taskStatus 注数据类型是 int(4),不允许为Null,默认值为 0,注释是‘状态0-处理中,1-成功,2-失败,3-已经删除’
需要做的是两边数据库,首先保证数据类型和长度是向上兼容的,注释保留最全的一部分,默认值需要根据业务场景来具体分析,之后再把建议修改的sql,两边做修改调整
分析结果:两边都做修改来把数据库齐平 ALTER TABLE `F_online`.`cs_task` CHANGE COLUMN taskStatus taskStatus int(4) NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功'; ALTER TABLE `T_online`.`cs_task` CHANGE COLUMN taskStatus taskStatus int(4) NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功';
场景5:各种混合不同
...
由此可以看出,加入你有多个环境需要比对,多个数据库,多个表需要这么一点点的人工比对,工作量是有多大的?如果你是幸运儿的话,表的结构差异性很少还行吧。
自定义实现方案
常言道工具提升生产力,是效率必不可少的关键一环。
思考一下 MySql 本身的 schema 表中,已经完全包含了所有数据库,所有的表信息,表字段信息,索引信息,函数信息,存储过程信息。
function_sql = "select `name` from mysql.proc where db = '库' and `type` = 'FUNCTION'" procedure_sql = "select `name` from mysql.proc where db = '库' and `type` = 'PROCEDURE'" view_sql = "select table_name from information_schema.VIEWS where table_schema = '表'" trigger_sql = "show triggers from 库" schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '表' and table_schema = '库' "
那既然如此,搞一个脚本来查询两边数据库的结果,然后做统计分析,再给出两边的 SQL 修改建议,然后导出Excel文件,很直观方式查看差异点,岂不是很快哉?
第一版脚本1:找出差异点
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import os import sys import pymysql from openpyxl import Workbook from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors wb_result = Workbook() sheet = wb_result.active def result_excel_init(): """ 初始化结果excel样式 """ # 样式 highlight = NamedStyle(name="highlight") highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True) highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) highlight.fill = PatternFill("solid", fgColor="ACB9C9") sheet.append(['表名', '{} 多字段'.format(s_db), '{} 多字段'.format(t_db), '字段名', '允许 Null 对比', '默认值对比', '数据类型对比', '长度对比', '索引类型对比', '注释对比']) sheet.row_dimensions[1].height = 25 sheet.column_dimensions['A'].width = 38 sheet.column_dimensions['B'].width = 40 sheet.column_dimensions['C'].width = 40 sheet.column_dimensions['D'].width = 20 sheet.column_dimensions['E'].width = 20 sheet.column_dimensions['F'].width = 25 sheet.column_dimensions['G'].width = 30 sheet.column_dimensions['H'].width = 60 sheet.column_dimensions['I'].width = 20 sheet.column_dimensions['J'].width = 200 for cell in list(sheet.rows)[0]: cell.style = highlight def result_excel_end(): style = NamedStyle(name="style") v_center = Alignment(vertical='center', wrap_text=True) hv_list = [1] for i, row in enumerate(list(sheet.rows)): if i == 0: continue for cell in row: if cell.column in hv_list: pass else: style.alignment = v_center cell.style = style def export_excel_file(data_list, file_name): print('\n--- 对比结束,导出 Excel 文件 ---\n') result_excel_init() for data in data_list: print(data) sheet.append([data.get('表名'), data.get('{} 多字段'.format(s_db)), data.get('{} 多字段'.format(t_db)), data.get('字段名'), data.get('允许Null'), data.get('默认值'), data.get('数据类型'), data.get('类型长度'), data.get('索引类型'), data.get('注释')]) result_excel_end() wb_result.save(file_name) print('\n执行导出结果完成\n') def s_exec_sql(sql, db_name): db = pymysql.connect(host='ip1', port=3306, user='用户1', password='密码1', database=db_name) cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() if results: return results def t_exec_sql(sql, db_name): db = pymysql.connect(host='ip2', port=3306, user='用户2', password='密码2', database=db_name) cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() if results: return results if __name__ == '__main__': s_db = 'F_online' t_db = 'T_online' # 函数、存储过程、触发器、表结构 function_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'FUNCTION'" procedure_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'PROCEDURE'" view_sql = "select table_name from information_schema.VIEWS where table_schema = '{}'" trigger_sql = "show triggers from {}" schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '{}' and table_schema = '{}' " print('\n---------- 触发器对比 ----------') s_trigger_result = s_exec_sql(trigger_sql.format(s_db), s_db) t_trigger_result = t_exec_sql(trigger_sql.format(t_db), t_db) print('{} 触发器 {} 个 {}'.format(s_db, len(s_trigger_result) if s_trigger_result else 0, s_trigger_result if s_trigger_result else '')) print('{} 触发器 {} 个 {}'.format(t_db, len(t_trigger_result) if t_trigger_result else 0, t_trigger_result if t_trigger_result else '')) print('\n---------- 存储过程对比 ----------') s_procedure_result = s_exec_sql(procedure_sql.format(s_db), s_db) t_procedure_result = t_exec_sql(procedure_sql.format(t_db), t_db) print('{} 存储过程 {} 个 {}'.format(s_db, len(s_procedure_result) if s_procedure_result else 0, s_procedure_result if s_procedure_result else '')) print('{} 存储过程 {} 个 {}'.format(t_db, len(t_procedure_result) if t_procedure_result else 0, t_procedure_result if t_procedure_result else '')) print('\n---------- 函数对比 ----------') s_function_result = s_exec_sql(function_sql.format(s_db), s_db) t_function_result = t_exec_sql(function_sql.format(t_db), t_db) print('{} 函数 {} 个 {}'.format(s_db, len(s_function_result) if s_function_result else 0, [v for (v,) in s_function_result] if s_function_result else '')) print('{} 函数 {} 个 {}'.format(t_db, len(t_function_result) if t_function_result else 0, [v for (v,) in t_function_result] if t_function_result else '')) print('\n---------- 视图对比 ----------') s_view_result = s_exec_sql(view_sql.format(s_db), s_db) t_view_result = t_exec_sql(view_sql.format(t_db), t_db) print('{} 视图 {} 个 {}'.format(s_db, len(s_view_result) if s_view_result else 0, [v for (v,) in s_view_result] if s_view_result else '')) print('{} 视图 {} 个 {}'.format(t_db, len(t_view_result) if t_view_result else 0, [v for (v,) in t_view_result] if t_view_result else '')) print('\n---------- 表结构对比 ----------') source_tables_result = s_exec_sql('SHOW TABLES from {}'.format(s_db), s_db) target_tables_result = t_exec_sql('SHOW TABLES from {}'.format(t_db), t_db) print('{} 表 {} 个'.format(s_db, len(source_tables_result) if source_tables_result else 0)) print('{} 表 {} 个'.format(t_db, len(target_tables_result) if target_tables_result else 0)) source_tables = set([tb for (tb,) in source_tables_result]) target_tables = set([tb for (tb,) in target_tables_result]) intersection_tables = set(source_tables) & set(target_tables) s_dif_tables = set(source_tables) - set(target_tables) t_dif_tables = set(target_tables) - set(source_tables) print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables))) print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables)) print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables)) comp_data_list = list() for i, table in enumerate(sorted(intersection_tables)): if table: # 查询 schema s_idx_result = s_exec_sql(schema_sql.format(table, s_db), s_db) t_idx_result = t_exec_sql(schema_sql.format(table, t_db), t_db) if s_idx_result == t_idx_result: # print(table, '完全一致') continue s_dic_data = {} t_dic_data = {} for d in s_idx_result: s_dic_data[d[0]] = d for d in t_idx_result: t_dic_data[d[0]] = d print('\n------ table: {} ------'.format(table)) s_column_names = sorted(s_dic_data.keys()) t_column_names = sorted(t_dic_data.keys()) if s_column_names != t_column_names: dif_columns = set(s_column_names) - set(t_column_names) if dif_columns: print('`{}`.`{}` 多字段 {}'.format(s_db, table, dif_columns)) comp_data_list.append({ "表名": table, "{} 多字段".format(s_db): ','.join(dif_columns) }) else: t_dif = set(t_column_names) - set(s_column_names) print('`{}`.`{}` 多字段 {}'.format(t_db, table, t_dif)) comp_data_list.append({ "表名": table, "{} 多字段".format(t_db): ','.join(t_dif) }) for k, v in s_dic_data.items(): bv = t_dic_data.get(k) # 两边有值但不相等 if bv and bv != v: s_is_null, t_is_null = v[1], bv[1] s_data_type, t_data_type = v[2], bv[2] s_def_value, t_def_value = v[3], bv[3] s_data_len, t_data_len = v[4], bv[4] s_idx_type, t_idx_type = v[5], bv[5] s_commend, t_commend = v[6], bv[6] comp_result = { "表名": table, "字段名": k } if s_is_null != t_is_null: comp_result['允许Null'] = '{}={} \n{}={}'.format(s_db, s_is_null, t_db, t_is_null) if s_data_type != t_data_type: comp_result['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type) if s_def_value != t_def_value: sv = s_def_value if s_def_value else '' tv = t_def_value if t_def_value else '' if sv != tv: comp_result['默认值'] = '{}={} \n{}={}'.format(s_db, sv, t_db, tv) if s_data_len != t_data_len: comp_result['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) if s_idx_type != t_idx_type: comp_result['索引类型'] = '{}={} \n{}={}'.format(s_db, s_idx_type, t_db, t_idx_type) if s_commend != t_commend: com_len = len(str(s_commend).replace("\r\n", ' ')) comp_result['注释'] = '{}={}{} \n{}={}'.format(s_db, str(s_commend).replace("\r\n", ' '), '\n' if com_len > 150 else '', t_db, str(t_commend).replace("\r\n", ' ')) comp_data_list.append(comp_result) # 导出 Excel 文件 export_excel_file(comp_data_list, '{}->{}-表结构对比2.xlsx'.format(s_db, t_db))
执行日志
---------- 触发器对比 ---------- F_online 触发器 0 个 T_online 触发器 0 个 ---------- 存储过程对比 ---------- F_online 存储过程 0 个 T_online 存储过程 0 个 ---------- 函数对比 ---------- F_online 函数 4 个 ['currval', 'func_checkVersion', 'nextval', 'setval'] T_online 函数 7 个 ['currval', 'func_checkVersion', 'getCategoryName', 'getRegionName', 'nextval', 'region', 'setval'] ---------- 视图对比 ---------- F_online 视图 9 个 ['cp_consume_verification_view', 'cp_registration_view', ...] T_online 视图 9 个 ['cp_consume_verification_view', 'cp_registration_view', ...] ---------- 表结构对比 ---------- F_online 表 1163 个 T_online 表 1145 个 F_online & T_online 共有表 1134 个 T_online 缺失表 29 个 {'cs_pm_service_channel_config', 'cs_delivery_time_rule_period', ...} F_online 缺失表 11 个 {'im_ps_member', 'cs_deliveryregion_regions_bak', 'cs_goods_bak', ...} ------ table: cs_am_reward_record_detail ------ ------ table: cs_cp_definition ------ `F_online`.`cs_cp_definition` 多字段 {'target', 'guideProvide'} ------ table: ... ------ --- 对比结束,导出 Excel 文件 --- {'表名': 'cs_am_reward_record_detail', '字段名': 'nickName', '允许Null': 'F_online=YES \nT_online=NO', '默认值': 'F_online= \nT_online='} {'表名': 'cs_am_share_record', '字段名': 'code', '索引类型': 'F_online= \nT_online=MUL'} {'表名': 'cs_batch_dispose_task', '字段名': 'taskStatus', '允许Null': 'F_online=NO \nT_online=YES', '注释': 'F_online=任务状态0-处理中,1-成功,2-失败,3-已经删除 \nT_online=任务状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功'} ... 执行导出结果完成
第二版脚本2:找出差异点,并给出 To 的修改建议
在脚本1的基础上进行改进,增加条件控制,过滤出一些 From 和 To 比较之后,To 不需要修改的一些内容,实现起来也较简单,有兴趣的可以通过开关来测试一下导出的结果
忽略字段类型一致 To 长度大于 From 的数据(类型为 'tinyint', 'varchar', 'bigint', 'int',其他类型excel人为判断)
忽略 To 字段类型大于 From 字段类型(类型为'tinyint', 'varchar', 'bigint', 'int','decimal','enum')
忽略 From 注释为空的字段
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import os import sys import re import pymysql from openpyxl import Workbook from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors wb_result = Workbook() sheet = wb_result.active def result_excel_init(): highlight = NamedStyle(name="highlight") highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True) highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) highlight.fill = PatternFill("solid", fgColor="ACB9C9") sheet.append(['表名', '{} 多字段'.format(s_db), '{} 多字段'.format(t_db), '字段名', '允许 Null 对比', '默认值对比', '数据类型对比', '长度对比', '索引类型对比', '注释对比', '{} 修改命令'.format(t_db), '脚本类型']) sheet.row_dimensions[1].height = 25 sheet.column_dimensions['A'].width = 38 sheet.column_dimensions['B'].width = 40 sheet.column_dimensions['C'].width = 50 sheet.column_dimensions['D'].width = 25 sheet.column_dimensions['E'].width = 20 sheet.column_dimensions['F'].width = 25 sheet.column_dimensions['G'].width = 30 sheet.column_dimensions['H'].width = 65 sheet.column_dimensions['I'].width = 20 sheet.column_dimensions['J'].width = 150 sheet.column_dimensions['K'].width = 200 sheet.column_dimensions['L'].width = 10 for cell in list(sheet.rows)[0]: cell.style = highlight def result_excel_end(): style = NamedStyle(name="style") v_center = Alignment(vertical='center', wrap_text=True) hv_list = [5] for i, row in enumerate(list(sheet.rows)): if i == 0: continue for cell in row: if cell.column in hv_list: pass else: style.alignment = v_center cell.style = style def export_excel_file(data_list, file_name): print('\n--- 对比结束,导出 Excel 文件 ---\n') result_excel_init() for data in data_list: print(data) sheet.append([data.get('表名'), data.get('{} 多字段'.format(s_db)), data.get('{} 多字段'.format(t_db)), data.get('字段名'), data.get('允许Null'), data.get('默认值'), data.get('数据类型'), data.get('类型长度'), data.get('索引类型'), data.get('注释'), data.get('{} 修改命令'.format(t_db)), data.get('脚本类型')]) result_excel_end() wb_result.save(file_name) print('\n执行导出结果完成 {}\n'.format(file_name)) def s_exec_sql(sql, db_name): db = pymysql.connect(host='Ip1', port=3306, user='用户1', password='密码2', database=db_name) cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() cursor.close() db.close() if results: return results def t_exec_sql(sql, db_name): db = pymysql.connect(host='Ip2', port=3306, user='用户1', password='密码2', database=db_name) cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() cursor.close() db.close() if results: return results def ignore_check_type(s_type, t_type): """ 忽略原始对比库的小类型数据 """ if s_type == 'tinyint': return True elif s_type == 'int' and t_type in ['bigint', 'decimal']: return True elif s_type == 'varchar' and t_type in ['text']: return True return False def alter_add_sql(db, tb_name, col_name, data_type, is_null, def_value, comment=''): # 'ALTER TABLE 表名 ADD COLUMN 列名 数据类型 允许为空 DEFAULT(默认) COMMENT(注释) AFTER(位置)' if 'bit' not in data_type: d_v = "DEFAULT '{}'".format(def_value) if def_value else '' else: d_v = "DEFAULT {}".format(def_value) if def_value else '' allow_null = '' if is_null == 'YES' else 'NOT NULL' cmd = "ALTER TABLE `{}`.`{}`\n ADD COLUMN `{}` {} {} {} {};".format(db, tb_name, col_name, data_type, allow_null, d_v, "COMMENT '{}'".format(comment) if comment else '') return cmd.replace("\r\n", ' ').replace(' ', ' ').replace(' ', ' ').replace(' ;', ';') def alter_modify_sql(db, tb_name, col_name, data_type, is_null, def_value, comment=''): if 'bit' not in data_type: d_v = "DEFAULT '{}'".format(def_value) if def_value else '' else: d_v = "DEFAULT {}".format(def_value) if def_value else '' allow_null = '' if is_null == 'YES' else 'NOT NULL' cmd = "ALTER TABLE `{}`.`{}`\n MODIFY COLUMN `{}` {} {} {} {};".format(db, tb_name, col_name, data_type, allow_null, d_v, "COMMENT '{}'".format(comment) if comment else '') return cmd.replace("\r\n", ' ').replace(' ', ' ').replace(' ', ' ').replace(' ;', ';') def get_bracket_str(s): """获取小括号内容""" return re.findall(r"[(](.*?)[)]", s) if __name__ == '__main__': s_db = 'From' t_db = 'To' # 函数、存储过程、触发器、表结构 function_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'FUNCTION'" procedure_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'PROCEDURE'" view_sql = "select table_name from information_schema.VIEWS where table_schema = '{}'" trigger_sql = "show triggers from {}" schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '{}' and table_schema = '{}' " ignore_left_no_comment = True # 忽略左边注释为空的 ignore_left_col_len_lower = True # 忽略左边相同数据类型下,长度较小的 ignore_left_col_type_lower = True # 忽略左边同类型的数据类型小于左边的 ignore_left_no_idx = True # 忽略左边字段无索引 ignore_table = ['cp_view'] # 忽略将要比较的表结构 print('\n---------- 触发器对比 ----------') s_trigger_result = s_exec_sql(trigger_sql.format(s_db), s_db) t_trigger_result = t_exec_sql(trigger_sql.format(t_db), t_db) print('{} 触发器 {} 个 {}'.format(s_db, len(s_trigger_result) if s_trigger_result else 0, s_trigger_result if s_trigger_result else '')) print('{} 触发器 {} 个 {}'.format(t_db, len(t_trigger_result) if t_trigger_result else 0, t_trigger_result if t_trigger_result else '')) print('\n---------- 存储过程对比 ----------') s_procedure_result = s_exec_sql(procedure_sql.format(s_db), s_db) t_procedure_result = t_exec_sql(procedure_sql.format(t_db), t_db) print('{} 存储过程 {} 个 {}'.format(s_db, len(s_procedure_result) if s_procedure_result else 0, s_procedure_result if s_procedure_result else '')) print('{} 存储过程 {} 个 {}'.format(t_db, len(t_procedure_result) if t_procedure_result else 0, t_procedure_result if t_procedure_result else '')) print('\n---------- 函数对比 ----------') s_function_result = s_exec_sql(function_sql.format(s_db), s_db) t_function_result = t_exec_sql(function_sql.format(t_db), t_db) print('{} 函数 {} 个 {}'.format(s_db, len(s_function_result) if s_function_result else 0, [v for (v,) in s_function_result] if s_function_result else '')) print('{} 函数 {} 个 {}'.format(t_db, len(t_function_result) if t_function_result else 0, [v for (v,) in t_function_result] if t_function_result else '')) print('\n---------- 视图对比 ----------') s_view_result = s_exec_sql(view_sql.format(s_db), s_db) t_view_result = t_exec_sql(view_sql.format(t_db), t_db) print('{} 视图 {} 个 {}'.format(s_db, len(s_view_result) if s_view_result else 0, [v for (v,) in s_view_result] if s_view_result else '')) print('{} 视图 {} 个 {}'.format(t_db, len(t_view_result) if t_view_result else 0, [v for (v,) in t_view_result] if t_view_result else '')) print('\n---------- 表结构对比 ----------') source_tables_result = s_exec_sql('SHOW TABLES from {}'.format(s_db), s_db) target_tables_result = t_exec_sql('SHOW TABLES from {}'.format(t_db), t_db) print('{} 表 {} 个'.format(s_db, len(source_tables_result) if source_tables_result else 0)) print('{} 表 {} 个'.format(t_db, len(target_tables_result) if target_tables_result else 0)) source_tables = set([tb for (tb,) in source_tables_result]) target_tables = set([tb for (tb,) in target_tables_result]) intersection_tables = set(source_tables) & set(target_tables) s_dif_tables = set(source_tables) - set(target_tables) t_dif_tables = set(target_tables) - set(source_tables) print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables))) print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables)) print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables)) comp_data_list = list() for i, table in enumerate(sorted(intersection_tables)): if table and table not in ignore_table: s_rows = s_exec_sql(schema_sql.format(table, s_db), s_db) t_rows = t_exec_sql(schema_sql.format(table, t_db), t_db) if s_rows == t_rows: continue # 字段名为key,整行数据为value s_dic_data, t_dic_data = {}, {} for d in s_rows: s_dic_data[str(d[0])] = d for d in t_rows: t_dic_data[str(d[0])] = d # print('\n------ table: {} ------'.format(table)) s_column_names = sorted(s_dic_data.keys()) t_column_names = sorted(t_dic_data.keys()) if s_column_names != t_column_names: dif_columns = set(s_column_names) - set(t_column_names) if dif_columns: print('`{}`.`{}` 多字段 {}'.format(s_db, table, dif_columns)) else: t_dif = set(t_column_names) - set(s_column_names) comp_data_list.append({ "表名": table, "{} 多字段".format(t_db): ','.join(t_dif) }) for k, v in s_dic_data.items(): bv = t_dic_data.get(k) # 两边有值但不相等 if bv: if bv != v: comp_result = { "表名": table, "字段名": k, "脚本类型": 'MODIFY' } comp_dict = {} idx_dict = {} s_is_null, t_is_null = v[1], bv[1] s_data_type, t_data_type = v[2], bv[2] s_def_value, t_def_value = v[3], bv[3] s_data_len, t_data_len = v[4], bv[4] s_idx_type, t_idx_type = v[5], bv[5] s_commend, t_commend = v[6], bv[6] # 要修改的值 c_is_null, c_def_value, c_data_len, c_commend = '', '', t_data_len, '' if s_is_null != t_is_null: comp_dict['允许Null'] = '{}={} \n{}={}'.format(s_db, s_is_null, t_db, t_is_null) c_is_null = s_is_null if s_data_type != t_data_type: # 忽略小类型 if not ignore_left_col_type_lower: comp_dict['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type) else: if not ignore_check_type(s_data_type, t_data_type): comp_dict['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type) if s_def_value != t_def_value: if s_def_value and t_def_value: comp_dict['默认值'] = '{}={} \n{}={}'.format(s_db, s_def_value, t_db, t_def_value) c_def_value = s_def_value if s_data_len != t_data_len: # 忽略小长度 if not ignore_left_col_len_lower: # 忽略小类型 if not ignore_left_col_type_lower: comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len else: if not ignore_check_type(s_data_type, t_data_type): comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len else: # 相同数据类型比较两的长度 if s_data_type != t_data_type: if not ignore_left_col_type_lower: comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len else: if not ignore_check_type(s_data_type, t_data_type): comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len else: if s_data_type in ['tinyint', 'varchar', 'bigint', 'int']: s_len = int(s_data_len.split('(')[1].split(')')[0]) t_len = int(t_data_len.split('(')[1].split(')')[0]) if s_len > t_len: comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len elif s_data_type == 'decimal': s_int_part = get_bracket_str(s_data_len)[0].split(',')[0] t_int_part = get_bracket_str(t_data_len)[0].split(',')[0] s_dec_part = get_bracket_str(s_data_len)[0].split(',')[1] t_dec_part = get_bracket_str(t_data_len)[0].split(',')[1] if int(s_int_part) > int(t_int_part): comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len elif int(s_dec_part) > int(t_dec_part): comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len elif s_data_type == 'enum': s_enum = set(get_bracket_str(s_data_len)[0].split(',')) v_enum = set(get_bracket_str(t_data_len)[0].split(',')) if s_enum - v_enum: comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len else: comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len) c_data_len = s_data_len if s_idx_type != t_idx_type: if ignore_left_no_idx and not s_idx_type: pass else: idx_dict['索引类型'] = '{}={} \n{}={}'.format(s_db, s_idx_type, t_db, t_idx_type) if s_commend != t_commend: if ignore_left_no_comment and not s_commend: pass else: comp_dict['注释'] = '{}={} \n{}={}'.format(s_db, str(s_commend).replace("\r\n", ' '), t_db, str(t_commend).replace("\r\n", ' ')) c_commend = s_commend else: c_commend = s_commend if comp_dict: if idx_dict: comp_dict.update(idx_dict) comp_dict['{} 修改命令'.format(t_db)] = alter_modify_sql(t_db, table, k, c_data_len, c_is_null, c_def_value, c_commend) comp_result.update(comp_dict) comp_data_list.append(comp_result) else: print('`{}`.`{}` 多字段 {}={}'.format(s_db, table, k, v)) c_is_null, c_def_value, c_data_len, c_commend = v[1], v[3], v[4], v[6] comp_result = { '表名': table, '{} 多字段'.format(s_db): k, '{} 修改命令'.format(t_db): alter_add_sql(t_db, table, k, c_data_len, c_is_null, c_def_value, c_commend), "脚本类型": 'ADD' } comp_data_list.append(comp_result) # 导出 Excel 文件 export_excel_file(comp_data_list, '{}->{}-表结构对比.xlsx'.format(s_db, t_db))
说明:对于两边数据库一次修改拉平,有两种方案
自定义实现将脚本2进行二次修改
现将 From 和 To 进行比较,将推荐修改的结果先在 To 的一方库执行,然后再与 To 和 From 两个库比较,再修改 From 一方库即可拉平
以上结果,是不是更为直观呢?
最后说一点,凡事都要经过测试,可以将线上的数据库结构经过测试环境验证之后,再做修改。
未经允许请勿转载:程序喵 » Python3 两个数据库触发器、存储过程、函数、视图、表结构、索引对比验证