# zhch/ppstructurev3_multi_gpu_multiprocess.py import json import time import os import glob import traceback from pathlib import Path from typing import List, Dict, Any, Tuple from multiprocessing import Queue, Manager, Process import cv2 import numpy as np from paddlex import create_pipeline from tqdm import tqdm import paddle from dotenv import load_dotenv load_dotenv(override=True) class PPStructureV3MultiGPUPredictor: """ PP-StructureV3多GPU多进程预测器 """ def __init__(self, pipeline_config_path: str = "PP-StructureV3", output_path: str = "output", gpu_id: int = 0, process_id: int = 0): """ 初始化预测器 Args: pipeline_config_path: PaddleX pipeline配置文件路径 output_path: 输出路径 gpu_id: GPU设备ID process_id: 进程ID """ self.pipeline_config = pipeline_config_path self.pipeline = None # 延迟初始化 self.output_path = output_path self.gpu_id = gpu_id self.process_id = process_id self.device = f"gpu:{gpu_id}" def _ensure_pipeline(self): """确保pipeline已初始化""" if self.pipeline is None: try: # 设置当前GPU paddle.device.set_device(f"gpu:{self.gpu_id}") self.pipeline = create_pipeline(pipeline=self.pipeline_config) print(f"进程 {self.process_id} - Pipeline初始化完成 - GPU:{self.gpu_id}") except Exception as e: print(f"进程 {self.process_id} - Pipeline初始化失败 - GPU:{self.gpu_id}, 错误: {e}") raise e def process_single_image(self, image_path: str) -> Dict[str, Any]: """ 处理单张图像 Args: image_path: 图像路径 Returns: 处理结果 """ try: # 确保pipeline已初始化 self._ensure_pipeline() # 读取图像获取尺寸信息 image = cv2.imread(image_path) if image is None: return { "image_path": Path(image_path).name, "error": "无法读取图像", "success": False, "processing_time": 0, "gpu_id": self.gpu_id, "process_id": self.process_id } # 运行PaddleX pipeline start_time = time.time() output = self.pipeline.predict( input=image_path, device=self.device, use_doc_orientation_classify=True, use_doc_unwarping=False, use_seal_recognition=True, use_chart_recognition=True, use_table_recognition=True, use_formula_recognition=True, ) # 保存结果 for res in output: res.save_to_json(save_path=self.output_path) res.save_to_markdown(save_path=self.output_path) process_time = time.time() - start_time # 返回处理结果 return { "image_path": Path(image_path).name, "processing_time": process_time, "success": True, "gpu_id": self.gpu_id, "process_id": self.process_id } except Exception as e: return { "image_path": Path(image_path).name, "error": str(e), "success": False, "processing_time": 0, "gpu_id": self.gpu_id, "process_id": self.process_id } def process_batch(self, image_paths: List[str]) -> List[Dict[str, Any]]: """ 批处理图像 Args: image_paths: 图像路径列表 Returns: 结果列表 """ results = [] for image_path in image_paths: result = self.process_single_image(image_path) results.append(result) return results def multi_gpu_process_worker(process_id: int, gpu_id: int, task_queue: Queue, result_queue: Queue, pipeline_config: str, output_path: str): """ 多GPU多进程工作函数 Args: process_id: 进程ID gpu_id: GPU设备ID task_queue: 任务队列 result_queue: 结果队列 pipeline_config: pipeline配置 output_path: 输出路径 """ try: # 每个进程创建自己的输出目录 worker_output = f"{output_path}/gpu{gpu_id}_process_{process_id}" os.makedirs(worker_output, exist_ok=True) # 初始化预测器(每个进程只初始化一次) predictor = PPStructureV3MultiGPUPredictor( pipeline_config, output_path=worker_output, gpu_id=gpu_id, process_id=process_id ) print(f"进程 {process_id} (GPU {gpu_id}) 初始化完成") # 持续处理任务 while True: try: batch = task_queue.get(timeout=2.0) if batch is None: # 结束信号 print(f"进程 {process_id} (GPU {gpu_id}) 收到结束信号") break # 处理批次 batch_results = predictor.process_batch(batch) result_queue.put(batch_results) print(f"进程 {process_id} (GPU {gpu_id}) 完成批次处理: {len(batch)} 张图像") except Exception as e: print(f"进程 {process_id} (GPU {gpu_id}) 处理批次时出错: {e}") continue except Exception as e: print(f"进程 {process_id} (GPU {gpu_id}) 初始化失败: {e}") traceback.print_exc() finally: print(f"进程 {process_id} (GPU {gpu_id}) 结束") def parallel_process_with_multi_gpu(image_paths: List[str], batch_size: int = 4, gpu_ids: List[int] = [0, 1], pipelines_per_gpu: int = 1, pipeline_config: str = "PP-StructureV3", output_path: str = "./output") -> List[Dict[str, Any]]: """ 使用多GPU多进程并行处理 Args: image_paths: 图像路径列表 batch_size: 批处理大小 gpu_ids: 要使用的GPU ID列表 pipelines_per_gpu: 每个GPU的pipeline实例数 pipeline_config: pipeline配置 output_path: 输出路径 Returns: 处理结果列表 """ # 确保输出目录存在 os.makedirs(output_path, exist_ok=True) # 检查可用GPU try: available_gpu_count = paddle.device.cuda.device_count() print(f"系统检测到 {available_gpu_count} 个GPU") # 验证指定的GPU是否可用 valid_gpu_ids = [] for gpu_id in gpu_ids: if gpu_id < available_gpu_count: valid_gpu_ids.append(gpu_id) else: print(f"警告:GPU {gpu_id} 不可用,跳过") if not valid_gpu_ids: print("错误:没有可用的GPU") return [] gpu_ids = valid_gpu_ids except Exception as e: print(f"GPU检测失败: {e}") gpu_ids = [0] # 降级为单GPU pipelines_per_gpu = 1 total_processes = len(gpu_ids) * pipelines_per_gpu print(f"使用GPU: {gpu_ids}") print(f"每GPU Pipeline数: {pipelines_per_gpu}") print(f"总进程数: {total_processes}") # 将图像路径分批 batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)] print(f"总批次数: {len(batches)}") # 创建进程间通信队列 manager = Manager() task_queue = manager.Queue() result_queue = manager.Queue() # 分发任务到队列 for batch in batches: task_queue.put(batch) print(f"任务已分发到队列") # 启动工作进程 processes = [] process_id = 0 for gpu_id in gpu_ids: for pipeline_idx in range(pipelines_per_gpu): p = Process( target=multi_gpu_process_worker, args=(process_id, gpu_id, task_queue, result_queue, pipeline_config, output_path), name=f"GPU{gpu_id}_Process{process_id}" ) p.start() processes.append(p) process_id += 1 print(f"启动了 {len(processes)} 个工作进程") # 发送结束信号 for _ in range(total_processes): task_queue.put(None) # 收集结果 all_results = [] total_images = len(image_paths) completed_count = 0 with tqdm(total=total_images, desc="多GPU多进程处理", unit="张") as pbar: # 等待所有结果 expected_batches = len(batches) received_batches = 0 while received_batches < expected_batches: try: batch_results = result_queue.get(timeout=60.0) # 增加超时时间 all_results.extend(batch_results) completed_count += len(batch_results) received_batches += 1 pbar.update(len(batch_results)) # 更新进度条 success_count = sum(1 for r in batch_results if r.get('success', False)) # 按GPU统计 gpu_stats = {} for r in all_results: gpu_id = r.get('gpu_id', 'unknown') if gpu_id not in gpu_stats: gpu_stats[gpu_id] = {'success': 0, 'total': 0} gpu_stats[gpu_id]['total'] += 1 if r.get('success', False): gpu_stats[gpu_id]['success'] += 1 gpu_info = ', '.join([f"GPU{k}:{v['success']}/{v['total']}" for k, v in gpu_stats.items()]) pbar.set_postfix({ 'batch_success': f"{success_count}/{len(batch_results)}", 'gpu_stats': gpu_info }) except Exception as e: print(f"等待结果时出错: {e}") break # 等待所有进程结束 print("等待所有进程结束...") for p in processes: p.join(timeout=10.0) if p.is_alive(): print(f"强制终止进程: {p.name}") p.terminate() return all_results def detect_available_gpus() -> List[int]: """检测可用的GPU""" try: gpu_count = paddle.device.cuda.device_count() available_gpus = list(range(gpu_count)) print(f"检测到 {gpu_count} 个可用GPU: {available_gpus}") return available_gpus except Exception as e: print(f"GPU检测失败: {e}") return [] def main(): """主函数 - 多GPU多进程并行处理""" # 配置参数 dataset_path = "../../OmniDocBench/OpenDataLab___OmniDocBench/images" output_dir = "./OmniDocBench_Results_MultiGPU_MultiProcess" pipeline_config = "PP-StructureV3" # 多GPU多进程参数(可配置) batch_size = 4 # 批处理大小 gpu_ids = [0, 1] # 指定使用的GPU ID列表 - 可修改 pipelines_per_gpu = 1 # 每个GPU的pipeline实例数 - 可修改 # 如果想要自动检测所有可用GPU,取消下面的注释 # available_gpus = detect_available_gpus() # if available_gpus: # gpu_ids = available_gpus # 确保输出目录存在 print(f"输出目录: {Path(output_dir).absolute()}") os.makedirs(output_dir, exist_ok=True) dataset_path = Path(dataset_path).resolve() output_dir = Path(output_dir).resolve() print("="*70) print("OmniDocBench 多GPU多进程并行处理开始") print("="*70) print(f"数据集路径: {dataset_path}") print(f"输出目录: {output_dir}") print(f"批处理大小: {batch_size}") print(f"指定GPU ID: {gpu_ids}") print(f"每GPU Pipeline数: {pipelines_per_gpu}") print(f"总进程数: {len(gpu_ids) * pipelines_per_gpu}") # 查找所有图像文件 image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff'] image_files = [] for ext in image_extensions: image_files.extend(glob.glob(os.path.join(dataset_path, ext))) print(f"\n找到 {len(image_files)} 个图像文件") if not image_files: print("未找到任何图像文件,程序终止") return # 限制处理数量用于测试 # image_files = image_files[:20] # 取消注释以限制处理数量 # 开始处理 start_time = time.time() try: print(f"\n使用多GPU多进程并行处理...") print(f"处理配置: {len(gpu_ids)}个GPU, 每GPU {pipelines_per_gpu}个进程") results = parallel_process_with_multi_gpu( image_files, batch_size=batch_size, gpu_ids=gpu_ids, pipelines_per_gpu=pipelines_per_gpu, pipeline_config=pipeline_config, output_path=str(output_dir) ) total_time = time.time() - start_time # 统计信息 success_count = sum(1 for r in results if r.get('success', False)) error_count = len(results) - success_count total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False)) avg_processing_time = total_processing_time / success_count if success_count > 0 else 0 # 按GPU和进程统计 gpu_stats = {} process_stats = {} for r in results: gpu_id = r.get('gpu_id', 'unknown') process_id = r.get('process_id', 'unknown') # GPU统计 if gpu_id not in gpu_stats: gpu_stats[gpu_id] = {'success': 0, 'total': 0, 'total_time': 0} gpu_stats[gpu_id]['total'] += 1 if r.get('success', False): gpu_stats[gpu_id]['success'] += 1 gpu_stats[gpu_id]['total_time'] += r.get('processing_time', 0) # 进程统计 if process_id not in process_stats: process_stats[process_id] = {'success': 0, 'total': 0, 'gpu_id': gpu_id} process_stats[process_id]['total'] += 1 if r.get('success', False): process_stats[process_id]['success'] += 1 # 保存结果统计 stats = { "total_files": len(image_files), "success_count": success_count, "error_count": error_count, "success_rate": success_count / len(image_files), "total_time": total_time, "avg_processing_time": avg_processing_time, "throughput": len(image_files) / total_time, "batch_size": batch_size, "gpu_ids": gpu_ids, "pipelines_per_gpu": pipelines_per_gpu, "total_processes": len(gpu_ids) * pipelines_per_gpu, "gpu_stats": gpu_stats, "process_stats": process_stats, "optimization": "多GPU多进程并行" } # 保存最终结果 output_file = os.path.join(output_dir, f"OmniDocBench_MultiGPU_batch{batch_size}_gpus{len(gpu_ids)}_ppg{pipelines_per_gpu}.json") final_results = { "configuration": { "gpu_ids": gpu_ids, "pipelines_per_gpu": pipelines_per_gpu, "batch_size": batch_size, "total_processes": len(gpu_ids) * pipelines_per_gpu }, "results": results, "stats": stats } with open(output_file, 'w', encoding='utf-8') as f: json.dump(final_results, f, ensure_ascii=False, indent=2) print("\n" + "="*70) print("多GPU多进程并行处理完成!") print("="*70) print(f"总文件数: {len(image_files)}") print(f"成功处理: {success_count}") print(f"失败数量: {error_count}") print(f"成功率: {success_count / len(image_files) * 100:.2f}%") print(f"总耗时: {total_time:.2f}秒") print(f"平均处理时间: {avg_processing_time:.2f}秒/张") print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒") print(f"配置: {len(gpu_ids)}个GPU, 每GPU {pipelines_per_gpu}个进程") # GPU统计 print(f"\nGPU分布统计:") for gpu_id, stat in gpu_stats.items(): if stat['total'] > 0: gpu_success_rate = stat['success'] / stat['total'] * 100 gpu_avg_time = stat['total_time'] / stat['success'] if stat['success'] > 0 else 0 print(f" GPU {gpu_id}: {stat['success']}/{stat['total']} 成功 " f"({gpu_success_rate:.1f}%), 平均 {gpu_avg_time:.2f}s/张") # 进程统计 print(f"\n进程分布统计:") for process_id, stat in process_stats.items(): if stat['total'] > 0: process_success_rate = stat['success'] / stat['total'] * 100 print(f" 进程 {process_id} (GPU {stat['gpu_id']}): {stat['success']}/{stat['total']} " f"({process_success_rate:.1f}%)") print(f"\n结果保存至: {output_file}") except Exception as e: print(f"处理过程中发生错误: {str(e)}") traceback.print_exc() if __name__ == "__main__": main()