Python3 爬虫搜索 gitlab 项目

背景

在项目需要重构时,需要检索公司中的项目的影响范围,比如项目需要摘除 pom.xml 中依赖 old-order-api 的依赖包,检索出所有项目那里用到了这个包;再比如之前公司项目的搜索引擎有大量使用 solr 的实现,现在需要重构把所有 solr 的地方,统统修改为 elasticsearch,这也需要检索所有项目。

实现方案

Snipaste_2022-08-14_16-25-46.jpg

方案1:通过 gitlab 在线一个个项目检索,缺点很明显,速度慢,不可复用

方案2:通过脚本爬虫方式来检索,缺点,自己写程序;优点,可复用,可扩展,可以把结果导出报表;注意点,项目爬取速度慢可以采用多线程进行优化

方案3:通过脚本爬虫方式把所有项目 clone 到本地,写脚本方式结合 grep 正则命令来读取所有文件内容,缺点,占内存,维护更新分支代码,优点,本地检索速度极快,查询内容较多时推荐。

代码实现

这里采用方案2,这里扩展把结果进行报表导出

(1)单线程检索

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import logging
import requests
from datetime import datetime
from bs4 import BeautifulSoup
from requests_pkcs12 import Pkcs12Adapter
from pyecharts.charts import Bar
from pyecharts import options as opts
from pyecharts.globals import ThemeType


__author__ = 'TingFeng'
__date__ = '2021/12/13'


'''
组分页查询:https://gitlab.co-xxx.com/api/v4/groups/551/projects?simple=True&per_page=100&page=3
项目列表:https://gitlab.co-xxx.com/api/v4/groups/551/projects.json?search=&per_page=50
页面检索:https://gitlab.co-xxx.com/search?utf8=%E2%9C%93&snippets=&scope=&search=elasticsearch&group_id=551&project_id=3492
'''

# 指定双向认证网站的pfx证书文件,并附带证书的密码,公司gitlab证书定期维护
session = requests.session()
BASE_HOST = 'https://gitlab.co-xxx.com'
HEADERS = {'PRIVATE-TOKEN': 'xxxxxxx'}
session.mount(BASE_HOST, Pkcs12Adapter(pkcs12_filename='cert/client-20220719.p12', pkcs12_password='xxxxx'))

# 读取那个分组的所有项目
group_id = '551'
# 检索词
search_word = 'elasticsearch'

# 日志文件
_root_file_path = os.path.abspath(".")
_py_filename = os.path.basename(sys.argv[0]).split(".")[0]
_log_file = '{}/logs/{}-{}.log'.format(_root_file_path, _py_filename, search_word)
_html_file = '{}/html/{}-{}.html'.format(_root_file_path, _py_filename, search_word)
logging.basicConfig(filename=_log_file, filemode='w', format='%(message)s', level=logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel('INFO')
logging.getLogger().addHandler(console_handler)


def blank(num: int):
    return ''.rjust(num)


def list_all_projects():
    _projects = []
    page = 1
    while True:
        rsp = session.get(
            BASE_HOST + '/api/v4/groups/' + group_id + '/projects',
            headers=HEADERS,
            params={'simple': True, 'per_page': 100, 'page': page}
        )
        print(rsp.url)
        projects_page = rsp.json()
        if len(projects_page) == 0:
            break
        else:
            _projects.extend(projects_page)
            page = page + 1
    return _projects


def search_web(_project_id):
    rsp = session.get(
        BASE_HOST + '/search?utf8=%E2%9C%93&snippets=&scope=&search={}&group_id={}&project_id={}'.format(search_word, group_id, _project_id),
        headers=HEADERS
    )
    # print('search-web:' + rsp.url)
    return {'text': rsp.text, 'web_url': rsp.url}


def bar_charts(_data: list):
    bar = Bar(init_opts=opts.InitOpts(theme=ThemeType.DARK))
    bar.width = '1800px'
    bar.height = '900px'
    bar.add_xaxis(['项目'])
    bar.set_global_opts(
        title_opts=opts.TitleOpts(title="关键字 {} 结果统计图".format(search_word), subtitle=""),
        legend_opts=opts.LegendOpts(orient='vertical'))
    for d in _data:
        bar.add_yaxis(d['项目'], [d['找到数']])
    bar.render(_html_file)


def search_task(projects: list):
    _all_datas = []

    for project in projects:
        _project_id = project['id']

        _last_activity_at = datetime.strptime(project['last_activity_at'], "%Y-%m-%dT%H:%M:%S.%f+08:00")
        _last_activity_at = _last_activity_at.strftime('%Y-%m-%d %H:%M:%S')

        _search_data = {'项目': project['name'], '分支': project['default_branch'], '最后活动时间': _last_activity_at}

        # 搜索结果
        _data_vo = search_web(_project_id)
        soup = BeautifulSoup(_data_vo['text'], 'lxml')
        for body in soup.select('#content-body .row-content-block'):
            logging.info('\n#################################################################################################')
            logging.info(blank(2) + body.text.replace('\n', '\t'))
            logging.info('#################################################################################################\n')

            _search_data['web_url'] = _data_vo['web_url']
            _search_data['找到数'] = soup.select_one('.nav-links .active span').text.replace("\n", "")

            _file_names = []
            for index, result in enumerate(soup.select('.search-results .blob-result')):
                logging.info('----' + str(index + 1) + '----')
                blob = BeautifulSoup(str(result), 'lxml')
                _file_name = blob.select_one('.js-file-title').text.replace('\n', '')
                _file_names.append(_file_name)
                logging.info('文件名:{}'.format(_file_name))
                logging.info(blob.select_one('.blob-content').text.replace('\n\n', '\n'))
            _search_data['文件列表'] = _file_names

            _all_datas.append(_search_data)
    return _all_datas


if __name__ == '__main__':
    projects = list_all_projects()
    logging.info('项目总数 {} 检索关键字 {} 请等待....'.format(len(projects), search_word))

    all_datas = search_task(projects)
    logging.info('\n检索完成, 打印最终结果\n')

    sort_data_list = sorted(all_datas, key=lambda i: int(i["找到数"]))
    for index, data in enumerate(sort_data_list):
        logging.info('序号 {}, {}'.format(str(index + 1), data))

    logging.info('\n生成统计结果\n')
    bar_charts(sort_data_list)

执行结果

Snipaste_2022-08-14_16-15-22.jpg

Snipaste_2022-08-14_16-35-52.jpg

(2)多线程检索

在以上单线程代码中改造,增加批量检索功能和导出图表功能、增加日志导出 .md 文件(导出pdf),增加导出 excel 功能

创建 keyword_list.txt,列出需要查询的多个词

Snipaste_2022-08-14_16-42-06.jpg

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import requests
import time
import functools
import threading
from datetime import datetime
from bs4 import BeautifulSoup
from requests_pkcs12 import Pkcs12Adapter
from urllib.parse import unquote
from pyecharts.charts import Bar
from pyecharts import options as opts
from pyecharts.globals import ThemeType
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors


__author__ = 'TingFeng'
__date__ = '2021/12/13'


'''
组分页查询:https://gitlab.co-xxx.com/api/v4/groups/551/projects?simple=True&per_page=100&page=3
项目列表:https://gitlab.co-xxx.com/api/v4/groups/551/projects.json?search=&per_page=50
页面检索:https://gitlab.co-xxx.com/search?utf8=%E2%9C%93&snippets=&scope=&search=elasticsearch&group_id=551&project_id=3492
python 中关于 sys.stdout 的一些用法:https://zhuanlan.zhihu.com/p/377418978
python信号量与线程池: https://blog.csdn.net/qq_41854146/article/details/107512424
'''

# 指定双向认证网站的pfx证书文件,并附带证书的密码,公司gitlab证书定期维护
session = requests.session()
BASE_HOST = 'https://gitlab.co-xxx.com'
HEADERS = {'PRIVATE-TOKEN': 'xxxxxxxxxx'}
session.mount(BASE_HOST, Pkcs12Adapter(pkcs12_filename='cert/client-20220719.p12', pkcs12_password='xxxxxx'))

# 导出excel
_root_file_path = os.path.abspath(".")
_py_filename = os.path.basename(sys.argv[0]).split(".")[0]
wb_result = Workbook()
sheet = wb_result.active

local = threading.local()


def time_me(info='耗时'):
    def _time_me(fn):
        @functools.wraps(fn)
        def _wrapper(*args, **kwargs):
            start = time.perf_counter()
            fn(*args, **kwargs)
            print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒')

        return _wrapper

    return _time_me


class Logger(object):
    def __init__(self, filename='default.md', stream=sys.stdout):
        self.terminal = stream
        self.log = open(filename, 'w+')

    def write(self, message):
        self.terminal.write(message)
        self.log.write(message)

    def flush(self):
        self.terminal.flush()
        self.log.flush()
        pass


def blank(num: int):
    return ''.rjust(num)


def result_excel_init():
    highlight = NamedStyle(name="highlight")
    highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
    highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
    highlight.fill = PatternFill("solid", fgColor="ACB9C9")

    sheet.append(['关键字', '项目名称', '文件数', '文件列表', '分支', '最后活动时间', '链接'])

    sheet.row_dimensions[1].height = 25
    sheet.column_dimensions['A'].width = 30
    sheet.column_dimensions['B'].width = 35
    sheet.column_dimensions['C'].width = 10
    sheet.column_dimensions['D'].width = 200
    sheet.column_dimensions['E'].width = 10
    sheet.column_dimensions['F'].width = 25
    sheet.column_dimensions['G'].width = 120

    for cell in list(sheet.rows)[0]:
        cell.style = highlight


def result_excel_end():
    style = NamedStyle(name="style")
    v_center = Alignment(vertical='center', wrap_text=True)
    hv_center = Alignment(horizontal='center', vertical='center', wrap_text=True)
    hv_list = [1, 3, 5, 6]
    for i, row in enumerate(list(sheet.rows)):
        if i == 0:
            continue
        for cell in row:
            if cell.column in hv_list:
                style.alignment = hv_center
            else:
                style.alignment = v_center
            cell.style = style


def list_all_projects():
    _projects = []
    page = 1
    while True:
        rsp = session.get(
            BASE_HOST + '/api/v4/groups/' + group_id + '/projects',
            headers=HEADERS,
            params={'simple': True, 'per_page': 100, 'page': page}
        )
        # print(rsp.url)

        projects_page = rsp.json()
        if len(projects_page) == 0:
            break
        else:
            _projects.extend(projects_page)
            page = page + 1
    return _projects


def search_web(_project_id: str, page=1):
    rsp = session.get(
        BASE_HOST + '/search?search={}&page={}&group_id={}&project_id={}'.format(local.search_word, page, group_id, _project_id),
        headers=HEADERS
    )
    # print(threading.currentThread().getName() + ' url:' + unquote(rsp.url))
    return {'text': rsp.text, 'link': rsp.url}


def bar_charts(_data: list, search_word: str):
    # print('\n生成 {} 统计结果\n'.format(unquote(search_word)))
    bar = Bar(init_opts=opts.InitOpts(theme=ThemeType.DARK))
    bar.width = '1800px'
    bar.height = '900px'
    bar.add_xaxis(['项目'])
    bar.set_global_opts(
        title_opts=opts.TitleOpts(title='关键字 {} 结果统计图'.format(unquote(search_word)), subtitle=''),
        legend_opts=opts.LegendOpts(orient='vertical'))
    for d in _data:
        bar.add_yaxis(d['项目'], [d['文件数']])
    bar.render("html/{}.html".format(search_word))


def select_project_data(project: list, search_word: str):
    local.search_word = search_word

    _project_id = project['id']
    _project_name = project['name']
    _branch = project['default_branch']
    _last_activity_at = datetime.strptime(project['last_activity_at'], '%Y-%m-%dT%H:%M:%S.%f+08:00')
    _last_activity_at = _last_activity_at.strftime('%Y-%m-%d %H:%M:%S')

    # 搜索结果
    _data_vo = search_web(_project_id)
    soup = BeautifulSoup(_data_vo['text'], 'lxml')
    _link = unquote(_data_vo['link'])

    # 过滤不查询的项目
    if _project_id in exclude_project:
        return

    # 计算总页数,默认每页20条
    badge = soup.select_one('.nav-links .active .badge')
    if badge:
        find_count = int(badge.text.replace('\n', ''))
        if find_count == 0:
            return
    else:
        return
    pages = 1 if find_count % 20 == 0 else int(find_count / 20) + 1

    # 输出到第几个文件
    local.line = 1
    local.print_project = False
    local.project_name = _project_name
    local.link = _link
    local.branch = _branch
    local.last_activity_at = _last_activity_at

    # 文件列表
    _file_names = []
    _file_names.extend(get_file_list(soup))

    # 跳过第一次查询,查询下一页
    for page in range(2, pages + 1):
        _data_vo = search_web(_project_id, page)
        soup = BeautifulSoup(_data_vo['text'], 'lxml')
        _file_names.extend(get_file_list(soup))

    # 文件列表为空不返回
    if _file_names:
        # 文件列表去重,然后按字符串长度升序
        _files = sorted(list(set(_file_names)), key=lambda i: len(i))
        _search_data = {
            '关键字': unquote(local.search_word),
            '项目': _project_name,
            '分支': _branch,
            '文件数': len(_files),
            '文件列表': _files,
            '最后活动时间': _last_activity_at,
            'link': _link
        }
        return _search_data


def get_file_list(soup):
    """
    获取代码段中的文件列表
    """
    _file_names = []

    # 代码段
    for d in soup.select('.search-results .blob-result'):
        blob = BeautifulSoup(str(d), 'lxml')
        _file_name = blob.select_one('.js-file-title').text.replace('\n', '')

        # 过滤测试类
        if filter_Tests and 'src/test/' in _file_name:
            continue
        # 过滤排除的文件类型
        _extension = _file_name[_file_name.rfind('.'):]
        if _extension in exclude_file:
            continue
        # 过滤排除的文件名称
        if _file_name[_file_name.rfind('/') + 1:] in exclude_file_name:
            continue

        # 打印一次文件名
        if not local.print_project:
            print('\n#### {}\n'.format(local.project_name))
            print('- 项目名称:{}'.format(local.project_name))
            print('- 默认分支:{}'.format(local.branch))
            print('- 最后活跃时间:{}'.format(local.last_activity_at))
            print('- link:{}\n'.format(local.link))
            local.print_project = True

        print('\n({}) 文件名:{}\n'.format(local.line, _file_name))
        code_snippet = '```{}{}```'.format(_extension[1:], blob.select_one('.blob-content').text.replace('\n\n', '\n'))
        code_snippet = get_code(code_snippet)
        print(code_snippet)
        _file_names.append(_file_name)
        local.line = local.line + 1

    return _file_names


def get_code(code_snippet: str):
    """
    格式化空格
    """
    # 计算空格开头数量的集合
    blanks = []
    for _line in code_snippet.split('\n'):
        if _line.startswith(' '):
            _i = 0
            for s in _line:
                if s.strip() == '':
                    _i = _i + 1
                else:
                    blanks.append(_i)
                    break

    if not blanks:
        return code_snippet

    # 计算最小空格数量
    min_no = min(blanks)
    # 从新生成字符串
    return '\n'.join([i.replace(blank(min_no), '', 1) for i in code_snippet.split('\n')])


def search_task(projects: list, search_word: str):
    # 设置输出的文件及设置
    sys.stdout = Logger(filename='logs/{}.md'.format(search_word))
    print('\n### {} \n'.format(unquote(search_word)))
    # print('\n关键字 {} 开始查询...\n'.format(unquote(search_word)))

    # 创建线程池
    all_task = []
    pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='search-project-exec')
    for project in projects:
        all_task.append(pool.submit(select_project_data, project, search_word))

    # 等待所有完成
    wait(all_task, return_when=ALL_COMPLETED)
    print('\n\n关键字 {} 查询完成...'.format(unquote(search_word)))

    # 打印结果
    _all_datas = list(filter(None, [i.result() for i in all_task]))
    if _all_datas:
        _result = sorted(_all_datas, key=lambda i: int(i['文件数']))
        print('关键字 {} 匹配结果 {}'.format(unquote(search_word), ['{} ({})'.format(d['项目'], d['文件数']) for d in _result]))

        # 生成统计图
        bar_charts(_result, search_word)
        return _result

    sys.stdout = default_log


@time_me()
def main():
    projects = list_all_projects()
    print('项目总数 {}\n'.format(len(projects)))

    with open('keyword_list.txt') as f:
        word_list = f.readlines()

    # 过滤空行,转义斜线内容
    word_list = list(filter(None, [i.replace('\n', '').replace('/', '%2F').replace('{', '%7B').replace('}', '%7D').strip() for i in word_list]))
    dtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    word_len = len(word_list)
    print('{} 检索 {} 个关键字'.format(dtime, word_len))

    all_result = []
    for _i, _word in enumerate(word_list):
        # 加入线程池执行
        print('\n第 {} 个关键词 {}'.format((_i + 1), unquote(_word)))
        all_result.append(search_task(projects, _word))

    print('所有关键字检索完成, 打印最终结果\n')

    print('```json')
    for result in list(filter(None, all_result)):
        print('')
        for index, data in enumerate(result):
            print('序号 {}, {}'.format(str(index + 1), data))
    print('```')

    # # 导出excel
    result_excel_init()
    for result in list(filter(None, all_result)):
        for data in result:
            # 文件长度升序,然后在拼接换行符
            files = '\n'.join(data['文件列表'])
            sheet.append([data['关键字'], data['项目'], int(data['文件数']), str(files), data['分支'], data['最后活动时间'], data['link']])
    result_excel_end()

    wb_result.save('excel/all-data-{}.xlsx'.format(datetime.now().strftime("%Y-%m-%d")))
    print('\n执行导出结果完成\n')

    # 删除无结果的文件
    for _word in word_list:
        _path = 'logs/{}.md'.format(_word)
        if os.path.exists(_path):
            with open(_path) as f:
                _txt_list = f.readlines()
            _has = False
            for _txt in [i.replace('\n', '').strip() for i in _txt_list]:
                if '文件名' in _txt:
                    _has = True
                    break
            if not _has:
                print('无结果文件删除\n', _path)
                os.remove(_path)


if __name__ == '__main__':
    group_id = '551'  # gitlab 组
    max_workers = 20  # 最大线程数量
    word_add_left_bracket = True  # 在关键字后面追加左小括号'('
    exclude_project = [2846]  # 过滤项目不查询
    exclude_file = ['.md', '.properties', '.css', '.sql', '.log', '.txt']  # 排除检查的文件类型
    exclude_file_name = ['solrconfig.xml', 'schema.xml', 'elevate.xml', 'managed-schema', 'yarn.lock']  # 排除检查的文件
    filter_Tests = True  # 过滤测试类
    default_log = Logger(filename='logs/{}.md'.format('all-data'), stream=sys.stdout)
    sys.stdout = default_log

    try:
        main()
    except Exception as e:
        print('查询异常:', e)

执行结果

- 日志省略,.md文件如下、报表如下,excel 如下

Snipaste_2022-08-14_16-50-50.jpg

Snipaste_2022-08-14_16-44-45.jpg

Snipaste_2022-08-14_16-54-43.jpg







未经允许请勿转载:程序喵 » Python3 爬虫搜索 gitlab 项目

点  赞 (1) 打  赏
分享到: