ppstructurev3_dual_gpu_optimized.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. # zhch/ppstructurev3_dual_gpu_optimized.py
  2. import json
  3. import time
  4. import os
  5. import glob
  6. import traceback
  7. from pathlib import Path
  8. from typing import List, Dict, Any, Tuple
  9. from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
  10. from multiprocessing import Queue, Manager, Process
  11. import cv2
  12. import numpy as np
  13. from paddlex import create_pipeline
  14. from tqdm import tqdm
  15. import threading
  16. import queue
  17. import paddle
  18. from dotenv import load_dotenv
  19. load_dotenv(override=True)
  20. class PPStructureV3DualGPUPredictor:
  21. """
  22. PP-StructureV3双GPU并行预测器
  23. """
  24. def __init__(self, pipeline_config_path: str = "PP-StructureV3", output_path: str = "output", gpu_id: int = 0):
  25. """
  26. 初始化预测器
  27. Args:
  28. pipeline_config_path: PaddleX pipeline配置文件路径
  29. output_path: 输出路径
  30. gpu_id: GPU设备ID (0 或 1)
  31. """
  32. self.pipeline_config = pipeline_config_path
  33. self.pipeline = None # 延迟初始化
  34. self.output_path = output_path
  35. self.gpu_id = gpu_id
  36. self.device = f"gpu:{gpu_id}"
  37. def _ensure_pipeline(self):
  38. """确保pipeline已初始化(线程安全)"""
  39. if self.pipeline is None:
  40. # 设置当前GPU
  41. paddle.device.set_device(f"gpu:{self.gpu_id}")
  42. self.pipeline = create_pipeline(pipeline=self.pipeline_config)
  43. print(f"Pipeline初始化完成 - GPU:{self.gpu_id}")
  44. def process_single_image(self, image_path: str) -> Dict[str, Any]:
  45. """
  46. 处理单张图像
  47. Args:
  48. image_path: 图像路径
  49. Returns:
  50. 处理结果{"image_path": str, "success": bool, "processing_time": float, "error": str}
  51. """
  52. try:
  53. # 确保pipeline已初始化
  54. self._ensure_pipeline()
  55. # 读取图像获取尺寸信息
  56. image = cv2.imread(image_path)
  57. if image is None:
  58. return {
  59. "image_path": Path(image_path).name,
  60. "error": "无法读取图像",
  61. "success": False,
  62. "processing_time": 0,
  63. "gpu_id": self.gpu_id
  64. }
  65. height, width = image.shape[:2]
  66. # 运行PaddleX pipeline
  67. start_time = time.time()
  68. output = self.pipeline.predict(
  69. input=image_path,
  70. device=self.device,
  71. use_doc_orientation_classify=True,
  72. use_doc_unwarping=False,
  73. use_seal_recognition=True,
  74. use_chart_recognition=True,
  75. use_table_recognition=True,
  76. use_formula_recognition=True,
  77. )
  78. # 保存结果
  79. for res in output:
  80. res.save_to_json(save_path=self.output_path)
  81. res.save_to_markdown(save_path=self.output_path)
  82. process_time = time.time() - start_time
  83. # 返回处理结果
  84. return {
  85. "image_path": Path(image_path).name,
  86. "processing_time": process_time,
  87. "success": True,
  88. "gpu_id": self.gpu_id
  89. }
  90. except Exception as e:
  91. return {
  92. "image_path": Path(image_path).name,
  93. "error": str(e),
  94. "success": False,
  95. "processing_time": 0,
  96. "gpu_id": self.gpu_id
  97. }
  98. def process_batch(self, image_paths: List[str]) -> List[Dict[str, Any]]:
  99. """
  100. 批处理图像
  101. Args:
  102. image_paths: 图像路径列表
  103. Returns:
  104. 结果列表
  105. """
  106. results = []
  107. for image_path in image_paths:
  108. result = self.process_single_image(image_path)
  109. results.append(result)
  110. return results
  111. class DualGPUThreadWorker:
  112. """双GPU线程工作器 - 每个线程维护自己的pipeline实例"""
  113. def __init__(self, pipeline_config: str, output_path: str, gpu_id: int, worker_id: int):
  114. self.worker_id = worker_id
  115. self.gpu_id = gpu_id
  116. self.predictor = PPStructureV3DualGPUPredictor(
  117. pipeline_config,
  118. output_path=f"{output_path}/gpu{gpu_id}_worker_{worker_id}",
  119. gpu_id=gpu_id
  120. )
  121. self.task_queue = queue.Queue()
  122. self.result_queue = queue.Queue()
  123. self.running = True
  124. def add_batch(self, batch: List[str]):
  125. """添加批处理任务"""
  126. self.task_queue.put(batch)
  127. def get_results(self) -> List[Dict[str, Any]]:
  128. """获取处理结果"""
  129. results = []
  130. while not self.result_queue.empty():
  131. try:
  132. result = self.result_queue.get_nowait()
  133. results.extend(result)
  134. except queue.Empty:
  135. break
  136. return results
  137. def worker_loop(self):
  138. """工作循环"""
  139. print(f"GPU{self.gpu_id} Worker{self.worker_id} 开始工作")
  140. while self.running:
  141. try:
  142. batch = self.task_queue.get(timeout=1.0)
  143. if batch is None: # 结束信号
  144. break
  145. # 处理批次
  146. batch_results = self.predictor.process_batch(batch)
  147. self.result_queue.put(batch_results)
  148. except queue.Empty:
  149. continue
  150. except Exception as e:
  151. print(f"GPU{self.gpu_id} Worker{self.worker_id} 处理出错: {e}")
  152. def stop(self):
  153. """停止工作线程"""
  154. self.running = False
  155. self.task_queue.put(None) # 发送结束信号
  156. def parallel_process_with_dual_gpu(image_paths: List[str],
  157. batch_size: int = 4,
  158. workers_per_gpu: int = 2, # 每个GPU的worker数量
  159. pipeline_config: str = "PP-StructureV3",
  160. output_path: str = "./output") -> List[Dict[str, Any]]:
  161. """
  162. 使用双GPU优化的多线程并行处理
  163. Args:
  164. image_paths: 图像路径列表
  165. batch_size: 批处理大小
  166. workers_per_gpu: 每个GPU的worker数量(推荐2个)
  167. pipeline_config: pipeline配置
  168. output_path: 输出路径
  169. Returns:
  170. 处理结果列表
  171. """
  172. # 确保输出目录存在
  173. os.makedirs(output_path, exist_ok=True)
  174. # 检查可用GPU
  175. try:
  176. gpu_count = paddle.device.cuda.device_count()
  177. print(f"检测到 {gpu_count} 个GPU")
  178. if gpu_count < 2:
  179. print("警告:检测到的GPU数量少于2个,建议检查CUDA配置")
  180. available_gpus = list(range(gpu_count))
  181. else:
  182. available_gpus = [0, 1] # 使用GPU 0和1
  183. except Exception as e:
  184. print(f"GPU检测失败: {e}")
  185. available_gpus = [0] # 降级为单GPU
  186. total_workers = len(available_gpus) * workers_per_gpu
  187. print(f"使用GPU: {available_gpus}")
  188. print(f"每GPU Worker数: {workers_per_gpu}")
  189. print(f"总Worker数: {total_workers}")
  190. # 将图像路径分批
  191. batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
  192. # 创建工作线程
  193. workers = []
  194. threads = []
  195. worker_id = 0
  196. for gpu_id in available_gpus:
  197. for i in range(workers_per_gpu):
  198. worker = DualGPUThreadWorker(pipeline_config, output_path, gpu_id, worker_id)
  199. workers.append(worker)
  200. thread = threading.Thread(target=worker.worker_loop, name=f"GPU{gpu_id}_Worker{worker_id}")
  201. thread.daemon = True
  202. thread.start()
  203. threads.append(thread)
  204. worker_id += 1
  205. print(f"启动了 {len(workers)} 个工作线程,分布在 {len(available_gpus)} 个GPU上")
  206. # 分发任务
  207. all_results = []
  208. total_images = len(image_paths)
  209. completed_count = 0
  210. try:
  211. with tqdm(total=total_images, desc="双GPU处理图像", unit="张") as pbar:
  212. # 轮流分发批次到不同的worker
  213. for i, batch in enumerate(batches):
  214. worker_id = i % len(workers)
  215. workers[worker_id].add_batch(batch)
  216. # 等待所有任务完成
  217. while completed_count < total_images:
  218. time.sleep(0.1) # 短暂等待
  219. # 收集结果
  220. for worker in workers:
  221. batch_results = worker.get_results()
  222. if batch_results:
  223. all_results.extend(batch_results)
  224. completed_count += len(batch_results)
  225. pbar.update(len(batch_results))
  226. # 更新进度条
  227. success_count = sum(1 for r in batch_results if r.get('success', False))
  228. # 按GPU统计
  229. gpu_stats = {}
  230. for r in all_results:
  231. gpu_id = r.get('gpu_id', 'unknown')
  232. if gpu_id not in gpu_stats:
  233. gpu_stats[gpu_id] = {'success': 0, 'total': 0}
  234. gpu_stats[gpu_id]['total'] += 1
  235. if r.get('success', False):
  236. gpu_stats[gpu_id]['success'] += 1
  237. gpu_info = ', '.join([f"GPU{k}:{v['success']}/{v['total']}" for k, v in gpu_stats.items()])
  238. pbar.set_postfix({
  239. 'recent_success': f"{success_count}/{len(batch_results)}",
  240. 'gpu_distribution': gpu_info
  241. })
  242. finally:
  243. # 停止所有工作线程
  244. for worker in workers:
  245. worker.stop()
  246. # 等待线程结束
  247. for thread in threads:
  248. thread.join(timeout=3.0)
  249. return all_results
  250. def monitor_gpu_memory():
  251. """监控GPU内存使用情况"""
  252. try:
  253. for gpu_id in [0, 1]:
  254. paddle.device.set_device(f"gpu:{gpu_id}")
  255. allocated = paddle.device.cuda.memory_allocated() / 1024**3
  256. reserved = paddle.device.cuda.memory_reserved() / 1024**3
  257. print(f"GPU {gpu_id} - 已分配: {allocated:.2f}GB, 已预留: {reserved:.2f}GB")
  258. except Exception as e:
  259. print(f"GPU内存监控失败: {e}")
  260. def main():
  261. """主函数 - 双GPU优化的并行处理"""
  262. # 配置参数
  263. dataset_path = "../../OmniDocBench/OpenDataLab___OmniDocBench/images"
  264. output_dir = "./OmniDocBench_Results_DualGPU"
  265. pipeline_config = "PP-StructureV3"
  266. # 双GPU处理参数
  267. batch_size = 4 # 批处理大小
  268. workers_per_gpu = 1 # 每个GPU的worker数量(24GB GPU推荐2个)
  269. # 确保输出目录存在
  270. print(f"输出目录: {Path(output_dir).absolute()}")
  271. os.makedirs(output_dir, exist_ok=True)
  272. dataset_path = Path(dataset_path).resolve()
  273. output_dir = Path(output_dir).resolve()
  274. print("="*60)
  275. print("OmniDocBench 双GPU优化并行处理开始")
  276. print("="*60)
  277. print(f"数据集路径: {dataset_path}")
  278. print(f"输出目录: {output_dir}")
  279. print(f"批处理大小: {batch_size}")
  280. print(f"每GPU Worker数: {workers_per_gpu}")
  281. print(f"总Worker数: {workers_per_gpu * 2}")
  282. # 监控初始GPU状态
  283. print("\n初始GPU内存状态:")
  284. monitor_gpu_memory()
  285. # 查找所有图像文件
  286. image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff']
  287. image_files = []
  288. for ext in image_extensions:
  289. image_files.extend(glob.glob(os.path.join(dataset_path, ext)))
  290. print(f"\n找到 {len(image_files)} 个图像文件")
  291. if not image_files:
  292. print("未找到任何图像文件,程序终止")
  293. return
  294. # 限制处理数量用于测试
  295. # image_files = image_files[:40] # 取消注释以限制处理数量
  296. # 开始处理
  297. start_time = time.time()
  298. try:
  299. print("\n使用双GPU优化并行处理...")
  300. results = parallel_process_with_dual_gpu(
  301. image_files,
  302. batch_size,
  303. workers_per_gpu,
  304. pipeline_config,
  305. str(output_dir)
  306. )
  307. total_time = time.time() - start_time
  308. # 统计信息
  309. success_count = sum(1 for r in results if r.get('success', False))
  310. error_count = len(results) - success_count
  311. total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False))
  312. avg_processing_time = total_processing_time / success_count if success_count > 0 else 0
  313. # 按GPU统计
  314. gpu_stats = {}
  315. for r in results:
  316. gpu_id = r.get('gpu_id', 'unknown')
  317. if gpu_id not in gpu_stats:
  318. gpu_stats[gpu_id] = {'success': 0, 'total': 0, 'total_time': 0}
  319. gpu_stats[gpu_id]['total'] += 1
  320. if r.get('success', False):
  321. gpu_stats[gpu_id]['success'] += 1
  322. gpu_stats[gpu_id]['total_time'] += r.get('processing_time', 0)
  323. # 保存结果统计
  324. stats = {
  325. "total_files": len(image_files),
  326. "success_count": success_count,
  327. "error_count": error_count,
  328. "success_rate": success_count / len(image_files),
  329. "total_time": total_time,
  330. "avg_processing_time": avg_processing_time,
  331. "throughput": len(image_files) / total_time,
  332. "batch_size": batch_size,
  333. "workers_per_gpu": workers_per_gpu,
  334. "total_workers": workers_per_gpu * 2,
  335. "gpu_stats": gpu_stats,
  336. "optimization": "双GPU多线程并行"
  337. }
  338. # 保存最终结果
  339. output_file = os.path.join(output_dir, f"OmniDocBench_DualGPU_batch{batch_size}_workers{workers_per_gpu}.json")
  340. final_results = {
  341. "results": results,
  342. "stats": stats
  343. }
  344. with open(output_file, 'w', encoding='utf-8') as f:
  345. json.dump(final_results, f, ensure_ascii=False, indent=2)
  346. print("\n" + "="*60)
  347. print("双GPU优化并行处理完成!")
  348. print("="*60)
  349. print(f"总文件数: {len(image_files)}")
  350. print(f"成功处理: {success_count}")
  351. print(f"失败数量: {error_count}")
  352. print(f"成功率: {success_count / len(image_files) * 100:.2f}%")
  353. print(f"总耗时: {total_time:.2f}秒")
  354. print(f"平均处理时间: {avg_processing_time:.2f}秒/张")
  355. print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒")
  356. print(f"Worker数: {workers_per_gpu * 2} (每GPU {workers_per_gpu}个)")
  357. # GPU统计
  358. print(f"\nGPU分布统计:")
  359. for gpu_id, stat in gpu_stats.items():
  360. if stat['total'] > 0:
  361. gpu_success_rate = stat['success'] / stat['total'] * 100
  362. gpu_avg_time = stat['total_time'] / stat['success'] if stat['success'] > 0 else 0
  363. print(f" GPU {gpu_id}: {stat['success']}/{stat['total']} 成功 "
  364. f"({gpu_success_rate:.1f}%), 平均 {gpu_avg_time:.2f}s/张")
  365. print(f"\n结果保存至: {output_file}")
  366. # 监控最终GPU状态
  367. print("\n最终GPU内存状态:")
  368. monitor_gpu_memory()
  369. except Exception as e:
  370. print(f"处理过程中发生错误: {str(e)}")
  371. traceback.print_exc()
  372. if __name__ == "__main__":
  373. main()