一、背景
作为 java 工程师,少不了配置 maven 依赖,有依赖必然会在本地配置仓库,默认的目录地址为.m2/repository
, 这本身没有什么,但随着项目的逐步增多,版本的逐步升级迭代(每周稳步迭代将会产生52个版本的 jar 包),仓库的目录占空空间将无限膨胀,占用空间甚至能达到 10G 以上,而往往很多的依赖版本已经废弃很久,未来也可能用不上,设想一下你的项目都统一用 spring-core 5.x 版本了,你的仓库里还存了不少 spring-core 4.x/3.x/2.x 版本的jar包...。另一角度讲对于一台 250G 的 mac 电脑来说也算是磁盘危机了。
二、优化方案
1、实现方式:用两种方式来实现
对于外面依赖,用目录的创建时间来判断,比如删除掉一年以前创建的目录;
对于公司自己项目,已知版本号,用目录上的版本号做限制,小于这个版本的数据直接删除目录;
2、语言选择:使用 python 脚本,代码简洁易懂
迭代指定的目录路径,遇到版本号的目录停止迭代,判断时间/判断版本号
符合条件,执行删除
三、代码实现
基于版本号删除
递归文件路径
符合版本号的条件,加入 list 集合
迭代 list 集合,放入多线程删除
同时删除 maven-metadata-local.xml 文件,此文件本地打包时会自动创建
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import os import sys import time import functools import threading import logging from os.path import join, getsize from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED __author__ = 'TingFeng' __date__ = '2022/08/13' _root_file_path = os.path.abspath(".") _py_filename = os.path.basename(sys.argv[0]).split(".")[0] _log_file = '{}/logs/{}.log'.format(_root_file_path, _py_filename) logging.basicConfig(filename=_log_file, filemode='w', format='%(message)s', level=logging.INFO) console_handler = logging.StreamHandler() console_handler.setLevel('INFO') logging.getLogger().addHandler(console_handler) # 所有要删除的目录路径、文件路径 paths = [] file_paths = [] def compare_version(v1=None, v2=None, split_flag="."): if (v1 is None) or (v1 == "") or (v2 is None) or (v2 == ""): if ((v1 is None) or (v1 == "")) and (v2 is not None) and (v2 != ""): return 2 if ((v2 is None) or (v2 == "")) and (v1 is not None) and (v1 != ""): return 1 if v1 == v2: return 0 try: current_section_v1 = v1[:v1.index(split_flag)] except Exception as e: current_section_v1 = v1 try: current_section_v2 = v2[:v2.index(split_flag)] except Exception as e: current_section_v2 = v2 if int(current_section_v1) > int(current_section_v2): return 1 elif int(current_section_v1) < int(current_section_v2): return 2 try: other_section_v1 = v1[v1.index(split_flag) + 1:] except Exception as e: other_section_v1 = "" try: other_section_v2 = v2[v2.index(split_flag) + 1:] except Exception as e: other_section_v2 = "" return compare_version(other_section_v1, other_section_v2) def get_dir_size(path): total_size = 0 doc_list = os.listdir(path) for doc in doc_list: if os.path.isfile(os.path.join(path, doc)): total_size = total_size + os.path.getsize(os.path.join(path, doc)) else: total_size = total_size + getsize(os.path.join(path, doc)) return total_size def get_dir(path): file_list = os.listdir(path) try: for tmp in file_list: path_tmp = os.path.join(path, tmp) if os.path.isdir(path_tmp): if not path_tmp.endswith(dir_suffix): get_dir(path_tmp) else: # paths.append(path_tmp) # 比较版本 version = path_tmp.split('/')[-1].replace(dir_suffix, '') flag = compare_version(version, minVersion) # print('{} version={} minVersion={} flag={}'.format(path_tmp, version, minVersion, flag)) if flag == 2: paths.append(path_tmp) elif path_tmp[path_tmp.rfind('/') + 1:] == del_file: file_paths.append(path_tmp) except PermissionError: pass def rm_target(_path: str): try: command = 'rm -rf {}'.format(_path) f = os.popen(command) print(f.readline()) logging.info('{} [{}] 完毕'.format(threading.currentThread().getName(), command)) except Exception as e: logging.error('执行异常,{}'.format(_path), e) def time_me(info='耗时'): def _time_me(fn): @functools.wraps(fn) def _wrapper(*args, **kwargs): start = time.perf_counter() fn(*args, **kwargs) print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒') return _wrapper return _time_me @time_me() def main(): all_task = [] pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='code-read-exec') # 获取目录 get_dir(m2_path) # 清理目录 sum_size = 0 for p in paths: size = get_dir_size(p) sum_size += size logging.info('{} {}'.format(p, size)) all_task.append(pool.submit(rm_target, p)) # 清理文件 for p in file_paths: logging.info(p) all_task.append(pool.submit(rm_target, p)) # 等待所有完成 wait(all_task, return_when=ALL_COMPLETED) logging.info("\n------所有项目处理完毕--------") memory = sum_size / 1000 / 1000 logging.info('共 {} 个项目, 约占用磁盘大小 {} {}'.format(len(paths), memory, "GB" if len(str(memory).split('.')[0]) > 4 else "MB")) if __name__ == '__main__': m2_path = '/Users/liurenkui/.m2/repository/com/tingfeng' # 最大线程数量 max_workers = 20 # 最低版本限制,删除指定文件 minVersion = '4.89' dir_suffix = '-SNAPSHOT' del_file = 'maven-metadata-local.xml' main()
基于时间删除
#!/usr/bin/python3 # -*- coding: UTF-8 -*- import os import sys import time import functools import threading import logging import re from os.path import join, getsize from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED __author__ = 'TingFeng' __date__ = '2022/08/13' _root_file_path = os.path.abspath(".") _py_filename = os.path.basename(sys.argv[0]).split(".")[0] _log_file = '{}/logs/{}.log'.format(_root_file_path, _py_filename) logging.basicConfig(filename=_log_file, filemode='w', format='%(message)s', level=logging.INFO) console_handler = logging.StreamHandler() console_handler.setLevel('INFO') logging.getLogger().addHandler(console_handler) # 所有要删除的目录路径 paths = [] def get_dir_size(path): total_size = 0 doc_list = os.listdir(path) for doc in doc_list: if os.path.isfile(os.path.join(path, doc)): total_size = total_size + os.path.getsize(os.path.join(path, doc)) else: total_size = total_size + getsize(os.path.join(path, doc)) return total_size def get_dir(path): file_list = os.listdir(path) try: for tmp in file_list: path_tmp = os.path.join(path, tmp) if os.path.isdir(path_tmp): if get_version(path_tmp[path_tmp.rfind('/') + 1:]): ctime = get_ctime(path_tmp) if min_time_clean > ctime: # print(path_tmp, '|', ctime) paths.append(path_tmp) else: get_dir(path_tmp) except PermissionError: pass def rm_target(_path: str): try: if move_to_trash: pass else: command = 'rm -rf {}'.format(_path) f = os.popen(command) print(f.readline()) logging.info('{} [{}] 完毕'.format(threading.currentThread().getName(), command)) except Exception as e: logging.error('执行异常,{}'.format(_path), e) def time_me(info='耗时'): def _time_me(fn): @functools.wraps(fn) def _wrapper(*args, **kwargs): start = time.perf_counter() fn(*args, **kwargs) print('{} {} {}'.format(fn.__name__, info, int(time.perf_counter() - start)), ' 秒') return _wrapper return _time_me def get_version(str): """获取版本""" r = re.search(r'([\d.]+)', str) if r and r.group(1) != '.': return r.group(1) def get_ctime(str): return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(os.stat(str).st_ctime)) @time_me() def main(): all_task = [] pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='code-read-exec') # 获取目录 get_dir(m2_path) # 清理目录 sum_size = 0 for p in paths: size = get_dir_size(p) sum_size += size logging.info('{} | {}'.format(p, size)) all_task.append(pool.submit(rm_target, p)) # # 等待所有完成 wait(all_task, return_when=ALL_COMPLETED) logging.info("\n------所有项目处理完毕--------") memory = sum_size / 1000 / 1000 logging.info('共 {} 个项目, 约占用磁盘大小 {} {}'.format(len(paths), memory, "GB" if len(str(memory).split('.')[0]) > 4 else "MB")) if __name__ == '__main__': m2_path = '/Users/liurenkui/.m2/repository' # 最大线程数量 max_workers = 20 # 最低版本限制,删除指定文件 min_time_clean = '2022-01-31' # 是否经过回收站,不经过回收站会直接删除,暂未实现 send2trash 模块有异常 move_to_trash = False main()
四、风险点说明
以上实现方案中,会直接进行 rm -f 删除,不经过回收站,所以删除时请确认!!!
python 中有提供了 send2trash 模块,放入回收站功能,但本人在macos 系统中 python 3.7 和 python 3.8 测试,均有 No module named 'Foundation' 异常,有朋友解决还请告知方案,谢谢
附异常issues
https://github.com/arsenetar/send2trash/issues/24
五、优化体验
由原本的 10G+ 成功搜身到 3G+,心情舒畅
未经允许请勿转载:程序喵 » Python3 清理 .m2/repository 目录中废弃的 maven 依赖包