# zhch/ppstructurev3_parallel_predict_optimized.py import json import time import os import glob import traceback from pathlib import Path from typing import List, Dict, Any, Tuple from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed from multiprocessing import Queue, Manager import cv2 import numpy as np from paddlex import create_pipeline from tqdm import tqdm import threading import queue from dotenv import load_dotenv load_dotenv(override=True) class PPStructureV3ParallelPredictor: """ PP-StructureV3并行预测器,支持多进程批处理 """ def __init__(self, pipeline_config_path: str = "PP-StructureV3", output_path: str = "output", use_gpu: bool = True): """ 初始化预测器 Args: pipeline_config_path: PaddleX pipeline配置文件路径 """ self.pipeline_config = pipeline_config_path self.pipeline = None # 延迟初始化 self.output_path = output_path self.use_gpu = use_gpu def _ensure_pipeline(self): """确保pipeline已初始化(线程安全)""" if self.pipeline is None: self.pipeline = create_pipeline(pipeline=self.pipeline_config) def process_single_image(self, image_path: str) -> Dict[str, Any]: """ 处理单张图像 Args: image_path: 图像路径 Returns: 处理结果{"image_path": str, "success": bool, "processing_time": float, "error": str} """ try: # 确保pipeline已初始化 self._ensure_pipeline() # 读取图像获取尺寸信息 image = cv2.imread(image_path) if image is None: return { "image_path": Path(image_path).name, "error": "无法读取图像", "success": False, "processing_time": 0 } height, width = image.shape[:2] # 运行PaddleX pipeline start_time = time.time() output = self.pipeline.predict( input=image_path, device="gpu" if self.use_gpu else "cpu", use_doc_orientation_classify=True, use_doc_unwarping=False, use_seal_recognition=True, use_chart_recognition=True, use_table_recognition=True, use_formula_recognition=True, ) # 保存结果 for res in output: res.save_to_json(save_path=self.output_path) res.save_to_markdown(save_path=self.output_path) process_time = time.time() - start_time # 返回处理结果 return { "image_path": Path(image_path).name, "processing_time": process_time, "success": True } except Exception as e: return { "image_path": Path(image_path).name, "error": str(e), "success": False, "processing_time": 0 } def process_batch(self, image_paths: List[str]) -> List[Dict[str, Any]]: """ 批处理图像 Args: image_paths: 图像路径列表 Returns: 结果列表 """ results = [] for image_path in image_paths: result = self.process_single_image(image_path) results.append(result) return results class ThreadWorker: """线程工作器 - 每个线程维护自己的pipeline实例""" def __init__(self, pipeline_config: str, output_path: str, use_gpu: bool, worker_id: int): self.worker_id = worker_id self.predictor = PPStructureV3ParallelPredictor( pipeline_config, output_path=f"{output_path}/worker_{worker_id}", use_gpu=use_gpu ) self.task_queue = queue.Queue() self.result_queue = queue.Queue() self.running = True def add_batch(self, batch: List[str]): """添加批处理任务""" self.task_queue.put(batch) def get_results(self) -> List[Dict[str, Any]]: """获取处理结果""" results = [] while not self.result_queue.empty(): try: result = self.result_queue.get_nowait() results.extend(result) except queue.Empty: break return results def worker_loop(self): """工作循环""" while self.running: try: batch = self.task_queue.get(timeout=1.0) if batch is None: # 结束信号 break # 处理批次 batch_results = self.predictor.process_batch(batch) self.result_queue.put(batch_results) except queue.Empty: continue except Exception as e: print(f"工作线程 {self.worker_id} 处理出错: {e}") def stop(self): """停止工作线程""" self.running = False self.task_queue.put(None) # 发送结束信号 def parallel_process_with_optimized_threading(image_paths: List[str], batch_size: int = 4, max_workers: int = 2, # GPU限制为2个worker pipeline_config: str = "PP-StructureV3", output_path: str = "./output", use_gpu: bool = True) -> List[Dict[str, Any]]: """ 使用优化的多线程并行处理(每个线程一个pipeline实例) Args: image_paths: 图像路径列表 batch_size: 批处理大小 max_workers: 最大工作线程数(GPU推荐2个) pipeline_config: pipeline配置 output_path: 输出路径 use_gpu: 是否使用GPU Returns: 处理结果列表 """ # 确保输出目录存在 os.makedirs(output_path, exist_ok=True) # 将图像路径分批 batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)] # 创建工作线程 workers = [] threads = [] for i in range(max_workers): worker = ThreadWorker(pipeline_config, output_path, use_gpu, i) workers.append(worker) thread = threading.Thread(target=worker.worker_loop) thread.daemon = True thread.start() threads.append(thread) print(f"启动了 {max_workers} 个工作线程,每个线程独立的pipeline实例") # 分发任务 all_results = [] total_images = len(image_paths) completed_count = 0 try: with tqdm(total=total_images, desc="处理图像", unit="张") as pbar: # 轮流分发批次到不同的worker for i, batch in enumerate(batches): worker_id = i % max_workers workers[worker_id].add_batch(batch) # 等待所有任务完成 while completed_count < total_images: time.sleep(0.1) # 短暂等待 # 收集结果 for worker in workers: batch_results = worker.get_results() if batch_results: all_results.extend(batch_results) completed_count += len(batch_results) pbar.update(len(batch_results)) # 更新进度条 success_count = sum(1 for r in batch_results if r.get('success', False)) pbar.set_postfix({ 'recent_success': f"{success_count}/{len(batch_results)}", 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}" }) finally: # 停止所有工作线程 for worker in workers: worker.stop() # 等待线程结束 for thread in threads: thread.join(timeout=2.0) return all_results def process_batch_worker_optimized(worker_id: int, task_queue: Queue, result_queue: Queue, pipeline_config: str, output_path: str, use_gpu: bool): """ 优化的多进程工作函数 - 每个进程只初始化一次pipeline """ try: # 每个进程创建自己的输出目录 worker_output = f"{output_path}/worker_{worker_id}" os.makedirs(worker_output, exist_ok=True) # 只初始化一次pipeline predictor = PPStructureV3ParallelPredictor( pipeline_config, output_path=worker_output, use_gpu=use_gpu ) print(f"进程 {worker_id} 初始化完成") # 持续处理任务 while True: try: batch = task_queue.get(timeout=2.0) if batch is None: # 结束信号 break # 处理批次 batch_results = predictor.process_batch(batch) result_queue.put(batch_results) except Exception as e: print(f"进程 {worker_id} 处理批次时出错: {e}") continue except Exception as e: print(f"进程 {worker_id} 初始化失败: {e}") traceback.print_exc() def parallel_process_with_optimized_multiprocessing(image_paths: List[str], batch_size: int = 4, max_workers: int = 4, pipeline_config: str = "PP-StructureV3", output_path: str = "./output", use_gpu: bool = False) -> List[Dict[str, Any]]: """ 使用优化的多进程并行处理(每个进程一个pipeline实例) Args: image_paths: 图像路径列表 batch_size: 批处理大小 max_workers: 最大工作进程数 pipeline_config: pipeline配置 output_path: 输出路径 use_gpu: 是否使用GPU Returns: 处理结果列表 """ # 确保输出目录存在 os.makedirs(output_path, exist_ok=True) # 将图像路径分批 batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)] # 创建进程间通信队列 manager = Manager() task_queue = manager.Queue() result_queue = manager.Queue() # 启动工作进程 processes = [] for i in range(max_workers): p = Process( target=process_batch_worker_optimized, args=(i, task_queue, result_queue, pipeline_config, output_path, use_gpu) ) p.start() processes.append(p) print(f"启动了 {max_workers} 个工作进程,每个进程独立的pipeline实例") # 分发任务 for batch in batches: task_queue.put(batch) # 发送结束信号 for _ in range(max_workers): task_queue.put(None) # 收集结果 all_results = [] total_images = len(image_paths) completed_count = 0 with tqdm(total=total_images, desc="处理图像", unit="张") as pbar: # 等待所有结果 expected_batches = len(batches) received_batches = 0 while received_batches < expected_batches: try: batch_results = result_queue.get(timeout=30.0) all_results.extend(batch_results) completed_count += len(batch_results) received_batches += 1 pbar.update(len(batch_results)) # 更新进度条 success_count = sum(1 for r in batch_results if r.get('success', False)) pbar.set_postfix({ 'batch_success': f"{success_count}/{len(batch_results)}", 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}" }) except Exception as e: print(f"等待结果时出错: {e}") break # 等待所有进程结束 for p in processes: p.join(timeout=10.0) if p.is_alive(): p.terminate() return all_results def main(): """主函数 - 优化的并行处理""" # 配置参数 dataset_path = "../../OmniDocBench/OpenDataLab___OmniDocBench/images" output_dir = "./OmniDocBench_Results_Optimized" pipeline_config = "PP-StructureV3" # 并行处理参数 batch_size = 4 # 批处理大小 use_gpu = True # 是否使用GPU max_workers = 4 # CPU可以用更多进程 use_multiprocessing = False # CPU用进程 # 确保输出目录存在 print(f"输出目录: {Path(output_dir).absolute()}") os.makedirs(output_dir, exist_ok=True) dataset_path = Path(dataset_path).resolve() output_dir = Path(output_dir).resolve() print("="*60) print("OmniDocBench 优化并行处理开始") print("="*60) print(f"数据集路径: {dataset_path}") print(f"输出目录: {output_dir}") print(f"批处理大小: {batch_size}") print(f"最大工作线程/进程数: {max_workers}") print(f"使用GPU: {use_gpu}") print(f"并行方式: {'多进程' if use_multiprocessing else '多线程'}") print(f"Pipeline实例数: {max_workers} (每个进程/线程一个)") # 查找所有图像文件 image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff'] image_files = [] for ext in image_extensions: image_files.extend(glob.glob(os.path.join(dataset_path, ext))) print(f"找到 {len(image_files)} 个图像文件") if not image_files: print("未找到任何图像文件,程序终止") return # 限制处理数量用于测试 # image_files = image_files[:20] # 取消注释以限制处理数量 # 开始处理 start_time = time.time() try: if use_multiprocessing: # 多进程处理(推荐用于CPU) print("使用优化的多进程并行处理...") results = parallel_process_with_optimized_multiprocessing( image_files, batch_size, max_workers, pipeline_config, str(output_dir), use_gpu ) else: # 多线程处理(推荐用于GPU) print("使用优化的多线程并行处理...") results = parallel_process_with_optimized_threading( image_files, batch_size, max_workers, pipeline_config, str(output_dir), use_gpu ) total_time = time.time() - start_time # 统计信息 success_count = sum(1 for r in results if r.get('success', False)) error_count = len(results) - success_count total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False)) avg_processing_time = total_processing_time / success_count if success_count > 0 else 0 # 保存结果统计 stats = { "total_files": len(image_files), "success_count": success_count, "error_count": error_count, "success_rate": success_count / len(image_files), "total_time": total_time, "avg_processing_time": avg_processing_time, "throughput": len(image_files) / total_time, "batch_size": batch_size, "max_workers": max_workers, "use_gpu": use_gpu, "use_multiprocessing": use_multiprocessing, "optimization": "单进程/线程单pipeline实例" } results['stats'] = stats # 保存最终结果 output_file = os.path.join(output_dir, f"OmniDocBench_PPStructureV3_batch{batch_size}.json") with open(output_file, 'w', encoding='utf-8') as f: json.dump(results, f, ensure_ascii=False, indent=2) print("\n" + "="*60) print("优化并行处理完成!") print("="*60) print(f"总文件数: {len(image_files)}") print(f"成功处理: {success_count}") print(f"失败数量: {error_count}") print(f"成功率: {success_count / len(image_files) * 100:.2f}%") print(f"总耗时: {total_time:.2f}秒") print(f"平均处理时间: {avg_processing_time:.2f}秒/张") print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒") print(f"Pipeline实例数: {max_workers}") print(f"统计信息保存至: {output_file}") except Exception as e: print(f"处理过程中发生错误: {str(e)}") traceback.print_exc() if __name__ == "__main__": main()