| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- """
- FileLoc Timing Hook - 轻量级 timing 数据采集 hook
- 该模块提供与 fileLoc 侧 timing 收集的接口。
- 当 fileLoc 未安装时,该模块为空操作,不影响主流程。
- 被 pipeline_manager_v2.py 调用,侵入量:1行导入 + 1行调用 = 2行。
- """
- import threading
- from typing import Dict, Any, Optional
- # 线程安全存储
- _timing_store: Dict[str, Dict[str, Any]] = {}
- _timing_lock = threading.Lock()
- def _extract_tables_from_page_elements(elements):
- """
- [SC2-B fix-high-PR-1] 从 page_result['elements'] 推算 tables(fallback)
- 根因:OCR 引擎的 _process_single_page 从未在 page_result 写 timing 键,
- 导致 on_page_result 拿不到 timing 退出。这里加 fallback:从 elements
- 过滤 type in ('table', 'table_body') 推算 tables,让下游 /ocr/process
- 响应里 model_timing 不再是 null。
- """
- if not isinstance(elements, list):
- return []
- tables = []
- for idx, elem in enumerate(elements):
- if not isinstance(elem, dict):
- continue
- elem_type = elem.get('type', '')
- if elem_type not in ('table', 'table_body'):
- continue
- content = elem.get('content', {}) if isinstance(elem.get('content'), dict) else {}
- tables.append({
- 'table_idx': idx,
- 'bbox': elem.get('bbox'),
- 'table_type': content.get('table_type'),
- # [ISSUE-20260626-001 v15] 保留 wrap 注入的 timing, 不再硬覆盖为 0
- 'classification_time': float(content.get('classification_time', 0.0) or 0.0),
- 'recognition_time': float(content.get('recognition_time', 0.0) or 0.0),
- 'recognition_method': content.get('recognition_method'),
- 'total_time': float(content.get('total_time', 0.0) or 0.0),
- })
- return tables
- def on_page_result(task_id: str, page_result: Dict[str, Any]) -> None:
- """
- 页面处理完成回调 - 被 pipeline_manager_v2.py 调用
- Args:
- task_id: 任务ID (如 "task_001_1" 其中最后一位是页码)
- page_result: 页面处理结果,包含 timing 数据
- """
- if not task_id or not page_result:
- return
- timing = page_result.get('timing', {})
- if not isinstance(timing, dict) or not timing:
- # [SC2-B fix-high-PR-1] Fallback:page_result 没有 timing 时,从 elements 推算
- fallback_tables = _extract_tables_from_page_elements(page_result.get('elements', []))
- if not fallback_tables:
- return
- timing = {
- 'page_idx': page_result.get('page_idx'),
- 'preprocessor_time': 0.0,
- 'layout_detection_time': 0.0,
- 'ocr_time': 0.0,
- 'tables': fallback_tables,
- }
- with _timing_lock:
- if task_id not in _timing_store:
- _timing_store[task_id] = {
- 'pages': [],
- 'doc_timing': {}
- }
- _timing_store[task_id]['pages'].append({
- 'page_idx': timing.get('page_idx'),
- 'preprocessor_time': timing.get('preprocessor_time', 0.0),
- 'layout_detection_time': timing.get('layout_detection_time', 0.0),
- 'ocr_time': timing.get('ocr_time', 0.0),
- 'tables': timing.get('tables', [])
- })
- def on_document_result(task_id: str, doc_result: Dict[str, Any]) -> None:
- """
- 文档处理完成回调 (可选)
- Args:
- task_id: 任务ID
- doc_result: 文档处理结果
- """
- pass
- def get_timing_data(task_id: str) -> Optional[Dict[str, Any]]:
- """
- 获取指定任务的 timing 数据 (供 fileLoc 调用)
- Args:
- task_id: 任务ID
- Returns:
- timing 数据字典,如果不存在返回 None
- """
- with _timing_lock:
- return _timing_store.get(task_id)
- def clear_timing_data(task_id: str) -> None:
- """清除指定任务的 timing 数据"""
- with _timing_lock:
- _timing_store.pop(task_id, None)
|