Python3 两个数据库触发器、存储过程、函数、视图、表结构、索引对比验证

背景

早期有一个saas服务的数据库,各租户的表结构内容完全相同,由于某大客户租户的业务较广,从集中管理的租户中独立出来做私有化的业务(gitlab代码仓库、db 数据库、服务器来单独部署)。

几年间,私有化的租户db和其他租户db管理就有了较大差异化。

由于业务变更,私有化的管理方式要切换回集中租户管理方式,所以要评估出将私有化的业务和db表结构之前差异点。

问题

这里跳过其他业务内容对比,说一下db之间怎么整理出差异点,然后由差异点来生成 alter 修改表结构的语句,判断哪些需要调整修改,哪些可以直接跳过来处理。

实现方案

常见的方案有很多,在 RDS 云服务上一般都会附有比较工具,另外可以找一下开源的对比工具,把各自的数据源和要对比的内容配置上即可。

只不过常规的对比工具结果,可能就不是那么的直观来表达出差异点,还需要人工二次主观意识的判断验证。如果表的数量和字段数量很少的情况下,一个个表人工比对结果还可以,那如果是上千张表呢如果有多套环境需要验证多次呢

需要遇到的场景有如下

  • From 表多字段、From 多表

  • To 表多字段、To 多表

  • 同表,From 多索引

  • 同表,To 多索引

  • 同表,索引内容一致,索引名不同

  • 同表同字段,类型相同,长度不同,注释相同

  • 同表同字段,类型相同,长度不同,注释不同

  • 同表同字段,类型相同,长度相同,注释不同

  • 同表同字段,类型不同,长度相同,注释相同

  • 同表同字段,类型不同,长度不同,注释不同

  • 同表同字段,类型不同,长度不同,注释相同

  • 同表同字段,类型相同,长度相同,注释相同,字段在表里的上下位置不同

  • ...

如下通过比较工具,两个数据库 From_onlineTo_online 对比结果,给出两边的修改建议

场景1:多字段

场景1:多字段

# 左表 From 和右表 To 比较出现异常
# Comparing `F_online`.`cs_tag` to `T_online`.`cs_tag`   [FAIL]
# Transformation for --changes-for=server2:
#

# 右表 To 建议修改语句
ALTER TABLE `T_online`.`cs_tag` 
    ADD COLUMN remark varchar(100) NULL COMMENT '备注' AFTER type;

#
# Transformation for reverse changes (--changes-for=server1):
#
# 右表 From 建议修改语句
# ALTER TABLE `F_online`.`cs_tag` 
#   DROP COLUMN remark;
#

很明显,这里是 From 数据库表,多一个 remark 字段,而 To 表建议添加,所以主观分析,只需要修改 To 表一方即可

场景2:字段名大小写不同

# 场景2:字段大小写不同

# Comparing `F_online`.`cs_rule` to `T_online`.`cs_rule`   [FAIL]
# Transformation for --changes-for=server2:
#

ALTER TABLE `T_online`.`cs_rule` 
DROP COLUMN SpecialDayTime,
ADD COLUMN specialDayTime datetime NULL COMMENT '特殊日子' AFTER conditionValue;

#
# Transformation for reverse changes (--changes-for=server1):
#
# ALTER TABLE `F_online`.`cs_rule` 
#   DROP COLUMN specialDayTime,
#   ADD COLUMN SpecialDayTime datetime NULL COMMENT '特殊日子' AFTER conditionValue;
#

这个也很简单,只是字段的大小写不同而已,mysql 大小写不敏感,可选择性的修改

场景3:类型相同,值不同,索引不同,注释不同

如下场景3,加个难度

# 场景3:类型相同,枚举值不同,索引数量不同,索引名大小写不同,注释不同

# Comparing `F_online`.`cs_order` to `T_online`.`cs_order`   [FAIL]
# Transformation for --changes-for=server2:
#

ALTER TABLE `T_online`.`cs_order` 
    DROP INDEX IDX_CARPARK_CON,
    DROP INDEX IDX_CARPARK_ORDERNUMBER,
    ADD INDEX idx_CARPARK_ORDERNUMBER (orderNumber),
    CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MOUTH','DAY','WEEK') NOT NULL COMMENT '周期 YEAR-年 MONTH-月 DAY-日 WEEK-周';

#
# Transformation for reverse changes (--changes-for=server1):
#
# ALTER TABLE `F_online`.`cs_order` 
#   ADD INDEX IDX_CARPARK_CON (carparkOrderNumber),
#   DROP INDEX idx_CARPARK_ORDERNUMBER,
#   ADD INDEX IDX_CARPARK_ORDERNUMBER (orderNumber),
#   CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日';
#

分析

  • 原本 F_online 表,有索引 idx_CARPARK_ORDERNUMBER,表字段  limitPeriod 注数据类型 enum('YEAR','MOUTH','DAY','WEEK')

  • 原本 T_online 表,有索引 IDX_CARPARK_ORDERNUMBERIDX_CARPARK_CON,表字段  limitPeriod 注数据类型 enum('YEAR','MONTH','DAY')

分析结果

  • 有索引相同,大小写命名不同而已,可以不用修改

  • 有索引类型相同,索引的内容不同,甚至还有错别字的情况

需要做的是两边数据库,首先保证数据类型能够全覆盖,修改业务把错别字情况改正,两边做修改调整

 ALTER TABLE `F_online`.`cs_order` 
   ADD INDEX IDX_CARPARK_CON (carparkOrderNumber),
   CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY', 'WEEK') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日 WEEK-周';

 ALTER TABLE `T_online`.`cs_order` 
   ADD INDEX IDX_CARPARK_CON (carparkOrderNumber),
   CHANGE COLUMN limitPeriod limitPeriod enum('YEAR','MONTH','DAY', 'WEEK') NOT NULL COMMENT '周期 YEAR-年 MOUTH-月 DAY-日 WEEK-周';

场景4:类型不同,是否为空不同,默认值不同,注释不同

场景4:类型不同,允许为空不同,默认值不同,注释不同

# Comparing `F_online`.`cs_task` to `T_online`.`cs_task`   [FAIL]
# Transformation for --changes-for=server2:
#

ALTER TABLE `T_online`.`cs_task` 
    CHANGE COLUMN taskStatus taskStatus tinyint(5) NULL DEFAULT 1 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功';

#
# Transformation for reverse changes (--changes-for=server1):
#
# ALTER TABLE `F_online`.`cs_task` 
#   CHANGE COLUMN taskStatus taskStatus int(4) NOT NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除';
#

分析

  • 原本 F_online.cs_task 表,字段  taskStatus 注数据类型是 tinyint(5)允许为Null,默认值为 1,注释是‘状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功

  • 原本 T_online.cs_task 表,字段  taskStatus 注数据类型是 int(4)不允许为Null,默认值为 0,注释是‘状态0-处理中,1-成功,2-失败,3-已经删除’ 

需要做的是两边数据库,首先保证数据类型和长度是向上兼容的,注释保留最全的一部分,默认值需要根据业务场景来具体分析,之后再把建议修改的sql,两边做修改调整

分析结果:两边都做修改来把数据库齐平

ALTER TABLE `F_online`.`cs_task` 
  CHANGE COLUMN taskStatus taskStatus int(4) NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功';

ALTER TABLE `T_online`.`cs_task` 
  CHANGE COLUMN taskStatus taskStatus int(4) NULL DEFAULT 0 COMMENT '状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功';

场景5:各种混合不同

...

由此可以看出,加入你有多个环境需要比对,多个数据库,多个表需要这么一点点的人工比对,工作量是有多大的?如果你是幸运儿的话,表的结构差异性很少还行吧。

自定义实现方案

常言道工具提升生产力,是效率必不可少的关键一环。

思考一下 MySql 本身的 schema 表中,已经完全包含了所有数据库,所有的表信息,表字段信息,索引信息,函数信息,存储过程信息。

Snipaste_2022-06-26_22-11-44.jpg

function_sql = "select `name` from mysql.proc where db = '库' and `type` = 'FUNCTION'"
procedure_sql = "select `name` from mysql.proc where db = '库' and `type` = 'PROCEDURE'"
view_sql = "select table_name from information_schema.VIEWS where table_schema = '表'"
trigger_sql = "show triggers from 库"
schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '表' and table_schema = '库' "

那既然如此,搞一个脚本来查询两边数据库的结果,然后做统计分析,再给出两边的 SQL 修改建议,然后导出Excel文件,很直观方式查看差异点,岂不是很快哉?

Snipaste_2022-06-25_17-17-00.jpg

第一版脚本1:找出差异点

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import pymysql
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors


wb_result = Workbook()
sheet = wb_result.active


def result_excel_init():
    """
    初始化结果excel样式
    """
    # 样式
    highlight = NamedStyle(name="highlight")
    highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
    highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
    highlight.fill = PatternFill("solid", fgColor="ACB9C9")

    sheet.append(['表名', '{} 多字段'.format(s_db), '{} 多字段'.format(t_db), '字段名', '允许 Null 对比', '默认值对比', '数据类型对比', '长度对比', '索引类型对比', '注释对比'])

    sheet.row_dimensions[1].height = 25
    sheet.column_dimensions['A'].width = 38
    sheet.column_dimensions['B'].width = 40
    sheet.column_dimensions['C'].width = 40
    sheet.column_dimensions['D'].width = 20
    sheet.column_dimensions['E'].width = 20
    sheet.column_dimensions['F'].width = 25
    sheet.column_dimensions['G'].width = 30
    sheet.column_dimensions['H'].width = 60
    sheet.column_dimensions['I'].width = 20
    sheet.column_dimensions['J'].width = 200

    for cell in list(sheet.rows)[0]:
        cell.style = highlight


def result_excel_end():
    style = NamedStyle(name="style")
    v_center = Alignment(vertical='center', wrap_text=True)
    hv_list = [1]
    for i, row in enumerate(list(sheet.rows)):
        if i == 0:
            continue
        for cell in row:
            if cell.column in hv_list:
                pass
            else:
                style.alignment = v_center
            cell.style = style


def export_excel_file(data_list, file_name):
    print('\n--- 对比结束,导出 Excel 文件 ---\n')
    result_excel_init()
    for data in data_list:
        print(data)
        sheet.append([data.get('表名'), data.get('{} 多字段'.format(s_db)), data.get('{} 多字段'.format(t_db)), data.get('字段名'), data.get('允许Null'),
                      data.get('默认值'), data.get('数据类型'), data.get('类型长度'), data.get('索引类型'), data.get('注释')])
    result_excel_end()
    wb_result.save(file_name)
    print('\n执行导出结果完成\n')


def s_exec_sql(sql, db_name):
    db = pymysql.connect(host='ip1', port=3306, user='用户1', password='密码1', database=db_name)
    cursor = db.cursor()
    cursor.execute(sql)
    results = cursor.fetchall()
    if results:
        return results


def t_exec_sql(sql, db_name):
    db = pymysql.connect(host='ip2', port=3306, user='用户2', password='密码2', database=db_name)
    cursor = db.cursor()
    cursor.execute(sql)
    results = cursor.fetchall()
    if results:
        return results


if __name__ == '__main__':
    s_db = 'F_online'
    t_db = 'T_online'

    # 函数、存储过程、触发器、表结构
    function_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'FUNCTION'"
    procedure_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'PROCEDURE'"
    view_sql = "select table_name from information_schema.VIEWS where table_schema = '{}'"
    trigger_sql = "show triggers from {}"
    schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '{}' and table_schema = '{}' "

    print('\n---------- 触发器对比 ----------')
    s_trigger_result = s_exec_sql(trigger_sql.format(s_db), s_db)
    t_trigger_result = t_exec_sql(trigger_sql.format(t_db), t_db)
    print('{} 触发器 {} 个 {}'.format(s_db, len(s_trigger_result) if s_trigger_result else 0, s_trigger_result if s_trigger_result else ''))
    print('{} 触发器 {} 个 {}'.format(t_db, len(t_trigger_result) if t_trigger_result else 0, t_trigger_result if t_trigger_result else ''))

    print('\n---------- 存储过程对比 ----------')
    s_procedure_result = s_exec_sql(procedure_sql.format(s_db), s_db)
    t_procedure_result = t_exec_sql(procedure_sql.format(t_db), t_db)
    print('{} 存储过程 {} 个 {}'.format(s_db, len(s_procedure_result) if s_procedure_result else 0, s_procedure_result if s_procedure_result else ''))
    print('{} 存储过程 {} 个 {}'.format(t_db, len(t_procedure_result) if t_procedure_result else 0, t_procedure_result if t_procedure_result else ''))

    print('\n---------- 函数对比 ----------')
    s_function_result = s_exec_sql(function_sql.format(s_db), s_db)
    t_function_result = t_exec_sql(function_sql.format(t_db), t_db)
    print('{} 函数 {} 个 {}'.format(s_db, len(s_function_result) if s_function_result else 0, [v for (v,) in s_function_result] if s_function_result else ''))
    print('{} 函数 {} 个 {}'.format(t_db, len(t_function_result) if t_function_result else 0, [v for (v,) in t_function_result] if t_function_result else ''))

    print('\n---------- 视图对比 ----------')
    s_view_result = s_exec_sql(view_sql.format(s_db), s_db)
    t_view_result = t_exec_sql(view_sql.format(t_db), t_db)
    print('{} 视图 {} 个 {}'.format(s_db, len(s_view_result) if s_view_result else 0, [v for (v,) in s_view_result] if s_view_result else ''))
    print('{} 视图 {} 个 {}'.format(t_db, len(t_view_result) if t_view_result else 0, [v for (v,) in t_view_result] if t_view_result else ''))

    print('\n---------- 表结构对比 ----------')
    source_tables_result = s_exec_sql('SHOW TABLES from {}'.format(s_db), s_db)
    target_tables_result = t_exec_sql('SHOW TABLES from {}'.format(t_db), t_db)
    print('{} 表 {} 个'.format(s_db, len(source_tables_result) if source_tables_result else 0))
    print('{} 表 {} 个'.format(t_db, len(target_tables_result) if target_tables_result else 0))

    source_tables = set([tb for (tb,) in source_tables_result])
    target_tables = set([tb for (tb,) in target_tables_result])

    intersection_tables = set(source_tables) & set(target_tables)
    s_dif_tables = set(source_tables) - set(target_tables)
    t_dif_tables = set(target_tables) - set(source_tables)
    print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables)))
    print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables))
    print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables))

    comp_data_list = list()
    for i, table in enumerate(sorted(intersection_tables)):
        if table:
            # 查询 schema
            s_idx_result = s_exec_sql(schema_sql.format(table, s_db), s_db)
            t_idx_result = t_exec_sql(schema_sql.format(table, t_db), t_db)

            if s_idx_result == t_idx_result:
                # print(table, '完全一致')
                continue

            s_dic_data = {}
            t_dic_data = {}
            for d in s_idx_result:
                s_dic_data[d[0]] = d
            for d in t_idx_result:
                t_dic_data[d[0]] = d

            print('\n------ table: {} ------'.format(table))
            s_column_names = sorted(s_dic_data.keys())
            t_column_names = sorted(t_dic_data.keys())
            if s_column_names != t_column_names:
                dif_columns = set(s_column_names) - set(t_column_names)
                if dif_columns:
                    print('`{}`.`{}` 多字段 {}'.format(s_db, table, dif_columns))
                    comp_data_list.append({
                        "表名": table,
                        "{} 多字段".format(s_db): ','.join(dif_columns)
                    })
                else:
                    t_dif = set(t_column_names) - set(s_column_names)
                    print('`{}`.`{}` 多字段 {}'.format(t_db, table, t_dif))
                    comp_data_list.append({
                        "表名": table,
                        "{} 多字段".format(t_db): ','.join(t_dif)
                    })

            for k, v in s_dic_data.items():
                bv = t_dic_data.get(k)
                # 两边有值但不相等
                if bv and bv != v:
                    s_is_null, t_is_null = v[1], bv[1]
                    s_data_type, t_data_type = v[2], bv[2]
                    s_def_value, t_def_value = v[3], bv[3]
                    s_data_len, t_data_len = v[4], bv[4]
                    s_idx_type, t_idx_type = v[5], bv[5]
                    s_commend, t_commend = v[6], bv[6]

                    comp_result = {
                        "表名": table,
                        "字段名": k
                    }

                    if s_is_null != t_is_null:
                        comp_result['允许Null'] = '{}={} \n{}={}'.format(s_db, s_is_null, t_db, t_is_null)
                    if s_data_type != t_data_type:
                        comp_result['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type)
                    if s_def_value != t_def_value:
                        sv = s_def_value if s_def_value else ''
                        tv = t_def_value if t_def_value else ''
                        if sv != tv:
                            comp_result['默认值'] = '{}={} \n{}={}'.format(s_db, sv, t_db, tv)
                    if s_data_len != t_data_len:
                        comp_result['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                    if s_idx_type != t_idx_type:
                        comp_result['索引类型'] = '{}={} \n{}={}'.format(s_db, s_idx_type, t_db, t_idx_type)
                    if s_commend != t_commend:
                        com_len = len(str(s_commend).replace("\r\n", ' '))
                        comp_result['注释'] = '{}={}{} \n{}={}'.format(s_db, str(s_commend).replace("\r\n", ' '), '\n' if com_len > 150 else '', t_db, str(t_commend).replace("\r\n", ' '))
                    comp_data_list.append(comp_result)

    # 导出 Excel 文件
    export_excel_file(comp_data_list, '{}->{}-表结构对比2.xlsx'.format(s_db, t_db))

执行日志

---------- 触发器对比 ----------
F_online 触发器 0 个 
T_online 触发器 0 个 

---------- 存储过程对比 ----------
F_online 存储过程 0 个 
T_online 存储过程 0 个 

---------- 函数对比 ----------
F_online 函数 4 个 ['currval', 'func_checkVersion', 'nextval', 'setval']
T_online 函数 7 个 ['currval', 'func_checkVersion', 'getCategoryName', 'getRegionName', 'nextval', 'region', 'setval']

---------- 视图对比 ----------
F_online 视图 9 个 ['cp_consume_verification_view', 'cp_registration_view', ...]
T_online 视图 9 个 ['cp_consume_verification_view', 'cp_registration_view', ...]

---------- 表结构对比 ----------
F_online 表 1163 个
T_online 表 1145 个

F_online & T_online 共有表 1134 个
T_online 缺失表 29 个 {'cs_pm_service_channel_config', 'cs_delivery_time_rule_period', ...}
F_online 缺失表 11 个 {'im_ps_member', 'cs_deliveryregion_regions_bak', 'cs_goods_bak', ...}


------ table: cs_am_reward_record_detail ------

------ table: cs_cp_definition ------
`F_online`.`cs_cp_definition` 多字段 {'target', 'guideProvide'}

------ table: ... ------


--- 对比结束,导出 Excel 文件 ---

{'表名': 'cs_am_reward_record_detail', '字段名': 'nickName', '允许Null': 'F_online=YES \nT_online=NO', '默认值': 'F_online= \nT_online='}
{'表名': 'cs_am_share_record', '字段名': 'code', '索引类型': 'F_online= \nT_online=MUL'}
{'表名': 'cs_batch_dispose_task', '字段名': 'taskStatus', '允许Null': 'F_online=NO \nT_online=YES', '注释': 'F_online=任务状态0-处理中,1-成功,2-失败,3-已经删除 \nT_online=任务状态0-处理中,1-成功,2-失败,3-已经删除,4-部分成功'}
...

执行导出结果完成

第二版脚本2:找出差异点,并给出 To 的修改建议

在脚本1的基础上进行改进,增加条件控制,过滤出一些 From 和 To 比较之后,To 不需要修改的一些内容,实现起来也较简单,有兴趣的可以通过开关来测试一下导出的结果

  • 忽略字段类型一致 To 长度大于 From 的数据(类型为 'tinyint', 'varchar', 'bigint', 'int',其他类型excel人为判断)

  • 忽略 To 字段类型大于 From 字段类型(类型为'tinyint', 'varchar', 'bigint', 'int','decimal','enum')

  • 忽略 From 注释为空的字段

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import re
import pymysql
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors

wb_result = Workbook()
sheet = wb_result.active


def result_excel_init():
    highlight = NamedStyle(name="highlight")
    highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
    highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
    highlight.fill = PatternFill("solid", fgColor="ACB9C9")

    sheet.append(['表名', '{} 多字段'.format(s_db), '{} 多字段'.format(t_db), '字段名', '允许 Null 对比', '默认值对比', '数据类型对比', '长度对比', '索引类型对比', '注释对比', '{} 修改命令'.format(t_db), '脚本类型'])

    sheet.row_dimensions[1].height = 25
    sheet.column_dimensions['A'].width = 38
    sheet.column_dimensions['B'].width = 40
    sheet.column_dimensions['C'].width = 50
    sheet.column_dimensions['D'].width = 25
    sheet.column_dimensions['E'].width = 20
    sheet.column_dimensions['F'].width = 25
    sheet.column_dimensions['G'].width = 30
    sheet.column_dimensions['H'].width = 65
    sheet.column_dimensions['I'].width = 20
    sheet.column_dimensions['J'].width = 150
    sheet.column_dimensions['K'].width = 200
    sheet.column_dimensions['L'].width = 10
    for cell in list(sheet.rows)[0]:
        cell.style = highlight


def result_excel_end():
    style = NamedStyle(name="style")
    v_center = Alignment(vertical='center', wrap_text=True)
    hv_list = [5]
    for i, row in enumerate(list(sheet.rows)):
        if i == 0:
            continue
        for cell in row:
            if cell.column in hv_list:
                pass
            else:
                style.alignment = v_center
            cell.style = style


def export_excel_file(data_list, file_name):
    print('\n--- 对比结束,导出 Excel 文件 ---\n')
    result_excel_init()
    for data in data_list:
        print(data)
        sheet.append([data.get('表名'), data.get('{} 多字段'.format(s_db)), data.get('{} 多字段'.format(t_db)), data.get('字段名'), data.get('允许Null'), data.get('默认值'),
                      data.get('数据类型'), data.get('类型长度'), data.get('索引类型'), data.get('注释'), data.get('{} 修改命令'.format(t_db)), data.get('脚本类型')])
    result_excel_end()
    wb_result.save(file_name)
    print('\n执行导出结果完成 {}\n'.format(file_name))


def s_exec_sql(sql, db_name):
    db = pymysql.connect(host='Ip1', port=3306, user='用户1', password='密码2', database=db_name)
    cursor = db.cursor()
    cursor.execute(sql)
    results = cursor.fetchall()
    cursor.close()
    db.close()
    if results:
        return results


def t_exec_sql(sql, db_name):
    db = pymysql.connect(host='Ip2', port=3306, user='用户1', password='密码2', database=db_name)
    cursor = db.cursor()
    cursor.execute(sql)
    results = cursor.fetchall()
    cursor.close()
    db.close()
    if results:
        return results


def ignore_check_type(s_type, t_type):
    """ 忽略原始对比库的小类型数据 """
    if s_type == 'tinyint':
        return True
    elif s_type == 'int' and t_type in ['bigint', 'decimal']:
        return True
    elif s_type == 'varchar' and t_type in ['text']:
        return True
    return False


def alter_add_sql(db, tb_name, col_name, data_type, is_null, def_value, comment=''):
    # 'ALTER TABLE 表名 ADD COLUMN 列名 数据类型 允许为空 DEFAULT(默认) COMMENT(注释) AFTER(位置)'
    if 'bit' not in data_type:
        d_v = "DEFAULT '{}'".format(def_value) if def_value else ''
    else:
        d_v = "DEFAULT {}".format(def_value) if def_value else ''

    allow_null = '' if is_null == 'YES' else 'NOT NULL'
    cmd = "ALTER TABLE `{}`.`{}`\n      ADD COLUMN `{}` {} {} {} {};".format(db, tb_name, col_name, data_type, allow_null, d_v, "COMMENT '{}'".format(comment) if comment else '')
    return cmd.replace("\r\n", ' ').replace('  ', ' ').replace(' ', ' ').replace(' ;', ';')


def alter_modify_sql(db, tb_name, col_name, data_type, is_null, def_value, comment=''):
    if 'bit' not in data_type:
        d_v = "DEFAULT '{}'".format(def_value) if def_value else ''
    else:
        d_v = "DEFAULT {}".format(def_value) if def_value else ''
    allow_null = '' if is_null == 'YES' else 'NOT NULL'
    cmd = "ALTER TABLE `{}`.`{}`\n      MODIFY COLUMN `{}` {} {} {} {};".format(db, tb_name, col_name, data_type, allow_null, d_v, "COMMENT '{}'".format(comment) if comment else '')
    return cmd.replace("\r\n", ' ').replace('  ', ' ').replace(' ', ' ').replace(' ;', ';')


def get_bracket_str(s):
    """获取小括号内容"""
    return re.findall(r"[(](.*?)[)]", s)


if __name__ == '__main__':
    s_db = 'From'
    t_db = 'To'

    # 函数、存储过程、触发器、表结构
    function_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'FUNCTION'"
    procedure_sql = "select `name` from mysql.proc where db = '{}' and `type` = 'PROCEDURE'"
    view_sql = "select table_name from information_schema.VIEWS where table_schema = '{}'"
    trigger_sql = "show triggers from {}"
    schema_sql = "select column_name, is_nullable, data_type, column_default, column_type, column_key, column_comment from information_schema.COLUMNS where table_name = '{}' and table_schema = '{}' "

    ignore_left_no_comment = True      # 忽略左边注释为空的
    ignore_left_col_len_lower = True   # 忽略左边相同数据类型下,长度较小的
    ignore_left_col_type_lower = True  # 忽略左边同类型的数据类型小于左边的
    ignore_left_no_idx = True          # 忽略左边字段无索引
    ignore_table = ['cp_view']         # 忽略将要比较的表结构

    print('\n---------- 触发器对比 ----------')
    s_trigger_result = s_exec_sql(trigger_sql.format(s_db), s_db)
    t_trigger_result = t_exec_sql(trigger_sql.format(t_db), t_db)
    print('{} 触发器 {} 个 {}'.format(s_db, len(s_trigger_result) if s_trigger_result else 0, s_trigger_result if s_trigger_result else ''))
    print('{} 触发器 {} 个 {}'.format(t_db, len(t_trigger_result) if t_trigger_result else 0, t_trigger_result if t_trigger_result else ''))

    print('\n---------- 存储过程对比 ----------')
    s_procedure_result = s_exec_sql(procedure_sql.format(s_db), s_db)
    t_procedure_result = t_exec_sql(procedure_sql.format(t_db), t_db)
    print('{} 存储过程 {} 个 {}'.format(s_db, len(s_procedure_result) if s_procedure_result else 0, s_procedure_result if s_procedure_result else ''))
    print('{} 存储过程 {} 个 {}'.format(t_db, len(t_procedure_result) if t_procedure_result else 0, t_procedure_result if t_procedure_result else ''))

    print('\n---------- 函数对比 ----------')
    s_function_result = s_exec_sql(function_sql.format(s_db), s_db)
    t_function_result = t_exec_sql(function_sql.format(t_db), t_db)
    print('{} 函数 {} 个 {}'.format(s_db, len(s_function_result) if s_function_result else 0, [v for (v,) in s_function_result] if s_function_result else ''))
    print('{} 函数 {} 个 {}'.format(t_db, len(t_function_result) if t_function_result else 0, [v for (v,) in t_function_result] if t_function_result else ''))

    print('\n---------- 视图对比 ----------')
    s_view_result = s_exec_sql(view_sql.format(s_db), s_db)
    t_view_result = t_exec_sql(view_sql.format(t_db), t_db)
    print('{} 视图 {} 个 {}'.format(s_db, len(s_view_result) if s_view_result else 0, [v for (v,) in s_view_result] if s_view_result else ''))
    print('{} 视图 {} 个 {}'.format(t_db, len(t_view_result) if t_view_result else 0, [v for (v,) in t_view_result] if t_view_result else ''))

    print('\n---------- 表结构对比 ----------')
    source_tables_result = s_exec_sql('SHOW TABLES from {}'.format(s_db), s_db)
    target_tables_result = t_exec_sql('SHOW TABLES from {}'.format(t_db), t_db)
    print('{} 表 {} 个'.format(s_db, len(source_tables_result) if source_tables_result else 0))
    print('{} 表 {} 个'.format(t_db, len(target_tables_result) if target_tables_result else 0))

    source_tables = set([tb for (tb,) in source_tables_result])
    target_tables = set([tb for (tb,) in target_tables_result])

    intersection_tables = set(source_tables) & set(target_tables)
    s_dif_tables = set(source_tables) - set(target_tables)
    t_dif_tables = set(target_tables) - set(source_tables)
    print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables)))
    print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables))
    print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables))

    comp_data_list = list()
    for i, table in enumerate(sorted(intersection_tables)):
        if table and table not in ignore_table:
            s_rows = s_exec_sql(schema_sql.format(table, s_db), s_db)
            t_rows = t_exec_sql(schema_sql.format(table, t_db), t_db)
            if s_rows == t_rows:
                continue

            # 字段名为key,整行数据为value
            s_dic_data, t_dic_data = {}, {}
            for d in s_rows:
                s_dic_data[str(d[0])] = d
            for d in t_rows:
                t_dic_data[str(d[0])] = d

            # print('\n------ table: {} ------'.format(table))
            s_column_names = sorted(s_dic_data.keys())
            t_column_names = sorted(t_dic_data.keys())
            if s_column_names != t_column_names:
                dif_columns = set(s_column_names) - set(t_column_names)
                if dif_columns:
                    print('`{}`.`{}` 多字段 {}'.format(s_db, table, dif_columns))
                else:
                    t_dif = set(t_column_names) - set(s_column_names)
                    comp_data_list.append({
                        "表名": table,
                        "{} 多字段".format(t_db): ','.join(t_dif)
                    })

            for k, v in s_dic_data.items():
                bv = t_dic_data.get(k)
                # 两边有值但不相等
                if bv:
                    if bv != v:
                        comp_result = {
                            "表名": table,
                            "字段名": k,
                            "脚本类型": 'MODIFY'
                        }

                        comp_dict = {}
                        idx_dict = {}
                        s_is_null, t_is_null = v[1], bv[1]
                        s_data_type, t_data_type = v[2], bv[2]
                        s_def_value, t_def_value = v[3], bv[3]
                        s_data_len, t_data_len = v[4], bv[4]
                        s_idx_type, t_idx_type = v[5], bv[5]
                        s_commend, t_commend = v[6], bv[6]

                        # 要修改的值
                        c_is_null, c_def_value, c_data_len, c_commend = '', '', t_data_len, ''

                        if s_is_null != t_is_null:
                            comp_dict['允许Null'] = '{}={} \n{}={}'.format(s_db, s_is_null, t_db, t_is_null)
                            c_is_null = s_is_null

                        if s_data_type != t_data_type:
                            # 忽略小类型
                            if not ignore_left_col_type_lower:
                                comp_dict['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type)
                            else:
                                if not ignore_check_type(s_data_type, t_data_type):
                                    comp_dict['数据类型'] = '{}={} \n{}={}'.format(s_db, s_data_type, t_db, t_data_type)

                        if s_def_value != t_def_value:
                            if s_def_value and t_def_value:
                                comp_dict['默认值'] = '{}={} \n{}={}'.format(s_db, s_def_value, t_db, t_def_value)
                                c_def_value = s_def_value

                        if s_data_len != t_data_len:
                            # 忽略小长度
                            if not ignore_left_col_len_lower:
                                # 忽略小类型
                                if not ignore_left_col_type_lower:
                                    comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                    c_data_len = s_data_len
                                else:
                                    if not ignore_check_type(s_data_type, t_data_type):
                                        comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                        c_data_len = s_data_len
                            else:
                                # 相同数据类型比较两的长度
                                if s_data_type != t_data_type:
                                    if not ignore_left_col_type_lower:
                                        comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                        c_data_len = s_data_len
                                    else:
                                        if not ignore_check_type(s_data_type, t_data_type):
                                            comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                            c_data_len = s_data_len
                                else:
                                    if s_data_type in ['tinyint', 'varchar', 'bigint', 'int']:
                                        s_len = int(s_data_len.split('(')[1].split(')')[0])
                                        t_len = int(t_data_len.split('(')[1].split(')')[0])
                                        if s_len > t_len:
                                            comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                            c_data_len = s_data_len
                                    elif s_data_type == 'decimal':
                                        s_int_part = get_bracket_str(s_data_len)[0].split(',')[0]
                                        t_int_part = get_bracket_str(t_data_len)[0].split(',')[0]
                                        s_dec_part = get_bracket_str(s_data_len)[0].split(',')[1]
                                        t_dec_part = get_bracket_str(t_data_len)[0].split(',')[1]
                                        if int(s_int_part) > int(t_int_part):
                                            comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                            c_data_len = s_data_len
                                        elif int(s_dec_part) > int(t_dec_part):
                                            comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                            c_data_len = s_data_len
                                    elif s_data_type == 'enum':
                                        s_enum = set(get_bracket_str(s_data_len)[0].split(','))
                                        v_enum = set(get_bracket_str(t_data_len)[0].split(','))
                                        if s_enum - v_enum:
                                            comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                            c_data_len = s_data_len
                                    else:
                                        comp_dict['类型长度'] = '{}={} \n{}={}'.format(s_db, s_data_len, t_db, t_data_len)
                                        c_data_len = s_data_len

                        if s_idx_type != t_idx_type:
                            if ignore_left_no_idx and not s_idx_type:
                                pass
                            else:
                                idx_dict['索引类型'] = '{}={} \n{}={}'.format(s_db, s_idx_type, t_db, t_idx_type)

                        if s_commend != t_commend:
                            if ignore_left_no_comment and not s_commend:
                                pass
                            else:
                                comp_dict['注释'] = '{}={} \n{}={}'.format(s_db, str(s_commend).replace("\r\n", ' '), t_db, str(t_commend).replace("\r\n", ' '))
                                c_commend = s_commend
                        else:
                            c_commend = s_commend

                        if comp_dict:
                            if idx_dict:
                                comp_dict.update(idx_dict)
                            comp_dict['{} 修改命令'.format(t_db)] = alter_modify_sql(t_db, table, k, c_data_len, c_is_null, c_def_value, c_commend)
                            comp_result.update(comp_dict)
                            comp_data_list.append(comp_result)
                else:
                    print('`{}`.`{}` 多字段 {}={}'.format(s_db, table, k, v))
                    c_is_null, c_def_value, c_data_len, c_commend = v[1], v[3], v[4], v[6]
                    comp_result = {
                        '表名': table,
                        '{} 多字段'.format(s_db): k,
                        '{} 修改命令'.format(t_db): alter_add_sql(t_db, table, k, c_data_len, c_is_null, c_def_value, c_commend),
                        "脚本类型": 'ADD'
                    }
                    comp_data_list.append(comp_result)

    # 导出 Excel 文件
    export_excel_file(comp_data_list, '{}->{}-表结构对比.xlsx'.format(s_db, t_db))

Snipaste_2022-06-26_23-15-36.jpg

说明:对于两边数据库一次修改拉平,有两种方案

  • 自定义实现将脚本2进行二次修改

  • 现将 From 和 To 进行比较,将推荐修改的结果先在 To 的一方库执行,然后再与 To 和 From 两个库比较,再修改 From 一方库即可拉平

以上结果,是不是更为直观呢?

最后说一点,凡事都要经过测试,可以将线上的数据库结构经过测试环境验证之后,再做修改。



未经允许请勿转载:程序喵 » Python3 两个数据库触发器、存储过程、函数、视图、表结构、索引对比验证

点  赞 (2) 打  赏
分享到: