| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500 |
- # zhch/ppstructurev3_parallel_predict_optimized.py
- import json
- import time
- import os
- import glob
- import traceback
- from pathlib import Path
- from typing import List, Dict, Any, Tuple
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
- from multiprocessing import Queue, Manager
- import cv2
- import numpy as np
- from paddlex import create_pipeline
- from tqdm import tqdm
- import threading
- import queue
- from dotenv import load_dotenv
- load_dotenv(override=True)
- class PPStructureV3ParallelPredictor:
- """
- PP-StructureV3并行预测器,支持多进程批处理
- """
- def __init__(self, pipeline_config_path: str = "PP-StructureV3", output_path: str = "output", use_gpu: bool = True):
- """
- 初始化预测器
-
- Args:
- pipeline_config_path: PaddleX pipeline配置文件路径
- """
- self.pipeline_config = pipeline_config_path
- self.pipeline = None # 延迟初始化
- self.output_path = output_path
- self.use_gpu = use_gpu
- def _ensure_pipeline(self):
- """确保pipeline已初始化(线程安全)"""
- if self.pipeline is None:
- self.pipeline = create_pipeline(pipeline=self.pipeline_config)
- def process_single_image(self, image_path: str) -> Dict[str, Any]:
- """
- 处理单张图像
-
- Args:
- image_path: 图像路径
-
- Returns:
- 处理结果{"image_path": str, "success": bool, "processing_time": float, "error": str}
- """
- try:
- # 确保pipeline已初始化
- self._ensure_pipeline()
-
- # 读取图像获取尺寸信息
- image = cv2.imread(image_path)
- if image is None:
- return {
- "image_path": Path(image_path).name,
- "error": "无法读取图像",
- "success": False,
- "processing_time": 0
- }
-
- height, width = image.shape[:2]
-
- # 运行PaddleX pipeline
- start_time = time.time()
-
- output = self.pipeline.predict(
- input=image_path,
- device="gpu" if self.use_gpu else "cpu",
- use_doc_orientation_classify=True,
- use_doc_unwarping=False,
- use_seal_recognition=True,
- use_chart_recognition=True,
- use_table_recognition=True,
- use_formula_recognition=True,
- )
-
- # 保存结果
- for res in output:
- res.save_to_json(save_path=self.output_path)
- res.save_to_markdown(save_path=self.output_path)
-
- process_time = time.time() - start_time
-
- # 返回处理结果
- return {
- "image_path": Path(image_path).name,
- "processing_time": process_time,
- "success": True
- }
-
- except Exception as e:
- return {
- "image_path": Path(image_path).name,
- "error": str(e),
- "success": False,
- "processing_time": 0
- }
-
- def process_batch(self, image_paths: List[str]) -> List[Dict[str, Any]]:
- """
- 批处理图像
-
- Args:
- image_paths: 图像路径列表
-
- Returns:
- 结果列表
- """
- results = []
-
- for image_path in image_paths:
- result = self.process_single_image(image_path)
- results.append(result)
-
- return results
- class ThreadWorker:
- """线程工作器 - 每个线程维护自己的pipeline实例"""
-
- def __init__(self, pipeline_config: str, output_path: str, use_gpu: bool, worker_id: int):
- self.worker_id = worker_id
- self.predictor = PPStructureV3ParallelPredictor(
- pipeline_config,
- output_path=f"{output_path}/worker_{worker_id}",
- use_gpu=use_gpu
- )
- self.task_queue = queue.Queue()
- self.result_queue = queue.Queue()
- self.running = True
-
- def add_batch(self, batch: List[str]):
- """添加批处理任务"""
- self.task_queue.put(batch)
-
- def get_results(self) -> List[Dict[str, Any]]:
- """获取处理结果"""
- results = []
- while not self.result_queue.empty():
- try:
- result = self.result_queue.get_nowait()
- results.extend(result)
- except queue.Empty:
- break
- return results
-
- def worker_loop(self):
- """工作循环"""
- while self.running:
- try:
- batch = self.task_queue.get(timeout=1.0)
- if batch is None: # 结束信号
- break
-
- # 处理批次
- batch_results = self.predictor.process_batch(batch)
- self.result_queue.put(batch_results)
-
- except queue.Empty:
- continue
- except Exception as e:
- print(f"工作线程 {self.worker_id} 处理出错: {e}")
-
- def stop(self):
- """停止工作线程"""
- self.running = False
- self.task_queue.put(None) # 发送结束信号
- def parallel_process_with_optimized_threading(image_paths: List[str],
- batch_size: int = 4,
- max_workers: int = 2, # GPU限制为2个worker
- pipeline_config: str = "PP-StructureV3",
- output_path: str = "./output",
- use_gpu: bool = True) -> List[Dict[str, Any]]:
- """
- 使用优化的多线程并行处理(每个线程一个pipeline实例)
-
- Args:
- image_paths: 图像路径列表
- batch_size: 批处理大小
- max_workers: 最大工作线程数(GPU推荐2个)
- pipeline_config: pipeline配置
- output_path: 输出路径
- use_gpu: 是否使用GPU
-
- Returns:
- 处理结果列表
- """
- # 确保输出目录存在
- os.makedirs(output_path, exist_ok=True)
-
- # 将图像路径分批
- batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
-
- # 创建工作线程
- workers = []
- threads = []
-
- for i in range(max_workers):
- worker = ThreadWorker(pipeline_config, output_path, use_gpu, i)
- workers.append(worker)
-
- thread = threading.Thread(target=worker.worker_loop)
- thread.daemon = True
- thread.start()
- threads.append(thread)
-
- print(f"启动了 {max_workers} 个工作线程,每个线程独立的pipeline实例")
-
- # 分发任务
- all_results = []
- total_images = len(image_paths)
- completed_count = 0
-
- try:
- with tqdm(total=total_images, desc="处理图像", unit="张") as pbar:
- # 轮流分发批次到不同的worker
- for i, batch in enumerate(batches):
- worker_id = i % max_workers
- workers[worker_id].add_batch(batch)
-
- # 等待所有任务完成
- while completed_count < total_images:
- time.sleep(0.1) # 短暂等待
-
- # 收集结果
- for worker in workers:
- batch_results = worker.get_results()
- if batch_results:
- all_results.extend(batch_results)
- completed_count += len(batch_results)
- pbar.update(len(batch_results))
-
- # 更新进度条
- success_count = sum(1 for r in batch_results if r.get('success', False))
- pbar.set_postfix({
- 'recent_success': f"{success_count}/{len(batch_results)}",
- 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
- })
-
- finally:
- # 停止所有工作线程
- for worker in workers:
- worker.stop()
-
- # 等待线程结束
- for thread in threads:
- thread.join(timeout=2.0)
-
- return all_results
- def process_batch_worker_optimized(worker_id: int,
- task_queue: Queue,
- result_queue: Queue,
- pipeline_config: str,
- output_path: str,
- use_gpu: bool):
- """
- 优化的多进程工作函数 - 每个进程只初始化一次pipeline
- """
- try:
- # 每个进程创建自己的输出目录
- worker_output = f"{output_path}/worker_{worker_id}"
- os.makedirs(worker_output, exist_ok=True)
-
- # 只初始化一次pipeline
- predictor = PPStructureV3ParallelPredictor(
- pipeline_config,
- output_path=worker_output,
- use_gpu=use_gpu
- )
-
- print(f"进程 {worker_id} 初始化完成")
-
- # 持续处理任务
- while True:
- try:
- batch = task_queue.get(timeout=2.0)
- if batch is None: # 结束信号
- break
-
- # 处理批次
- batch_results = predictor.process_batch(batch)
- result_queue.put(batch_results)
-
- except Exception as e:
- print(f"进程 {worker_id} 处理批次时出错: {e}")
- continue
-
- except Exception as e:
- print(f"进程 {worker_id} 初始化失败: {e}")
- traceback.print_exc()
- def parallel_process_with_optimized_multiprocessing(image_paths: List[str],
- batch_size: int = 4,
- max_workers: int = 4,
- pipeline_config: str = "PP-StructureV3",
- output_path: str = "./output",
- use_gpu: bool = False) -> List[Dict[str, Any]]:
- """
- 使用优化的多进程并行处理(每个进程一个pipeline实例)
-
- Args:
- image_paths: 图像路径列表
- batch_size: 批处理大小
- max_workers: 最大工作进程数
- pipeline_config: pipeline配置
- output_path: 输出路径
- use_gpu: 是否使用GPU
-
- Returns:
- 处理结果列表
- """
- # 确保输出目录存在
- os.makedirs(output_path, exist_ok=True)
-
- # 将图像路径分批
- batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
-
- # 创建进程间通信队列
- manager = Manager()
- task_queue = manager.Queue()
- result_queue = manager.Queue()
-
- # 启动工作进程
- processes = []
- for i in range(max_workers):
- p = Process(
- target=process_batch_worker_optimized,
- args=(i, task_queue, result_queue, pipeline_config, output_path, use_gpu)
- )
- p.start()
- processes.append(p)
-
- print(f"启动了 {max_workers} 个工作进程,每个进程独立的pipeline实例")
-
- # 分发任务
- for batch in batches:
- task_queue.put(batch)
-
- # 发送结束信号
- for _ in range(max_workers):
- task_queue.put(None)
-
- # 收集结果
- all_results = []
- total_images = len(image_paths)
- completed_count = 0
-
- with tqdm(total=total_images, desc="处理图像", unit="张") as pbar:
- # 等待所有结果
- expected_batches = len(batches)
- received_batches = 0
-
- while received_batches < expected_batches:
- try:
- batch_results = result_queue.get(timeout=30.0)
- all_results.extend(batch_results)
- completed_count += len(batch_results)
- received_batches += 1
-
- pbar.update(len(batch_results))
-
- # 更新进度条
- success_count = sum(1 for r in batch_results if r.get('success', False))
- pbar.set_postfix({
- 'batch_success': f"{success_count}/{len(batch_results)}",
- 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
- })
-
- except Exception as e:
- print(f"等待结果时出错: {e}")
- break
-
- # 等待所有进程结束
- for p in processes:
- p.join(timeout=10.0)
- if p.is_alive():
- p.terminate()
-
- return all_results
- def main():
- """主函数 - 优化的并行处理"""
-
- # 配置参数
- dataset_path = "../../OmniDocBench/OpenDataLab___OmniDocBench/images"
- output_dir = "./OmniDocBench_Results_Optimized"
- pipeline_config = "PP-StructureV3"
-
- # 并行处理参数
- batch_size = 4 # 批处理大小
- use_gpu = True # 是否使用GPU
- max_workers = 4 # CPU可以用更多进程
- use_multiprocessing = False # CPU用进程
-
- # 确保输出目录存在
- print(f"输出目录: {Path(output_dir).absolute()}")
- os.makedirs(output_dir, exist_ok=True)
-
- dataset_path = Path(dataset_path).resolve()
- output_dir = Path(output_dir).resolve()
-
- print("="*60)
- print("OmniDocBench 优化并行处理开始")
- print("="*60)
- print(f"数据集路径: {dataset_path}")
- print(f"输出目录: {output_dir}")
- print(f"批处理大小: {batch_size}")
- print(f"最大工作线程/进程数: {max_workers}")
- print(f"使用GPU: {use_gpu}")
- print(f"并行方式: {'多进程' if use_multiprocessing else '多线程'}")
- print(f"Pipeline实例数: {max_workers} (每个进程/线程一个)")
-
- # 查找所有图像文件
- image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff']
- image_files = []
-
- for ext in image_extensions:
- image_files.extend(glob.glob(os.path.join(dataset_path, ext)))
-
- print(f"找到 {len(image_files)} 个图像文件")
-
- if not image_files:
- print("未找到任何图像文件,程序终止")
- return
-
- # 限制处理数量用于测试
- # image_files = image_files[:20] # 取消注释以限制处理数量
-
- # 开始处理
- start_time = time.time()
-
- try:
- if use_multiprocessing:
- # 多进程处理(推荐用于CPU)
- print("使用优化的多进程并行处理...")
- results = parallel_process_with_optimized_multiprocessing(
- image_files, batch_size, max_workers, pipeline_config, str(output_dir), use_gpu
- )
- else:
- # 多线程处理(推荐用于GPU)
- print("使用优化的多线程并行处理...")
- results = parallel_process_with_optimized_threading(
- image_files, batch_size, max_workers, pipeline_config, str(output_dir), use_gpu
- )
-
- total_time = time.time() - start_time
-
- # 统计信息
- success_count = sum(1 for r in results if r.get('success', False))
- error_count = len(results) - success_count
- total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False))
- avg_processing_time = total_processing_time / success_count if success_count > 0 else 0
-
- # 保存结果统计
- stats = {
- "total_files": len(image_files),
- "success_count": success_count,
- "error_count": error_count,
- "success_rate": success_count / len(image_files),
- "total_time": total_time,
- "avg_processing_time": avg_processing_time,
- "throughput": len(image_files) / total_time,
- "batch_size": batch_size,
- "max_workers": max_workers,
- "use_gpu": use_gpu,
- "use_multiprocessing": use_multiprocessing,
- "optimization": "单进程/线程单pipeline实例"
- }
- results['stats'] = stats
- # 保存最终结果
- output_file = os.path.join(output_dir, f"OmniDocBench_PPStructureV3_batch{batch_size}.json")
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2)
- print("\n" + "="*60)
- print("优化并行处理完成!")
- print("="*60)
- print(f"总文件数: {len(image_files)}")
- print(f"成功处理: {success_count}")
- print(f"失败数量: {error_count}")
- print(f"成功率: {success_count / len(image_files) * 100:.2f}%")
- print(f"总耗时: {total_time:.2f}秒")
- print(f"平均处理时间: {avg_processing_time:.2f}秒/张")
- print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒")
- print(f"Pipeline实例数: {max_workers}")
- print(f"统计信息保存至: {output_file}")
-
- except Exception as e:
- print(f"处理过程中发生错误: {str(e)}")
- traceback.print_exc()
- if __name__ == "__main__":
- main()
|