Python3 两个数据库所有表索引的对比验证
接着上一篇文章继续说明:Python3 两个数据库触发器、存储过程、函数、视图、表结构、索引对比验证
实现结果如下
对比脚步实现
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import os import sys import pymysql from openpyxl import Workbook from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors """ 表索引对比 """ wb_result = Workbook() sheet = wb_result.active def result_excel_init(): highlight = NamedStyle(name="highlight") highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True) highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) highlight.fill = PatternFill("solid", fgColor="ACB9C9") sheet.append(['表名', '{} 缺索引'.format(s_db), '{} 缺索引'.format(t_db)]) sheet.row_dimensions[1].height = 25 sheet.column_dimensions['A'].width = 50 sheet.column_dimensions['B'].width = 150 sheet.column_dimensions['C'].width = 150 for cell in list(sheet.rows)[0]: cell.style = highlight def result_excel_end(): style = NamedStyle(name="style") v_center = Alignment(vertical='center', wrap_text=True) hv_list = [1] for i, row in enumerate(list(sheet.rows)): if i == 0: continue for cell in row: if cell.column in hv_list: pass else: style.alignment = v_center cell.style = style def export_excel_file(data_list, file_name): print('\n--- 对比结束,导出 Excel 文件 ---\n') result_excel_init() for data in data_list: print(data) sheet.append([data.get('表名'), data.get('{} 缺索引'.format(s_db), '').replace(' ', ' '), data.get('{} 缺索引'.format(t_db), '').replace(' ', ' ')]) result_excel_end() wb_result.save(file_name) print('\n执行导出结果完成 {}\n'.format(file_name)) def exec_sql(sql, db_name, tb=''): db = pymysql.connect(host='Ip', port=3306, user='用户', password='密码', database=db_name) cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() if results: return results # else: # print('\t{}.{} 查询无索引'.format(db_name, tb if tb else '')) if __name__ == '__main__': s_db = 'From' t_db = 'To' schema_sql = """ SELECT CONCAT( 'ALTER TABLE ', TABLE_NAME, '', ' ADD ', IF (NON_UNIQUE = 1, CASE UPPER( INDEX_TYPE ) WHEN 'FULLTEXT' THEN 'FULLTEXT INDEX' WHEN 'SPATIAL' THEN 'SPATIAL INDEX' ELSE CONCAT( 'INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) END, IF ( UPPER( INDEX_NAME ) = 'PRIMARY', CONCAT( 'PRIMARY KEY USING ', INDEX_TYPE ), CONCAT( 'UNIQUE INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) ) ), '(', GROUP_CONCAT( DISTINCT CONCAT( '', COLUMN_NAME, '' ) ORDER BY SEQ_IN_INDEX ASC SEPARATOR ', ' ), ');' ) AS 'Show_Add_Indexes' FROM information_schema.STATISTICS WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}' GROUP BY TABLE_NAME, INDEX_NAME ORDER BY TABLE_NAME ASC, INDEX_NAME ASC; """ source_tables_result = exec_sql('SHOW TABLES from {}'.format(s_db), s_db) target_tables_result = exec_sql('SHOW TABLES from {}'.format(t_db), t_db) source_tables = set([tb for (tb,) in source_tables_result]) target_tables = set([tb for (tb,) in target_tables_result]) intersection_tables = set(source_tables) & set(target_tables) s_dif_tables = set(source_tables) - set(target_tables) t_dif_tables = set(target_tables) - set(source_tables) print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables))) print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables)) print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables)) comp_data_list = list() for i, table in enumerate(sorted(intersection_tables)): if table: s_result = exec_sql(schema_sql.format(s_db, table), s_db, table) t_result = exec_sql(schema_sql.format(t_db, table), t_db, table) if s_result == t_result: continue s_rows = set([data for (data,) in s_result]) if s_result else set() t_rows = set([data for (data,) in t_result]) if t_result else set() t_dif_rows = s_rows - t_rows s_dif_rows = t_rows - s_rows if t_dif_rows: print(t_db, '缺索引:', t_dif_rows) comp_data_list.append({ '表名': table, '{} 缺索引'.format(t_db): '\n'.join(t_dif_rows), }) if s_dif_rows: print(s_db, '缺索引', table, s_dif_rows) comp_data_list.append({ '表名': table, '{} 缺索引'.format(s_db): '\n'.join(s_dif_rows), }) print("\n------对比完成------\n") # 导出 Excel 文件 export_excel_file(comp_data_list, '{}->{}-索引-对比.xlsx'.format(s_db, t_db))
脚本2
细心的你如果仔细思考一下索引执行就回发现,比如 From 和 To 索引的大小写名称不同,但结果相同;索引名称不同,但索引的字段完全相同;
这类场景,其实并不需要两边都要做修改,比较索引的耗时过程和数据量是成正比的,仅仅名字不同没必要徒增索引耗空间,所以,脚本还有改进的空间,将名字不同,但内容完全相同的索引,给排除掉,而非人工验证,岂不很好?
你也可以基于以上脚本1进行自己扩展实现,如下给出示例之一
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import os import sys import pymysql from openpyxl import Workbook from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors """ 表索引对比 """ wb_result = Workbook() sheet = wb_result.active def result_excel_init(): highlight = NamedStyle(name="highlight") highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True) highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) highlight.fill = PatternFill("solid", fgColor="ACB9C9") sheet.append(['表名', '{} 缺索引'.format(s_db), '{} 缺索引'.format(t_db), '索引名相同内容不同']) sheet.row_dimensions[1].height = 25 sheet.column_dimensions['A'].width = 50 sheet.column_dimensions['B'].width = 150 sheet.column_dimensions['C'].width = 150 sheet.column_dimensions['D'].width = 150 for cell in list(sheet.rows)[0]: cell.style = highlight def result_excel_end(): style = NamedStyle(name="style") v_center = Alignment(vertical='center', wrap_text=True) hv_list = [1] for i, row in enumerate(list(sheet.rows)): if i == 0: continue for cell in row: if cell.column in hv_list: pass else: style.alignment = v_center cell.style = style def export_excel_file(data_list, file_name): print('\n--- 对比结束,导出 Excel 文件 ---\n') result_excel_init() for data in data_list: print(data) sheet.append([data.get('表名'), data.get('{} 缺索引'.format(s_db), '').replace(' ', ' '), data.get('{} 缺索引'.format(t_db), '').replace(' ', ' '), data.get('索引名相同内容不同')]) result_excel_end() wb_result.save(file_name) print('\n执行导出结果完成 {}\n'.format(file_name)) def exec_sql(sql, db_name, tb=''): db = pymysql.connect(host='IP', port=3306, user='用户', password='密码', database=db_name) cursor = db.cursor() cursor.execute(sql) results = cursor.fetchall() cursor.close() db.close() if results: return results def get_table_idx(rows): """ Key 索引名、Value 索引字段 索引名规则:名称#alter语句 """ tb_idx = {} if rows: for r in rows: fields = r.split('(')[1].replace(');', '').replace(' ', '') if ' PRIMARY KEY ' in r: tb_idx['pri_key' + '#' + r] = fields else: name = r.split(' INDEX ')[1].split(' ')[0] tb_idx[name + '#' + r] = fields return tb_idx if __name__ == '__main__': s_db = 'From' t_db = 'To' schema_sql = """ SELECT CONCAT( 'ALTER TABLE ', TABLE_NAME, '', ' ADD ', IF (NON_UNIQUE = 1, CASE UPPER( INDEX_TYPE ) WHEN 'FULLTEXT' THEN 'FULLTEXT INDEX' WHEN 'SPATIAL' THEN 'SPATIAL INDEX' ELSE CONCAT( 'INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) END, IF ( UPPER( INDEX_NAME ) = 'PRIMARY', CONCAT( 'PRIMARY KEY USING ', INDEX_TYPE ), CONCAT( 'UNIQUE INDEX ', INDEX_NAME, ' USING ', INDEX_TYPE ) ) ), '(', GROUP_CONCAT( DISTINCT CONCAT( '', COLUMN_NAME, '' ) ORDER BY SEQ_IN_INDEX ASC SEPARATOR ', ' ), ');' ) AS 'Show_Add_Indexes' FROM information_schema.STATISTICS WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}' GROUP BY TABLE_NAME, INDEX_NAME ORDER BY TABLE_NAME ASC, INDEX_NAME ASC; """ source_tables_result = exec_sql('SHOW TABLES from {}'.format(s_db), s_db) target_tables_result = exec_sql('SHOW TABLES from {}'.format(t_db), t_db) source_tables = set([tb for (tb,) in source_tables_result]) target_tables = set([tb for (tb,) in target_tables_result]) intersection_tables = set(source_tables) & set(target_tables) s_dif_tables = set(source_tables) - set(target_tables) t_dif_tables = set(target_tables) - set(source_tables) print('\n{} & {} 共有表 {} 个'.format(s_db, t_db, len(intersection_tables))) print('{} 缺失表 {} 个 {}'.format(t_db, len(s_dif_tables), s_dif_tables)) print('{} 缺失表 {} 个 {}\n'.format(s_db, len(t_dif_tables), t_dif_tables)) ignore_difName_sameFields = True # 过滤名字不相同但内容相同的索引 comp_data_list = list() for i, table in enumerate(sorted(intersection_tables)): if table: s_result = exec_sql(schema_sql.format(s_db, table), s_db, table) t_result = exec_sql(schema_sql.format(t_db, table), t_db, table) if s_result == t_result: continue s_rows = set([data for (data,) in s_result]) if s_result else set() t_rows = set([data for (data,) in t_result]) if t_result else set() s_tb_idx = get_table_idx(s_rows) t_tb_idx = get_table_idx(t_rows) print('\n------ table: {} ------'.format(table)) # 两边的索引名比较 sum_key = s_tb_idx.keys() | t_tb_idx.keys() for k in sum_key: sv = s_tb_idx.get(k) tv = t_tb_idx.get(k) k_name = k.split('#')[0] alter_sql = k.split('#')[1] if not sv: if ignore_difName_sameFields: no_hit = True for s_k in s_tb_idx.keys(): if s_tb_idx.get(s_k) == tv: print('{} 索引名不同,{} <-> {} 字段内容相同 {} 忽略'.format(table, k_name, s_k.split('#')[0], tv)) no_hit = False break if no_hit: print('`{}`.`{}` 缺索引 {}'.format(s_db, table, k_name)) comp_data_list.append({'表名': table, '{} 缺索引'.format(s_db): alter_sql}) else: print('`{}`.`{}` 缺索引 {}'.format(s_db, table, k_name)) comp_data_list.append({'表名': table, '{} 缺索引'.format(s_db): alter_sql}) elif not tv: if ignore_difName_sameFields: no_hit = True for t_k in t_tb_idx.keys(): if t_tb_idx.get(t_k) == sv: print('{} 索引名不同 {} <-> {} 字段内容相同 {} 忽略'.format(table, k_name, t_k.split('#')[0], sv)) no_hit = False break if no_hit: print('`{}`.`{}` 缺索引 {}'.format(t_db, table, k_name)) comp_data_list.append({'表名': table, '{} 缺索引'.format(t_db): alter_sql}) else: print('`{}`.`{}` 缺索引 {}'.format(t_db, table, k_name)) comp_data_list.append({'表名': table, '{} 缺索引'.format(t_db): alter_sql}) elif sv != tv: print('{} 索引名相同 {} 字段不同 {} <-> {} '.format(table, k_name, sv, tv)) comp_data_list.append({'表名': table, '索引名相同内容不同': '{}={}\n{}={}'.format(s_db, sv, t_db, tv)}) print("\n------对比完成------\n") # 导出 Excel 文件 export_excel_file(comp_data_list, '{}->{}-索引-对比2.xlsx'.format(s_db, t_db))
未经允许请勿转载:程序喵 » Python3 两个数据库所有表索引的对比验证