背景
早期有一个saas服务的数据库,各租户的表结构内容完全相同,由于某大客户租户的业务较广,从集中管理的租户中独立出来做私有化的业务(gitlab代码仓库、db 数据库、服务器来单独部署)。
几年间,私有化的租户db和其他租户db管理就有了较大差异化。
由于业务变更,私有化的管理方式要切换回集中租户管理方式,所以要评估出将私有化的业务和db表结构之前差异点。
问题
这里跳过其他业务内容对比,说一下db之间怎么整理出差异点,然后由差异点来生成 alter 修改表结构的语句,判断哪些需要调整修改,哪些可以直接跳过来处理。
实现方案
常见的方案有很多,在 RDS 云服务上一般都会附有比较工具,另外可以找一下开源的对比工具,把各自的数据源和要对比的内容配置上即可。
只不过常规的对比工具结果,可能就不是那么的直观来表达出差异点,还需要人工二次主观意识的判断验证。如果表的数量和字段数量很少的情况下,一个个表人工比对结果还可以,那如果是上千张表呢?如果有多套环境需要验证多次呢?
需要遇到的场景有如下
From 表多字段、From 多表
To 表多字段、To 多表
同表,From 多索引
同表,To 多索引
同表,索引内容一致,索引名不同
同表同字段,类型相同,长度不同,注释相同
同表同字段,类型相同,长度不同,注释不同
同表同字段,类型相同,长度相同,注释不同
同表同字段,类型不同,长度相同,注释相同
同表同字段,类型不同,长度不同,注释不同
同表同字段,类型不同,长度不同,注释相同
同表同字段,类型相同,长度相同,注释相同,字段在表里的上下位置不同
...
如下通过比较工具,两个数据库 From_online 和 To_online 对比结果,给出两边的修改建议
场景1:多字段
场景1:多字段 # 左表 From 和右表 To 比较出现异常 # Comparing `F_online`.`cs_tag` to `T_online`.`cs_tag` [FAIL] # Transformation for --changes-for=server2: # # 右表 To 建议修改语句 ALTER TABLE `T_online`.`cs_tag` ADD COLUMN remark varchar(100) NULL COMMENT '备注' AFTER type; # # Transformation for reverse changes (--changes-for=server1): # # 右表 From 建议修改语句 # ALTER TABLE `F_online`.`cs_tag` # DROP COLUMN remark; #
很明显,这里是 From 数据库表,多一个 remark 字段,而 To 表建议添加,所以主观分析,只需要修改 To 表一方即可。
场景2:字段名大小写不同
# 场景2:字段大小写不同 # Comparing `F_online`.`cs_rule` to `T_online`.`cs_rule` [FAIL] # Transformation for --changes-for=server2: # ALTER TABLE `T_online`.`cs_rule` DROP COLUMN SpecialDayTime, ADD COLUMN specialDayTime datetime NULL COMMENT '特殊日子' AFTER conditionValue; # # Transformation for reverse changes (--changes-for=server1): # # ALTER TABLE `F_online`.`cs_rule` # DROP COLUMN specialDayTime, # ADD COLUMN SpecialDayTime datetime NULL COMMENT '特殊日子' AFTER conditionValue; #
这个也很简单,只是字段的大小写不同而已,mysql 大小写不敏感,可选择性的修改。
场景3:类型相同,值不同,索引不同,注释不同
如下场景3,加个难度
# 场景3:类型相同,枚举值不同,索引数量不同,索引名大小写不同,注释不同
# Comparing `F_online`.`cs_order` to `T_online`.`cs_order` [FAIL]
# Transformation for --changes-for=server2:
#
ALTER TABLE `T_online`.`cs_order`
DROP INDEX IDX_CARPARK_CON,
DROP INDEX IDX_CARPARK_ORDERNUMBER,
ADD INDEX idx_CARPARK_ORDERNUMBER (orderNumber),
CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MOUTH','DAY','WEEK') NOT NULL COMMENT '周期 YEAR-年 MONTH-月 DAY-日 WEEK-周';
#
# Transformation for reverse changes (--changes-for=server1):
#
# ALTER TABLE `F_online`.`cs_order`
# ADD INDEX IDX_CARPARK_CON (carparkOrderNumber),
# DROP INDEX idx_CARPARK_ORDERNUMBER,
# ADD INDEX IDX_CARPARK_ORDERNUMBER (orderNumber),
# CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日';
#分析
原本 F_online 表,有索引 idx_CARPARK_ORDERNUMBER,表字段 limitPeriod 注数据类型 enum('YEAR','MOUTH','DAY','WEEK'),
原本 T_online 表,有索引 IDX_CARPARK_ORDERNUMBER、IDX_CARPARK_CON,表字段 limitPeriod 注数据类型 enum('YEAR','MONTH','DAY')
分析结果
有索引相同,大小写命名不同而已,可以不用修改
有索引类型相同,索引的内容不同,甚至还有错别字的情况
需要做的是两边数据库,首先保证数据类型能够全覆盖,修改业务把错别字情况改正,两边做修改调整
ALTER TABLE `F_online`.`cs_order`
ADD INDEX IDX_CARPARK_CON (carparkOrderNumber),
CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY', 'WEEK') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日 WEEK-周';
ALTER TABLE `T_online`.`cs_order`
ADD INDEX IDX_CARPARK_CON (carparkOrderNumber),
CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY', 'WEEK') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日 WEEK-周';场景4:类型不同,是否为空不同,默认值不同,注释不同
场景4:类型不同,允许为空不同,默认值不同,注释不同 # Comparing `F_online`.`cs_task` to `T_online`.`cs_task` [FAIL] # Transformation for --changes-for=server2: # ALTER TABLE `T_online`.`cs_task` CHANGE COLUMN taskStatus taskStatus tinyint(5) NULL DEFAULT 1 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功'; # # Transformation for reverse changes (--changes-for=server1): # # ALTER TABLE `F_online`.`cs_task` # CHANGE COLUMN taskStatus taskStatus int(4) NOT NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除'; #
分析
原本 F_online.cs_task 表,字段 taskStatus 注数据类型是 tinyint(5),允许为Null,默认值为 1,注释是‘状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功’
原本 T_online.cs_task 表,字段 taskStatus 注数据类型是 int(4),不允许为Null,默认值为 0,注释是‘状态0-处理中,1-成功,2-失败,3-已经删除’
需要做的是两边数据库,首先保证数据类型和长度是向上兼容的,注释保留最全的一部分,默认值需要根据业务场景来具体分析,之后再把建议修改的sql,两边做修改调整
分析结果:两边都做修改来把数据库齐平 ALTER TABLE `F_online`.`cs_task` CHANGE COLUMN taskStatus taskStatus int(4) NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功'; ALTER TABLE `T_online`.`cs_task` CHANGE COLUMN taskStatus taskStatus int(4) NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功';
场景5:各种混合不同
...
由此可以看出,加入你有多个环境需要比对,多个数据库,多个表需要这么一点点的人工比对,工作量是有多大的?如果你是幸运儿的话,表的结构差异性很少还行吧。
自定义实现方案
常言道工具提升生产力,是效率必不可少的关键一环。
思考一下 MySql 本身的 schema 表中,已经完全包含了所有数据库,所有的表信息,表字段信息,索引信息,函数信息,存储过程信息。
function_sql = "select `name` from mysql.proc where db = '库' and `type` = 'FUNCTION'" procedure_sql = "select `name` from mysql.proc where db = '库' and `type` = 'PROCEDURE'" view_sql = "select table_name from information_schema.VIEWS where table_schema = '表'" trigger_sql = "show triggers from 库" schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '表' and table_schema = '库' "
那既然如此,搞一个脚本来查询两边数据库的结果,然后做统计分析,再给出两边的 SQL 修改建议,然后导出Excel文件,很直观方式查看差异点,岂不是很快哉?
第一版脚本1:找出差异点
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os
import sys
import pymysql
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors
wb_result = Workbook()
sheet = wb_result.active
def result_excel_init():
"""
初始化结果excel样式
"""
# 样式
highlight = NamedStyle(name="highlight")
highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
highlight.fill = PatternFill("solid", fgColor="ACB9C9")
sheet.append(['表名', '{} 多字段'.format(s_db), '{} 多字段'.format(t_db), '字段名', '允许 Null 对比', '默认值对比', '数据类型对比', '长度对比', '索引类型对比', '注释对比'])
sheet.row_dimensions[1].height = 25
sheet.column_dimensions['A'].width = 38
sheet.column_dimensions['B'].width = 40
sheet.column_dimensions['C'].width = 40
sheet.column_dimensions['D'].width = 20
sheet.column_dimensions['E'].width = 20
sheet.column_dimensions['F'].width = 25
sheet.column_dimensions['G'].width = 30
sheet.column_dimensions['H'].width = 60
sheet.column_dimensions['I'].width = 20
sheet.column_dimensions['J'].width = 200
for cell in list(sheet.rows)[0]:
cell.style = highlight
def result_excel_end():
style = NamedStyle(name="style")
v_center = Alignment(vertical='center', wrap_text=True)
hv_list = [1]
for i, row in enumerate(list(sheet.rows)):
if i == 0:
continue
for cell in row:
if cell.column in hv_list:
pass
else:
style.alignment = v_center
cell.style = style
def export_excel_file(data_list, file_name):
print('\n--- 对比结束,导出 Excel 文件 ---\n')
result_excel_init()
for data in data_list:
print(data)
sheet.append([data.get('表名'), data.get('{} 多字段'.format(s_db)), data.get('{} 多字段'.format(t_db)), data.get('字段名'), data.get('允许Null'),
data.get('默认值'), data.get('数据类型'), data.get('类型长度'), data.get('索引类型'), data.get('注释')])
result_excel_end()
wb_result.save(file_name)
print('\n执行导出结果完成\n')
def s_exec_sql(sql, db_name):
db = pymysql.connect(host='ip1', port=3306, user='用户1', password='密码1', database=db_name)
cursor = db.cursor()
cursor.execute(sql)
results = cursor.fetchall()
if results:
return results
def t_exec_sql(sql, db_name):
db = pymysql.connect(host='ip2', port=3306, user='用户2', password='密码2', database=db_name)
cursor = db.cursor()
cursor.execute(sql)
results = cursor.fetchall()
if results:
return results
if __name__ == '__main__':
s_db = 'F_online'
t_db = 'T_online'
# 函数、存储过程、触发器、表结构
function_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'FUNCTION'"
procedure_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'PROCEDURE'"
view_sql = "select table_name from information_schema.VIEWS where table_schema = '{}'"
trigger_sql = "show triggers from {}"
schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '{}' and table_schema = '{}' "
print('\n---------- 触发器对比 ----------')
s_trigger_result = s_exec_sql(trigger_sql.format(s_db), s_db)
t_trigger_result = t_exec_sql(trigger_sql.format(t_db), t_db)
print('{} 触发器 {} 个 {}'.format(s_db, len(s_trigger_result) if s_trigger_result else 0, s_trigger_result if s_trigger_result else ''))
print('{} 触发器 {} 个 {}'.format(t_db, len(t_trigger_result) if t_trigger_result else 0, t_trigger_result if t_trigger_result else ''))
print('\n---------- 存储过程对比 ----------')
s_procedure_result = s_exec_sql(procedure_sql.format(s_db), s_db)
t_procedure_result = t_exec_sql(procedure_sql.format(t_db), t_db)
print('{} 存储过程 {} 个 {}'.format(s_db, len(s_procedure_result) if s_procedure_result else 0, s_procedure_result if s_procedure_result else ''))
print('{} 存储过程 {} 个 {}'.format(t_db, len(t_procedure_result) if t_procedure_result else 0, t_procedure_result if t_procedure_result else ''))
print('\n---------- 函数对比 ----------')
s_function_result = s_exec_sql(function_sql.format(s_db), s_db)
t_function_result = t_exec_sql(function_sql.format(t_db), t_db)
print('{} 函数 {} 个 {}'.format(s_db, len(s_function_result) if s_function_result else 0, [v for (v,) in s_function_result] if s_function_result else ''))
print('{} 函数 {} 个 {}'.format(t_db, len(t_function_result) if t_function_result else 0, [v for (v,) in t_function_result] if t_function_result else ''))
print('\n---------- 视图对比 ----------')
s_view_result = s_exec_sql(view_sql.format(s_db), s_db)
t_view_result = t_exec_sql(view_sql.format(t_db), t_db)
print('{} 视图 {} 个 {}'.format(s_db, len(s_view_result) if s_view_result else 0, [v for (v,) in s_view_result] if s_view_result else ''))
print('{} 视图 {} 个 {}'.format(t_db, len(t_view_result) if t_view_result else 0, [v for (v,) in t_view_result] if t_view_result else ''))
print('\n---------- 表结构对比 ----------')
source_tables_result = s_exec_sql('SHOW TABLES from {}'.format(s_db), s_db)
target_tables_result = t_exec_sql('SHOW TABLES from {}'.format(t_db), t_db)
print('{} 表 {} 个'.format(s_db, len(source_tables_result) if source_tables_result else 0))
print('{} 表 {} 个'.format(t_db, len(target_tables_result) if target_tables_result else 0))
source_tables = set([tb for (tb,) in source_tables_result])
target_tables = set([tb for (tb,) in target_tables_result])
intersection_tables = set(source_tables) & set(target_tables)
s_dif_tables = set(source_tables) - set(target_tables)
t_dif_tables = set(target_tables) - set(source_tables)
print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables)))
print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables))
print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables))
comp_data_list = list()
for i, table in enumerate(sorted(intersection_tables)):
if table:
# 查询 schema
s_idx_result = s_exec_sql(schema_sql.format(table, s_db), s_db)
t_idx_result = t_exec_sql(schema_sql.format(table, t_db), t_db)
if s_idx_result == t_idx_result:
# print(table, '完全一致')
continue
s_dic_data = {}
t_dic_data = {}
for d in s_idx_result:
s_dic_data[d[0]] = d
for d in t_idx_result:
t_dic_data[d[0]] = d
print('\n------ table: {} ------'.format(table))
s_column_names = sorted(s_dic_data.keys())
t_column_names = sorted(t_dic_data.keys())
if s_column_names != t_column_names:
dif_columns = set(s_column_names) - set(t_column_names)
if dif_columns:
print('`{}`.`{}` 多字段 {}'.format(s_db, table, dif_columns))
comp_data_list.append({
"表名": table,
"{} 多字段".format(s_db): ','.join(dif_columns)
})
else:
t_dif = set(t_column_names) - set(s_column_names)
print('`{}`.`{}` 多字段 {}'.format(t_db, table, t_dif))
comp_data_list.append({
"表名": table,
"{} 多字段".format(t_db): ','.join(t_dif)
})
for k, v in s_dic_data.items():
bv = t_dic_data.get(k)
# 两边有值但不相等
if bv and bv != v:
s_is_null, t_is_null = v[1], bv[1]
s_data_type, t_data_type = v[2], bv[2]
s_def_value, t_def_value = v[3], bv[3]
s_data_len, t_data_len = v[4], bv[4]
s_idx_type, t_idx_type = v[5], bv[5]
s_commend, t_commend = v[6], bv[6]
comp_result = {
"表名": table,
"字段名": k
}
if s_is_null != t_is_null:
comp_result['允许Null'] = '{}={} \n{}={}'.format(s_db, s_is_null, t_db, t_is_null)
if s_data_type != t_data_type:
comp_result['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type)
if s_def_value != t_def_value:
sv = s_def_value if s_def_value else ''
tv = t_def_value if t_def_value else ''
if sv != tv:
comp_result['默认值'] = '{}={} \n{}={}'.format(s_db, sv, t_db, tv)
if s_data_len != t_data_len:
comp_result['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
if s_idx_type != t_idx_type:
comp_result['索引类型'] = '{}={} \n{}={}'.format(s_db, s_idx_type, t_db, t_idx_type)
if s_commend != t_commend:
com_len = len(str(s_commend).replace("\r\n", ' '))
comp_result['注释'] = '{}={}{} \n{}={}'.format(s_db, str(s_commend).replace("\r\n", ' '), '\n' if com_len > 150 else '', t_db, str(t_commend).replace("\r\n", ' '))
comp_data_list.append(comp_result)
# 导出 Excel 文件
export_excel_file(comp_data_list, '{}->{}-表结构对比2.xlsx'.format(s_db, t_db))执行日志
---------- 触发器对比 ----------
F_online 触发器 0 个
T_online 触发器 0 个
---------- 存储过程对比 ----------
F_online 存储过程 0 个
T_online 存储过程 0 个
---------- 函数对比 ----------
F_online 函数 4 个 ['currval', 'func_checkVersion', 'nextval', 'setval']
T_online 函数 7 个 ['currval', 'func_checkVersion', 'getCategoryName', 'getRegionName', 'nextval', 'region', 'setval']
---------- 视图对比 ----------
F_online 视图 9 个 ['cp_consume_verification_view', 'cp_registration_view', ...]
T_online 视图 9 个 ['cp_consume_verification_view', 'cp_registration_view', ...]
---------- 表结构对比 ----------
F_online 表 1163 个
T_online 表 1145 个
F_online & T_online 共有表 1134 个
T_online 缺失表 29 个 {'cs_pm_service_channel_config', 'cs_delivery_time_rule_period', ...}
F_online 缺失表 11 个 {'im_ps_member', 'cs_deliveryregion_regions_bak', 'cs_goods_bak', ...}
------ table: cs_am_reward_record_detail ------
------ table: cs_cp_definition ------
`F_online`.`cs_cp_definition` 多字段 {'target', 'guideProvide'}
------ table: ... ------
--- 对比结束,导出 Excel 文件 ---
{'表名': 'cs_am_reward_record_detail', '字段名': 'nickName', '允许Null': 'F_online=YES \nT_online=NO', '默认值': 'F_online= \nT_online='}
{'表名': 'cs_am_share_record', '字段名': 'code', '索引类型': 'F_online= \nT_online=MUL'}
{'表名': 'cs_batch_dispose_task', '字段名': 'taskStatus', '允许Null': 'F_online=NO \nT_online=YES', '注释': 'F_online=任务状态0-处理中,1-成功,2-失败,3-已经删除 \nT_online=任务状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功'}
...
执行导出结果完成第二版脚本2:找出差异点,并给出 To 的修改建议
在脚本1的基础上进行改进,增加条件控制,过滤出一些 From 和 To 比较之后,To 不需要修改的一些内容,实现起来也较简单,有兴趣的可以通过开关来测试一下导出的结果
忽略字段类型一致 To 长度大于 From 的数据(类型为 'tinyint', 'varchar', 'bigint', 'int',其他类型excel人为判断)
忽略 To 字段类型大于 From 字段类型(类型为'tinyint', 'varchar', 'bigint', 'int','decimal','enum')
忽略 From 注释为空的字段
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import os
import sys
import re
import pymysql
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors
wb_result = Workbook()
sheet = wb_result.active
def result_excel_init():
highlight = NamedStyle(name="highlight")
highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
highlight.fill = PatternFill("solid", fgColor="ACB9C9")
sheet.append(['表名', '{} 多字段'.format(s_db), '{} 多字段'.format(t_db), '字段名', '允许 Null 对比', '默认值对比', '数据类型对比', '长度对比', '索引类型对比', '注释对比', '{} 修改命令'.format(t_db), '脚本类型'])
sheet.row_dimensions[1].height = 25
sheet.column_dimensions['A'].width = 38
sheet.column_dimensions['B'].width = 40
sheet.column_dimensions['C'].width = 50
sheet.column_dimensions['D'].width = 25
sheet.column_dimensions['E'].width = 20
sheet.column_dimensions['F'].width = 25
sheet.column_dimensions['G'].width = 30
sheet.column_dimensions['H'].width = 65
sheet.column_dimensions['I'].width = 20
sheet.column_dimensions['J'].width = 150
sheet.column_dimensions['K'].width = 200
sheet.column_dimensions['L'].width = 10
for cell in list(sheet.rows)[0]:
cell.style = highlight
def result_excel_end():
style = NamedStyle(name="style")
v_center = Alignment(vertical='center', wrap_text=True)
hv_list = [5]
for i, row in enumerate(list(sheet.rows)):
if i == 0:
continue
for cell in row:
if cell.column in hv_list:
pass
else:
style.alignment = v_center
cell.style = style
def export_excel_file(data_list, file_name):
print('\n--- 对比结束,导出 Excel 文件 ---\n')
result_excel_init()
for data in data_list:
print(data)
sheet.append([data.get('表名'), data.get('{} 多字段'.format(s_db)), data.get('{} 多字段'.format(t_db)), data.get('字段名'), data.get('允许Null'), data.get('默认值'),
data.get('数据类型'), data.get('类型长度'), data.get('索引类型'), data.get('注释'), data.get('{} 修改命令'.format(t_db)), data.get('脚本类型')])
result_excel_end()
wb_result.save(file_name)
print('\n执行导出结果完成 {}\n'.format(file_name))
def s_exec_sql(sql, db_name):
db = pymysql.connect(host='Ip1', port=3306, user='用户1', password='密码2', database=db_name)
cursor = db.cursor()
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
db.close()
if results:
return results
def t_exec_sql(sql, db_name):
db = pymysql.connect(host='Ip2', port=3306, user='用户1', password='密码2', database=db_name)
cursor = db.cursor()
cursor.execute(sql)
results = cursor.fetchall()
cursor.close()
db.close()
if results:
return results
def ignore_check_type(s_type, t_type):
""" 忽略原始对比库的小类型数据 """
if s_type == 'tinyint':
return True
elif s_type == 'int' and t_type in ['bigint', 'decimal']:
return True
elif s_type == 'varchar' and t_type in ['text']:
return True
return False
def alter_add_sql(db, tb_name, col_name, data_type, is_null, def_value, comment=''):
# 'ALTER TABLE 表名 ADD COLUMN 列名 数据类型 允许为空 DEFAULT(默认) COMMENT(注释) AFTER(位置)'
if 'bit' not in data_type:
d_v = "DEFAULT '{}'".format(def_value) if def_value else ''
else:
d_v = "DEFAULT {}".format(def_value) if def_value else ''
allow_null = '' if is_null == 'YES' else 'NOT NULL'
cmd = "ALTER TABLE `{}`.`{}`\n ADD COLUMN `{}` {} {} {} {};".format(db, tb_name, col_name, data_type, allow_null, d_v, "COMMENT '{}'".format(comment) if comment else '')
return cmd.replace("\r\n", ' ').replace(' ', ' ').replace(' ', ' ').replace(' ;', ';')
def alter_modify_sql(db, tb_name, col_name, data_type, is_null, def_value, comment=''):
if 'bit' not in data_type:
d_v = "DEFAULT '{}'".format(def_value) if def_value else ''
else:
d_v = "DEFAULT {}".format(def_value) if def_value else ''
allow_null = '' if is_null == 'YES' else 'NOT NULL'
cmd = "ALTER TABLE `{}`.`{}`\n MODIFY COLUMN `{}` {} {} {} {};".format(db, tb_name, col_name, data_type, allow_null, d_v, "COMMENT '{}'".format(comment) if comment else '')
return cmd.replace("\r\n", ' ').replace(' ', ' ').replace(' ', ' ').replace(' ;', ';')
def get_bracket_str(s):
"""获取小括号内容"""
return re.findall(r"[(](.*?)[)]", s)
if __name__ == '__main__':
s_db = 'From'
t_db = 'To'
# 函数、存储过程、触发器、表结构
function_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'FUNCTION'"
procedure_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'PROCEDURE'"
view_sql = "select table_name from information_schema.VIEWS where table_schema = '{}'"
trigger_sql = "show triggers from {}"
schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '{}' and table_schema = '{}' "
ignore_left_no_comment = True # 忽略左边注释为空的
ignore_left_col_len_lower = True # 忽略左边相同数据类型下,长度较小的
ignore_left_col_type_lower = True # 忽略左边同类型的数据类型小于左边的
ignore_left_no_idx = True # 忽略左边字段无索引
ignore_table = ['cp_view'] # 忽略将要比较的表结构
print('\n---------- 触发器对比 ----------')
s_trigger_result = s_exec_sql(trigger_sql.format(s_db), s_db)
t_trigger_result = t_exec_sql(trigger_sql.format(t_db), t_db)
print('{} 触发器 {} 个 {}'.format(s_db, len(s_trigger_result) if s_trigger_result else 0, s_trigger_result if s_trigger_result else ''))
print('{} 触发器 {} 个 {}'.format(t_db, len(t_trigger_result) if t_trigger_result else 0, t_trigger_result if t_trigger_result else ''))
print('\n---------- 存储过程对比 ----------')
s_procedure_result = s_exec_sql(procedure_sql.format(s_db), s_db)
t_procedure_result = t_exec_sql(procedure_sql.format(t_db), t_db)
print('{} 存储过程 {} 个 {}'.format(s_db, len(s_procedure_result) if s_procedure_result else 0, s_procedure_result if s_procedure_result else ''))
print('{} 存储过程 {} 个 {}'.format(t_db, len(t_procedure_result) if t_procedure_result else 0, t_procedure_result if t_procedure_result else ''))
print('\n---------- 函数对比 ----------')
s_function_result = s_exec_sql(function_sql.format(s_db), s_db)
t_function_result = t_exec_sql(function_sql.format(t_db), t_db)
print('{} 函数 {} 个 {}'.format(s_db, len(s_function_result) if s_function_result else 0, [v for (v,) in s_function_result] if s_function_result else ''))
print('{} 函数 {} 个 {}'.format(t_db, len(t_function_result) if t_function_result else 0, [v for (v,) in t_function_result] if t_function_result else ''))
print('\n---------- 视图对比 ----------')
s_view_result = s_exec_sql(view_sql.format(s_db), s_db)
t_view_result = t_exec_sql(view_sql.format(t_db), t_db)
print('{} 视图 {} 个 {}'.format(s_db, len(s_view_result) if s_view_result else 0, [v for (v,) in s_view_result] if s_view_result else ''))
print('{} 视图 {} 个 {}'.format(t_db, len(t_view_result) if t_view_result else 0, [v for (v,) in t_view_result] if t_view_result else ''))
print('\n---------- 表结构对比 ----------')
source_tables_result = s_exec_sql('SHOW TABLES from {}'.format(s_db), s_db)
target_tables_result = t_exec_sql('SHOW TABLES from {}'.format(t_db), t_db)
print('{} 表 {} 个'.format(s_db, len(source_tables_result) if source_tables_result else 0))
print('{} 表 {} 个'.format(t_db, len(target_tables_result) if target_tables_result else 0))
source_tables = set([tb for (tb,) in source_tables_result])
target_tables = set([tb for (tb,) in target_tables_result])
intersection_tables = set(source_tables) & set(target_tables)
s_dif_tables = set(source_tables) - set(target_tables)
t_dif_tables = set(target_tables) - set(source_tables)
print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables)))
print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables))
print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables))
comp_data_list = list()
for i, table in enumerate(sorted(intersection_tables)):
if table and table not in ignore_table:
s_rows = s_exec_sql(schema_sql.format(table, s_db), s_db)
t_rows = t_exec_sql(schema_sql.format(table, t_db), t_db)
if s_rows == t_rows:
continue
# 字段名为key,整行数据为value
s_dic_data, t_dic_data = {}, {}
for d in s_rows:
s_dic_data[str(d[0])] = d
for d in t_rows:
t_dic_data[str(d[0])] = d
# print('\n------ table: {} ------'.format(table))
s_column_names = sorted(s_dic_data.keys())
t_column_names = sorted(t_dic_data.keys())
if s_column_names != t_column_names:
dif_columns = set(s_column_names) - set(t_column_names)
if dif_columns:
print('`{}`.`{}` 多字段 {}'.format(s_db, table, dif_columns))
else:
t_dif = set(t_column_names) - set(s_column_names)
comp_data_list.append({
"表名": table,
"{} 多字段".format(t_db): ','.join(t_dif)
})
for k, v in s_dic_data.items():
bv = t_dic_data.get(k)
# 两边有值但不相等
if bv:
if bv != v:
comp_result = {
"表名": table,
"字段名": k,
"脚本类型": 'MODIFY'
}
comp_dict = {}
idx_dict = {}
s_is_null, t_is_null = v[1], bv[1]
s_data_type, t_data_type = v[2], bv[2]
s_def_value, t_def_value = v[3], bv[3]
s_data_len, t_data_len = v[4], bv[4]
s_idx_type, t_idx_type = v[5], bv[5]
s_commend, t_commend = v[6], bv[6]
# 要修改的值
c_is_null, c_def_value, c_data_len, c_commend = '', '', t_data_len, ''
if s_is_null != t_is_null:
comp_dict['允许Null'] = '{}={} \n{}={}'.format(s_db, s_is_null, t_db, t_is_null)
c_is_null = s_is_null
if s_data_type != t_data_type:
# 忽略小类型
if not ignore_left_col_type_lower:
comp_dict['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type)
else:
if not ignore_check_type(s_data_type, t_data_type):
comp_dict['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type)
if s_def_value != t_def_value:
if s_def_value and t_def_value:
comp_dict['默认值'] = '{}={} \n{}={}'.format(s_db, s_def_value, t_db, t_def_value)
c_def_value = s_def_value
if s_data_len != t_data_len:
# 忽略小长度
if not ignore_left_col_len_lower:
# 忽略小类型
if not ignore_left_col_type_lower:
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
else:
if not ignore_check_type(s_data_type, t_data_type):
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
else:
# 相同数据类型比较两的长度
if s_data_type != t_data_type:
if not ignore_left_col_type_lower:
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
else:
if not ignore_check_type(s_data_type, t_data_type):
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
else:
if s_data_type in ['tinyint', 'varchar', 'bigint', 'int']:
s_len = int(s_data_len.split('(')[1].split(')')[0])
t_len = int(t_data_len.split('(')[1].split(')')[0])
if s_len > t_len:
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
elif s_data_type == 'decimal':
s_int_part = get_bracket_str(s_data_len)[0].split(',')[0]
t_int_part = get_bracket_str(t_data_len)[0].split(',')[0]
s_dec_part = get_bracket_str(s_data_len)[0].split(',')[1]
t_dec_part = get_bracket_str(t_data_len)[0].split(',')[1]
if int(s_int_part) > int(t_int_part):
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
elif int(s_dec_part) > int(t_dec_part):
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
elif s_data_type == 'enum':
s_enum = set(get_bracket_str(s_data_len)[0].split(','))
v_enum = set(get_bracket_str(t_data_len)[0].split(','))
if s_enum - v_enum:
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
else:
comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
c_data_len = s_data_len
if s_idx_type != t_idx_type:
if ignore_left_no_idx and not s_idx_type:
pass
else:
idx_dict['索引类型'] = '{}={} \n{}={}'.format(s_db, s_idx_type, t_db, t_idx_type)
if s_commend != t_commend:
if ignore_left_no_comment and not s_commend:
pass
else:
comp_dict['注释'] = '{}={} \n{}={}'.format(s_db, str(s_commend).replace("\r\n", ' '), t_db, str(t_commend).replace("\r\n", ' '))
c_commend = s_commend
else:
c_commend = s_commend
if comp_dict:
if idx_dict:
comp_dict.update(idx_dict)
comp_dict['{} 修改命令'.format(t_db)] = alter_modify_sql(t_db, table, k, c_data_len, c_is_null, c_def_value, c_commend)
comp_result.update(comp_dict)
comp_data_list.append(comp_result)
else:
print('`{}`.`{}` 多字段 {}={}'.format(s_db, table, k, v))
c_is_null, c_def_value, c_data_len, c_commend = v[1], v[3], v[4], v[6]
comp_result = {
'表名': table,
'{} 多字段'.format(s_db): k,
'{} 修改命令'.format(t_db): alter_add_sql(t_db, table, k, c_data_len, c_is_null, c_def_value, c_commend),
"脚本类型": 'ADD'
}
comp_data_list.append(comp_result)
# 导出 Excel 文件
export_excel_file(comp_data_list, '{}->{}-表结构对比.xlsx'.format(s_db, t_db))说明:对于两边数据库一次修改拉平,有两种方案
自定义实现将脚本2进行二次修改
现将 From 和 To 进行比较,将推荐修改的结果先在 To 的一方库执行,然后再与 To 和 From 两个库比较,再修改 From 一方库即可拉平
以上结果,是不是更为直观呢?
最后说一点,凡事都要经过测试,可以将线上的数据库结构经过测试环境验证之后,再做修改。
未经允许请勿转载:程序喵 » Python3 两个数据库触发器、存储过程、函数、视图、表结构、索引对比验证
程序喵