| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- # zhch/omnidocbench_parallel_eval.py
- import json
- import time
- import os
- import glob
- import traceback
- from pathlib import Path
- from typing import List, Dict, Any, Tuple
- from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
- from multiprocessing import Queue, Manager
- import cv2
- import numpy as np
- from paddlex import create_pipeline
- from tqdm import tqdm
- import threading
- class OmniDocBenchParallelEvaluator:
- """
- OmniDocBench并行评估器,支持多进程批处理
- """
-
- def __init__(self, pipeline_config_path: str = "PP-StructureV3"):
- """
- 初始化评估器
-
- Args:
- pipeline_config_path: PaddleX pipeline配置文件路径
- """
- self.pipeline_config = pipeline_config_path
- self.category_mapping = self._get_category_mapping()
-
- def _get_category_mapping(self) -> Dict[str, str]:
- """获取PaddleX类别到OmniDocBench类别的映射"""
- return {
- 'title': 'title',
- 'text': 'text_block',
- 'figure': 'figure',
- 'figure_caption': 'figure_caption',
- 'table': 'table',
- 'table_caption': 'table_caption',
- 'equation': 'equation_isolated',
- 'header': 'header',
- 'footer': 'footer',
- 'reference': 'reference',
- 'seal': 'abandon',
- 'number': 'page_number',
- }
-
- def create_pipeline(self):
- """创建pipeline实例(每个进程单独创建)"""
- return create_pipeline(pipeline=self.pipeline_config)
-
- def process_single_image(self, image_path: str, use_gpu: bool = True) -> Dict[str, Any]:
- """
- 处理单张图像
-
- Args:
- image_path: 图像路径
- use_gpu: 是否使用GPU
-
- Returns:
- OmniDocBench格式的结果字典
- """
- try:
- # 每个进程创建自己的pipeline
- pipeline = self.create_pipeline()
-
- # 读取图像获取尺寸信息
- image = cv2.imread(image_path)
- if image is None:
- return None
-
- height, width = image.shape[:2]
-
- # 运行PaddleX pipeline
- start_time = time.time()
-
- output = list(pipeline.predict(
- input=image_path,
- device="gpu" if use_gpu else "cpu",
- use_doc_orientation_classify=True,
- use_doc_unwarping=False,
- use_seal_recognition=True,
- use_chart_recognition=True,
- use_table_recognition=True,
- use_formula_recognition=True,
- ))
-
- process_time = time.time() - start_time
-
- # 转换为OmniDocBench格式
- result = self._convert_to_omnidocbench_format(
- output, image_path, width, height
- )
-
- # 添加处理时间信息
- if result:
- result["processing_time"] = process_time
- result["success"] = True
-
- return result
-
- except Exception as e:
- return {
- "image_path": Path(image_path).name,
- "error": str(e),
- "success": False,
- "processing_time": 0
- }
-
- def process_batch(self, image_paths: List[str], use_gpu: bool = True) -> List[Dict[str, Any]]:
- """
- 批处理图像
-
- Args:
- image_paths: 图像路径列表
- use_gpu: 是否使用GPU
-
- Returns:
- 结果列表
- """
- results = []
- pipeline = self.create_pipeline()
-
- for image_path in image_paths:
- try:
- result = self._process_with_pipeline(pipeline, image_path, use_gpu)
- if result:
- results.append(result)
- except Exception as e:
- error_result = {
- "image_path": Path(image_path).name,
- "error": str(e),
- "success": False,
- "processing_time": 0
- }
- results.append(error_result)
-
- return results
-
- def _process_with_pipeline(self, pipeline, image_path: str, use_gpu: bool) -> Dict[str, Any]:
- """使用给定的pipeline处理图像"""
- # 读取图像获取尺寸信息
- image = cv2.imread(image_path)
- if image is None:
- return None
-
- height, width = image.shape[:2]
-
- # 运行pipeline
- start_time = time.time()
-
- output = list(pipeline.predict(
- input=image_path,
- device="gpu" if use_gpu else "cpu",
- use_doc_orientation_classify=True,
- use_doc_unwarping=False,
- use_seal_recognition=True,
- use_chart_recognition=True,
- use_table_recognition=True,
- use_formula_recognition=True,
- ))
-
- process_time = time.time() - start_time
-
- # 转换格式
- result = self._convert_to_omnidocbench_format(
- output, image_path, width, height
- )
-
- if result:
- result["processing_time"] = process_time
- result["success"] = True
-
- return result
-
- def _convert_to_omnidocbench_format(self,
- paddlex_output: List,
- image_path: str,
- width: int,
- height: int) -> Dict[str, Any]:
- """将PaddleX输出转换为OmniDocBench格式"""
- layout_dets = []
- anno_id_counter = 0
-
- # 处理PaddleX的输出
- for res in paddlex_output:
- res_json = res.json.get('res', {})
- parsing_list = res_json.get('parsing_res_list', [])
-
- for item in parsing_list:
- bbox = item.get('block_bbox', [])
- category = item.get('block_label', 'text_block')
- content = item.get('block_content', '')
-
- # 转换bbox格式
- if len(bbox) == 4:
- x1, y1, x2, y2 = bbox
- poly = [x1, y1, x2, y1, x2, y2, x1, y2]
- else:
- poly = bbox
-
- # 映射类别
- omni_category = self.category_mapping.get(category, 'text_block')
-
- # 创建layout检测结果
- layout_det = {
- "category_type": omni_category,
- "poly": poly,
- "ignore": False,
- "order": anno_id_counter,
- "anno_id": anno_id_counter,
- }
-
- # 添加内容
- if content and content.strip():
- if omni_category == 'table':
- layout_det["html"] = content
- else:
- layout_det["text"] = content.strip()
-
- # 添加属性
- layout_det["attribute"] = self._extract_attributes(item, omni_category)
- layout_det["line_with_spans"] = []
-
- layout_dets.append(layout_det)
- anno_id_counter += 1
-
- # 构建完整结果
- result = {
- "layout_dets": layout_dets,
- "page_info": {
- "page_no": 0,
- "height": height,
- "width": width,
- "image_path": Path(image_path).name,
- "page_attribute": {
- "data_source": "research_report",
- "language": "simplified_chinese",
- "layout": "single_column",
- "watermark": False,
- "fuzzy_scan": False,
- "colorful_backgroud": False
- }
- },
- "extra": {
- "relation": []
- }
- }
-
- return result
-
- def _extract_attributes(self, item: Dict, category: str) -> Dict:
- """提取属性标签"""
- attributes = {}
-
- if category == 'table':
- attributes.update({
- "table_layout": "vertical",
- "with_span": False,
- "line": "full_line",
- "language": "table_simplified_chinese",
- "include_equation": False,
- "include_backgroud": False,
- "table_vertical": False
- })
-
- content = item.get('block_content', '')
- if 'colspan' in content or 'rowspan' in content:
- attributes["with_span"] = True
-
- elif category in ['text_block', 'title']:
- attributes.update({
- "text_language": "text_simplified_chinese",
- "text_background": "white",
- "text_rotate": "normal"
- })
-
- elif 'equation' in category:
- attributes.update({
- "formula_type": "print"
- })
-
- return attributes
-
- def parallel_process_with_threading(self,
- image_paths: List[str],
- batch_size: int = 4,
- max_workers: int = 4,
- use_gpu: bool = True) -> List[Dict[str, Any]]:
- """
- 使用多线程并行处理(推荐用于GPU)
-
- Args:
- image_paths: 图像路径列表
- batch_size: 批处理大小
- max_workers: 最大工作线程数
- use_gpu: 是否使用GPU
-
- Returns:
- 处理结果列表
- """
- # 将图像路径分批
- batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
-
- all_results = []
- completed_count = 0
- total_images = len(image_paths)
-
- # 创建进度条
- with tqdm(total=total_images, desc="处理图像", unit="张") as pbar:
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有批处理任务
- future_to_batch = {
- executor.submit(self.process_batch, batch, use_gpu): batch
- for batch in batches
- }
-
- # 收集结果
- for future in as_completed(future_to_batch):
- batch = future_to_batch[future]
- try:
- batch_results = future.result()
- all_results.extend(batch_results)
- completed_count += len(batch)
- pbar.update(len(batch))
-
- # 更新进度条描述
- success_count = sum(1 for r in batch_results if r.get('success', False))
- pbar.set_postfix({
- 'batch_success': f"{success_count}/{len(batch)}",
- 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
- })
-
- except Exception as e:
- print(f"批处理失败: {e}")
- # 为失败的批次创建错误结果
- for img_path in batch:
- error_result = {
- "image_path": Path(img_path).name,
- "error": str(e),
- "success": False,
- "processing_time": 0
- }
- all_results.append(error_result)
- pbar.update(len(batch))
-
- return all_results
-
- def parallel_process_with_multiprocessing(self,
- image_paths: List[str],
- batch_size: int = 4,
- max_workers: int = 4,
- use_gpu: bool = False) -> List[Dict[str, Any]]:
- """
- 使用多进程并行处理(推荐用于CPU)
-
- Args:
- image_paths: 图像路径列表
- batch_size: 批处理大小
- max_workers: 最大工作进程数
- use_gpu: 是否使用GPU
-
- Returns:
- 处理结果列表
- """
- # 将图像路径分批
- batches = [image_paths[i:i + batch_size] for i in range(0, len(image_paths), batch_size)]
-
- all_results = []
- completed_count = 0
- total_images = len(image_paths)
-
- # 创建进度条
- with tqdm(total=total_images, desc="处理图像", unit="张") as pbar:
- with ProcessPoolExecutor(max_workers=max_workers) as executor:
- # 提交所有批处理任务
- future_to_batch = {
- executor.submit(process_batch_worker, batch, self.pipeline_config, use_gpu): batch
- for batch in batches
- }
-
- # 收集结果
- for future in as_completed(future_to_batch):
- batch = future_to_batch[future]
- try:
- batch_results = future.result()
- all_results.extend(batch_results)
- completed_count += len(batch)
- pbar.update(len(batch))
-
- # 更新进度条描述
- success_count = sum(1 for r in batch_results if r.get('success', False))
- pbar.set_postfix({
- 'batch_success': f"{success_count}/{len(batch)}",
- 'total_success': f"{sum(1 for r in all_results if r.get('success', False))}/{completed_count}"
- })
-
- except Exception as e:
- print(f"批处理失败: {e}")
- # 为失败的批次创建错误结果
- for img_path in batch:
- error_result = {
- "image_path": Path(img_path).name,
- "error": str(e),
- "success": False,
- "processing_time": 0
- }
- all_results.append(error_result)
- pbar.update(len(batch))
-
- return all_results
-
- def save_results_incrementally(self,
- results: List[Dict[str, Any]],
- output_file: str,
- save_interval: int = 50):
- """
- 增量保存结果
-
- Args:
- results: 结果列表
- output_file: 输出文件路径
- save_interval: 保存间隔
- """
- if len(results) % save_interval == 0 and len(results) > 0:
- try:
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2)
- print(f"已保存 {len(results)} 个结果到 {output_file}")
- except Exception as e:
- print(f"保存结果时出错: {e}")
- def process_batch_worker(image_paths: List[str], pipeline_config: str, use_gpu: bool) -> List[Dict[str, Any]]:
- """
- 多进程工作函数
- """
- try:
- # 在每个进程中创建评估器
- evaluator = OmniDocBenchParallelEvaluator(pipeline_config)
- return evaluator.process_batch(image_paths, use_gpu)
- except Exception as e:
- # 返回错误结果
- error_results = []
- for img_path in image_paths:
- error_results.append({
- "image_path": Path(img_path).name,
- "error": str(e),
- "success": False,
- "processing_time": 0
- })
- return error_results
- def main():
- """主函数 - 并行处理OmniDocBench数据集"""
-
- # 配置参数
- dataset_path = "/Users/zhch158/workspace/repository.git/OmniDocBench/OpenDataLab___OmniDocBench/images"
- output_dir = "/Users/zhch158/workspace/repository.git/PaddleX/zhch/OmniDocBench_Results"
- pipeline_config = "PP-StructureV3"
-
- # 并行处理参数
- batch_size = 4 # 批处理大小
- max_workers = 4 # 最大工作进程/线程数
- use_gpu = True # 是否使用GPU
- use_multiprocessing = False # False=多线程(GPU推荐), True=多进程(CPU推荐)
-
- # 确保输出目录存在
- os.makedirs(output_dir, exist_ok=True)
-
- print("="*60)
- print("OmniDocBench 并行评估开始")
- print("="*60)
- print(f"数据集路径: {dataset_path}")
- print(f"输出目录: {output_dir}")
- print(f"批处理大小: {batch_size}")
- print(f"最大工作线程/进程数: {max_workers}")
- print(f"使用GPU: {use_gpu}")
- print(f"并行方式: {'多进程' if use_multiprocessing else '多线程'}")
-
- # 查找所有图像文件
- image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.bmp', '*.tiff']
- image_files = []
-
- for ext in image_extensions:
- image_files.extend(glob.glob(os.path.join(dataset_path, ext)))
-
- print(f"找到 {len(image_files)} 个图像文件")
-
- if not image_files:
- print("未找到任何图像文件,程序终止")
- return
-
- # 创建评估器
- evaluator = OmniDocBenchParallelEvaluator(pipeline_config)
-
- # 开始处理
- start_time = time.time()
-
- if use_multiprocessing:
- # 多进程处理(推荐用于CPU)
- print("使用多进程并行处理...")
- results = evaluator.parallel_process_with_multiprocessing(
- image_files, batch_size, max_workers, use_gpu
- )
- else:
- # 多线程处理(推荐用于GPU)
- print("使用多线程并行处理...")
- results = evaluator.parallel_process_with_threading(
- image_files, batch_size, max_workers, use_gpu
- )
-
- total_time = time.time() - start_time
-
- # 保存最终结果
- output_file = os.path.join(output_dir, f"OmniDocBench_PPStructureV3_batch{batch_size}.json")
- try:
- with open(output_file, 'w', encoding='utf-8') as f:
- json.dump(results, f, ensure_ascii=False, indent=2)
-
- print("\n" + "="*60)
- print("处理完成!")
- print("="*60)
-
- # 统计信息
- success_count = sum(1 for r in results if r.get('success', False))
- error_count = len(results) - success_count
- total_processing_time = sum(r.get('processing_time', 0) for r in results if r.get('success', False))
- avg_processing_time = total_processing_time / success_count if success_count > 0 else 0
-
- print(f"总文件数: {len(image_files)}")
- print(f"成功处理: {success_count}")
- print(f"失败数量: {error_count}")
- print(f"成功率: {success_count / len(image_files) * 100:.2f}%")
- print(f"总耗时: {total_time:.2f}秒")
- print(f"平均处理时间: {avg_processing_time:.2f}秒/张")
- print(f"吞吐量: {len(image_files) / total_time:.2f}张/秒")
- print(f"结果保存至: {output_file}")
-
- # 保存统计信息
- stats = {
- "total_files": len(image_files),
- "success_count": success_count,
- "error_count": error_count,
- "success_rate": success_count / len(image_files),
- "total_time": total_time,
- "avg_processing_time": avg_processing_time,
- "throughput": len(image_files) / total_time,
- "batch_size": batch_size,
- "max_workers": max_workers,
- "use_gpu": use_gpu,
- "use_multiprocessing": use_multiprocessing
- }
-
- stats_file = os.path.join(output_dir, f"processing_stats_batch{batch_size}.json")
- with open(stats_file, 'w', encoding='utf-8') as f:
- json.dump(stats, f, ensure_ascii=False, indent=2)
-
- print(f"统计信息保存至: {stats_file}")
-
- except Exception as e:
- print(f"保存结果文件时发生错误: {str(e)}")
- traceback.print_exc()
- if __name__ == "__main__":
- main()
|