| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415 |
- """
- 多GPU多进程推理始终有问题,多个进程启动后,paddle底层报错
- 目前无法定位原因
- """
- import json
- import time
- import os
- import argparse
- import sys
- import subprocess
- import tempfile
- from pathlib import Path
- from typing import List, Dict, Any, Tuple
- from concurrent.futures import ProcessPoolExecutor, as_completed
- import threading
- from queue import Queue
- from tqdm import tqdm
- from utils import (
- get_image_files_from_dir,
- get_image_files_from_list,
- get_image_files_from_csv,
- split_files,
- create_temp_file_list,
- collect_pid_files
- )
- def collect_processed_files(results: List[Dict[str, Any]]) -> List[Tuple[str, str]]:
- """
- 从处理结果中收集文件
-
- Args:
- results: 处理结果列表
-
- Returns:
- 文件列表(文件路径,处理结果),
- """
- processed_files = []
- for result in results:
- """
- 根据output_dir+process_id找到每个进程的结果文件
- {
- "process_id": 1,
- "success": true,
- "processing_time": 42.744526386260986,
- "file_count": 5,
- "device": "gpu:1",
- "output_dir": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_1",
- ...
- }
- """
- pid_output_file = Path(result.get("output_dir", "")) / f"process_{result['process_id']}.json"
- if not pid_output_file.exists():
- print(f"⚠️ Warning: Output file not found for process {result['process_id']}: {pid_output_file}")
- if not result.get("success", False):
- # 整个进程失败的情况
- process_failed_files = result.get("failed_files", [])
- processed_files.extend([(f, "fail") for f in process_failed_files if f])
- else:
- pid_files = collect_pid_files(str(pid_output_file))
- processed_files.extend(pid_files)
-
- return processed_files
- def run_single_process(args: Tuple[List[str], Dict[str, Any], int]) -> Dict[str, Any]:
- """
- 运行单个ppstructurev3_single_process.py进程
-
- Args:
- args: (file_chunk, config, process_id)
-
- Returns:
- 处理结果
- """
- file_chunk, config, process_id = args
-
- if not file_chunk:
- return {"process_id": process_id, "success": False, "error": "Empty file chunk"}
-
- # 创建临时文件列表
- temp_file_list = create_temp_file_list(file_chunk)
-
- try:
- # 创建进程专用的输出目录
- process_output_dir = Path(config["output_dir"]) / f"process_{process_id}_{time.strftime('%Y%m%d_%H%M%S')}"
- process_output_dir.mkdir(parents=True, exist_ok=True)
-
- # 构建命令行参数
- cmd = [
- sys.executable,
- config["single_process_script"],
- "--input_file_list", temp_file_list, # 需要修改single_process脚本支持文件列表
- "--output_dir", str(process_output_dir),
- "--pipeline", config["pipeline"],
- "--device", config["device"],
- ]
-
- # 添加可选参数
- if config.get("test_mode", False):
- cmd.append("--test_mode")
-
- print(f"Process {process_id} starting with {len(file_chunk)} files on device {config['device']}")
-
- # 执行子进程
- start_time = time.time()
- result = subprocess.run(
- cmd,
- capture_output=True,
- text=True,
- timeout=config.get("timeout", 3600) # 1小时超时
- )
-
- processing_time = time.time() - start_time
-
- if result.returncode == 0:
- print(f"Process {process_id} completed successfully in {processing_time:.2f}s")
-
- # 读取结果文件
- result_files = list(process_output_dir.glob("*.json"))
-
- return {
- "process_id": process_id,
- "success": True,
- "processing_time": processing_time,
- "file_count": len(file_chunk),
- "device": config["device"],
- "output_dir": str(process_output_dir),
- "result_files": [str(f) for f in result_files],
- "stdout": result.stdout,
- "stderr": result.stderr
- }
- else:
- print(f"Process {process_id} failed with return code {result.returncode}")
- return {
- "process_id": process_id,
- "success": False,
- "error": f"Process failed with return code {result.returncode}",
- "processing_time": processing_time,
- "file_count": len(file_chunk),
- "device": config["device"],
- "output_dir": str(process_output_dir),
- "failed_files": [str(f) for f in file_chunk],
- "stdout": result.stdout,
- "stderr": result.stderr
- }
-
- except subprocess.TimeoutExpired:
- print(f"Process {process_id} timed out")
- return {
- "process_id": process_id,
- "success": False,
- "error": "Process timeout",
- "device": config["device"],
- "output_dir": str(process_output_dir),
- "failed_files": [str(f) for f in file_chunk]
- }
- except Exception as e:
- print(f"Process {process_id} error: {e}")
- return {
- "process_id": process_id,
- "success": False,
- "error": str(e),
- "device": config["device"],
- "output_dir": str(process_output_dir),
- "failed_files": [str(f) for f in file_chunk]
- }
- finally:
- # 清理临时文件
- try:
- os.unlink(temp_file_list)
- except:
- pass
- def monitor_progress(total_files: int, completed_queue: Queue):
- """
- 监控处理进度
- """
- with tqdm(total=total_files, desc="Total Progress", unit="files") as pbar:
- completed_count = 0
- while completed_count < total_files:
- try:
- batch_count = completed_queue.get(timeout=1)
- completed_count += batch_count
- pbar.update(batch_count)
- except:
- continue
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Multi-Process Scheduler")
-
- # 输入输出参数
- input_group = parser.add_mutually_exclusive_group(required=True)
- input_group.add_argument("--input_dir", type=str, help="Input directory")
- input_group.add_argument("--input_file_list", type=str, help="Input file list (one file per line)")
- input_group.add_argument("--input_csv", type=str, help="Input CSV file with image_path and status columns")
- parser.add_argument("--output_dir", type=str, required=True, help="Output directory")
- parser.add_argument("--single_process_script", type=str,
- default="./ppstructurev3_single_process.py",
- help="Path to single process script")
-
- # 并行参数
- parser.add_argument("--num_processes", type=int, default=4, help="Number of parallel processes")
- parser.add_argument("--devices", type=str, default="gpu:0,gpu:1,gpu:2,gpu:3",
- help="Device list (comma separated)")
-
- # Pipeline参数
- parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name")
- parser.add_argument("--timeout", type=int, default=3600, help="Process timeout in seconds")
-
- # 其他参数
- parser.add_argument("--test_mode", action="store_true", help="Test mode")
- parser.add_argument("--max_files", type=int, default=None, help="Maximum files to process")
-
- args = parser.parse_args()
-
- try:
- # 获取图像文件列表
- if args.input_csv:
- # 从CSV文件读取
- image_files = get_image_files_from_csv(args.input_csv, "fail")
- print(f"📊 Loaded {len(image_files)} files from CSV with status filter: fail")
- elif args.input_file_list:
- # 从文件列表读取
- image_files = get_image_files_from_list(args.input_file_list)
- else:
- # 从目录读取
- input_dir = Path(args.input_dir).resolve()
- print(f"📁 Input dir: {input_dir}")
-
- if not input_dir.exists():
- print(f"❌ Input directory does not exist: {input_dir}")
- return 1
- print(f"Input dir: {input_dir}")
- image_files = get_image_files_from_dir(input_dir, args.max_files)
- output_dir = Path(args.output_dir).resolve()
- print(f"Output dir: {output_dir}")
-
- # 限制文件数量
- if args.max_files:
- image_files = image_files[:args.max_files]
-
- if args.test_mode:
- image_files = image_files[:20]
- print(f"Test mode: processing only {len(image_files)} images")
-
- print(f"Found {len(image_files)} image files")
-
- # 解析设备列表
- devices = [d.strip() for d in args.devices.split(',')]
- if len(devices) < args.num_processes:
- # 如果设备数少于进程数,循环使用设备
- devices = devices * ((args.num_processes // len(devices)) + 1)
- devices = devices[:args.num_processes]
-
- print(f"Using {args.num_processes} processes with devices: {devices}")
-
- # 分割文件列表
- file_chunks = split_files(image_files, args.num_processes)
- print(f"Split into {len(file_chunks)} chunks: {[len(chunk) for chunk in file_chunks]}")
-
- # 创建输出目录
- output_dir.mkdir(parents=True, exist_ok=True)
-
- # 准备进程参数
- process_configs = []
- for i, (chunk, device) in enumerate(zip(file_chunks, devices)):
- config = {
- "single_process_script": str(Path(args.single_process_script).resolve()),
- "output_dir": str(output_dir),
- "pipeline": args.pipeline,
- "device": device,
- "timeout": args.timeout,
- "test_mode": args.test_mode
- }
- process_configs.append((chunk, config, i))
-
- # 启动进度监控
- completed_queue = Queue()
- progress_thread = threading.Thread(
- target=monitor_progress,
- args=(len(image_files), completed_queue)
- )
- progress_thread.daemon = True
- progress_thread.start()
-
- # 执行并行处理
- start_time = time.time()
- results = []
-
- with ProcessPoolExecutor(max_workers=args.num_processes) as executor:
- # 提交所有任务
- future_to_process = {
- executor.submit(run_single_process, config): i
- for i, config in enumerate(process_configs)
- }
-
- # 收集结果
- for future in as_completed(future_to_process):
- process_id = future_to_process[future]
- try:
- result = future.result()
- results.append(result)
-
- # 更新进度
- if result.get("success", False):
- completed_queue.put(result.get("file_count", 0))
-
- print(f"Process {process_id} finished: {result.get('success', False)}")
-
- except Exception as e:
- print(f"Process {process_id} generated an exception: {e}")
- results.append({
- "process_id": process_id,
- "success": False,
- "error": str(e)
- })
-
- total_time = time.time() - start_time
-
- # 统计结果
- successful_processes = sum(1 for r in results if r.get('success', False))
- total_processed_files = sum(r.get('file_count', 0) for r in results if r.get('success', False))
-
- print(f"\n" + "="*60)
- print(f"🎉 Parallel processing completed!")
- print(f"📊 Statistics:")
- print(f" Total processes: {len(results)}")
- print(f" Successful processes: {successful_processes}")
- print(f" Total files processed: {total_processed_files}/{len(image_files)}")
- print(f" Success rate: {total_processed_files/len(image_files)*100:.2f}%")
- print(f"⏱️ Performance:")
- print(f" Total time: {total_time:.2f} seconds")
- print(f" Throughput: {total_processed_files/total_time:.2f} files/second")
- print(f" Avg time per file: {total_time/total_processed_files:.2f} seconds")
-
- # 保存调度结果
- scheduler_stats = {
- "total_files": len(image_files),
- "total_processes": len(results),
- "successful_processes": successful_processes,
- "total_processed_files": total_processed_files,
- "success_rate": total_processed_files / len(image_files) if len(image_files) > 0 else 0,
- "total_time": total_time,
- "throughput": total_processed_files / total_time if total_time > 0 else 0,
- "avg_time_per_file": total_time / total_processed_files if total_processed_files > 0 else 0,
- "num_processes": args.num_processes,
- "devices": devices,
- "pipeline": args.pipeline,
- "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
- }
-
- final_results = {
- "scheduler_stats": scheduler_stats,
- "process_results": results
- }
-
- # 保存结果
- output_file = output_dir / f"scheduler_results_{args.num_processes}procs_{time.strftime('%Y%m%d_%H%M%S')}.json"
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(final_results, f, ensure_ascii=False, indent=2)
-
- print(f"💾 Scheduler results saved to: {output_file}")
- # 收集文件处理结果
- processed_files = []
- processed_files = collect_processed_files(results)
- output_file_processed = output_dir / f"processed_files_{args.num_processes}procs_{time.strftime('%Y%m%d_%H%M%S')}.csv"
- with open(output_file_processed, 'w', encoding='utf-8') as f:
- f.write("image_path,status\n")
- for file_path, status in processed_files:
- f.write(f"{file_path},{status}\n")
- print(f"💾 Processed files saved to: {output_file_processed}")
- return 0 if successful_processes == len(results) else 1
-
- except Exception as e:
- print(f"❌ Scheduler failed: {e}")
- import traceback
- traceback.print_exc()
- return 1
- if __name__ == "__main__":
- print(f"🚀 启动多进程调度程序..., 约定各进程统计文件名为: process_{{process_id}}.json")
- if len(sys.argv) == 1:
- # 默认配置
- default_config = {
- "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images",
- "output_dir": "./OmniDocBench_Results_Scheduler",
- "pipeline": "./my_config/PP-StructureV3.yaml",
- "num_processes": 3,
- "devices": "gpu:0,gpu:1,gpu:2,gpu:3",
- }
-
- # default_config = {
- # "input_csv": "./OmniDocBench_Results_Scheduler/processed_files_4procs_20250814_213138.csv",
- # "output_dir": "./OmniDocBench_Results_Scheduler",
- # "pipeline": "./my_config/PP-StructureV3.yaml",
- # "num_processes": 4,
- # "devices": "gpu:0,gpu:1,gpu:2,gpu:3",
- # }
- sys.argv = [sys.argv[0]]
- for key, value in default_config.items():
- sys.argv.extend([f"--{key}", str(value)])
-
- sys.argv.append("--test_mode")
-
- sys.exit(main())
|