fileLoc_timing_hook.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. """
  2. FileLoc Timing Hook - 轻量级 timing 数据采集 hook
  3. 该模块提供与 fileLoc 侧 timing 收集的接口。
  4. 当 fileLoc 未安装时,该模块为空操作,不影响主流程。
  5. 被 pipeline_manager_v2.py 调用,侵入量:1行导入 + 1行调用 = 2行。
  6. """
  7. import threading
  8. from typing import Dict, Any, Optional
  9. # 线程安全存储
  10. _timing_store: Dict[str, Dict[str, Any]] = {}
  11. _timing_lock = threading.Lock()
  12. def _extract_tables_from_page_elements(elements):
  13. """
  14. [SC2-B fix-high-PR-1] 从 page_result['elements'] 推算 tables(fallback)
  15. 根因:OCR 引擎的 _process_single_page 从未在 page_result 写 timing 键,
  16. 导致 on_page_result 拿不到 timing 退出。这里加 fallback:从 elements
  17. 过滤 type in ('table', 'table_body') 推算 tables,让下游 /ocr/process
  18. 响应里 model_timing 不再是 null。
  19. """
  20. if not isinstance(elements, list):
  21. return []
  22. tables = []
  23. for idx, elem in enumerate(elements):
  24. if not isinstance(elem, dict):
  25. continue
  26. elem_type = elem.get('type', '')
  27. if elem_type not in ('table', 'table_body'):
  28. continue
  29. content = elem.get('content', {}) if isinstance(elem.get('content'), dict) else {}
  30. tables.append({
  31. 'table_idx': idx,
  32. 'bbox': elem.get('bbox'),
  33. 'table_type': content.get('table_type'),
  34. # [ISSUE-20260626-001 v15] 保留 wrap 注入的 timing, 不再硬覆盖为 0
  35. 'classification_time': float(content.get('classification_time', 0.0) or 0.0),
  36. 'recognition_time': float(content.get('recognition_time', 0.0) or 0.0),
  37. 'recognition_method': content.get('recognition_method'),
  38. 'total_time': float(content.get('total_time', 0.0) or 0.0),
  39. })
  40. return tables
  41. def on_page_result(task_id: str, page_result: Dict[str, Any]) -> None:
  42. """
  43. 页面处理完成回调 - 被 pipeline_manager_v2.py 调用
  44. Args:
  45. task_id: 任务ID (如 "task_001_1" 其中最后一位是页码)
  46. page_result: 页面处理结果,包含 timing 数据
  47. """
  48. if not task_id or not page_result:
  49. return
  50. timing = page_result.get('timing', {})
  51. if not isinstance(timing, dict) or not timing:
  52. # [SC2-B fix-high-PR-1] Fallback:page_result 没有 timing 时,从 elements 推算
  53. fallback_tables = _extract_tables_from_page_elements(page_result.get('elements', []))
  54. if not fallback_tables:
  55. return
  56. timing = {
  57. 'page_idx': page_result.get('page_idx'),
  58. 'preprocessor_time': 0.0,
  59. 'layout_detection_time': 0.0,
  60. 'ocr_time': 0.0,
  61. 'tables': fallback_tables,
  62. }
  63. with _timing_lock:
  64. if task_id not in _timing_store:
  65. _timing_store[task_id] = {
  66. 'pages': [],
  67. 'doc_timing': {}
  68. }
  69. _timing_store[task_id]['pages'].append({
  70. 'page_idx': timing.get('page_idx'),
  71. 'preprocessor_time': timing.get('preprocessor_time', 0.0),
  72. 'layout_detection_time': timing.get('layout_detection_time', 0.0),
  73. 'ocr_time': timing.get('ocr_time', 0.0),
  74. 'tables': timing.get('tables', [])
  75. })
  76. def on_document_result(task_id: str, doc_result: Dict[str, Any]) -> None:
  77. """
  78. 文档处理完成回调 (可选)
  79. Args:
  80. task_id: 任务ID
  81. doc_result: 文档处理结果
  82. """
  83. pass
  84. def get_timing_data(task_id: str) -> Optional[Dict[str, Any]]:
  85. """
  86. 获取指定任务的 timing 数据 (供 fileLoc 调用)
  87. Args:
  88. task_id: 任务ID
  89. Returns:
  90. timing 数据字典,如果不存在返回 None
  91. """
  92. with _timing_lock:
  93. return _timing_store.get(task_id)
  94. def clear_timing_data(task_id: str) -> None:
  95. """清除指定任务的 timing 数据"""
  96. with _timing_lock:
  97. _timing_store.pop(task_id, None)