|
|
@@ -1,451 +0,0 @@
|
|
|
-# zhch/ppstructurev3_multi_gpu_multiprocess_official.py
|
|
|
-"""
|
|
|
-多GPU多进程推理始终有问题,多个进程启动后,paddle底层报错
|
|
|
-目前无法定位原因
|
|
|
-"""
|
|
|
-import json
|
|
|
-import time
|
|
|
-import os
|
|
|
-import glob
|
|
|
-import traceback
|
|
|
-import argparse
|
|
|
-import sys
|
|
|
-from pathlib import Path
|
|
|
-from typing import List, Dict, Any, Tuple
|
|
|
-from multiprocessing import Manager, Process, Queue
|
|
|
-from queue import Empty
|
|
|
-import cv2
|
|
|
-import numpy as np
|
|
|
-from paddlex import create_pipeline
|
|
|
-from paddlex.utils.device import constr_device, parse_device
|
|
|
-from tqdm import tqdm
|
|
|
-# import paddle # ❌ 不要在主模块导入paddle
|
|
|
-# from cuda_utils import detect_available_gpus, monitor_gpu_memory # ❌ 不要在主进程使用
|
|
|
-
|
|
|
-from dotenv import load_dotenv
|
|
|
-load_dotenv(override=True)
|
|
|
-
|
|
|
-def worker(pipeline_name_or_config_path: str,
|
|
|
- device: str,
|
|
|
- task_queue: Queue,
|
|
|
- result_queue: Queue,
|
|
|
- batch_size: int,
|
|
|
- output_dir: str,
|
|
|
- worker_id: int):
|
|
|
- """
|
|
|
- 工作进程函数 - 基于官方parallel_inference.md实现
|
|
|
-
|
|
|
- Args:
|
|
|
- pipeline_name_or_config_path: Pipeline名称或配置路径
|
|
|
- device: 设备字符串
|
|
|
- task_queue: 任务队列
|
|
|
- result_queue: 结果队列
|
|
|
- batch_size: 批处理大小
|
|
|
- output_dir: 输出目录
|
|
|
- worker_id: 工作进程ID
|
|
|
- """
|
|
|
- try:
|
|
|
- # 在子进程中导入paddle,避免主进程CUDA冲突
|
|
|
- import paddle
|
|
|
- import os
|
|
|
-
|
|
|
- # 设置子进程的CUDA设备
|
|
|
- device_id = device.split(':')[1] if ':' in device else '0'
|
|
|
- os.environ['CUDA_VISIBLE_DEVICES'] = device_id
|
|
|
-
|
|
|
- # 设置paddle使用单精度,避免混合精度问题
|
|
|
- # paddle.set_default_dtype("float32")
|
|
|
-
|
|
|
- # 清理GPU缓存
|
|
|
- if paddle.device.cuda.device_count() > 0:
|
|
|
- paddle.device.cuda.empty_cache()
|
|
|
-
|
|
|
- # 直接创建pipeline,让PaddleX自动处理设备初始化
|
|
|
- pipeline = create_pipeline(pipeline_name_or_config_path, device=device)
|
|
|
- print(f"Worker {worker_id} initialized with device {device}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"Worker {worker_id} ({device}) initialization failed: {e}", file=sys.stderr)
|
|
|
- traceback.print_exc()
|
|
|
- # 发送错误信息到结果队列
|
|
|
- result_queue.put([{
|
|
|
- "error": f"Worker initialization failed: {str(e)}",
|
|
|
- "worker_id": worker_id,
|
|
|
- "device": device,
|
|
|
- "success": False
|
|
|
- }])
|
|
|
- return
|
|
|
-
|
|
|
- try:
|
|
|
- should_end = False
|
|
|
- batch = []
|
|
|
- processed_count = 0
|
|
|
-
|
|
|
- while not should_end:
|
|
|
- try:
|
|
|
- input_path = task_queue.get_nowait()
|
|
|
- except Empty:
|
|
|
- should_end = True
|
|
|
- except Exception as e:
|
|
|
- # 处理其他可能的异常
|
|
|
- print(f"Unexpected error while getting task: {e}", file=sys.stderr)
|
|
|
- traceback.print_exc()
|
|
|
- should_end = True
|
|
|
- else:
|
|
|
- # 检查是否为结束信号
|
|
|
- if input_path is None:
|
|
|
- should_end = True
|
|
|
- else:
|
|
|
- batch.append(input_path)
|
|
|
-
|
|
|
- if batch and (len(batch) == batch_size or should_end):
|
|
|
- try:
|
|
|
- start_time = time.time()
|
|
|
-
|
|
|
- # 使用pipeline预测
|
|
|
- results = pipeline.predict(
|
|
|
- batch,
|
|
|
- use_doc_orientation_classify=True,
|
|
|
- use_doc_unwarping=False,
|
|
|
- use_seal_recognition=True,
|
|
|
- use_chart_recognition=True,
|
|
|
- use_table_recognition=True,
|
|
|
- use_formula_recognition=True,
|
|
|
- )
|
|
|
-
|
|
|
- batch_processing_time = time.time() - start_time
|
|
|
- batch_results = []
|
|
|
-
|
|
|
- for result in results:
|
|
|
- try:
|
|
|
- input_path = Path(result["input_path"])
|
|
|
-
|
|
|
- # 保存结果
|
|
|
- if result.get("page_index") is not None:
|
|
|
- output_filename = f"{input_path.stem}_{result['page_index']}"
|
|
|
- else:
|
|
|
- output_filename = f"{input_path.stem}"
|
|
|
-
|
|
|
- # 保存JSON和Markdown
|
|
|
- json_output_path = str(Path(output_dir, f"{output_filename}.json"))
|
|
|
- md_output_path = str(Path(output_dir, f"{output_filename}.md"))
|
|
|
-
|
|
|
- result.save_to_json(json_output_path)
|
|
|
- result.save_to_markdown(md_output_path)
|
|
|
-
|
|
|
- # 记录处理结果
|
|
|
- batch_results.append({
|
|
|
- "image_path": input_path.name,
|
|
|
- "processing_time": batch_processing_time / len(batch), # 平均时间
|
|
|
- "success": True,
|
|
|
- "device": device,
|
|
|
- "worker_id": worker_id,
|
|
|
- "output_json": json_output_path,
|
|
|
- "output_md": md_output_path
|
|
|
- })
|
|
|
-
|
|
|
- processed_count += 1
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- traceback.print_exc()
|
|
|
- batch_results.append({
|
|
|
- "image_path": Path(result["input_path"]).name,
|
|
|
- "processing_time": 0,
|
|
|
- "success": False,
|
|
|
- "device": device,
|
|
|
- "worker_id": worker_id,
|
|
|
- "error": str(e)
|
|
|
- })
|
|
|
-
|
|
|
- # 将结果放入结果队列
|
|
|
- result_queue.put(batch_results)
|
|
|
-
|
|
|
- # print(f"Worker {worker_id} ({device}) processed batch of {len(batch)} files. Total: {processed_count}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- # 批处理失败
|
|
|
- error_results = []
|
|
|
- for img_path in batch:
|
|
|
- error_results.append({
|
|
|
- "image_path": Path(img_path).name,
|
|
|
- "processing_time": 0,
|
|
|
- "success": False,
|
|
|
- "device": device,
|
|
|
- "worker_id": worker_id,
|
|
|
- "error": str(e)
|
|
|
- })
|
|
|
- result_queue.put(error_results)
|
|
|
-
|
|
|
- print(f"Error processing batch {batch} on {device}: {e}", file=sys.stderr)
|
|
|
- traceback.print_exc()
|
|
|
-
|
|
|
- batch.clear()
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"Worker {worker_id} ({device}) initialization failed: {e}", file=sys.stderr)
|
|
|
- traceback.print_exc()
|
|
|
- finally:
|
|
|
- # 清理GPU缓存
|
|
|
- try:
|
|
|
- paddle.device.cuda.empty_cache()
|
|
|
- except Exception as e:
|
|
|
- print(f"Error clearing GPU cache: {e}", file=sys.stderr)
|
|
|
- print(f"Worker {worker_id} ({device}) finished")
|
|
|
-
|
|
|
-def parallel_process_with_official_approach(image_paths: List[str],
|
|
|
- pipeline_name: str = "PP-StructureV3",
|
|
|
- device_str: str = "gpu:0,1",
|
|
|
- instances_per_device: int = 1,
|
|
|
- batch_size: int = 1,
|
|
|
- output_dir: str = "./output") -> List[Dict[str, Any]]:
|
|
|
- """
|
|
|
- 使用官方推荐的方法进行多GPU多进程并行处理
|
|
|
-
|
|
|
- Args:
|
|
|
- image_paths: 图像路径列表
|
|
|
- pipeline_name: Pipeline名称
|
|
|
- device_str: 设备字符串,如"gpu:0,1,2,3"
|
|
|
- instances_per_device: 每个设备的实例数
|
|
|
- batch_size: 批处理大小
|
|
|
- output_dir: 输出目录
|
|
|
-
|
|
|
- Returns:
|
|
|
- 处理结果列表
|
|
|
- """
|
|
|
- # 创建输出目录
|
|
|
- output_path = Path(output_dir)
|
|
|
- output_path.mkdir(parents=True, exist_ok=True)
|
|
|
-
|
|
|
- # 解析设备 - 不要在主进程中初始化paddle
|
|
|
- try:
|
|
|
- device_type, device_ids = parse_device(device_str)
|
|
|
- if device_ids is None or len(device_ids) < 1:
|
|
|
- print("No valid devices specified.", file=sys.stderr)
|
|
|
- return []
|
|
|
-
|
|
|
- print(f"Parsed devices: {device_type}:{device_ids}")
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"Failed to parse device string '{device_str}': {e}", file=sys.stderr)
|
|
|
- return []
|
|
|
-
|
|
|
- # 验证批处理大小
|
|
|
- if batch_size <= 0:
|
|
|
- print("Batch size must be greater than 0.", file=sys.stderr)
|
|
|
- return []
|
|
|
-
|
|
|
- total_instances = len(device_ids) * instances_per_device
|
|
|
- print(f"Configuration:")
|
|
|
- print(f" Devices: {device_ids}")
|
|
|
- print(f" Instances per device: {instances_per_device}")
|
|
|
- print(f" Total instances: {total_instances}")
|
|
|
- print(f" Batch size: {batch_size}")
|
|
|
- print(f" Total images: {len(image_paths)}")
|
|
|
-
|
|
|
- # 使用Manager创建队列
|
|
|
- with Manager() as manager:
|
|
|
- task_queue = manager.Queue()
|
|
|
- result_queue = manager.Queue()
|
|
|
-
|
|
|
- # 将任务放入队列
|
|
|
- for img_path in image_paths:
|
|
|
- task_queue.put(str(img_path))
|
|
|
-
|
|
|
- print(f"Added {len(image_paths)} tasks to queue")
|
|
|
-
|
|
|
- # 创建并启动工作进程
|
|
|
- processes = []
|
|
|
- worker_id = 0
|
|
|
-
|
|
|
- for device_id in device_ids:
|
|
|
- for _ in range(instances_per_device):
|
|
|
- device = constr_device(device_type, [device_id])
|
|
|
- p = Process(
|
|
|
- target=worker,
|
|
|
- args=(
|
|
|
- pipeline_name,
|
|
|
- device,
|
|
|
- task_queue,
|
|
|
- result_queue,
|
|
|
- batch_size,
|
|
|
- str(output_path),
|
|
|
- worker_id,
|
|
|
- ),
|
|
|
- )
|
|
|
- p.start()
|
|
|
- processes.append(p)
|
|
|
- worker_id += 1
|
|
|
-
|
|
|
- print(f"Started {len(processes)} worker processes")
|
|
|
-
|
|
|
- # 发送结束信号
|
|
|
- for _ in range(total_instances):
|
|
|
- task_queue.put(None)
|
|
|
-
|
|
|
- # 收集结果
|
|
|
- all_results = []
|
|
|
- total_images = len(image_paths)
|
|
|
-
|
|
|
- with tqdm(total=total_images, desc="Processing images", unit="img") as pbar:
|
|
|
- completed_count = 0
|
|
|
-
|
|
|
- while completed_count < total_images:
|
|
|
- try:
|
|
|
- batch_results = result_queue.get(timeout=600) # 10分钟超时
|
|
|
- all_results.extend(batch_results)
|
|
|
-
|
|
|
- # 更新进度条
|
|
|
- batch_success_count = sum(1 for r in batch_results if r.get('success', False))
|
|
|
- completed_count += len(batch_results)
|
|
|
- pbar.update(len(batch_results))
|
|
|
-
|
|
|
- # 显示当前批次状态
|
|
|
- pbar.set_postfix({
|
|
|
- 'batch_success': f"{batch_success_count}/{len(batch_results)}",
|
|
|
- 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
|
|
|
- })
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"Error collecting results: {e}")
|
|
|
- break
|
|
|
-
|
|
|
- # 等待所有进程结束
|
|
|
- for p in processes:
|
|
|
- p.join()
|
|
|
-
|
|
|
- return all_results
|
|
|
-
|
|
|
-def main():
|
|
|
- """主函数"""
|
|
|
- parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Multi-GPU Parallel Processing")
|
|
|
-
|
|
|
- # 必需参数
|
|
|
- parser.add_argument("--input_dir", type=str, default="../../OmniDocBench/OpenDataLab___OmniDocBench/images", help="Input directory")
|
|
|
- parser.add_argument("--output_dir", type=str, default="./OmniDocBench_Results_Official", help="Output directory")
|
|
|
- parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name")
|
|
|
- parser.add_argument("--device", type=str, default="gpu:0", help="Device string")
|
|
|
- parser.add_argument("--instances_per_device", type=int, default=1, help="Instances per device")
|
|
|
- parser.add_argument("--batch_size", type=int, default=4, help="Batch size")
|
|
|
- parser.add_argument("--input_pattern", type=str, default="*", help="Input file pattern")
|
|
|
- parser.add_argument("--test_mode", action="store_true", help="Test mode (process only 20 images)")
|
|
|
-
|
|
|
- args = parser.parse_args()
|
|
|
-
|
|
|
- try:
|
|
|
- # 获取图像文件列表
|
|
|
- input_dir = Path(args.input_dir).resolve()
|
|
|
- output_dir = Path(args.output_dir).resolve()
|
|
|
- print(f"Input dir: {input_dir}, Output dir: {output_dir}")
|
|
|
- if not input_dir.exists():
|
|
|
- print(f"Input directory does not exist: {input_dir}")
|
|
|
- return 1
|
|
|
-
|
|
|
- # 查找图像文件
|
|
|
- image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
|
|
|
- image_files = []
|
|
|
- for ext in image_extensions:
|
|
|
- image_files.extend(list(input_dir.glob(f"*{ext}")))
|
|
|
- image_files.extend(list(input_dir.glob(f"*{ext.upper()}")))
|
|
|
-
|
|
|
- if not image_files:
|
|
|
- print(f"No image files found in {input_dir}")
|
|
|
- return 1
|
|
|
-
|
|
|
- image_files = [str(f) for f in image_files]
|
|
|
- print(f"Found {len(image_files)} image files")
|
|
|
-
|
|
|
- if args.test_mode:
|
|
|
- image_files = image_files[:20]
|
|
|
- print(f"Test mode: processing only {len(image_files)} images")
|
|
|
-
|
|
|
- # 开始处理
|
|
|
- start_time = time.time()
|
|
|
- results = parallel_process_with_official_approach(
|
|
|
- image_files,
|
|
|
- args.pipeline,
|
|
|
- args.device,
|
|
|
- args.instances_per_device,
|
|
|
- args.batch_size,
|
|
|
- str(output_dir)
|
|
|
- )
|
|
|
- total_time = time.time() - start_time
|
|
|
-
|
|
|
- # 统计结果
|
|
|
- success_count = sum(1 for r in results if r.get('success', False))
|
|
|
- error_count = len(results) - success_count
|
|
|
-
|
|
|
- print(f"\n" + "="*50)
|
|
|
- print(f"Processing completed!")
|
|
|
- print(f"Total files: {len(image_files)}")
|
|
|
- print(f"Successful: {success_count}")
|
|
|
- print(f"Failed: {error_count}")
|
|
|
- print(f"Success rate: {success_count / len(image_files) * 100:.2f}%")
|
|
|
- print(f"Total time: {total_time:.2f} seconds")
|
|
|
- print(f"Throughput: {len(image_files) / total_time:.2f} images/second")
|
|
|
-
|
|
|
- # 保存结果统计
|
|
|
- stats = {
|
|
|
- "total_files": len(image_files),
|
|
|
- "success_count": success_count,
|
|
|
- "error_count": error_count,
|
|
|
- "success_rate": success_count / len(image_files),
|
|
|
- "total_time": total_time,
|
|
|
- "throughput": len(image_files) / total_time,
|
|
|
- "batch_size": args.batch_size,
|
|
|
- "gpu_ids": args.device,
|
|
|
- "pipelines_per_gpu": args.instances_per_device
|
|
|
- }
|
|
|
-
|
|
|
- # 保存最终结果
|
|
|
- output_file = os.path.join(output_dir, f"OmniDocBench_MultiGPU_batch{args.batch_size}.json")
|
|
|
- final_results = {
|
|
|
- "stats": stats,
|
|
|
- "results": results
|
|
|
- }
|
|
|
-
|
|
|
- with open(output_file, 'w', encoding='utf-8') as f:
|
|
|
- json.dump(final_results, f, ensure_ascii=False, indent=2)
|
|
|
-
|
|
|
- return 0
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- print(f"Processing failed: {e}", file=sys.stderr)
|
|
|
- traceback.print_exc()
|
|
|
- return 1
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- # ❌ 移除所有主进程CUDA操作
|
|
|
- # print(f"🚀 启动OCR程序...")
|
|
|
- # print(f"CUDA 版本: {paddle.device.cuda.get_device_name()}")
|
|
|
- # print(f"CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES')}")
|
|
|
- # available_gpus = detect_available_gpus()
|
|
|
- # monitor_gpu_memory(available_gpus)
|
|
|
-
|
|
|
- # ✅ 只进行简单的环境检查
|
|
|
- print(f"🚀 启动OCR程序...")
|
|
|
- print(f"CUDA_VISIBLE_DEVICES: {os.environ.get('CUDA_VISIBLE_DEVICES')}")
|
|
|
-
|
|
|
- if len(sys.argv) == 1:
|
|
|
- # 如果没有命令行参数,使用默认配置运行
|
|
|
- print("No command line arguments provided. Running with default configuration...")
|
|
|
-
|
|
|
- # 默认配置
|
|
|
- default_config = {
|
|
|
- "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images",
|
|
|
- "output_dir": "./OmniDocBench_Results_Official",
|
|
|
- "pipeline": "PP-StructureV3",
|
|
|
- "device": "gpu:3",
|
|
|
- "instances_per_device": 1,
|
|
|
- "batch_size": 1,
|
|
|
- # "test_mode": False
|
|
|
- }
|
|
|
-
|
|
|
- # 构造参数
|
|
|
- sys.argv = [sys.argv[0]]
|
|
|
- for key, value in default_config.items():
|
|
|
- sys.argv.extend([f"--{key}", str(value)])
|
|
|
-
|
|
|
- # 测试模式
|
|
|
- sys.argv.append("--test_mode")
|
|
|
-
|
|
|
- sys.exit(main())
|