Python3 多线程批量拉取 Gitlab 项目代码

背景

可能你会遇到项目需要把你的项目组中所有的项目拉去到本地维护,如果项目不多一个一个手动的 git clone 还好,那如果项目有几百个呢?一个个 clone 本地可能就到下班时间了,第二天需要把这几百个项目在做 git pull 拉新代码,是不是很绝望?!

所以一个脚本辅助能帮到我们很多。

实现方式

第一步:现将组下所有项目输出到Excel文件中

Snipaste_2022-06-30_09-56-17.jpg

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import sys
import requests
import time
import functools
from requests_pkcs12 import Pkcs12Adapter
from openpyxl import Workbook
from openpyxl.styles import NamedStyle, Font, Alignment, PatternFill, colors

# 指定双向认证网站的pfx证书文件,并附带证书的密码
session = requests.session()
BASE_HOST = 'https://gitlab.co-xxx.com'
HEADERS = {'PRIVATE-TOKEN': 'xxxxxx'}  # gitlab私人token
session.mount(BASE_HOST, Pkcs12Adapter(pkcs12_filename='cert/client-xxxxxx.p12', pkcs12_password='xxxxxx'))       # 带有证书的方式访问公司内部项目

# 导出excel
_root_file_path = os.path.abspath(".")
_py_filename = os.path.basename(sys.argv[0]).split(".")[0]
wb_result = Workbook()
sheet = wb_result.active


def time_me(info='耗时'):
    def _time_me(fn):
        @functools.wraps(fn)
        def _wrapper(*args, **kwargs):
            start = time.perf_counter()
            fn(*args, **kwargs)
            print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒')
        return _wrapper
    return _time_me


def result_excel_init():
    highlight = NamedStyle(name="highlight")
    highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
    highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
    highlight.fill = PatternFill("solid", fgColor="ACB9C9")

    sheet.append(['关键字', '项目名称', '文件数', '文件列表', '分支', '最后活动时间', '链接'])

    sheet.row_dimensions[1].height = 25
    sheet.column_dimensions['A'].width = 30
    sheet.column_dimensions['B'].width = 35
    sheet.column_dimensions['C'].width = 10
    sheet.column_dimensions['D'].width = 200
    sheet.column_dimensions['E'].width = 10
    sheet.column_dimensions['F'].width = 25
    sheet.column_dimensions['G'].width = 120

    for cell in list(sheet.rows)[0]:
        cell.style = highlight


def result_excel_end():
    style = NamedStyle(name="style")
    v_center = Alignment(vertical='center', wrap_text=True)
    hv_center = Alignment(horizontal='center', vertical='center', wrap_text=True)
    hv_list = [1, 3, 5, 6]
    for i, row in enumerate(list(sheet.rows)):
        if i == 0:
            continue
        for cell in row:
            if cell.column in hv_list:
                style.alignment = hv_center
            else:
                style.alignment = v_center
            cell.style = style


def list_all_projects():
    _projects = []
    page = 1
    while True:
        rsp = session.get(
            BASE_HOST + '/api/v4/groups/' + group_id + '/projects',
            headers=HEADERS,
            params={'simple': True, 'per_page': 100, 'page': page}
        )
        projects_page = rsp.json()
        if len(projects_page) == 0:
            break
        else:
            _projects.extend(projects_page)
            page = page + 1
    return _projects


def result_excel_init():
    highlight = NamedStyle(name="highlight")
    highlight.font = Font(name='Arial', size=13, color=colors.BLACK, bold=True)
    highlight.alignment = Alignment(horizontal='center', vertical='center', wrap_text=True)
    highlight.fill = PatternFill("solid", fgColor="ACB9C9")

    sheet.append(['id', 'name', 'path_with_namespace', 'description', 'ssh_url_to_repo', 'default_branch', 'created_at', 'last_activity_at'])

    sheet.row_dimensions[1].height = 25
    sheet.column_dimensions['A'].width = 10
    sheet.column_dimensions['B'].width = 20
    sheet.column_dimensions['C'].width = 20
    sheet.column_dimensions['D'].width = 100
    sheet.column_dimensions['E'].width = 100
    sheet.column_dimensions['F'].width = 20
    sheet.column_dimensions['G'].width = 25
    sheet.column_dimensions['H'].width = 25

    for cell in list(sheet.rows)[0]:
        cell.style = highlight


@time_me()
def main():
    try:
        projects = list_all_projects()
        result_excel_init()
        print('项目总数 {}\n'.format(len(projects)))
        for project in projects:
            print(project)
            sheet.append([project['id'], project['name'], project['path_with_namespace'], project['description'], project['ssh_url_to_repo'], project['default_branch'], project['created_at'], project['last_activity_at']])
        wb_result.save('excel/projects-info-{}.xlsx'.format(group_id))
        print('\n执行导出结果完成\n')
    except Exception as e:
        print('查询异常:', e)


if __name__ == '__main__':
    group_id = '551'  # 项目组id
    main()

第二步:读取文件,批量拉取

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import os
import os.path
import sys
import requests
import time
import functools
import threading
import logging
from requests_pkcs12 import Pkcs12Adapter
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED

session = requests.session()
BASE_HOST = 'https://gitlab.co-xxxxxx.com'
HEADERS = {'PRIVATE-TOKEN': 'xxxxxxx'}
session.mount(BASE_HOST, Pkcs12Adapter(pkcs12_filename='cert/client-xxxxxx.p12', pkcs12_password='xxxxxx'))


def time_me(info='耗时'):
    def _time_me(fn):
        @functools.wraps(fn)
        def _wrapper(*args, **kwargs):
            start = time.perf_counter()
            fn(*args, **kwargs)
            print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒')
        return _wrapper
    return _time_me


def batch_clone(name: str, namespace: str, default_branch: str, ssh_pash: str):
    try:
        # 查看目录是否存在
        _path = base_code_path + "/" + name
        if not os.path.exists(_path):
            print('{} 开始拉取 [{}]'.format(threading.currentThread().getName(), namespace))
            command = 'git clone -b {} {}'.format(default_branch, ssh_pash)
            f = os.popen(command)
            # print(f.readline())
            print(f.read())
            print('{} 拉取 [{}]完毕'.format(threading.currentThread().getName(), namespace))
        else:
            pass
            # print('目录存在', _path)
        return namespace
    except Exception as e:
        print('\nclone 异常 {},{} \n'.format(namespace, ssh_pash), e)


@time_me()
def main():
    # 创建线程池
    all_task = []
    pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='git-clone-exec')

    # 多线程下载
    for i, row in enumerate(ws_projects.rows):
        if i == 0:
            continue
        if not row[0].value:
            break

        os.chdir(base_code_path)

        _pid = str(row[0].value).strip()
        _name = str(row[1].value).strip()
        _namespace = str(row[2].value).strip()
        _ssh_url = str(row[4].value).strip()
        _default_branch = str(row[5].value).strip()

        if int(_pid) not in excluded_project:
            time.sleep(0.05)
            all_task.append(pool.submit(batch_clone, _name, _namespace, _default_branch, _ssh_url))

    # 等待所有完成
    wait(all_task, return_when=ALL_COMPLETED)
    print("\n------所有项目拉取完毕--------")
    print([i.result() for i in all_task])


if __name__ == '__main__':
    # 加载文件
    wb = openpyxl.load_workbook(project_excel_file)
    ws_projects = wb['Sheet']

    # 操作目录
    base_code_path = '/Users/liurenkui/workSpace/Java/all-code'
    os.chdir(base_code_path)

    # 排除不下载的项目id
    excluded_project = []

    # 最大线程数量
    max_workers = 20

    # 执行主方法
    main()


未经允许请勿转载:程序喵 » Python3 多线程批量拉取 Gitlab 项目代码

点  赞 (1) 打  赏
分享到: