|
|
@@ -1,512 +0,0 @@
|
|
|
-# zhch/ppstructurev3_multi_gpu_multiprocess.py
|
|
|
-import json
|
|
|
-import time
|
|
|
-import os
|
|
|
-import glob
|
|
|
-import traceback
|
|
|
-from pathlib import Path
|
|
|
-from typing import List, Dict, Any, Tuple
|
|
|
-from multiprocessing import Queue, Manager, Process
|
|
|
-import cv2
|
|
|
-import numpy as np
|
|
|
-from paddlex import create_pipeline
|
|
|
-from tqdm import tqdm
|
|
|
-import paddle
|
|
|
-
|
|
|
-from dotenv import load_dotenv
|
|
|
-load_dotenv(override=True)
|
|
|
-
|
|
|
-class PPStructureV3MultiGPUPredictor:
|
|
|
- """
|
|
|
- PP-StructureV3多GPU多进程预测器
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, pipeline_config_path: str = "PP-StructureV3", output_path: str = "output", gpu_id: int = 0, process_id: int = 0):
|
|
|
- """
|
|
|
- 初始化预测器
|
|
|
-
|
|
|
- Args:
|
|
|
- pipeline_config_path: PaddleX pipeline配置文件路径
|
|
|
- output_path: 输出路径
|
|
|
- gpu_id: GPU设备ID
|
|
|
- process_id: 进程ID
|
|
|
- """
|
|
|
- self.pipeline_config = pipeline_config_path
|
|
|
- self.pipeline = None # 延迟初始化
|
|
|
- self.output_path = output_path
|
|
|
- self.gpu_id = gpu_id
|
|
|
- self.process_id = process_id
|
|
|
- self.device = f"gpu:{gpu_id}"
|
|
|
-
|
|
|
- def _ensure_pipeline(self):
|
|
|
- """确保pipeline已初始化"""
|
|
|
- if self.pipeline is None:
|
|
|
- try:
|
|
|
- # 设置当前GPU
|
|
|
- paddle.device.set_device(f"gpu:{self.gpu_id}")
|
|
|
- self.pipeline = create_pipeline(pipeline=self.pipeline_config)
|
|
|
- print(f"进程 {self.process_id} - Pipeline初始化完成 - GPU:{self.gpu_id}")
|
|
|
- except Exception as e:
|
|
|
- print(f"进程 {self.process_id} - Pipeline初始化失败 - GPU:{self.gpu_id}, 错误: {e}")
|
|
|
- raise e
|
|
|
-
|
|
|
- def process_single_image(self, image_path: str) -> Dict[str, Any]:
|
|
|
- """
|
|
|
- 处理单张图像
|
|
|
-
|
|
|
- Args:
|
|
|
- image_path: 图像路径
|
|
|
-
|
|
|
- Returns:
|
|
|
- 处理结果
|
|
|
- """
|
|
|
- try:
|
|
|
- # 确保pipeline已初始化
|
|
|
- self._ensure_pipeline()
|
|
|
-
|
|
|
- # 读取图像获取尺寸信息
|
|
|
- image = cv2.imread(image_path)
|
|
|
- if image is None:
|
|
|
- return {
|
|
|
- "image_path": Path(image_path).name,
|
|
|
- "error": "无法读取图像",
|
|
|
- "success": False,
|
|
|
- "processing_time": 0,
|
|
|
- "gpu_id": self.gpu_id,
|
|
|
- "process_id": self.process_id
|
|
|
- }
|
|
|
-
|
|
|
- # 运行PaddleX pipeline
|
|
|
- start_time = time.time()
|
|
|
-
|
|
|
- output = self.pipeline.predict(
|
|
|
- input=image_path,
|
|
|
- device=self.device,
|
|
|
- use_doc_orientation_classify=True,
|
|
|
- use_doc_unwarping=False,
|
|
|
- use_seal_recognition=True,
|
|
|
- use_chart_recognition=True,
|
|
|
- use_table_recognition=True,
|
|
|
- use_formula_recognition=True,
|
|
|
- )
|
|
|
-
|
|
|
- # 保存结果
|
|
|
- for res in output:
|
|
|
- res.save_to_json(save_path=self.output_path)
|
|
|
- res.save_to_markdown(save_path=self.output_path)
|
|
|
-
|
|
|
- process_time = time.time() - start_time
|
|
|
-
|
|
|
- # 返回处理结果
|
|
|
- return {
|
|
|
- "image_path": Path(image_path).name,
|
|
|
- "processing_time": process_time,
|
|
|
- "success": True,
|
|
|
- "gpu_id": self.gpu_id,
|
|
|
- "process_id": self.process_id
|
|
|
- }
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- return {
|
|
|
- "image_path": Path(image_path).name,
|
|
|
- "error": str(e),
|
|
|
- "success": False,
|
|
|
- "processing_time": 0,
|
|
|
- "gpu_id": self.gpu_id,
|
|
|
- "process_id": self.process_id
|
|
|
- }
|
|
|
-
|
|
|
- def process_batch(self, image_paths: List[str]) -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 批处理图像
|
|
|
-
|
|
|
- Args:
|
|
|
- image_paths: 图像路径列表
|
|
|
-
|
|
|
- Returns:
|
|
|
- 结果列表
|
|
|
- """
|
|
|
- results = []
|
|
|
-
|
|
|
- for image_path in image_paths:
|
|
|
- result = self.process_single_image(image_path)
|
|
|
- results.append(result)
|
|
|
-
|
|
|
- return results
|
|
|
-
|
|
|
-def multi_gpu_process_worker(process_id: int,
|
|
|
- gpu_id: int,
|
|
|
- task_queue: Queue,
|
|
|
- result_queue: Queue,
|
|
|
- pipeline_config: str,
|
|
|
- output_path: str):
|
|
|
- """
|
|
|
- 多GPU多进程工作函数
|
|
|
-
|
|
|
- Args:
|
|
|
- process_id: 进程ID
|
|
|
- gpu_id: GPU设备ID
|
|
|
- task_queue: 任务队列
|
|
|
- result_queue: 结果队列
|
|
|
- pipeline_config: pipeline配置
|
|
|
- output_path: 输出路径
|
|
|
- """
|
|
|
- try:
|
|
|
- # 每个进程创建自己的输出目录
|
|
|
- worker_output = f"{output_path}/gpu{gpu_id}_process_{process_id}"
|
|
|
- os.makedirs(worker_output, exist_ok=True)
|
|
|
-
|
|
|
- # 初始化预测器(每个进程只初始化一次)
|
|
|
- predictor = PPStructureV3MultiGPUPredictor(
|
|
|
- pipeline_config,
|
|
|
- output_path=worker_output,
|
|
|
- gpu_id=gpu_id,
|
|
|
- process_id=process_id
|
|
|
- )
|
|
|
-
|
|
|
- print(f"进程 {process_id} (GPU {gpu_id}) 初始化完成")
|
|
|
-
|
|
|
- # 持续处理任务
|
|
|
- while True:
|
|
|
- try:
|
|
|
- batch = task_queue.get(timeout=2.0)
|
|
|
- if batch is None: # 结束信号
|
|
|
- print(f"进程 {process_id} (GPU {gpu_id}) 收到结束信号")
|
|
|
- break
|
|
|
-
|
|
|
- # 处理批次
|
|
|
- batch_results = predictor.process_batch(batch)
|
|
|
- result_queue.put(batch_results)
|
|
|
-
|
|
|
- print(f"进程 {process_id} (GPU {gpu_id}) 完成批次处理: {len(batch)} 张图像")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"进程 {process_id} (GPU {gpu_id}) 处理批次时出错: {e}")
|
|
|
- continue
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"进程 {process_id} (GPU {gpu_id}) 初始化失败: {e}")
|
|
|
- traceback.print_exc()
|
|
|
- finally:
|
|
|
- print(f"进程 {process_id} (GPU {gpu_id}) 结束")
|
|
|
-
|
|
|
-def parallel_process_with_multi_gpu(image_paths: List[str],
|
|
|
- batch_size: int = 4,
|
|
|
- gpu_ids: List[int] = [0, 1],
|
|
|
- pipelines_per_gpu: int = 1,
|
|
|
- pipeline_config: str = "PP-StructureV3",
|
|
|
- output_path: str = "./output") -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 使用多GPU多进程并行处理
|
|
|
-
|
|
|
- Args:
|
|
|
- image_paths: 图像路径列表
|
|
|
- batch_size: 批处理大小
|
|
|
- gpu_ids: 要使用的GPU ID列表
|
|
|
- pipelines_per_gpu: 每个GPU的pipeline实例数
|
|
|
- pipeline_config: pipeline配置
|
|
|
- output_path: 输出路径
|
|
|
-
|
|
|
- Returns:
|
|
|
- 处理结果列表
|
|
|
- """
|
|
|
- # 确保输出目录存在
|
|
|
- os.makedirs(output_path, exist_ok=True)
|
|
|
-
|
|
|
- # 检查可用GPU
|
|
|
- try:
|
|
|
- available_gpu_count = paddle.device.cuda.device_count()
|
|
|
- print(f"系统检测到 {available_gpu_count} 个GPU")
|
|
|
-
|
|
|
- # 验证指定的GPU是否可用
|
|
|
- valid_gpu_ids = []
|
|
|
- for gpu_id in gpu_ids:
|
|
|
- if gpu_id < available_gpu_count:
|
|
|
- valid_gpu_ids.append(gpu_id)
|
|
|
- else:
|
|
|
- print(f"警告:GPU {gpu_id} 不可用,跳过")
|
|
|
-
|
|
|
- if not valid_gpu_ids:
|
|
|
- print("错误:没有可用的GPU")
|
|
|
- return []
|
|
|
-
|
|
|
- gpu_ids = valid_gpu_ids
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"GPU检测失败: {e}")
|
|
|
- gpu_ids = [0] # 降级为单GPU
|
|
|
- pipelines_per_gpu = 1
|
|
|
-
|
|
|
- total_processes = len(gpu_ids) * pipelines_per_gpu
|
|
|
- print(f"使用GPU: {gpu_ids}")
|
|
|
- print(f"每GPU Pipeline数: {pipelines_per_gpu}")
|
|
|
- print(f"总进程数: {total_processes}")
|
|
|
-
|
|
|
- # 将图像路径分批
|
|
|
- batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
|
|
|
- print(f"总批次数: {len(batches)}")
|
|
|
-
|
|
|
- # 创建进程间通信队列
|
|
|
- manager = Manager()
|
|
|
- task_queue = manager.Queue()
|
|
|
- result_queue = manager.Queue()
|
|
|
-
|
|
|
- # 分发任务到队列
|
|
|
- for batch in batches:
|
|
|
- task_queue.put(batch)
|
|
|
- print(f"任务已分发到队列")
|
|
|
-
|
|
|
- # 启动工作进程
|
|
|
- processes = []
|
|
|
- process_id = 0
|
|
|
-
|
|
|
- for gpu_id in gpu_ids:
|
|
|
- for pipeline_idx in range(pipelines_per_gpu):
|
|
|
- p = Process(
|
|
|
- target=multi_gpu_process_worker,
|
|
|
- args=(process_id, gpu_id, task_queue, result_queue, pipeline_config, output_path),
|
|
|
- name=f"GPU{gpu_id}_Process{process_id}"
|
|
|
- )
|
|
|
- p.start()
|
|
|
- processes.append(p)
|
|
|
- process_id += 1
|
|
|
-
|
|
|
- print(f"启动了 {len(processes)} 个工作进程")
|
|
|
-
|
|
|
- # 发送结束信号
|
|
|
- for _ in range(total_processes):
|
|
|
- task_queue.put(None)
|
|
|
-
|
|
|
- # 收集结果
|
|
|
- all_results = []
|
|
|
- total_images = len(image_paths)
|
|
|
- completed_count = 0
|
|
|
-
|
|
|
- with tqdm(total=total_images, desc="多GPU多进程处理", unit="张") as pbar:
|
|
|
- # 等待所有结果
|
|
|
- expected_batches = len(batches)
|
|
|
- received_batches = 0
|
|
|
-
|
|
|
- while received_batches < expected_batches:
|
|
|
- try:
|
|
|
- batch_results = result_queue.get(timeout=60.0) # 增加超时时间
|
|
|
- all_results.extend(batch_results)
|
|
|
- completed_count += len(batch_results)
|
|
|
- received_batches += 1
|
|
|
-
|
|
|
- pbar.update(len(batch_results))
|
|
|
-
|
|
|
- # 更新进度条
|
|
|
- success_count = sum(1 for r in batch_results if r.get('success', False))
|
|
|
-
|
|
|
- # 按GPU统计
|
|
|
- gpu_stats = {}
|
|
|
- for r in all_results:
|
|
|
- gpu_id = r.get('gpu_id', 'unknown')
|
|
|
- if gpu_id not in gpu_stats:
|
|
|
- gpu_stats[gpu_id] = {'success': 0, 'total': 0}
|
|
|
- gpu_stats[gpu_id]['total'] += 1
|
|
|
- if r.get('success', False):
|
|
|
- gpu_stats[gpu_id]['success'] += 1
|
|
|
-
|
|
|
- gpu_info = ', '.join([f"GPU{k}:{v['success']}/{v['total']}" for k, v in gpu_stats.items()])
|
|
|
-
|
|
|
- pbar.set_postfix({
|
|
|
- 'batch_success': f"{success_count}/{len(batch_results)}",
|
|
|
- 'gpu_stats': gpu_info
|
|
|
- })
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"等待结果时出错: {e}")
|
|
|
- break
|
|
|
-
|
|
|
- # 等待所有进程结束
|
|
|
- print("等待所有进程结束...")
|
|
|
- for p in processes:
|
|
|
- p.join(timeout=10.0)
|
|
|
- if p.is_alive():
|
|
|
- print(f"强制终止进程: {p.name}")
|
|
|
- p.terminate()
|
|
|
-
|
|
|
- return all_results
|
|
|
-
|
|
|
-def detect_available_gpus() -> List[int]:
|
|
|
- """检测可用的GPU"""
|
|
|
- try:
|
|
|
- gpu_count = paddle.device.cuda.device_count()
|
|
|
- available_gpus = list(range(gpu_count))
|
|
|
- print(f"检测到 {gpu_count} 个可用GPU: {available_gpus}")
|
|
|
- return available_gpus
|
|
|
- except Exception as e:
|
|
|
- print(f"GPU检测失败: {e}")
|
|
|
- return []
|
|
|
-
|
|
|
-def main():
|
|
|
- """主函数 - 多GPU多进程并行处理"""
|
|
|
-
|
|
|
- # 配置参数
|
|
|
- dataset_path = "../../OmniDocBench/OpenDataLab___OmniDocBench/images"
|
|
|
- output_dir = "./OmniDocBench_Results_MultiGPU_MultiProcess"
|
|
|
- pipeline_config = "PP-StructureV3"
|
|
|
-
|
|
|
- # 多GPU多进程参数(可配置)
|
|
|
- batch_size = 4 # 批处理大小
|
|
|
- gpu_ids = [0, 1, 2, 3] # 指定使用的GPU ID列表 - 可修改
|
|
|
- pipelines_per_gpu = 1 # 每个GPU的pipeline实例数 - 可修改
|
|
|
-
|
|
|
- # 如果想要自动检测所有可用GPU,取消下面的注释
|
|
|
- # available_gpus = detect_available_gpus()
|
|
|
- # if available_gpus:
|
|
|
- # gpu_ids = available_gpus
|
|
|
-
|
|
|
- # 确保输出目录存在
|
|
|
- print(f"输出目录: {Path(output_dir).absolute()}")
|
|
|
- os.makedirs(output_dir, exist_ok=True)
|
|
|
-
|
|
|
- dataset_path = Path(dataset_path).resolve()
|
|
|
- output_dir = Path(output_dir).resolve()
|
|
|
-
|
|
|
- print("="*70)
|
|
|
- print("OmniDocBench 多GPU多进程并行处理开始")
|
|
|
- print("="*70)
|
|
|
- print(f"数据集路径: {dataset_path}")
|
|
|
- print(f"输出目录: {output_dir}")
|
|
|
- print(f"批处理大小: {batch_size}")
|
|
|
- print(f"指定GPU ID: {gpu_ids}")
|
|
|
- print(f"每GPU Pipeline数: {pipelines_per_gpu}")
|
|
|
- print(f"总进程数: {len(gpu_ids) * pipelines_per_gpu}")
|
|
|
-
|
|
|
- # 查找所有图像文件
|
|
|
- image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff']
|
|
|
- image_files = []
|
|
|
-
|
|
|
- for ext in image_extensions:
|
|
|
- image_files.extend(glob.glob(os.path.join(dataset_path, ext)))
|
|
|
-
|
|
|
- print(f"\n找到 {len(image_files)} 个图像文件")
|
|
|
-
|
|
|
- if not image_files:
|
|
|
- print("未找到任何图像文件,程序终止")
|
|
|
- return
|
|
|
-
|
|
|
- # 限制处理数量用于测试
|
|
|
- # image_files = image_files[:20] # 取消注释以限制处理数量
|
|
|
-
|
|
|
- # 开始处理
|
|
|
- start_time = time.time()
|
|
|
-
|
|
|
- try:
|
|
|
- print(f"\n使用多GPU多进程并行处理...")
|
|
|
- print(f"处理配置: {len(gpu_ids)}个GPU, 每GPU {pipelines_per_gpu}个进程")
|
|
|
-
|
|
|
- results = parallel_process_with_multi_gpu(
|
|
|
- image_files,
|
|
|
- batch_size=batch_size,
|
|
|
- gpu_ids=gpu_ids,
|
|
|
- pipelines_per_gpu=pipelines_per_gpu,
|
|
|
- pipeline_config=pipeline_config,
|
|
|
- output_path=str(output_dir)
|
|
|
- )
|
|
|
-
|
|
|
- total_time = time.time() - start_time
|
|
|
-
|
|
|
- # 统计信息
|
|
|
- success_count = sum(1 for r in results if r.get('success', False))
|
|
|
- error_count = len(results) - success_count
|
|
|
- total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False))
|
|
|
- avg_processing_time = total_processing_time / success_count if success_count > 0 else 0
|
|
|
-
|
|
|
- # 按GPU和进程统计
|
|
|
- gpu_stats = {}
|
|
|
- process_stats = {}
|
|
|
-
|
|
|
- for r in results:
|
|
|
- gpu_id = r.get('gpu_id', 'unknown')
|
|
|
- process_id = r.get('process_id', 'unknown')
|
|
|
-
|
|
|
- # GPU统计
|
|
|
- if gpu_id not in gpu_stats:
|
|
|
- gpu_stats[gpu_id] = {'success': 0, 'total': 0, 'total_time': 0}
|
|
|
- gpu_stats[gpu_id]['total'] += 1
|
|
|
- if r.get('success', False):
|
|
|
- gpu_stats[gpu_id]['success'] += 1
|
|
|
- gpu_stats[gpu_id]['total_time'] += r.get('processing_time', 0)
|
|
|
-
|
|
|
- # 进程统计
|
|
|
- if process_id not in process_stats:
|
|
|
- process_stats[process_id] = {'success': 0, 'total': 0, 'gpu_id': gpu_id}
|
|
|
- process_stats[process_id]['total'] += 1
|
|
|
- if r.get('success', False):
|
|
|
- process_stats[process_id]['success'] += 1
|
|
|
-
|
|
|
- # 保存结果统计
|
|
|
- stats = {
|
|
|
- "total_files": len(image_files),
|
|
|
- "success_count": success_count,
|
|
|
- "error_count": error_count,
|
|
|
- "success_rate": success_count / len(image_files),
|
|
|
- "total_time": total_time,
|
|
|
- "avg_processing_time": avg_processing_time,
|
|
|
- "throughput": len(image_files) / total_time,
|
|
|
- "batch_size": batch_size,
|
|
|
- "gpu_ids": gpu_ids,
|
|
|
- "pipelines_per_gpu": pipelines_per_gpu,
|
|
|
- "total_processes": len(gpu_ids) * pipelines_per_gpu,
|
|
|
- "gpu_stats": gpu_stats,
|
|
|
- "process_stats": process_stats,
|
|
|
- "optimization": "多GPU多进程并行"
|
|
|
- }
|
|
|
-
|
|
|
- # 保存最终结果
|
|
|
- output_file = os.path.join(output_dir, f"OmniDocBench_MultiGPU_batch{batch_size}_gpus{len(gpu_ids)}_ppg{pipelines_per_gpu}.json")
|
|
|
- final_results = {
|
|
|
- "configuration": {
|
|
|
- "gpu_ids": gpu_ids,
|
|
|
- "pipelines_per_gpu": pipelines_per_gpu,
|
|
|
- "batch_size": batch_size,
|
|
|
- "total_processes": len(gpu_ids) * pipelines_per_gpu
|
|
|
- },
|
|
|
- "results": results,
|
|
|
- "stats": stats
|
|
|
- }
|
|
|
-
|
|
|
- with open(output_file, 'w', encoding='utf-8') as f:
|
|
|
- json.dump(final_results, f, ensure_ascii=False, indent=2)
|
|
|
-
|
|
|
- print("\n" + "="*70)
|
|
|
- print("多GPU多进程并行处理完成!")
|
|
|
- print("="*70)
|
|
|
- print(f"总文件数: {len(image_files)}")
|
|
|
- print(f"成功处理: {success_count}")
|
|
|
- print(f"失败数量: {error_count}")
|
|
|
- print(f"成功率: {success_count / len(image_files) * 100:.2f}%")
|
|
|
- print(f"总耗时: {total_time:.2f}秒")
|
|
|
- print(f"平均处理时间: {avg_processing_time:.2f}秒/张")
|
|
|
- print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒")
|
|
|
- print(f"配置: {len(gpu_ids)}个GPU, 每GPU {pipelines_per_gpu}个进程")
|
|
|
-
|
|
|
- # GPU统计
|
|
|
- print(f"\nGPU分布统计:")
|
|
|
- for gpu_id, stat in gpu_stats.items():
|
|
|
- if stat['total'] > 0:
|
|
|
- gpu_success_rate = stat['success'] / stat['total'] * 100
|
|
|
- gpu_avg_time = stat['total_time'] / stat['success'] if stat['success'] > 0 else 0
|
|
|
- print(f" GPU {gpu_id}: {stat['success']}/{stat['total']} 成功 "
|
|
|
- f"({gpu_success_rate:.1f}%), 平均 {gpu_avg_time:.2f}s/张")
|
|
|
-
|
|
|
- # 进程统计
|
|
|
- print(f"\n进程分布统计:")
|
|
|
- for process_id, stat in process_stats.items():
|
|
|
- if stat['total'] > 0:
|
|
|
- process_success_rate = stat['success'] / stat['total'] * 100
|
|
|
- print(f" 进程 {process_id} (GPU {stat['gpu_id']}): {stat['success']}/{stat['total']} "
|
|
|
- f"({process_success_rate:.1f}%)")
|
|
|
-
|
|
|
- print(f"\n结果保存至: {output_file}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"处理过程中发生错误: {str(e)}")
|
|
|
- traceback.print_exc()
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- main()
|