#!/usr/bin/env python3 """ PDF 批量处理脚本 支持多种处理器,配置文件驱动 """ import os import sys import argparse import subprocess import json import yaml from pathlib import Path from datetime import datetime from typing import List, Dict, Optional, Any from dataclasses import dataclass, field import logging from tqdm import tqdm import time # ============================================================================ # 数据类定义 # ============================================================================ @dataclass class ProcessorConfig: """处理器配置""" name: str script: str input_arg: str = "--input_file" output_arg: str = "--output_dir" extra_args: List[str] = field(default_factory=list) output_subdir: str = "results" # 新增:每个处理器独立的输出目录 description: str = "" @dataclass class ProcessResult: """处理结果""" pdf_file: str success: bool duration: float error_message: str = "" # ============================================================================ # 配置管理 # ============================================================================ class ConfigManager: """配置管理器""" DEFAULT_CONFIG = { 'processors': { 'paddleocr_vl_single_process': { 'script': 'paddleocr_vl_single_process.py', 'input_arg': '--input_file', 'output_arg': '--output_dir', 'extra_args': [ '--pipeline=./my_config/PaddleOCR-VL-Client.yaml', '--no-adapter' ], 'output_subdir': 'paddleocr_vl_results', 'description': 'PaddleOCR-VL 处理器' }, 'ppstructurev3_single_process': { 'script': 'ppstructurev3_single_process.py', 'input_arg': '--input_file', 'output_arg': '--output_dir', 'extra_args': [ '--pipeline=./my_config/PP-StructureV3.yaml' ], 'output_subdir': 'ppstructurev3_results', 'description': 'PP-StructureV3 处理器' }, 'ppstructurev3_single_client': { 'script': 'ppstructurev3_single_client.py', 'input_arg': '--input_file', 'output_arg': '--output_dir', 'extra_args': [ '--api_url=http://10.192.72.11:8111/layout-parsing', '--timeout=300' ], 'output_subdir': 'ppstructurev3_client_results', 'description': 'PP-StructureV3 HTTP API 客户端' } }, 'global': { 'base_dir': '/Users/zhch158/workspace/data/流水分析', 'output_subdir': 'results' } } def __init__(self, config_file: Optional[str] = None): self.config_file = config_file self.config = self._load_config() def _load_config(self) -> Dict: """加载配置文件""" if self.config_file and Path(self.config_file).exists(): with open(self.config_file, 'r', encoding='utf-8') as f: if self.config_file.endswith('.yaml') or self.config_file.endswith('.yml'): return yaml.safe_load(f) else: return json.load(f) return self.DEFAULT_CONFIG.copy() def get_processor_config(self, processor_name: str) -> ProcessorConfig: """获取处理器配置""" if processor_name not in self.config['processors']: raise ValueError(f"处理器 '{processor_name}' 不存在") proc_config = self.config['processors'][processor_name] return ProcessorConfig( name=processor_name, script=proc_config['script'], input_arg=proc_config.get('input_arg', '--input_file'), output_arg=proc_config.get('output_arg', '--output_dir'), extra_args=proc_config.get('extra_args', []), output_subdir=proc_config.get('output_subdir', processor_name + '_results'), description=proc_config.get('description', '') ) def get_global_config(self, key: str, default=None): """获取全局配置""" return self.config.get('global', {}).get(key, default) def list_processors(self) -> List[str]: """列出所有可用的处理器""" return list(self.config['processors'].keys()) # ============================================================================ # PDF 文件查找器 # ============================================================================ class PDFFileFinder: """PDF 文件查找器""" def __init__(self, base_dir: str): self.base_dir = Path(base_dir) def from_file_list(self, list_file: str) -> List[Path]: """从文件列表读取""" pdf_files = [] with open(list_file, 'r', encoding='utf-8') as f: for line in f: # 跳过空行和注释 line = line.strip() if not line or line.startswith('#'): continue # 构建完整路径 pdf_path = self._resolve_path(line) if pdf_path: pdf_files.append(pdf_path) return pdf_files def from_list(self, pdf_list: List[str]) -> List[Path]: """从列表读取""" pdf_files = [] for pdf in pdf_list: pdf_path = self._resolve_path(pdf.strip()) if pdf_path: pdf_files.append(pdf_path) return pdf_files def find_all(self) -> List[Path]: """查找基础目录下所有 PDF""" return sorted(self.base_dir.rglob('*.pdf')) def _resolve_path(self, path_str: str) -> Optional[Path]: """解析路径""" path = Path(path_str) # 绝对路径 if path.is_absolute(): return path if path.exists() else path # 返回路径,即使不存在 # 相对路径 # 1. 尝试完整相对路径 candidate1 = self.base_dir / path if candidate1.exists(): return candidate1 # 2. 尝试在同名子目录下查找 if '/' not in path_str: pdf_name = path.stem candidate2 = self.base_dir / pdf_name / path.name if candidate2.exists(): return candidate2 # 3. 使用 glob 搜索 matches = list(self.base_dir.rglob(path.name)) if matches: return matches[0] # 返回候选路径(即使不存在) return candidate1 # ============================================================================ # PDF 批处理器 # ============================================================================ class PDFBatchProcessor: """PDF 批处理器""" def __init__( self, processor_config: ProcessorConfig, output_subdir: Optional[str] = None, dry_run: bool = False ): self.processor_config = processor_config # 如果指定了output_subdir,使用指定的;否则使用处理器配置中的 self.output_subdir = output_subdir or processor_config.output_subdir self.dry_run = dry_run # 设置日志 self.logger = self._setup_logger() # 统计信息 self.results: List[ProcessResult] = [] def _setup_logger(self) -> logging.Logger: """设置日志""" logger = logging.getLogger('PDFBatchProcessor') logger.setLevel(logging.INFO) # 避免重复添加handler if not logger.handlers: # 控制台输出 console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_format = logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) console_handler.setFormatter(console_format) logger.addHandler(console_handler) return logger def process_files(self, pdf_files: List[Path]) -> Dict[str, Any]: """批量处理文件""" self.logger.info(f"开始处理 {len(pdf_files)} 个文件") self.logger.info(f"处理器: {self.processor_config.description}") self.logger.info(f"脚本: {self.processor_config.script}") self.logger.info(f"输出目录: {self.output_subdir}") start_time = time.time() # 使用进度条 with tqdm(total=len(pdf_files), desc="处理进度", unit="file") as pbar: for pdf_file in pdf_files: result = self._process_single_file(pdf_file) self.results.append(result) pbar.update(1) # 更新进度条描述 success_count = sum(1 for r in self.results if r.success) pbar.set_postfix({ 'success': success_count, 'failed': len(self.results) - success_count }) total_duration = time.time() - start_time # 生成统计信息 stats = self._generate_stats(total_duration) # 保存日志 self._save_log(stats) return stats def _process_single_file(self, pdf_file: Path) -> ProcessResult: """处理单个文件""" self.logger.info(f"处理: {pdf_file}") # 检查文件是否存在 if not pdf_file.exists(): self.logger.warning(f"跳过: 文件不存在 - {pdf_file}") return ProcessResult( pdf_file=str(pdf_file), success=False, duration=0, error_message="文件不存在" ) # 确定输出目录 output_dir = pdf_file.parent / pdf_file.stem / self.output_subdir # if not self.dry_run: # output_dir.mkdir(parents=True, exist_ok=True) # 构建命令 cmd = self._build_command(pdf_file, output_dir) self.logger.debug(f"执行命令: {' '.join(cmd)}") if self.dry_run: self.logger.info(f"[DRY RUN] 将执行: {' '.join(cmd)}") return ProcessResult( pdf_file=str(pdf_file), success=True, duration=0, error_message="" ) # 执行命令 start_time = time.time() try: result = subprocess.run( cmd, capture_output=True, text=True, check=True ) duration = time.time() - start_time self.logger.info(f"✓ 成功 (耗时: {duration:.2f}秒)") return ProcessResult( pdf_file=str(pdf_file), success=True, duration=duration, error_message="" ) except subprocess.CalledProcessError as e: duration = time.time() - start_time error_msg = e.stderr if e.stderr else str(e) self.logger.error(f"✗ 失败 (耗时: {duration:.2f}秒)") self.logger.error(f"错误信息: {error_msg}") return ProcessResult( pdf_file=str(pdf_file), success=False, duration=duration, error_message=error_msg ) def _build_command(self, pdf_file: Path, output_dir: Path) -> List[str]: """构建执行命令""" cmd = [ sys.executable, # 使用当前 Python 解释器 self.processor_config.script, self.processor_config.input_arg, str(pdf_file), self.processor_config.output_arg, str(output_dir) ] # 添加额外参数 cmd.extend(self.processor_config.extra_args) return cmd def _generate_stats(self, total_duration: float) -> Dict[str, Any]: """生成统计信息""" success_count = sum(1 for r in self.results if r.success) failed_count = len(self.results) - success_count failed_files = [r.pdf_file for r in self.results if not r.success] stats = { 'total': len(self.results), 'success': success_count, 'failed': failed_count, 'total_duration': total_duration, 'failed_files': failed_files, 'results': [ { 'file': r.pdf_file, 'success': r.success, 'duration': r.duration, 'error': r.error_message } for r in self.results ] } return stats def _save_log(self, stats: Dict[str, Any]): """保存日志""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') log_file = f"batch_process_{self.processor_config.name}_{timestamp}.log" with open(log_file, 'w', encoding='utf-8') as f: f.write("PDF 批量处理日志\n") f.write("=" * 80 + "\n\n") f.write(f"处理器: {self.processor_config.description}\n") f.write(f"处理器名称: {self.processor_config.name}\n") f.write(f"脚本: {self.processor_config.script}\n") f.write(f"输出目录: {self.output_subdir}\n") f.write(f"开始时间: {datetime.now()}\n") f.write(f"总耗时: {stats['total_duration']:.2f} 秒\n\n") f.write("统计信息:\n") f.write(f" 总文件数: {stats['total']}\n") f.write(f" 成功: {stats['success']}\n") f.write(f" 失败: {stats['failed']}\n\n") if stats['failed_files']: f.write("失败的文件:\n") for file in stats['failed_files']: f.write(f" - {file}\n") f.write("\n") f.write("详细结果:\n") for result in stats['results']: status = "✓" if result['success'] else "✗" f.write(f"{status} {result['file']} ({result['duration']:.2f}s)\n") if result['error']: f.write(f" 错误: {result['error']}\n") self.logger.info(f"日志已保存: {log_file}") # ============================================================================ # 命令行接口 # ============================================================================ def create_parser() -> argparse.ArgumentParser: """创建命令行参数解析器""" parser = argparse.ArgumentParser( description='PDF 批量处理工具', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例用法: 1. 使用配置文件中的处理器: python batch_process_pdf.py -p paddleocr_vl_single_process -f pdf_list.txt 2. 处理指定目录下所有 PDF: python batch_process_pdf.py -p ppstructurev3_single_client -d /path/to/pdfs 3. 手动指定脚本和参数: python batch_process_pdf.py \\ -s ppstructurev3_single_client.py \\ -d /path/to/pdfs \\ -f pdf_list.txt \\ -e "--api_url=http://localhost:8111 --timeout=600" 4. 列出所有可用的处理器: python batch_process_pdf.py --list-processors 5. 查看配置文件内容: python batch_process_pdf.py --show-config 6. 覆盖默认输出目录: python batch_process_pdf.py -p ppstructurev3_single_process -f pdf_list.txt -o custom_output """ ) # 处理器选择 parser.add_argument( '-p', '--processor', help='处理器名称 (如: paddleocr_vl_single_process, ppstructurev3_single_process, ppstructurev3_single_client)' ) # 配置文件 parser.add_argument( '-c', '--config', default='processor_configs.yaml', help='配置文件路径 (默认: processor_configs.yaml)' ) # 手动指定脚本 parser.add_argument( '-s', '--script', help='Python 脚本路径 (覆盖配置文件)' ) # 目录和文件 parser.add_argument( '-d', '--base-dir', help='PDF 文件基础目录' ) parser.add_argument( '-o', '--output-subdir', help='输出子目录名称 (覆盖处理器默认配置)' ) parser.add_argument( '-f', '--file-list', help='PDF 文件列表文件路径' ) parser.add_argument( '-l', '--pdf-list', nargs='+', help='PDF 文件列表 (空格分隔)' ) # 额外参数 parser.add_argument( '-e', '--extra-args', help='额外参数 (覆盖配置文件)' ) # 工具选项 parser.add_argument( '--list-processors', action='store_true', help='列出所有可用的处理器' ) parser.add_argument( '--show-config', action='store_true', help='显示配置文件内容' ) parser.add_argument( '--dry-run', action='store_true', help='模拟运行,不实际执行' ) parser.add_argument( '-v', '--verbose', action='store_true', help='详细输出' ) return parser def main(): """主函数""" parser = create_parser() args = parser.parse_args() # 设置日志级别 if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # 加载配置 config_manager = ConfigManager(args.config if Path(args.config).exists() else None) # 列出处理器 if args.list_processors: print("可用的处理器:") for name in config_manager.list_processors(): proc_config = config_manager.get_processor_config(name) print(f" • {name}") print(f" 描述: {proc_config.description}") print(f" 脚本: {proc_config.script}") print(f" 输出目录: {proc_config.output_subdir}") print() return 0 # 显示配置 if args.show_config: print(yaml.dump(config_manager.config, allow_unicode=True)) return 0 # 获取处理器配置 if args.processor: processor_config = config_manager.get_processor_config(args.processor) elif args.script: # 手动指定脚本 processor_config = ProcessorConfig( name='manual', script=args.script, extra_args=args.extra_args.split() if args.extra_args else [], output_subdir=args.output_subdir or 'manual_results' ) else: parser.error("必须指定 -p 或 -s 参数") # 覆盖额外参数 if args.extra_args and args.processor: processor_config.extra_args = args.extra_args.split() # 获取基础目录 base_dir = args.base_dir or config_manager.get_global_config('base_dir') if not base_dir: parser.error("必须指定 -d 参数或在配置文件中设置 base_dir") # 查找 PDF 文件 finder = PDFFileFinder(base_dir) if args.file_list: pdf_files = finder.from_file_list(args.file_list) elif args.pdf_list: pdf_files = finder.from_list(args.pdf_list) else: pdf_files = finder.find_all() if not pdf_files: print("❌ 未找到任何 PDF 文件") return 1 # print(f"\n找到 {len(pdf_files)} 个 PDF 文件") valid_file_paths = [f.as_posix() for f in pdf_files if f.exists()] if valid_file_paths: print("\n".join(valid_file_paths)) # 验证文件 valid_files = [f for f in pdf_files if f.exists()] invalid_files = [f for f in pdf_files if not f.exists()] if invalid_files: print(f"\n⚠️ 警告: {len(invalid_files)} 个文件不存在:") for f in invalid_files[:5]: # 只显示前5个 print(f" - {f}") if len(invalid_files) > 5: print(f" ... 还有 {len(invalid_files) - 5} 个") # 确认执行 if not args.dry_run and valid_files: confirm = input(f"\n是否继续处理 {len(valid_files)} 个文件? [Y/n]: ") if confirm.lower() not in ['', 'y', 'yes']: print("已取消") return 0 # 批量处理 processor = PDFBatchProcessor( processor_config=processor_config, output_subdir=args.output_subdir, # 传递命令行指定的输出目录 dry_run=args.dry_run ) stats = processor.process_files(valid_files) # 显示统计信息 print("\n" + "=" * 80) print("处理完成") print("=" * 80) print(f"\n📊 统计信息:") print(f" 处理器: {processor_config.description}") print(f" 输出目录: {processor.output_subdir}") print(f" 总文件数: {stats['total']}") print(f" ✓ 成功: {stats['success']}") print(f" ✗ 失败: {stats['failed']}") print(f" ⏱️ 总耗时: {stats['total_duration']:.2f} 秒") if stats['failed_files']: print(f"\n失败的文件:") for file in stats['failed_files']: print(f" ✗ {file}") return 0 if stats['failed'] == 0 else 1 if __name__ == '__main__': sys.exit(main())