ppstructurev3_scheduler.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. """
  2. 多GPU多进程推理始终有问题,多个进程启动后,paddle底层报错
  3. 目前无法定位原因
  4. """
  5. import json
  6. import time
  7. import os
  8. import argparse
  9. import sys
  10. import subprocess
  11. import tempfile
  12. from pathlib import Path
  13. from typing import List, Dict, Any, Tuple
  14. from concurrent.futures import ProcessPoolExecutor, as_completed
  15. import threading
  16. from queue import Queue
  17. from tqdm import tqdm
  18. from utils import (
  19. get_image_files_from_dir,
  20. get_image_files_from_list,
  21. get_image_files_from_csv,
  22. split_files,
  23. create_temp_file_list
  24. )
  25. def collect_pid_files(pid_output_file: str) -> List[Tuple[str, str]]:
  26. """
  27. 从进程输出文件中收集文件
  28. Args:
  29. pid_output_file: 进程输出文件路径
  30. Returns:
  31. 文件列表(文件路径,处理结果)
  32. """
  33. """
  34. 单进程结果统计文件格式
  35. "results": [
  36. {
  37. "image_path": "docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.jpg",
  38. "processing_time": 2.0265579223632812e-06,
  39. "success": true,
  40. "device": "gpu:3",
  41. "output_json": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_3/docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.json",
  42. "output_md": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_3/docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.md"
  43. },
  44. ...
  45. """
  46. if not Path(pid_output_file).exists():
  47. print(f"⚠️ Warning: PID output file not found: {pid_output_file}")
  48. return []
  49. with open(pid_output_file, 'r', encoding='utf-8') as f:
  50. data = json.load(f)
  51. if not isinstance(data, dict) or "results" not in data:
  52. print(f"⚠️ Warning: Invalid PID output file format: {pid_output_file}")
  53. return []
  54. # 返回文件路径和处理状态, 如果“success”: True, 则状态为“success”, 否则为“fail”
  55. file_list = []
  56. for file_result in data.get("results", []):
  57. image_path = file_result.get("image_path", "")
  58. status = "success" if file_result.get("success", False) else "fail"
  59. file_list.append((image_path, status))
  60. return file_list
  61. def collect_processed_files(results: List[Dict[str, Any]]) -> List[Tuple[str, str]]:
  62. """
  63. 从处理结果中收集文件
  64. Args:
  65. results: 处理结果列表
  66. Returns:
  67. 文件列表(文件路径,处理结果),
  68. """
  69. processed_files = []
  70. for result in results:
  71. """
  72. 根据output_dir+process_id找到每个进程的结果文件
  73. {
  74. "process_id": 1,
  75. "success": true,
  76. "processing_time": 42.744526386260986,
  77. "file_count": 5,
  78. "device": "gpu:1",
  79. "output_dir": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_1",
  80. ...
  81. }
  82. """
  83. pid_output_file = Path(result.get("output_dir", "")) / f"process_{result['process_id']}.json"
  84. if not pid_output_file.exists():
  85. print(f"⚠️ Warning: Output file not found for process {result['process_id']}: {pid_output_file}")
  86. if not result.get("success", False):
  87. # 整个进程失败的情况
  88. process_failed_files = result.get("failed_files", [])
  89. processed_files.extend([(f, "fail") for f in process_failed_files if f])
  90. else:
  91. pid_files = collect_pid_files(str(pid_output_file))
  92. processed_files.extend(pid_files)
  93. return processed_files
  94. def run_single_process(args: Tuple[List[str], Dict[str, Any], int]) -> Dict[str, Any]:
  95. """
  96. 运行单个ppstructurev3_single_process.py进程
  97. Args:
  98. args: (file_chunk, config, process_id)
  99. Returns:
  100. 处理结果
  101. """
  102. file_chunk, config, process_id = args
  103. if not file_chunk:
  104. return {"process_id": process_id, "success": False, "error": "Empty file chunk"}
  105. # 创建临时文件列表
  106. temp_file_list = create_temp_file_list(file_chunk)
  107. try:
  108. # 创建进程专用的输出目录
  109. process_output_dir = Path(config["output_dir"]) / f"process_{process_id}_{time.strftime('%Y%m%d_%H%M%S')}"
  110. process_output_dir.mkdir(parents=True, exist_ok=True)
  111. # 构建命令行参数
  112. cmd = [
  113. sys.executable,
  114. config["single_process_script"],
  115. "--input_file_list", temp_file_list, # 需要修改single_process脚本支持文件列表
  116. "--output_dir", str(process_output_dir),
  117. "--pipeline", config["pipeline"],
  118. "--device", config["device"],
  119. "--batch_size", str(config["batch_size"]),
  120. ]
  121. # 添加可选参数
  122. if config.get("test_mode", False):
  123. cmd.append("--test_mode")
  124. print(f"Process {process_id} starting with {len(file_chunk)} files on device {config['device']}")
  125. # 执行子进程
  126. start_time = time.time()
  127. result = subprocess.run(
  128. cmd,
  129. capture_output=True,
  130. text=True,
  131. timeout=config.get("timeout", 3600) # 1小时超时
  132. )
  133. processing_time = time.time() - start_time
  134. if result.returncode == 0:
  135. print(f"Process {process_id} completed successfully in {processing_time:.2f}s")
  136. # 读取结果文件
  137. result_files = list(process_output_dir.glob("*.json"))
  138. return {
  139. "process_id": process_id,
  140. "success": True,
  141. "processing_time": processing_time,
  142. "file_count": len(file_chunk),
  143. "device": config["device"],
  144. "output_dir": str(process_output_dir),
  145. "result_files": [str(f) for f in result_files],
  146. "stdout": result.stdout,
  147. "stderr": result.stderr
  148. }
  149. else:
  150. print(f"Process {process_id} failed with return code {result.returncode}")
  151. return {
  152. "process_id": process_id,
  153. "success": False,
  154. "error": f"Process failed with return code {result.returncode}",
  155. "processing_time": processing_time,
  156. "file_count": len(file_chunk),
  157. "device": config["device"],
  158. "output_dir": str(process_output_dir),
  159. "failed_files": [str(f) for f in file_chunk],
  160. "stdout": result.stdout,
  161. "stderr": result.stderr
  162. }
  163. except subprocess.TimeoutExpired:
  164. print(f"Process {process_id} timed out")
  165. return {
  166. "process_id": process_id,
  167. "success": False,
  168. "error": "Process timeout",
  169. "device": config["device"],
  170. "output_dir": str(process_output_dir),
  171. "failed_files": [str(f) for f in file_chunk]
  172. }
  173. except Exception as e:
  174. print(f"Process {process_id} error: {e}")
  175. return {
  176. "process_id": process_id,
  177. "success": False,
  178. "error": str(e),
  179. "device": config["device"],
  180. "output_dir": str(process_output_dir),
  181. "failed_files": [str(f) for f in file_chunk]
  182. }
  183. finally:
  184. # 清理临时文件
  185. try:
  186. os.unlink(temp_file_list)
  187. except:
  188. pass
  189. def monitor_progress(total_files: int, completed_queue: Queue):
  190. """
  191. 监控处理进度
  192. """
  193. with tqdm(total=total_files, desc="Total Progress", unit="files") as pbar:
  194. completed_count = 0
  195. while completed_count < total_files:
  196. try:
  197. batch_count = completed_queue.get(timeout=1)
  198. completed_count += batch_count
  199. pbar.update(batch_count)
  200. except:
  201. continue
  202. def main():
  203. """主函数"""
  204. parser = argparse.ArgumentParser(description="PaddleX PP-StructureV3 Multi-Process Scheduler")
  205. # 输入输出参数
  206. input_group = parser.add_mutually_exclusive_group(required=True)
  207. input_group.add_argument("--input_dir", type=str, help="Input directory")
  208. input_group.add_argument("--input_file_list", type=str, help="Input file list (one file per line)")
  209. input_group.add_argument("--input_csv", type=str, help="Input CSV file with image_path and status columns")
  210. parser.add_argument("--output_dir", type=str, required=True, help="Output directory")
  211. parser.add_argument("--single_process_script", type=str,
  212. default="./ppstructurev3_single_process.py",
  213. help="Path to single process script")
  214. # 并行参数
  215. parser.add_argument("--num_processes", type=int, default=4, help="Number of parallel processes")
  216. parser.add_argument("--devices", type=str, default="gpu:0,gpu:1,gpu:2,gpu:3",
  217. help="Device list (comma separated)")
  218. # Pipeline参数
  219. parser.add_argument("--pipeline", type=str, default="PP-StructureV3", help="Pipeline name")
  220. parser.add_argument("--batch_size", type=int, default=4, help="Batch size per process")
  221. parser.add_argument("--timeout", type=int, default=3600, help="Process timeout in seconds")
  222. # 其他参数
  223. parser.add_argument("--test_mode", action="store_true", help="Test mode")
  224. parser.add_argument("--max_files", type=int, default=None, help="Maximum files to process")
  225. args = parser.parse_args()
  226. try:
  227. # 获取图像文件列表
  228. if args.input_csv:
  229. # 从CSV文件读取
  230. image_files = get_image_files_from_csv(args.input_csv, "fail")
  231. print(f"📊 Loaded {len(image_files)} files from CSV with status filter: fail")
  232. elif args.input_file_list:
  233. # 从文件列表读取
  234. image_files = get_image_files_from_list(args.input_file_list)
  235. else:
  236. # 从目录读取
  237. input_dir = Path(args.input_dir).resolve()
  238. print(f"📁 Input dir: {input_dir}")
  239. if not input_dir.exists():
  240. print(f"❌ Input directory does not exist: {input_dir}")
  241. return 1
  242. print(f"Input dir: {input_dir}")
  243. image_files = get_image_files_from_dir(input_dir, args.max_files)
  244. output_dir = Path(args.output_dir).resolve()
  245. print(f"Output dir: {output_dir}")
  246. # 限制文件数量
  247. if args.max_files:
  248. image_files = image_files[:args.max_files]
  249. if args.test_mode:
  250. image_files = image_files[:20]
  251. print(f"Test mode: processing only {len(image_files)} images")
  252. print(f"Found {len(image_files)} image files")
  253. # 解析设备列表
  254. devices = [d.strip() for d in args.devices.split(',')]
  255. if len(devices) < args.num_processes:
  256. # 如果设备数少于进程数,循环使用设备
  257. devices = devices * ((args.num_processes // len(devices)) + 1)
  258. devices = devices[:args.num_processes]
  259. print(f"Using {args.num_processes} processes with devices: {devices}")
  260. # 分割文件列表
  261. file_chunks = split_files(image_files, args.num_processes)
  262. print(f"Split into {len(file_chunks)} chunks: {[len(chunk) for chunk in file_chunks]}")
  263. # 创建输出目录
  264. output_dir.mkdir(parents=True, exist_ok=True)
  265. # 准备进程参数
  266. process_configs = []
  267. for i, (chunk, device) in enumerate(zip(file_chunks, devices)):
  268. config = {
  269. "single_process_script": str(Path(args.single_process_script).resolve()),
  270. "output_dir": str(output_dir),
  271. "pipeline": args.pipeline,
  272. "device": device,
  273. "batch_size": args.batch_size,
  274. "timeout": args.timeout,
  275. "test_mode": args.test_mode
  276. }
  277. process_configs.append((chunk, config, i))
  278. # 启动进度监控
  279. completed_queue = Queue()
  280. progress_thread = threading.Thread(
  281. target=monitor_progress,
  282. args=(len(image_files), completed_queue)
  283. )
  284. progress_thread.daemon = True
  285. progress_thread.start()
  286. # 执行并行处理
  287. start_time = time.time()
  288. results = []
  289. with ProcessPoolExecutor(max_workers=args.num_processes) as executor:
  290. # 提交所有任务
  291. future_to_process = {
  292. executor.submit(run_single_process, config): i
  293. for i, config in enumerate(process_configs)
  294. }
  295. # 收集结果
  296. for future in as_completed(future_to_process):
  297. process_id = future_to_process[future]
  298. try:
  299. result = future.result()
  300. results.append(result)
  301. # 更新进度
  302. if result.get("success", False):
  303. completed_queue.put(result.get("file_count", 0))
  304. print(f"Process {process_id} finished: {result.get('success', False)}")
  305. except Exception as e:
  306. print(f"Process {process_id} generated an exception: {e}")
  307. results.append({
  308. "process_id": process_id,
  309. "success": False,
  310. "error": str(e)
  311. })
  312. total_time = time.time() - start_time
  313. # 统计结果
  314. successful_processes = sum(1 for r in results if r.get('success', False))
  315. total_processed_files = sum(r.get('file_count', 0) for r in results if r.get('success', False))
  316. print(f"\n" + "="*60)
  317. print(f"🎉 Parallel processing completed!")
  318. print(f"📊 Statistics:")
  319. print(f" Total processes: {len(results)}")
  320. print(f" Successful processes: {successful_processes}")
  321. print(f" Total files processed: {total_processed_files}/{len(image_files)}")
  322. print(f" Success rate: {total_processed_files/len(image_files)*100:.2f}%")
  323. print(f"⏱️ Performance:")
  324. print(f" Total time: {total_time:.2f} seconds")
  325. print(f" Throughput: {total_processed_files/total_time:.2f} files/second")
  326. print(f" Avg time per file: {total_time/total_processed_files:.2f} seconds")
  327. # 保存调度结果
  328. scheduler_stats = {
  329. "total_files": len(image_files),
  330. "total_processes": len(results),
  331. "successful_processes": successful_processes,
  332. "total_processed_files": total_processed_files,
  333. "success_rate": total_processed_files / len(image_files) if len(image_files) > 0 else 0,
  334. "total_time": total_time,
  335. "throughput": total_processed_files / total_time if total_time > 0 else 0,
  336. "avg_time_per_file": total_time / total_processed_files if total_processed_files > 0 else 0,
  337. "num_processes": args.num_processes,
  338. "devices": devices,
  339. "batch_size": args.batch_size,
  340. "pipeline": args.pipeline,
  341. "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
  342. }
  343. final_results = {
  344. "scheduler_stats": scheduler_stats,
  345. "process_results": results
  346. }
  347. # 保存结果
  348. output_file = output_dir / f"scheduler_results_{args.num_processes}procs_{time.strftime('%Y%m%d_%H%M%S')}.json"
  349. with open(output_file, 'w', encoding='utf-8') as f:
  350. json.dump(final_results, f, ensure_ascii=False, indent=2)
  351. print(f"💾 Scheduler results saved to: {output_file}")
  352. # 收集文件处理结果
  353. processed_files = []
  354. processed_files = collect_processed_files(results)
  355. output_file_processed = output_dir / f"processed_files_{args.num_processes}procs_{time.strftime('%Y%m%d_%H%M%S')}.csv"
  356. with open(output_file_processed, 'w', encoding='utf-8') as f:
  357. f.write("image_path,status\n")
  358. for file_path, status in processed_files:
  359. f.write(f"{file_path},{status}\n")
  360. print(f"💾 Processed files saved to: {output_file_processed}")
  361. return 0 if successful_processes == len(results) else 1
  362. except Exception as e:
  363. print(f"❌ Scheduler failed: {e}")
  364. import traceback
  365. traceback.print_exc()
  366. return 1
  367. if __name__ == "__main__":
  368. print(f"🚀 启动多进程调度程序..., 约定各进程统计文件名为: process_{{process_id}}.json")
  369. if len(sys.argv) == 1:
  370. # 默认配置
  371. default_config = {
  372. "input_dir": "../../OmniDocBench/OpenDataLab___OmniDocBench/images",
  373. "output_dir": "./OmniDocBench_Results_Scheduler",
  374. "num_processes": 2,
  375. "devices": "gpu:0,gpu:1,gpu:2,gpu:3",
  376. "batch_size": 2,
  377. }
  378. # default_config = {
  379. # "input_csv": "./OmniDocBench_Results_Scheduler/processed_files_4procs_20250814_213138.csv",
  380. # "output_dir": "./OmniDocBench_Results_Scheduler",
  381. # "num_processes": 4,
  382. # "devices": "gpu:0,gpu:1,gpu:2,gpu:3",
  383. # "batch_size": 2,
  384. # }
  385. sys.argv = [sys.argv[0]]
  386. for key, value in default_config.items():
  387. sys.argv.extend([f"--{key}", str(value)])
  388. sys.argv.append("--test_mode")
  389. sys.exit(main())