فهرست منبع

feat: 添加优化的PP-StructureV3并行预测器,支持多线程和多进程处理

zhch158_admin 3 ماه پیش
والد
کامیت
53d2de7cf9
1فایلهای تغییر یافته به همراه503 افزوده شده و 0 حذف شده
  1. 503 0
      zhch/ppstructurev3_parallel_predict_optimized.py

+ 503 - 0
zhch/ppstructurev3_parallel_predict_optimized.py

@@ -0,0 +1,503 @@
+# zhch/ppstructurev3_parallel_predict_optimized.py
+import json
+import time
+import os
+import glob
+import traceback
+from pathlib import Path
+from typing import List, Dict, Any, Tuple
+from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
+from multiprocessing import Queue, Manager
+import cv2
+import numpy as np
+from paddlex import create_pipeline
+from tqdm import tqdm
+import threading
+import queue
+
+class PPStructureV3ParallelPredictor:
+    """
+    PP-StructureV3并行预测器,支持多进程批处理
+    """
+
+    def __init__(self, pipeline_config_path: str = "PP-StructureV3", output_path: str = "output", use_gpu: bool = True):
+        """
+        初始化预测器
+        
+        Args:
+            pipeline_config_path: PaddleX pipeline配置文件路径
+        """
+        self.pipeline_config = pipeline_config_path
+        self.pipeline = None  # 延迟初始化
+        self.output_path = output_path
+        self.use_gpu = use_gpu
+
+    def _ensure_pipeline(self):
+        """确保pipeline已初始化(线程安全)"""
+        if self.pipeline is None:
+            self.pipeline = create_pipeline(pipeline=self.pipeline_config)
+
+    def process_single_image(self, image_path: str) -> Dict[str, Any]:
+        """
+        处理单张图像
+        
+        Args:
+            image_path: 图像路径
+            
+        Returns:
+            处理结果{"image_path": str, "success": bool, "processing_time": float, "error": str}
+        """
+        try:
+            # 确保pipeline已初始化
+            self._ensure_pipeline()
+            
+            # 读取图像获取尺寸信息
+            image = cv2.imread(image_path)
+            if image is None:
+                return {
+                    "image_path": Path(image_path).name,
+                    "error": "无法读取图像",
+                    "success": False,
+                    "processing_time": 0
+                }
+                
+            height, width = image.shape[:2]
+            
+            # 运行PaddleX pipeline
+            start_time = time.time()
+            
+            output = self.pipeline.predict(
+                input=image_path,
+                device="gpu" if self.use_gpu else "cpu",
+                use_doc_orientation_classify=True,
+                use_doc_unwarping=False,
+                use_seal_recognition=True,
+                use_chart_recognition=True,
+                use_table_recognition=True,
+                use_formula_recognition=True,
+            )
+            
+            # 保存结果
+            for res in output:
+                res.save_to_json(save_path=self.output_path)
+                res.save_to_markdown(save_path=self.output_path)
+            
+            process_time = time.time() - start_time
+            
+            # 返回处理结果
+            return {
+                "image_path": Path(image_path).name,
+                "processing_time": process_time,
+                "success": True
+            }
+            
+        except Exception as e:
+            return {
+                "image_path": Path(image_path).name,
+                "error": str(e),
+                "success": False,
+                "processing_time": 0
+            }
+    
+    def process_batch(self, image_paths: List[str]) -> List[Dict[str, Any]]:
+        """
+        批处理图像
+        
+        Args:
+            image_paths: 图像路径列表
+            
+        Returns:
+            结果列表
+        """
+        results = []
+        
+        for image_path in image_paths:
+            result = self.process_single_image(image_path)
+            results.append(result)
+        
+        return results
+
+class ThreadWorker:
+    """线程工作器 - 每个线程维护自己的pipeline实例"""
+    
+    def __init__(self, pipeline_config: str, output_path: str, use_gpu: bool, worker_id: int):
+        self.worker_id = worker_id
+        self.predictor = PPStructureV3ParallelPredictor(
+            pipeline_config, 
+            output_path=f"{output_path}/worker_{worker_id}", 
+            use_gpu=use_gpu
+        )
+        self.task_queue = queue.Queue()
+        self.result_queue = queue.Queue()
+        self.running = True
+        
+    def add_batch(self, batch: List[str]):
+        """添加批处理任务"""
+        self.task_queue.put(batch)
+    
+    def get_results(self) -> List[Dict[str, Any]]:
+        """获取处理结果"""
+        results = []
+        while not self.result_queue.empty():
+            try:
+                result = self.result_queue.get_nowait()
+                results.extend(result)
+            except queue.Empty:
+                break
+        return results
+    
+    def worker_loop(self):
+        """工作循环"""
+        while self.running:
+            try:
+                batch = self.task_queue.get(timeout=1.0)
+                if batch is None:  # 结束信号
+                    break
+                    
+                # 处理批次
+                batch_results = self.predictor.process_batch(batch)
+                self.result_queue.put(batch_results)
+                
+            except queue.Empty:
+                continue
+            except Exception as e:
+                print(f"工作线程 {self.worker_id} 处理出错: {e}")
+    
+    def stop(self):
+        """停止工作线程"""
+        self.running = False
+        self.task_queue.put(None)  # 发送结束信号
+
+def parallel_process_with_optimized_threading(image_paths: List[str],
+                                            batch_size: int = 4,
+                                            max_workers: int = 2,  # GPU限制为2个worker
+                                            pipeline_config: str = "PP-StructureV3",
+                                            output_path: str = "./output",
+                                            use_gpu: bool = True) -> List[Dict[str, Any]]:
+    """
+    使用优化的多线程并行处理(每个线程一个pipeline实例)
+    
+    Args:
+        image_paths: 图像路径列表
+        batch_size: 批处理大小
+        max_workers: 最大工作线程数(GPU推荐2个)
+        pipeline_config: pipeline配置
+        output_path: 输出路径
+        use_gpu: 是否使用GPU
+        
+    Returns:
+        处理结果列表
+    """
+    # 确保输出目录存在
+    os.makedirs(output_path, exist_ok=True)
+    
+    # 将图像路径分批
+    batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
+    
+    # 创建工作线程
+    workers = []
+    threads = []
+    
+    for i in range(max_workers):
+        worker = ThreadWorker(pipeline_config, output_path, use_gpu, i)
+        workers.append(worker)
+        
+        thread = threading.Thread(target=worker.worker_loop)
+        thread.daemon = True
+        thread.start()
+        threads.append(thread)
+    
+    print(f"启动了 {max_workers} 个工作线程,每个线程独立的pipeline实例")
+    
+    # 分发任务
+    all_results = []
+    total_images = len(image_paths)
+    completed_count = 0
+    
+    try:
+        with tqdm(total=total_images, desc="处理图像", unit="张") as pbar:
+            # 轮流分发批次到不同的worker
+            for i, batch in enumerate(batches):
+                worker_id = i % max_workers
+                workers[worker_id].add_batch(batch)
+            
+            # 等待所有任务完成
+            while completed_count < total_images:
+                time.sleep(0.1)  # 短暂等待
+                
+                # 收集结果
+                for worker in workers:
+                    batch_results = worker.get_results()
+                    if batch_results:
+                        all_results.extend(batch_results)
+                        completed_count += len(batch_results)
+                        pbar.update(len(batch_results))
+                        
+                        # 更新进度条
+                        success_count = sum(1 for r in batch_results if r.get('success', False))
+                        pbar.set_postfix({
+                            'recent_success': f"{success_count}/{len(batch_results)}",
+                            'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
+                        })
+    
+    finally:
+        # 停止所有工作线程
+        for worker in workers:
+            worker.stop()
+        
+        # 等待线程结束
+        for thread in threads:
+            thread.join(timeout=2.0)
+    
+    return all_results
+
+def process_batch_worker_optimized(worker_id: int, 
+                                 task_queue: Queue, 
+                                 result_queue: Queue,
+                                 pipeline_config: str, 
+                                 output_path: str, 
+                                 use_gpu: bool):
+    """
+    优化的多进程工作函数 - 每个进程只初始化一次pipeline
+    """
+    try:
+        # 每个进程创建自己的输出目录
+        worker_output = f"{output_path}/worker_{worker_id}"
+        os.makedirs(worker_output, exist_ok=True)
+        
+        # 只初始化一次pipeline
+        predictor = PPStructureV3ParallelPredictor(
+            pipeline_config, 
+            output_path=worker_output, 
+            use_gpu=use_gpu
+        )
+        
+        print(f"进程 {worker_id} 初始化完成")
+        
+        # 持续处理任务
+        while True:
+            try:
+                batch = task_queue.get(timeout=2.0)
+                if batch is None:  # 结束信号
+                    break
+                
+                # 处理批次
+                batch_results = predictor.process_batch(batch)
+                result_queue.put(batch_results)
+                
+            except Exception as e:
+                print(f"进程 {worker_id} 处理批次时出错: {e}")
+                continue
+                
+    except Exception as e:
+        print(f"进程 {worker_id} 初始化失败: {e}")
+        traceback.print_exc()
+
+def parallel_process_with_optimized_multiprocessing(image_paths: List[str],
+                                                  batch_size: int = 4,
+                                                  max_workers: int = 4,
+                                                  pipeline_config: str = "PP-StructureV3",
+                                                  output_path: str = "./output",
+                                                  use_gpu: bool = False) -> List[Dict[str, Any]]:
+    """
+    使用优化的多进程并行处理(每个进程一个pipeline实例)
+    
+    Args:
+        image_paths: 图像路径列表
+        batch_size: 批处理大小
+        max_workers: 最大工作进程数
+        pipeline_config: pipeline配置
+        output_path: 输出路径
+        use_gpu: 是否使用GPU
+        
+    Returns:
+        处理结果列表
+    """
+    # 确保输出目录存在
+    os.makedirs(output_path, exist_ok=True)
+    
+    # 将图像路径分批
+    batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
+    
+    # 创建进程间通信队列
+    manager = Manager()
+    task_queue = manager.Queue()
+    result_queue = manager.Queue()
+    
+    # 启动工作进程
+    processes = []
+    for i in range(max_workers):
+        p = Process(
+            target=process_batch_worker_optimized,
+            args=(i, task_queue, result_queue, pipeline_config, output_path, use_gpu)
+        )
+        p.start()
+        processes.append(p)
+    
+    print(f"启动了 {max_workers} 个工作进程,每个进程独立的pipeline实例")
+    
+    # 分发任务
+    for batch in batches:
+        task_queue.put(batch)
+    
+    # 发送结束信号
+    for _ in range(max_workers):
+        task_queue.put(None)
+    
+    # 收集结果
+    all_results = []
+    total_images = len(image_paths)
+    completed_count = 0
+    
+    with tqdm(total=total_images, desc="处理图像", unit="张") as pbar:
+        # 等待所有结果
+        expected_batches = len(batches)
+        received_batches = 0
+        
+        while received_batches < expected_batches:
+            try:
+                batch_results = result_queue.get(timeout=30.0)
+                all_results.extend(batch_results)
+                completed_count += len(batch_results)
+                received_batches += 1
+                
+                pbar.update(len(batch_results))
+                
+                # 更新进度条
+                success_count = sum(1 for r in batch_results if r.get('success', False))
+                pbar.set_postfix({
+                    'batch_success': f"{success_count}/{len(batch_results)}",
+                    'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
+                })
+                
+            except Exception as e:
+                print(f"等待结果时出错: {e}")
+                break
+    
+    # 等待所有进程结束
+    for p in processes:
+        p.join(timeout=10.0)
+        if p.is_alive():
+            p.terminate()
+    
+    return all_results
+
+def main():
+    """主函数 - 优化的并行处理"""
+    
+    # 配置参数
+    dataset_path = "../../OmniDocBench/OpenDataLab___OmniDocBench/images"
+    output_dir = "./OmniDocBench_Results_Optimized"
+    pipeline_config = "PP-StructureV3"
+    
+    # 并行处理参数
+    batch_size = 4          # 批处理大小
+    use_gpu = True          # 是否使用GPU
+    
+    # GPU限制:最多2个实例,CPU可以更多
+    if use_gpu:
+        max_workers = 2     # GPU推荐2个线程
+        use_multiprocessing = False  # GPU用线程
+    else:
+        max_workers = 4     # CPU可以用更多进程
+        use_multiprocessing = True   # CPU用进程
+    
+    # 确保输出目录存在
+    print(f"输出目录: {Path(output_dir).absolute()}")
+    os.makedirs(output_dir, exist_ok=True)
+    
+    dataset_path = Path(dataset_path).resolve()
+    output_dir = Path(output_dir).resolve()
+    
+    print("="*60)
+    print("OmniDocBench 优化并行处理开始")
+    print("="*60)
+    print(f"数据集路径: {dataset_path}")
+    print(f"输出目录: {output_dir}")
+    print(f"批处理大小: {batch_size}")
+    print(f"最大工作线程/进程数: {max_workers}")
+    print(f"使用GPU: {use_gpu}")
+    print(f"并行方式: {'多进程' if use_multiprocessing else '多线程'}")
+    print(f"Pipeline实例数: {max_workers} (每个进程/线程一个)")
+    
+    # 查找所有图像文件
+    image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff']
+    image_files = []
+    
+    for ext in image_extensions:
+        image_files.extend(glob.glob(os.path.join(dataset_path, ext)))
+    
+    print(f"找到 {len(image_files)} 个图像文件")
+    
+    if not image_files:
+        print("未找到任何图像文件,程序终止")
+        return
+    
+    # 限制处理数量用于测试
+    # image_files = image_files[:20]  # 取消注释以限制处理数量
+    
+    # 开始处理
+    start_time = time.time()
+    
+    try:
+        if use_multiprocessing:
+            # 多进程处理(推荐用于CPU)
+            print("使用优化的多进程并行处理...")
+            results = parallel_process_with_optimized_multiprocessing(
+                image_files, batch_size, max_workers, pipeline_config, str(output_dir), use_gpu
+            )
+        else:
+            # 多线程处理(推荐用于GPU)
+            print("使用优化的多线程并行处理...")
+            results = parallel_process_with_optimized_threading(
+                image_files, batch_size, max_workers, pipeline_config, str(output_dir), use_gpu
+            )
+        
+        total_time = time.time() - start_time
+        
+        # 统计信息
+        success_count = sum(1 for r in results if r.get('success', False))
+        error_count = len(results) - success_count
+        total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False))
+        avg_processing_time = total_processing_time / success_count if success_count > 0 else 0
+        
+        # 保存结果统计
+        stats = {
+            "total_files": len(image_files),
+            "success_count": success_count,
+            "error_count": error_count,
+            "success_rate": success_count / len(image_files),
+            "total_time": total_time,
+            "avg_processing_time": avg_processing_time,
+            "throughput": len(image_files) / total_time,
+            "batch_size": batch_size,
+            "max_workers": max_workers,
+            "use_gpu": use_gpu,
+            "use_multiprocessing": use_multiprocessing,
+            "optimization": "单进程/线程单pipeline实例"
+        }
+        results['stats'] = stats
+        # 保存最终结果
+        output_file = os.path.join(output_dir, f"OmniDocBench_PPStructureV3_batch{batch_size}.json")
+        with open(output_file, 'w', encoding='utf-8') as f:
+            json.dump(results, f, ensure_ascii=False, indent=2)        
+
+        print("\n" + "="*60)
+        print("优化并行处理完成!")
+        print("="*60)
+        print(f"总文件数: {len(image_files)}")
+        print(f"成功处理: {success_count}")
+        print(f"失败数量: {error_count}")
+        print(f"成功率: {success_count / len(image_files) * 100:.2f}%")
+        print(f"总耗时: {total_time:.2f}秒")
+        print(f"平均处理时间: {avg_processing_time:.2f}秒/张")
+        print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒")
+        print(f"Pipeline实例数: {max_workers}")
+        print(f"统计信息保存至: {output_file}")
+        
+    except Exception as e:
+        print(f"处理过程中发生错误: {str(e)}")
+        traceback.print_exc()
+
+if __name__ == "__main__":
+    main()