""" FileLoc Timing Hook - 轻量级 timing 数据采集 hook 该模块提供与 fileLoc 侧 timing 收集的接口。 当 fileLoc 未安装时,该模块为空操作,不影响主流程。 被 pipeline_manager_v2.py 调用,侵入量:1行导入 + 1行调用 = 2行。 """ import threading from typing import Dict, Any, Optional # 线程安全存储 _timing_store: Dict[str, Dict[str, Any]] = {} _timing_lock = threading.Lock() def _extract_tables_from_page_elements(elements): """ [SC2-B fix-high-PR-1] 从 page_result['elements'] 推算 tables(fallback) 根因:OCR 引擎的 _process_single_page 从未在 page_result 写 timing 键, 导致 on_page_result 拿不到 timing 退出。这里加 fallback:从 elements 过滤 type in ('table', 'table_body') 推算 tables,让下游 /ocr/process 响应里 model_timing 不再是 null。 """ if not isinstance(elements, list): return [] tables = [] for idx, elem in enumerate(elements): if not isinstance(elem, dict): continue elem_type = elem.get('type', '') if elem_type not in ('table', 'table_body'): continue content = elem.get('content', {}) if isinstance(elem.get('content'), dict) else {} tables.append({ 'table_idx': idx, 'bbox': elem.get('bbox'), 'table_type': content.get('table_type'), # [ISSUE-20260626-001 v15] 保留 wrap 注入的 timing, 不再硬覆盖为 0 'classification_time': float(content.get('classification_time', 0.0) or 0.0), 'recognition_time': float(content.get('recognition_time', 0.0) or 0.0), 'recognition_method': content.get('recognition_method'), 'total_time': float(content.get('total_time', 0.0) or 0.0), }) return tables def on_page_result(task_id: str, page_result: Dict[str, Any]) -> None: """ 页面处理完成回调 - 被 pipeline_manager_v2.py 调用 Args: task_id: 任务ID (如 "task_001_1" 其中最后一位是页码) page_result: 页面处理结果,包含 timing 数据 """ if not task_id or not page_result: return timing = page_result.get('timing', {}) if not isinstance(timing, dict) or not timing: # [SC2-B fix-high-PR-1] Fallback:page_result 没有 timing 时,从 elements 推算 fallback_tables = _extract_tables_from_page_elements(page_result.get('elements', [])) if not fallback_tables: return timing = { 'page_idx': page_result.get('page_idx'), 'preprocessor_time': 0.0, 'layout_detection_time': 0.0, 'ocr_time': 0.0, 'tables': fallback_tables, } with _timing_lock: if task_id not in _timing_store: _timing_store[task_id] = { 'pages': [], 'doc_timing': {} } _timing_store[task_id]['pages'].append({ 'page_idx': timing.get('page_idx'), 'preprocessor_time': timing.get('preprocessor_time', 0.0), 'layout_detection_time': timing.get('layout_detection_time', 0.0), 'ocr_time': timing.get('ocr_time', 0.0), 'tables': timing.get('tables', []) }) def on_document_result(task_id: str, doc_result: Dict[str, Any]) -> None: """ 文档处理完成回调 (可选) Args: task_id: 任务ID doc_result: 文档处理结果 """ pass def get_timing_data(task_id: str) -> Optional[Dict[str, Any]]: """ 获取指定任务的 timing 数据 (供 fileLoc 调用) Args: task_id: 任务ID Returns: timing 数据字典,如果不存在返回 None """ with _timing_lock: return _timing_store.get(task_id) def clear_timing_data(task_id: str) -> None: """清除指定任务的 timing 数据""" with _timing_lock: _timing_store.pop(task_id, None)