|
|
@@ -0,0 +1,453 @@
|
|
|
+# zhch/ppstructurev3_dual_gpu_optimized.py
|
|
|
+import json
|
|
|
+import time
|
|
|
+import os
|
|
|
+import glob
|
|
|
+import traceback
|
|
|
+from pathlib import Path
|
|
|
+from typing import List, Dict, Any, Tuple
|
|
|
+from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
|
|
|
+from multiprocessing import Queue, Manager, Process
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
+from paddlex import create_pipeline
|
|
|
+from tqdm import tqdm
|
|
|
+import threading
|
|
|
+import queue
|
|
|
+import paddle
|
|
|
+
|
|
|
+from dotenv import load_dotenv
|
|
|
+load_dotenv(override=True)
|
|
|
+
|
|
|
+class PPStructureV3DualGPUPredictor:
|
|
|
+ """
|
|
|
+ PP-StructureV3双GPU并行预测器
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, pipeline_config_path: str = "PP-StructureV3", output_path: str = "output", gpu_id: int = 0):
|
|
|
+ """
|
|
|
+ 初始化预测器
|
|
|
+
|
|
|
+ Args:
|
|
|
+ pipeline_config_path: PaddleX pipeline配置文件路径
|
|
|
+ output_path: 输出路径
|
|
|
+ gpu_id: GPU设备ID (0 或 1)
|
|
|
+ """
|
|
|
+ self.pipeline_config = pipeline_config_path
|
|
|
+ self.pipeline = None # 延迟初始化
|
|
|
+ self.output_path = output_path
|
|
|
+ self.gpu_id = gpu_id
|
|
|
+ self.device = f"gpu:{gpu_id}"
|
|
|
+
|
|
|
+ def _ensure_pipeline(self):
|
|
|
+ """确保pipeline已初始化(线程安全)"""
|
|
|
+ if self.pipeline is None:
|
|
|
+ # 设置当前GPU
|
|
|
+ paddle.device.set_device(f"gpu:{self.gpu_id}")
|
|
|
+ self.pipeline = create_pipeline(pipeline=self.pipeline_config)
|
|
|
+ print(f"Pipeline初始化完成 - GPU:{self.gpu_id}")
|
|
|
+
|
|
|
+ def process_single_image(self, image_path: str) -> Dict[str, Any]:
|
|
|
+ """
|
|
|
+ 处理单张图像
|
|
|
+
|
|
|
+ Args:
|
|
|
+ image_path: 图像路径
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 处理结果{"image_path": str, "success": bool, "processing_time": float, "error": str}
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 确保pipeline已初始化
|
|
|
+ self._ensure_pipeline()
|
|
|
+
|
|
|
+ # 读取图像获取尺寸信息
|
|
|
+ image = cv2.imread(image_path)
|
|
|
+ if image is None:
|
|
|
+ return {
|
|
|
+ "image_path": Path(image_path).name,
|
|
|
+ "error": "无法读取图像",
|
|
|
+ "success": False,
|
|
|
+ "processing_time": 0,
|
|
|
+ "gpu_id": self.gpu_id
|
|
|
+ }
|
|
|
+
|
|
|
+ height, width = image.shape[:2]
|
|
|
+
|
|
|
+ # 运行PaddleX pipeline
|
|
|
+ start_time = time.time()
|
|
|
+
|
|
|
+ output = self.pipeline.predict(
|
|
|
+ input=image_path,
|
|
|
+ device=self.device,
|
|
|
+ use_doc_orientation_classify=True,
|
|
|
+ use_doc_unwarping=False,
|
|
|
+ use_seal_recognition=True,
|
|
|
+ use_chart_recognition=True,
|
|
|
+ use_table_recognition=True,
|
|
|
+ use_formula_recognition=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # 保存结果
|
|
|
+ for res in output:
|
|
|
+ res.save_to_json(save_path=self.output_path)
|
|
|
+ res.save_to_markdown(save_path=self.output_path)
|
|
|
+
|
|
|
+ process_time = time.time() - start_time
|
|
|
+
|
|
|
+ # 返回处理结果
|
|
|
+ return {
|
|
|
+ "image_path": Path(image_path).name,
|
|
|
+ "processing_time": process_time,
|
|
|
+ "success": True,
|
|
|
+ "gpu_id": self.gpu_id
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return {
|
|
|
+ "image_path": Path(image_path).name,
|
|
|
+ "error": str(e),
|
|
|
+ "success": False,
|
|
|
+ "processing_time": 0,
|
|
|
+ "gpu_id": self.gpu_id
|
|
|
+ }
|
|
|
+
|
|
|
+ def process_batch(self, image_paths: List[str]) -> List[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ 批处理图像
|
|
|
+
|
|
|
+ Args:
|
|
|
+ image_paths: 图像路径列表
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 结果列表
|
|
|
+ """
|
|
|
+ results = []
|
|
|
+
|
|
|
+ for image_path in image_paths:
|
|
|
+ result = self.process_single_image(image_path)
|
|
|
+ results.append(result)
|
|
|
+
|
|
|
+ return results
|
|
|
+
|
|
|
+class DualGPUThreadWorker:
|
|
|
+ """双GPU线程工作器 - 每个线程维护自己的pipeline实例"""
|
|
|
+
|
|
|
+ def __init__(self, pipeline_config: str, output_path: str, gpu_id: int, worker_id: int):
|
|
|
+ self.worker_id = worker_id
|
|
|
+ self.gpu_id = gpu_id
|
|
|
+ self.predictor = PPStructureV3DualGPUPredictor(
|
|
|
+ pipeline_config,
|
|
|
+ output_path=f"{output_path}/gpu{gpu_id}_worker_{worker_id}",
|
|
|
+ gpu_id=gpu_id
|
|
|
+ )
|
|
|
+ self.task_queue = queue.Queue()
|
|
|
+ self.result_queue = queue.Queue()
|
|
|
+ self.running = True
|
|
|
+
|
|
|
+ def add_batch(self, batch: List[str]):
|
|
|
+ """添加批处理任务"""
|
|
|
+ self.task_queue.put(batch)
|
|
|
+
|
|
|
+ def get_results(self) -> List[Dict[str, Any]]:
|
|
|
+ """获取处理结果"""
|
|
|
+ results = []
|
|
|
+ while not self.result_queue.empty():
|
|
|
+ try:
|
|
|
+ result = self.result_queue.get_nowait()
|
|
|
+ results.extend(result)
|
|
|
+ except queue.Empty:
|
|
|
+ break
|
|
|
+ return results
|
|
|
+
|
|
|
+ def worker_loop(self):
|
|
|
+ """工作循环"""
|
|
|
+ print(f"GPU{self.gpu_id} Worker{self.worker_id} 开始工作")
|
|
|
+
|
|
|
+ while self.running:
|
|
|
+ try:
|
|
|
+ batch = self.task_queue.get(timeout=1.0)
|
|
|
+ if batch is None: # 结束信号
|
|
|
+ break
|
|
|
+
|
|
|
+ # 处理批次
|
|
|
+ batch_results = self.predictor.process_batch(batch)
|
|
|
+ self.result_queue.put(batch_results)
|
|
|
+
|
|
|
+ except queue.Empty:
|
|
|
+ continue
|
|
|
+ except Exception as e:
|
|
|
+ print(f"GPU{self.gpu_id} Worker{self.worker_id} 处理出错: {e}")
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ """停止工作线程"""
|
|
|
+ self.running = False
|
|
|
+ self.task_queue.put(None) # 发送结束信号
|
|
|
+
|
|
|
+def parallel_process_with_dual_gpu(image_paths: List[str],
|
|
|
+ batch_size: int = 4,
|
|
|
+ workers_per_gpu: int = 2, # 每个GPU的worker数量
|
|
|
+ pipeline_config: str = "PP-StructureV3",
|
|
|
+ output_path: str = "./output") -> List[Dict[str, Any]]:
|
|
|
+ """
|
|
|
+ 使用双GPU优化的多线程并行处理
|
|
|
+
|
|
|
+ Args:
|
|
|
+ image_paths: 图像路径列表
|
|
|
+ batch_size: 批处理大小
|
|
|
+ workers_per_gpu: 每个GPU的worker数量(推荐2个)
|
|
|
+ pipeline_config: pipeline配置
|
|
|
+ output_path: 输出路径
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ 处理结果列表
|
|
|
+ """
|
|
|
+ # 确保输出目录存在
|
|
|
+ os.makedirs(output_path, exist_ok=True)
|
|
|
+
|
|
|
+ # 检查可用GPU
|
|
|
+ try:
|
|
|
+ gpu_count = paddle.device.cuda.device_count()
|
|
|
+ print(f"检测到 {gpu_count} 个GPU")
|
|
|
+
|
|
|
+ if gpu_count < 2:
|
|
|
+ print("警告:检测到的GPU数量少于2个,建议检查CUDA配置")
|
|
|
+ available_gpus = list(range(gpu_count))
|
|
|
+ else:
|
|
|
+ available_gpus = [0, 1] # 使用GPU 0和1
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"GPU检测失败: {e}")
|
|
|
+ available_gpus = [0] # 降级为单GPU
|
|
|
+
|
|
|
+ total_workers = len(available_gpus) * workers_per_gpu
|
|
|
+ print(f"使用GPU: {available_gpus}")
|
|
|
+ print(f"每GPU Worker数: {workers_per_gpu}")
|
|
|
+ print(f"总Worker数: {total_workers}")
|
|
|
+
|
|
|
+ # 将图像路径分批
|
|
|
+ batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
|
|
|
+
|
|
|
+ # 创建工作线程
|
|
|
+ workers = []
|
|
|
+ threads = []
|
|
|
+
|
|
|
+ worker_id = 0
|
|
|
+ for gpu_id in available_gpus:
|
|
|
+ for i in range(workers_per_gpu):
|
|
|
+ worker = DualGPUThreadWorker(pipeline_config, output_path, gpu_id, worker_id)
|
|
|
+ workers.append(worker)
|
|
|
+
|
|
|
+ thread = threading.Thread(target=worker.worker_loop, name=f"GPU{gpu_id}_Worker{worker_id}")
|
|
|
+ thread.daemon = True
|
|
|
+ thread.start()
|
|
|
+ threads.append(thread)
|
|
|
+ worker_id += 1
|
|
|
+
|
|
|
+ print(f"启动了 {len(workers)} 个工作线程,分布在 {len(available_gpus)} 个GPU上")
|
|
|
+
|
|
|
+ # 分发任务
|
|
|
+ all_results = []
|
|
|
+ total_images = len(image_paths)
|
|
|
+ completed_count = 0
|
|
|
+
|
|
|
+ try:
|
|
|
+ with tqdm(total=total_images, desc="双GPU处理图像", unit="张") as pbar:
|
|
|
+ # 轮流分发批次到不同的worker
|
|
|
+ for i, batch in enumerate(batches):
|
|
|
+ worker_id = i % len(workers)
|
|
|
+ workers[worker_id].add_batch(batch)
|
|
|
+
|
|
|
+ # 等待所有任务完成
|
|
|
+ while completed_count < total_images:
|
|
|
+ time.sleep(0.1) # 短暂等待
|
|
|
+
|
|
|
+ # 收集结果
|
|
|
+ for worker in workers:
|
|
|
+ batch_results = worker.get_results()
|
|
|
+ if batch_results:
|
|
|
+ all_results.extend(batch_results)
|
|
|
+ completed_count += len(batch_results)
|
|
|
+ pbar.update(len(batch_results))
|
|
|
+
|
|
|
+ # 更新进度条
|
|
|
+ success_count = sum(1 for r in batch_results if r.get('success', False))
|
|
|
+
|
|
|
+ # 按GPU统计
|
|
|
+ gpu_stats = {}
|
|
|
+ for r in all_results:
|
|
|
+ gpu_id = r.get('gpu_id', 'unknown')
|
|
|
+ if gpu_id not in gpu_stats:
|
|
|
+ gpu_stats[gpu_id] = {'success': 0, 'total': 0}
|
|
|
+ gpu_stats[gpu_id]['total'] += 1
|
|
|
+ if r.get('success', False):
|
|
|
+ gpu_stats[gpu_id]['success'] += 1
|
|
|
+
|
|
|
+ gpu_info = ', '.join([f"GPU{k}:{v['success']}/{v['total']}" for k, v in gpu_stats.items()])
|
|
|
+
|
|
|
+ pbar.set_postfix({
|
|
|
+ 'recent_success': f"{success_count}/{len(batch_results)}",
|
|
|
+ 'gpu_distribution': gpu_info
|
|
|
+ })
|
|
|
+
|
|
|
+ finally:
|
|
|
+ # 停止所有工作线程
|
|
|
+ for worker in workers:
|
|
|
+ worker.stop()
|
|
|
+
|
|
|
+ # 等待线程结束
|
|
|
+ for thread in threads:
|
|
|
+ thread.join(timeout=3.0)
|
|
|
+
|
|
|
+ return all_results
|
|
|
+
|
|
|
+def monitor_gpu_memory():
|
|
|
+ """监控GPU内存使用情况"""
|
|
|
+ try:
|
|
|
+ for gpu_id in [0, 1]:
|
|
|
+ paddle.device.set_device(f"gpu:{gpu_id}")
|
|
|
+ allocated = paddle.device.cuda.memory_allocated() / 1024**3
|
|
|
+ reserved = paddle.device.cuda.memory_reserved() / 1024**3
|
|
|
+ print(f"GPU {gpu_id} - 已分配: {allocated:.2f}GB, 已预留: {reserved:.2f}GB")
|
|
|
+ except Exception as e:
|
|
|
+ print(f"GPU内存监控失败: {e}")
|
|
|
+
|
|
|
+def main():
|
|
|
+ """主函数 - 双GPU优化的并行处理"""
|
|
|
+
|
|
|
+ # 配置参数
|
|
|
+ dataset_path = "../../OmniDocBench/OpenDataLab___OmniDocBench/images"
|
|
|
+ output_dir = "./OmniDocBench_Results_DualGPU"
|
|
|
+ pipeline_config = "PP-StructureV3"
|
|
|
+
|
|
|
+ # 双GPU处理参数
|
|
|
+ batch_size = 4 # 批处理大小
|
|
|
+ workers_per_gpu = 2 # 每个GPU的worker数量(24GB GPU推荐2个)
|
|
|
+
|
|
|
+ # 确保输出目录存在
|
|
|
+ print(f"输出目录: {Path(output_dir).absolute()}")
|
|
|
+ os.makedirs(output_dir, exist_ok=True)
|
|
|
+
|
|
|
+ dataset_path = Path(dataset_path).resolve()
|
|
|
+ output_dir = Path(output_dir).resolve()
|
|
|
+
|
|
|
+ print("="*60)
|
|
|
+ print("OmniDocBench 双GPU优化并行处理开始")
|
|
|
+ print("="*60)
|
|
|
+ print(f"数据集路径: {dataset_path}")
|
|
|
+ print(f"输出目录: {output_dir}")
|
|
|
+ print(f"批处理大小: {batch_size}")
|
|
|
+ print(f"每GPU Worker数: {workers_per_gpu}")
|
|
|
+ print(f"总Worker数: {workers_per_gpu * 2}")
|
|
|
+
|
|
|
+ # 监控初始GPU状态
|
|
|
+ print("\n初始GPU内存状态:")
|
|
|
+ monitor_gpu_memory()
|
|
|
+
|
|
|
+ # 查找所有图像文件
|
|
|
+ image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff']
|
|
|
+ image_files = []
|
|
|
+
|
|
|
+ for ext in image_extensions:
|
|
|
+ image_files.extend(glob.glob(os.path.join(dataset_path, ext)))
|
|
|
+
|
|
|
+ print(f"\n找到 {len(image_files)} 个图像文件")
|
|
|
+
|
|
|
+ if not image_files:
|
|
|
+ print("未找到任何图像文件,程序终止")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 限制处理数量用于测试
|
|
|
+ # image_files = image_files[:40] # 取消注释以限制处理数量
|
|
|
+
|
|
|
+ # 开始处理
|
|
|
+ start_time = time.time()
|
|
|
+
|
|
|
+ try:
|
|
|
+ print("\n使用双GPU优化并行处理...")
|
|
|
+ results = parallel_process_with_dual_gpu(
|
|
|
+ image_files,
|
|
|
+ batch_size,
|
|
|
+ workers_per_gpu,
|
|
|
+ pipeline_config,
|
|
|
+ str(output_dir)
|
|
|
+ )
|
|
|
+
|
|
|
+ total_time = time.time() - start_time
|
|
|
+
|
|
|
+ # 统计信息
|
|
|
+ success_count = sum(1 for r in results if r.get('success', False))
|
|
|
+ error_count = len(results) - success_count
|
|
|
+ total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False))
|
|
|
+ avg_processing_time = total_processing_time / success_count if success_count > 0 else 0
|
|
|
+
|
|
|
+ # 按GPU统计
|
|
|
+ gpu_stats = {}
|
|
|
+ for r in results:
|
|
|
+ gpu_id = r.get('gpu_id', 'unknown')
|
|
|
+ if gpu_id not in gpu_stats:
|
|
|
+ gpu_stats[gpu_id] = {'success': 0, 'total': 0, 'total_time': 0}
|
|
|
+ gpu_stats[gpu_id]['total'] += 1
|
|
|
+ if r.get('success', False):
|
|
|
+ gpu_stats[gpu_id]['success'] += 1
|
|
|
+ gpu_stats[gpu_id]['total_time'] += r.get('processing_time', 0)
|
|
|
+
|
|
|
+ # 保存结果统计
|
|
|
+ stats = {
|
|
|
+ "total_files": len(image_files),
|
|
|
+ "success_count": success_count,
|
|
|
+ "error_count": error_count,
|
|
|
+ "success_rate": success_count / len(image_files),
|
|
|
+ "total_time": total_time,
|
|
|
+ "avg_processing_time": avg_processing_time,
|
|
|
+ "throughput": len(image_files) / total_time,
|
|
|
+ "batch_size": batch_size,
|
|
|
+ "workers_per_gpu": workers_per_gpu,
|
|
|
+ "total_workers": workers_per_gpu * 2,
|
|
|
+ "gpu_stats": gpu_stats,
|
|
|
+ "optimization": "双GPU多线程并行"
|
|
|
+ }
|
|
|
+
|
|
|
+ # 保存最终结果
|
|
|
+ output_file = os.path.join(output_dir, f"OmniDocBench_DualGPU_batch{batch_size}_workers{workers_per_gpu}.json")
|
|
|
+ final_results = {
|
|
|
+ "results": results,
|
|
|
+ "stats": stats
|
|
|
+ }
|
|
|
+
|
|
|
+ with open(output_file, 'w', encoding='utf-8') as f:
|
|
|
+ json.dump(final_results, f, ensure_ascii=False, indent=2)
|
|
|
+
|
|
|
+ print("\n" + "="*60)
|
|
|
+ print("双GPU优化并行处理完成!")
|
|
|
+ print("="*60)
|
|
|
+ print(f"总文件数: {len(image_files)}")
|
|
|
+ print(f"成功处理: {success_count}")
|
|
|
+ print(f"失败数量: {error_count}")
|
|
|
+ print(f"成功率: {success_count / len(image_files) * 100:.2f}%")
|
|
|
+ print(f"总耗时: {total_time:.2f}秒")
|
|
|
+ print(f"平均处理时间: {avg_processing_time:.2f}秒/张")
|
|
|
+ print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒")
|
|
|
+ print(f"Worker数: {workers_per_gpu * 2} (每GPU {workers_per_gpu}个)")
|
|
|
+
|
|
|
+ # GPU统计
|
|
|
+ print(f"\nGPU分布统计:")
|
|
|
+ for gpu_id, stat in gpu_stats.items():
|
|
|
+ if stat['total'] > 0:
|
|
|
+ gpu_success_rate = stat['success'] / stat['total'] * 100
|
|
|
+ gpu_avg_time = stat['total_time'] / stat['success'] if stat['success'] > 0 else 0
|
|
|
+ print(f" GPU {gpu_id}: {stat['success']}/{stat['total']} 成功 "
|
|
|
+ f"({gpu_success_rate:.1f}%), 平均 {gpu_avg_time:.2f}s/张")
|
|
|
+
|
|
|
+ print(f"\n结果保存至: {output_file}")
|
|
|
+
|
|
|
+ # 监控最终GPU状态
|
|
|
+ print("\n最终GPU内存状态:")
|
|
|
+ monitor_gpu_memory()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ print(f"处理过程中发生错误: {str(e)}")
|
|
|
+ traceback.print_exc()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|