Jelajahi Sumber

feat: 添加 PDF 批量处理脚本,支持多种处理器和配置文件驱动

zhch158_admin 2 minggu lalu
induk
melakukan
83ad1265f4
1 mengubah file dengan 657 tambahan dan 0 penghapusan
  1. 657 0
      zhch/batch_process_pdf.py

+ 657 - 0
zhch/batch_process_pdf.py

@@ -0,0 +1,657 @@
+#!/usr/bin/env python3
+"""
+PDF 批量处理脚本
+支持多种处理器,配置文件驱动
+"""
+
+import os
+import sys
+import argparse
+import subprocess
+import json
+import yaml
+from pathlib import Path
+from datetime import datetime
+from typing import List, Dict, Optional, Any
+from dataclasses import dataclass, field
+import logging
+from tqdm import tqdm
+import time
+
+# ============================================================================
+# 数据类定义
+# ============================================================================
+
+@dataclass
+class ProcessorConfig:
+    """处理器配置"""
+    name: str
+    script: str
+    input_arg: str = "--input_file"
+    output_arg: str = "--output_dir"
+    extra_args: List[str] = field(default_factory=list)
+    output_subdir: str = "results"  # 新增:每个处理器独立的输出目录
+    description: str = ""
+
+
+@dataclass
+class ProcessResult:
+    """处理结果"""
+    pdf_file: str
+    success: bool
+    duration: float
+    error_message: str = ""
+
+
+# ============================================================================
+# 配置管理
+# ============================================================================
+
+class ConfigManager:
+    """配置管理器"""
+    
+    DEFAULT_CONFIG = {
+        'processors': {
+            'paddleocr_vl_single_process': {
+                'script': 'paddleocr_vl_single_process.py',
+                'input_arg': '--input_file',
+                'output_arg': '--output_dir',
+                'extra_args': [
+                    '--pipeline=./my_config/PaddleOCR-VL-Client.yaml',
+                    '--no-adapter'
+                ],
+                'output_subdir': 'paddleocr_vl_results',
+                'description': 'PaddleOCR-VL 处理器'
+            },
+            'ppstructurev3_single_process': {
+                'script': 'ppstructurev3_single_process.py',
+                'input_arg': '--input_file',
+                'output_arg': '--output_dir',
+                'extra_args': [
+                    '--pipeline=./my_config/PP-StructureV3.yaml'
+                ],
+                'output_subdir': 'ppstructurev3_results',
+                'description': 'PP-StructureV3 处理器'
+            },
+            'ppstructurev3_single_client': {
+                'script': 'ppstructurev3_single_client.py',
+                'input_arg': '--input_file',
+                'output_arg': '--output_dir',
+                'extra_args': [
+                    '--api_url=http://10.192.72.11:8111/layout-parsing',
+                    '--timeout=300'
+                ],
+                'output_subdir': 'ppstructurev3_client_results',
+                'description': 'PP-StructureV3 HTTP API 客户端'
+            }
+        },
+        'global': {
+            'base_dir': '/Users/zhch158/workspace/data/流水分析',
+            'output_subdir': 'results'
+        }
+    }
+    
+    def __init__(self, config_file: Optional[str] = None):
+        self.config_file = config_file
+        self.config = self._load_config()
+    
+    def _load_config(self) -> Dict:
+        """加载配置文件"""
+        if self.config_file and Path(self.config_file).exists():
+            with open(self.config_file, 'r', encoding='utf-8') as f:
+                if self.config_file.endswith('.yaml') or self.config_file.endswith('.yml'):
+                    return yaml.safe_load(f)
+                else:
+                    return json.load(f)
+        return self.DEFAULT_CONFIG.copy()
+    
+    def get_processor_config(self, processor_name: str) -> ProcessorConfig:
+        """获取处理器配置"""
+        if processor_name not in self.config['processors']:
+            raise ValueError(f"处理器 '{processor_name}' 不存在")
+        
+        proc_config = self.config['processors'][processor_name]
+        return ProcessorConfig(
+            name=processor_name,
+            script=proc_config['script'],
+            input_arg=proc_config.get('input_arg', '--input_file'),
+            output_arg=proc_config.get('output_arg', '--output_dir'),
+            extra_args=proc_config.get('extra_args', []),
+            output_subdir=proc_config.get('output_subdir', processor_name + '_results'),
+            description=proc_config.get('description', '')
+        )
+    
+    def get_global_config(self, key: str, default=None):
+        """获取全局配置"""
+        return self.config.get('global', {}).get(key, default)
+    
+    def list_processors(self) -> List[str]:
+        """列出所有可用的处理器"""
+        return list(self.config['processors'].keys())
+
+
+# ============================================================================
+# PDF 文件查找器
+# ============================================================================
+
+class PDFFileFinder:
+    """PDF 文件查找器"""
+    
+    def __init__(self, base_dir: str):
+        self.base_dir = Path(base_dir)
+    
+    def from_file_list(self, list_file: str) -> List[Path]:
+        """从文件列表读取"""
+        pdf_files = []
+        
+        with open(list_file, 'r', encoding='utf-8') as f:
+            for line in f:
+                # 跳过空行和注释
+                line = line.strip()
+                if not line or line.startswith('#'):
+                    continue
+                
+                # 构建完整路径
+                pdf_path = self._resolve_path(line)
+                if pdf_path:
+                    pdf_files.append(pdf_path)
+        
+        return pdf_files
+    
+    def from_list(self, pdf_list: List[str]) -> List[Path]:
+        """从列表读取"""
+        pdf_files = []
+        
+        for pdf in pdf_list:
+            pdf_path = self._resolve_path(pdf.strip())
+            if pdf_path:
+                pdf_files.append(pdf_path)
+        
+        return pdf_files
+    
+    def find_all(self) -> List[Path]:
+        """查找基础目录下所有 PDF"""
+        return sorted(self.base_dir.rglob('*.pdf'))
+    
+    def _resolve_path(self, path_str: str) -> Optional[Path]:
+        """解析路径"""
+        path = Path(path_str)
+        
+        # 绝对路径
+        if path.is_absolute():
+            return path if path.exists() else path  # 返回路径,即使不存在
+        
+        # 相对路径
+        # 1. 尝试完整相对路径
+        candidate1 = self.base_dir / path
+        if candidate1.exists():
+            return candidate1
+        
+        # 2. 尝试在同名子目录下查找
+        if '/' not in path_str:
+            pdf_name = path.stem
+            candidate2 = self.base_dir / pdf_name / path.name
+            if candidate2.exists():
+                return candidate2
+        
+        # 3. 使用 glob 搜索
+        matches = list(self.base_dir.rglob(path.name))
+        if matches:
+            return matches[0]
+        
+        # 返回候选路径(即使不存在)
+        return candidate1
+
+
+# ============================================================================
+# PDF 批处理器
+# ============================================================================
+
+class PDFBatchProcessor:
+    """PDF 批处理器"""
+    
+    def __init__(
+        self,
+        processor_config: ProcessorConfig,
+        output_subdir: Optional[str] = None,
+        dry_run: bool = False
+    ):
+        self.processor_config = processor_config
+        # 如果指定了output_subdir,使用指定的;否则使用处理器配置中的
+        self.output_subdir = output_subdir or processor_config.output_subdir
+        self.dry_run = dry_run
+        
+        # 设置日志
+        self.logger = self._setup_logger()
+        
+        # 统计信息
+        self.results: List[ProcessResult] = []
+    
+    def _setup_logger(self) -> logging.Logger:
+        """设置日志"""
+        logger = logging.getLogger('PDFBatchProcessor')
+        logger.setLevel(logging.INFO)
+        
+        # 避免重复添加handler
+        if not logger.handlers:
+            # 控制台输出
+            console_handler = logging.StreamHandler()
+            console_handler.setLevel(logging.INFO)
+            console_format = logging.Formatter(
+                '%(asctime)s - %(levelname)s - %(message)s',
+                datefmt='%Y-%m-%d %H:%M:%S'
+            )
+            console_handler.setFormatter(console_format)
+            logger.addHandler(console_handler)
+        
+        return logger
+    
+    def process_files(self, pdf_files: List[Path]) -> Dict[str, Any]:
+        """批量处理文件"""
+        self.logger.info(f"开始处理 {len(pdf_files)} 个文件")
+        self.logger.info(f"处理器: {self.processor_config.description}")
+        self.logger.info(f"脚本: {self.processor_config.script}")
+        self.logger.info(f"输出目录: {self.output_subdir}")
+        
+        start_time = time.time()
+        
+        # 使用进度条
+        with tqdm(total=len(pdf_files), desc="处理进度", unit="file") as pbar:
+            for pdf_file in pdf_files:
+                result = self._process_single_file(pdf_file)
+                self.results.append(result)
+                pbar.update(1)
+                
+                # 更新进度条描述
+                success_count = sum(1 for r in self.results if r.success)
+                pbar.set_postfix({
+                    'success': success_count,
+                    'failed': len(self.results) - success_count
+                })
+        
+        total_duration = time.time() - start_time
+        
+        # 生成统计信息
+        stats = self._generate_stats(total_duration)
+        
+        # 保存日志
+        self._save_log(stats)
+        
+        return stats
+    
+    def _process_single_file(self, pdf_file: Path) -> ProcessResult:
+        """处理单个文件"""
+        self.logger.info(f"处理: {pdf_file}")
+        
+        # 检查文件是否存在
+        if not pdf_file.exists():
+            self.logger.warning(f"跳过: 文件不存在 - {pdf_file}")
+            return ProcessResult(
+                pdf_file=str(pdf_file),
+                success=False,
+                duration=0,
+                error_message="文件不存在"
+            )
+        
+        # 确定输出目录
+        output_dir = pdf_file.parent / pdf_file.stem / self.output_subdir
+        
+        # if not self.dry_run:
+        #     output_dir.mkdir(parents=True, exist_ok=True)
+        
+        # 构建命令
+        cmd = self._build_command(pdf_file, output_dir)
+        
+        self.logger.debug(f"执行命令: {' '.join(cmd)}")
+        
+        if self.dry_run:
+            self.logger.info(f"[DRY RUN] 将执行: {' '.join(cmd)}")
+            return ProcessResult(
+                pdf_file=str(pdf_file),
+                success=True,
+                duration=0,
+                error_message=""
+            )
+        
+        # 执行命令
+        start_time = time.time()
+        try:
+            result = subprocess.run(
+                cmd,
+                capture_output=True,
+                text=True,
+                check=True
+            )
+            duration = time.time() - start_time
+            
+            self.logger.info(f"✓ 成功 (耗时: {duration:.2f}秒)")
+            
+            return ProcessResult(
+                pdf_file=str(pdf_file),
+                success=True,
+                duration=duration,
+                error_message=""
+            )
+            
+        except subprocess.CalledProcessError as e:
+            duration = time.time() - start_time
+            error_msg = e.stderr if e.stderr else str(e)
+            
+            self.logger.error(f"✗ 失败 (耗时: {duration:.2f}秒)")
+            self.logger.error(f"错误信息: {error_msg}")
+            
+            return ProcessResult(
+                pdf_file=str(pdf_file),
+                success=False,
+                duration=duration,
+                error_message=error_msg
+            )
+    
+    def _build_command(self, pdf_file: Path, output_dir: Path) -> List[str]:
+        """构建执行命令"""
+        cmd = [
+            sys.executable,  # 使用当前 Python 解释器
+            self.processor_config.script,
+            self.processor_config.input_arg, str(pdf_file),
+            self.processor_config.output_arg, str(output_dir)
+        ]
+        
+        # 添加额外参数
+        cmd.extend(self.processor_config.extra_args)
+        
+        return cmd
+    
+    def _generate_stats(self, total_duration: float) -> Dict[str, Any]:
+        """生成统计信息"""
+        success_count = sum(1 for r in self.results if r.success)
+        failed_count = len(self.results) - success_count
+        
+        failed_files = [r.pdf_file for r in self.results if not r.success]
+        
+        stats = {
+            'total': len(self.results),
+            'success': success_count,
+            'failed': failed_count,
+            'total_duration': total_duration,
+            'failed_files': failed_files,
+            'results': [
+                {
+                    'file': r.pdf_file,
+                    'success': r.success,
+                    'duration': r.duration,
+                    'error': r.error_message
+                }
+                for r in self.results
+            ]
+        }
+        
+        return stats
+    
+    def _save_log(self, stats: Dict[str, Any]):
+        """保存日志"""
+        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
+        log_file = f"batch_process_{self.processor_config.name}_{timestamp}.log"
+        
+        with open(log_file, 'w', encoding='utf-8') as f:
+            f.write("PDF 批量处理日志\n")
+            f.write("=" * 80 + "\n\n")
+            
+            f.write(f"处理器: {self.processor_config.description}\n")
+            f.write(f"处理器名称: {self.processor_config.name}\n")
+            f.write(f"脚本: {self.processor_config.script}\n")
+            f.write(f"输出目录: {self.output_subdir}\n")
+            f.write(f"开始时间: {datetime.now()}\n")
+            f.write(f"总耗时: {stats['total_duration']:.2f} 秒\n\n")
+            
+            f.write("统计信息:\n")
+            f.write(f"  总文件数: {stats['total']}\n")
+            f.write(f"  成功: {stats['success']}\n")
+            f.write(f"  失败: {stats['failed']}\n\n")
+            
+            if stats['failed_files']:
+                f.write("失败的文件:\n")
+                for file in stats['failed_files']:
+                    f.write(f"  - {file}\n")
+                f.write("\n")
+            
+            f.write("详细结果:\n")
+            for result in stats['results']:
+                status = "✓" if result['success'] else "✗"
+                f.write(f"{status} {result['file']} ({result['duration']:.2f}s)\n")
+                if result['error']:
+                    f.write(f"   错误: {result['error']}\n")
+        
+        self.logger.info(f"日志已保存: {log_file}")
+
+
+# ============================================================================
+# 命令行接口
+# ============================================================================
+
+def create_parser() -> argparse.ArgumentParser:
+    """创建命令行参数解析器"""
+    parser = argparse.ArgumentParser(
+        description='PDF 批量处理工具',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+示例用法:
+
+  1. 使用配置文件中的处理器:
+     python batch_process_pdf.py -p paddleocr_vl_single_process -f pdf_list.txt
+
+  2. 处理指定目录下所有 PDF:
+     python batch_process_pdf.py -p ppstructurev3_single_client -d /path/to/pdfs
+
+  3. 手动指定脚本和参数:
+     python batch_process_pdf.py \\
+         -s ppstructurev3_single_client.py \\
+         -d /path/to/pdfs \\
+         -f pdf_list.txt \\
+         -e "--api_url=http://localhost:8111 --timeout=600"
+
+  4. 列出所有可用的处理器:
+     python batch_process_pdf.py --list-processors
+
+  5. 查看配置文件内容:
+     python batch_process_pdf.py --show-config
+
+  6. 覆盖默认输出目录:
+     python batch_process_pdf.py -p ppstructurev3_single_process -f pdf_list.txt -o custom_output
+        """
+    )
+    
+    # 处理器选择
+    parser.add_argument(
+        '-p', '--processor',
+        help='处理器名称 (如: paddleocr_vl_single_process, ppstructurev3_single_process, ppstructurev3_single_client)'
+    )
+    
+    # 配置文件
+    parser.add_argument(
+        '-c', '--config',
+        default='processor_configs.yaml',
+        help='配置文件路径 (默认: processor_configs.yaml)'
+    )
+    
+    # 手动指定脚本
+    parser.add_argument(
+        '-s', '--script',
+        help='Python 脚本路径 (覆盖配置文件)'
+    )
+    
+    # 目录和文件
+    parser.add_argument(
+        '-d', '--base-dir',
+        help='PDF 文件基础目录'
+    )
+    
+    parser.add_argument(
+        '-o', '--output-subdir',
+        help='输出子目录名称 (覆盖处理器默认配置)'
+    )
+    
+    parser.add_argument(
+        '-f', '--file-list',
+        help='PDF 文件列表文件路径'
+    )
+    
+    parser.add_argument(
+        '-l', '--pdf-list',
+        nargs='+',
+        help='PDF 文件列表 (空格分隔)'
+    )
+    
+    # 额外参数
+    parser.add_argument(
+        '-e', '--extra-args',
+        help='额外参数 (覆盖配置文件)'
+    )
+    
+    # 工具选项
+    parser.add_argument(
+        '--list-processors',
+        action='store_true',
+        help='列出所有可用的处理器'
+    )
+    
+    parser.add_argument(
+        '--show-config',
+        action='store_true',
+        help='显示配置文件内容'
+    )
+    
+    parser.add_argument(
+        '--dry-run',
+        action='store_true',
+        help='模拟运行,不实际执行'
+    )
+    
+    parser.add_argument(
+        '-v', '--verbose',
+        action='store_true',
+        help='详细输出'
+    )
+    
+    return parser
+
+
+def main():
+    """主函数"""
+    parser = create_parser()
+    args = parser.parse_args()
+    
+    # 设置日志级别
+    if args.verbose:
+        logging.getLogger().setLevel(logging.DEBUG)
+    
+    # 加载配置
+    config_manager = ConfigManager(args.config if Path(args.config).exists() else None)
+    
+    # 列出处理器
+    if args.list_processors:
+        print("可用的处理器:")
+        for name in config_manager.list_processors():
+            proc_config = config_manager.get_processor_config(name)
+            print(f"  • {name}")
+            print(f"    描述: {proc_config.description}")
+            print(f"    脚本: {proc_config.script}")
+            print(f"    输出目录: {proc_config.output_subdir}")
+            print()
+        return 0
+    
+    # 显示配置
+    if args.show_config:
+        print(yaml.dump(config_manager.config, allow_unicode=True))
+        return 0
+    
+    # 获取处理器配置
+    if args.processor:
+        processor_config = config_manager.get_processor_config(args.processor)
+    elif args.script:
+        # 手动指定脚本
+        processor_config = ProcessorConfig(
+            name='manual',
+            script=args.script,
+            extra_args=args.extra_args.split() if args.extra_args else [],
+            output_subdir=args.output_subdir or 'manual_results'
+        )
+    else:
+        parser.error("必须指定 -p 或 -s 参数")
+    
+    # 覆盖额外参数
+    if args.extra_args and args.processor:
+        processor_config.extra_args = args.extra_args.split()
+    
+    # 获取基础目录
+    base_dir = args.base_dir or config_manager.get_global_config('base_dir')
+    if not base_dir:
+        parser.error("必须指定 -d 参数或在配置文件中设置 base_dir")
+    
+    # 查找 PDF 文件
+    finder = PDFFileFinder(base_dir)
+    
+    if args.file_list:
+        pdf_files = finder.from_file_list(args.file_list)
+    elif args.pdf_list:
+        pdf_files = finder.from_list(args.pdf_list)
+    else:
+        pdf_files = finder.find_all()
+    
+    if not pdf_files:
+        print("❌ 未找到任何 PDF 文件")
+        return 1
+    
+    # print(f"\n找到 {len(pdf_files)} 个 PDF 文件")
+    valid_file_paths = [f.as_posix() for f in pdf_files if f.exists()]
+    if valid_file_paths:
+        print("\n".join(valid_file_paths))    
+
+    # 验证文件
+    valid_files = [f for f in pdf_files if f.exists()]
+    invalid_files = [f for f in pdf_files if not f.exists()]
+    
+    if invalid_files:
+        print(f"\n⚠️  警告: {len(invalid_files)} 个文件不存在:")
+        for f in invalid_files[:5]:  # 只显示前5个
+            print(f"  - {f}")
+        if len(invalid_files) > 5:
+            print(f"  ... 还有 {len(invalid_files) - 5} 个")
+    
+    # 确认执行
+    if not args.dry_run and valid_files:
+        confirm = input(f"\n是否继续处理 {len(valid_files)} 个文件? [Y/n]: ")
+        if confirm.lower() not in ['', 'y', 'yes']:
+            print("已取消")
+            return 0
+    
+    # 批量处理
+    processor = PDFBatchProcessor(
+        processor_config=processor_config,
+        output_subdir=args.output_subdir,  # 传递命令行指定的输出目录
+        dry_run=args.dry_run
+    )
+    
+    stats = processor.process_files(valid_files)
+    
+    # 显示统计信息
+    print("\n" + "=" * 80)
+    print("处理完成")
+    print("=" * 80)
+    print(f"\n📊 统计信息:")
+    print(f"  处理器: {processor_config.description}")
+    print(f"  输出目录: {processor.output_subdir}")
+    print(f"  总文件数: {stats['total']}")
+    print(f"  ✓ 成功: {stats['success']}")
+    print(f"  ✗ 失败: {stats['failed']}")
+    print(f"  ⏱️  总耗时: {stats['total_duration']:.2f} 秒")
+    
+    if stats['failed_files']:
+        print(f"\n失败的文件:")
+        for file in stats['failed_files']:
+            print(f"  ✗ {file}")
+    
+    return 0 if stats['failed'] == 0 else 1
+
+
+if __name__ == '__main__':
+    sys.exit(main())