背景
在项目需要重构时,需要检索公司中的项目的影响范围,比如项目需要摘除 pom.xml 中依赖 old-order-api 的依赖包,检索出所有项目那里用到了这个包;再比如之前公司项目的搜索引擎有大量使用 solr 的实现,现在需要重构把所有 solr 的地方,统统修改为 elasticsearch,这也需要检索所有项目。
实现方案
方案1:通过 gitlab 在线一个个项目检索,缺点很明显,速度慢,不可复用
方案2:通过脚本爬虫方式来检索,缺点,自己写程序;优点,可复用,可扩展,可以把结果导出报表;注意点,项目爬取速度慢可以采用多线程进行优化
方案3:通过脚本爬虫方式把所有项目 clone 到本地,写脚本方式结合 grep 正则命令来读取所有文件内容,缺点,占内存,维护更新分支代码,优点,本地检索速度极快,查询内容较多时推荐。
代码实现
这里采用方案2,这里扩展把结果进行报表导出
(1)单线程检索
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import os import sys import logging import requests from datetime import datetime from bs4 import BeautifulSoup from requests_pkcs12 import Pkcs12Adapter from pyecharts.charts import Bar from pyecharts import options as opts from pyecharts.globals import ThemeType __author__ = 'TingFeng' __date__ = '2021/12/13' ''' 组分页查询:https://gitlab.co-xxx.com/api/v4/groups/551/projects?simple=True&per_page=100&page=3 项目列表:https://gitlab.co-xxx.com/api/v4/groups/551/projects.json?search=&per_page=50 页面检索:https://gitlab.co-xxx.com/search?utf8=%E2%9C%93&snippets=&scope=&search=elasticsearch&group_id=551&project_id=3492 ''' # 指定双向认证网站的pfx证书文件,并附带证书的密码,公司gitlab证书定期维护 session = requests.session() BASE_HOST = 'https://gitlab.co-xxx.com' HEADERS = {'PRIVATE-TOKEN': 'xxxxxxx'} session.mount(BASE_HOST, Pkcs12Adapter(pkcs12_filename='cert/client-20220719.p12', pkcs12_password='xxxxx')) # 读取那个分组的所有项目 group_id = '551' # 检索词 search_word = 'elasticsearch' # 日志文件 _root_file_path = os.path.abspath(".") _py_filename = os.path.basename(sys.argv[0]).split(".")[0] _log_file = '{}/logs/{}-{}.log'.format(_root_file_path, _py_filename, search_word) _html_file = '{}/html/{}-{}.html'.format(_root_file_path, _py_filename, search_word) logging.basicConfig(filename=_log_file, filemode='w', format='%(message)s', level=logging.INFO) console_handler = logging.StreamHandler() console_handler.setLevel('INFO') logging.getLogger().addHandler(console_handler) def blank(num: int): return ''.rjust(num) def list_all_projects(): _projects = [] page = 1 while True: rsp = session.get( BASE_HOST + '/api/v4/groups/' + group_id + '/projects', headers=HEADERS, params={'simple': True, 'per_page': 100, 'page': page} ) print(rsp.url) projects_page = rsp.json() if len(projects_page) == 0: break else: _projects.extend(projects_page) page = page + 1 return _projects def search_web(_project_id): rsp = session.get( BASE_HOST + '/search?utf8=%E2%9C%93&snippets=&scope=&search={}&group_id={}&project_id={}'.format(search_word, group_id, _project_id), headers=HEADERS ) # print('search-web:' + rsp.url) return {'text': rsp.text, 'web_url': rsp.url} def bar_charts(_data: list): bar = Bar(init_opts=opts.InitOpts(theme=ThemeType.DARK)) bar.width = '1800px' bar.height = '900px' bar.add_xaxis(['项目']) bar.set_global_opts( title_opts=opts.TitleOpts(title="关键字 {} 结果统计图".format(search_word), subtitle=""), legend_opts=opts.LegendOpts(orient='vertical')) for d in _data: bar.add_yaxis(d['项目'], [d['找到数']]) bar.render(_html_file) def search_task(projects: list): _all_datas = [] for project in projects: _project_id = project['id'] _last_activity_at = datetime.strptime(project['last_activity_at'], "%Y-%m-%dT%H:%M:%S.%f+08:00") _last_activity_at = _last_activity_at.strftime('%Y-%m-%d %H:%M:%S') _search_data = {'项目': project['name'], '分支': project['default_branch'], '最后活动时间': _last_activity_at} # 搜索结果 _data_vo = search_web(_project_id) soup = BeautifulSoup(_data_vo['text'], 'lxml') for body in soup.select('#content-body .row-content-block'): logging.info('\n#################################################################################################') logging.info(blank(2) + body.text.replace('\n', '\t')) logging.info('#################################################################################################\n') _search_data['web_url'] = _data_vo['web_url'] _search_data['找到数'] = soup.select_one('.nav-links .active span').text.replace("\n", "") _file_names = [] for index, result in enumerate(soup.select('.search-results .blob-result')): logging.info('----' + str(index + 1) + '----') blob = BeautifulSoup(str(result), 'lxml') _file_name = blob.select_one('.js-file-title').text.replace('\n', '') _file_names.append(_file_name) logging.info('文件名:{}'.format(_file_name)) logging.info(blob.select_one('.blob-content').text.replace('\n\n', '\n')) _search_data['文件列表'] = _file_names _all_datas.append(_search_data) return _all_datas if __name__ == '__main__': projects = list_all_projects() logging.info('项目总数 {} 检索关键字 {} 请等待....'.format(len(projects), search_word)) all_datas = search_task(projects) logging.info('\n检索完成, 打印最终结果\n') sort_data_list = sorted(all_datas, key=lambda i: int(i["找到数"])) for index, data in enumerate(sort_data_list): logging.info('序号 {}, {}'.format(str(index + 1), data)) logging.info('\n生成统计结果\n') bar_charts(sort_data_list)
执行结果
(2)多线程检索
在以上单线程代码中改造,增加批量检索功能和导出图表功能、增加日志导出 .md 文件(导出pdf),增加导出 excel 功能
创建 keyword_list.txt,列出需要查询的多个词
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import os import sys import requests import time import functools import threading from datetime import datetime from bs4 import BeautifulSoup from requests_pkcs12 import Pkcs12Adapter from urllib.parse import unquote from pyecharts.charts import Bar from pyecharts import options as opts from pyecharts.globals import ThemeType from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED from openpyxl import Workbook from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors __author__ = 'TingFeng' __date__ = '2021/12/13' ''' 组分页查询:https://gitlab.co-xxx.com/api/v4/groups/551/projects?simple=True&per_page=100&page=3 项目列表:https://gitlab.co-xxx.com/api/v4/groups/551/projects.json?search=&per_page=50 页面检索:https://gitlab.co-xxx.com/search?utf8=%E2%9C%93&snippets=&scope=&search=elasticsearch&group_id=551&project_id=3492 python 中关于 sys.stdout 的一些用法:https://zhuanlan.zhihu.com/p/377418978 python信号量与线程池: https://blog.csdn.net/qq_41854146/article/details/107512424 ''' # 指定双向认证网站的pfx证书文件,并附带证书的密码,公司gitlab证书定期维护 session = requests.session() BASE_HOST = 'https://gitlab.co-xxx.com' HEADERS = {'PRIVATE-TOKEN': 'xxxxxxxxxx'} session.mount(BASE_HOST, Pkcs12Adapter(pkcs12_filename='cert/client-20220719.p12', pkcs12_password='xxxxxx')) # 导出excel _root_file_path = os.path.abspath(".") _py_filename = os.path.basename(sys.argv[0]).split(".")[0] wb_result = Workbook() sheet = wb_result.active local = threading.local() def time_me(info='耗时'): def _time_me(fn): @functools.wraps(fn) def _wrapper(*args, **kwargs): start = time.perf_counter() fn(*args, **kwargs) print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒') return _wrapper return _time_me class Logger(object): def __init__(self, filename='default.md', stream=sys.stdout): self.terminal = stream self.log = open(filename, 'w+') def write(self, message): self.terminal.write(message) self.log.write(message) def flush(self): self.terminal.flush() self.log.flush() pass def blank(num: int): return ''.rjust(num) def result_excel_init(): highlight = NamedStyle(name="highlight") highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True) highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) highlight.fill = PatternFill("solid", fgColor="ACB9C9") sheet.append(['关键字', '项目名称', '文件数', '文件列表', '分支', '最后活动时间', '链接']) sheet.row_dimensions[1].height = 25 sheet.column_dimensions['A'].width = 30 sheet.column_dimensions['B'].width = 35 sheet.column_dimensions['C'].width = 10 sheet.column_dimensions['D'].width = 200 sheet.column_dimensions['E'].width = 10 sheet.column_dimensions['F'].width = 25 sheet.column_dimensions['G'].width = 120 for cell in list(sheet.rows)[0]: cell.style = highlight def result_excel_end(): style = NamedStyle(name="style") v_center = Alignment(vertical='center', wrap_text=True) hv_center = Alignment(horizontal='center', vertical='center', wrap_text=True) hv_list = [1, 3, 5, 6] for i, row in enumerate(list(sheet.rows)): if i == 0: continue for cell in row: if cell.column in hv_list: style.alignment = hv_center else: style.alignment = v_center cell.style = style def list_all_projects(): _projects = [] page = 1 while True: rsp = session.get( BASE_HOST + '/api/v4/groups/' + group_id + '/projects', headers=HEADERS, params={'simple': True, 'per_page': 100, 'page': page} ) # print(rsp.url) projects_page = rsp.json() if len(projects_page) == 0: break else: _projects.extend(projects_page) page = page + 1 return _projects def search_web(_project_id: str, page=1): rsp = session.get( BASE_HOST + '/search?search={}&page={}&group_id={}&project_id={}'.format(local.search_word, page, group_id, _project_id), headers=HEADERS ) # print(threading.currentThread().getName() + ' url:' + unquote(rsp.url)) return {'text': rsp.text, 'link': rsp.url} def bar_charts(_data: list, search_word: str): # print('\n生成 {} 统计结果\n'.format(unquote(search_word))) bar = Bar(init_opts=opts.InitOpts(theme=ThemeType.DARK)) bar.width = '1800px' bar.height = '900px' bar.add_xaxis(['项目']) bar.set_global_opts( title_opts=opts.TitleOpts(title='关键字 {} 结果统计图'.format(unquote(search_word)), subtitle=''), legend_opts=opts.LegendOpts(orient='vertical')) for d in _data: bar.add_yaxis(d['项目'], [d['文件数']]) bar.render("html/{}.html".format(search_word)) def select_project_data(project: list, search_word: str): local.search_word = search_word _project_id = project['id'] _project_name = project['name'] _branch = project['default_branch'] _last_activity_at = datetime.strptime(project['last_activity_at'], '%Y-%m-%dT%H:%M:%S.%f+08:00') _last_activity_at = _last_activity_at.strftime('%Y-%m-%d %H:%M:%S') # 搜索结果 _data_vo = search_web(_project_id) soup = BeautifulSoup(_data_vo['text'], 'lxml') _link = unquote(_data_vo['link']) # 过滤不查询的项目 if _project_id in exclude_project: return # 计算总页数,默认每页20条 badge = soup.select_one('.nav-links .active .badge') if badge: find_count = int(badge.text.replace('\n', '')) if find_count == 0: return else: return pages = 1 if find_count % 20 == 0 else int(find_count / 20) + 1 # 输出到第几个文件 local.line = 1 local.print_project = False local.project_name = _project_name local.link = _link local.branch = _branch local.last_activity_at = _last_activity_at # 文件列表 _file_names = [] _file_names.extend(get_file_list(soup)) # 跳过第一次查询,查询下一页 for page in range(2, pages + 1): _data_vo = search_web(_project_id, page) soup = BeautifulSoup(_data_vo['text'], 'lxml') _file_names.extend(get_file_list(soup)) # 文件列表为空不返回 if _file_names: # 文件列表去重,然后按字符串长度升序 _files = sorted(list(set(_file_names)), key=lambda i: len(i)) _search_data = { '关键字': unquote(local.search_word), '项目': _project_name, '分支': _branch, '文件数': len(_files), '文件列表': _files, '最后活动时间': _last_activity_at, 'link': _link } return _search_data def get_file_list(soup): """ 获取代码段中的文件列表 """ _file_names = [] # 代码段 for d in soup.select('.search-results .blob-result'): blob = BeautifulSoup(str(d), 'lxml') _file_name = blob.select_one('.js-file-title').text.replace('\n', '') # 过滤测试类 if filter_Tests and 'src/test/' in _file_name: continue # 过滤排除的文件类型 _extension = _file_name[_file_name.rfind('.'):] if _extension in exclude_file: continue # 过滤排除的文件名称 if _file_name[_file_name.rfind('/') + 1:] in exclude_file_name: continue # 打印一次文件名 if not local.print_project: print('\n#### {}\n'.format(local.project_name)) print('- 项目名称:{}'.format(local.project_name)) print('- 默认分支:{}'.format(local.branch)) print('- 最后活跃时间:{}'.format(local.last_activity_at)) print('- link:{}\n'.format(local.link)) local.print_project = True print('\n({}) 文件名:{}\n'.format(local.line, _file_name)) code_snippet = '```{}{}```'.format(_extension[1:], blob.select_one('.blob-content').text.replace('\n\n', '\n')) code_snippet = get_code(code_snippet) print(code_snippet) _file_names.append(_file_name) local.line = local.line + 1 return _file_names def get_code(code_snippet: str): """ 格式化空格 """ # 计算空格开头数量的集合 blanks = [] for _line in code_snippet.split('\n'): if _line.startswith(' '): _i = 0 for s in _line: if s.strip() == '': _i = _i + 1 else: blanks.append(_i) break if not blanks: return code_snippet # 计算最小空格数量 min_no = min(blanks) # 从新生成字符串 return '\n'.join([i.replace(blank(min_no), '', 1) for i in code_snippet.split('\n')]) def search_task(projects: list, search_word: str): # 设置输出的文件及设置 sys.stdout = Logger(filename='logs/{}.md'.format(search_word)) print('\n### {} \n'.format(unquote(search_word))) # print('\n关键字 {} 开始查询...\n'.format(unquote(search_word))) # 创建线程池 all_task = [] pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='search-project-exec') for project in projects: all_task.append(pool.submit(select_project_data, project, search_word)) # 等待所有完成 wait(all_task, return_when=ALL_COMPLETED) print('\n\n关键字 {} 查询完成...'.format(unquote(search_word))) # 打印结果 _all_datas = list(filter(None, [i.result() for i in all_task])) if _all_datas: _result = sorted(_all_datas, key=lambda i: int(i['文件数'])) print('关键字 {} 匹配结果 {}'.format(unquote(search_word), ['{} ({})'.format(d['项目'], d['文件数']) for d in _result])) # 生成统计图 bar_charts(_result, search_word) return _result sys.stdout = default_log @time_me() def main(): projects = list_all_projects() print('项目总数 {}\n'.format(len(projects))) with open('keyword_list.txt') as f: word_list = f.readlines() # 过滤空行,转义斜线内容 word_list = list(filter(None, [i.replace('\n', '').replace('/', '%2F').replace('{', '%7B').replace('}', '%7D').strip() for i in word_list])) dtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S') word_len = len(word_list) print('{} 检索 {} 个关键字'.format(dtime, word_len)) all_result = [] for _i, _word in enumerate(word_list): # 加入线程池执行 print('\n第 {} 个关键词 {}'.format((_i + 1), unquote(_word))) all_result.append(search_task(projects, _word)) print('所有关键字检索完成, 打印最终结果\n') print('```json') for result in list(filter(None, all_result)): print('') for index, data in enumerate(result): print('序号 {}, {}'.format(str(index + 1), data)) print('```') # # 导出excel result_excel_init() for result in list(filter(None, all_result)): for data in result: # 文件长度升序,然后在拼接换行符 files = '\n'.join(data['文件列表']) sheet.append([data['关键字'], data['项目'], int(data['文件数']), str(files), data['分支'], data['最后活动时间'], data['link']]) result_excel_end() wb_result.save('excel/all-data-{}.xlsx'.format(datetime.now().strftime("%Y-%m-%d"))) print('\n执行导出结果完成\n') # 删除无结果的文件 for _word in word_list: _path = 'logs/{}.md'.format(_word) if os.path.exists(_path): with open(_path) as f: _txt_list = f.readlines() _has = False for _txt in [i.replace('\n', '').strip() for i in _txt_list]: if '文件名' in _txt: _has = True break if not _has: print('无结果文件删除\n', _path) os.remove(_path) if __name__ == '__main__': group_id = '551' # gitlab 组 max_workers = 20 # 最大线程数量 word_add_left_bracket = True # 在关键字后面追加左小括号'(' exclude_project = [2846] # 过滤项目不查询 exclude_file = ['.md', '.properties', '.css', '.sql', '.log', '.txt'] # 排除检查的文件类型 exclude_file_name = ['solrconfig.xml', 'schema.xml', 'elevate.xml', 'managed-schema', 'yarn.lock'] # 排除检查的文件 filter_Tests = True # 过滤测试类 default_log = Logger(filename='logs/{}.md'.format('all-data'), stream=sys.stdout) sys.stdout = default_log try: main() except Exception as e: print('查询异常:', e)
执行结果
- 日志省略,.md文件如下、报表如下,excel 如下
未经允许请勿转载:程序喵 » Python3 爬虫搜索 gitlab 项目