ocr_comparator.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. import os
  2. import sys
  3. from typing import Dict, List, Tuple
  4. from datetime import datetime
  5. from pathlib import Path
  6. # 添加 ocr_platform 根目录到 Python 路径
  7. # 使用 resolve() 确保路径是绝对路径,避免相对路径导致的 IndexError
  8. _file_path = Path(__file__).resolve()
  9. ocr_platform_root = _file_path.parents[1] # ocr_comparator.py -> ocr_comparator -> ocr_platform
  10. if str(ocr_platform_root) not in sys.path:
  11. sys.path.insert(0, str(ocr_platform_root))
  12. try:
  13. from .content_extractor import ContentExtractor
  14. from .table_comparator import TableComparator
  15. from .paragraph_comparator import ParagraphComparator
  16. except ImportError:
  17. from content_extractor import ContentExtractor
  18. from table_comparator import TableComparator
  19. from paragraph_comparator import ParagraphComparator
  20. class OCRResultComparator:
  21. """OCR结果比较器主类"""
  22. def __init__(self):
  23. self.content_extractor = ContentExtractor()
  24. self.table_comparator = TableComparator()
  25. self.paragraph_comparator = ParagraphComparator()
  26. self.differences = []
  27. self.paragraph_match_threshold = 80
  28. self.content_similarity_threshold = 95
  29. self.max_paragraph_window = 6
  30. self.table_comparison_mode = 'standard'
  31. self.header_similarity_threshold = 90
  32. def compare_files(self, file1_path: str, file2_path: str) -> Dict:
  33. """比较两个OCR结果文件"""
  34. print(f"\n📖 读取文件...")
  35. # 读取文件内容
  36. with open(file1_path, 'r', encoding='utf-8') as f:
  37. content1 = f.read()
  38. with open(file2_path, 'r', encoding='utf-8') as f:
  39. content2 = f.read()
  40. print(f"✅ 文件读取完成")
  41. print(f" 文件1大小: {len(content1)} 字符")
  42. print(f" 文件2大小: {len(content2)} 字符")
  43. # ✅ 提取结构化内容(包含位置信息)
  44. print(f"\n📊 提取结构化内容...")
  45. structured_content1 = self.content_extractor.extract_structured_content(content1)
  46. structured_content2 = self.content_extractor.extract_structured_content(content2)
  47. print(f" 文件1: {len(structured_content1['tables'])}个表格, {len(structured_content1['paragraph_blocks'])}个段落块")
  48. print(f" 文件2: {len(structured_content2['tables'])}个表格, {len(structured_content2['paragraph_blocks'])}个段落块")
  49. # 初始化差异列表
  50. all_differences = []
  51. # ✅ 智能表格匹配与比较
  52. print(f"\n🔍 开始表格智能匹配...")
  53. tables1 = structured_content1['tables']
  54. tables2 = structured_content2['tables']
  55. # 记录匹配的表格对
  56. table_matches = []
  57. if tables1 and tables2:
  58. # 找到匹配的表格对
  59. table_matches = self.table_comparator.find_matching_tables(
  60. [t['data'] for t in tables1],
  61. [t['data'] for t in tables2]
  62. )
  63. if not table_matches:
  64. print(f" ⚠️ 未找到匹配的表格")
  65. all_differences.append({
  66. 'type': 'table_structure',
  67. 'position': '表格匹配',
  68. 'file1_value': f'{len(tables1)}个表格',
  69. 'file2_value': f'{len(tables2)}个表格',
  70. 'description': '未找到可匹配的表格',
  71. 'severity': 'high'
  72. })
  73. else:
  74. # 比较每对匹配的表格
  75. for idx1, idx2, similarity in table_matches:
  76. print(f"\n 📋 对比匹配的表格: 表格{idx1+1} vs 表格{idx2+1}")
  77. if self.table_comparison_mode == 'flow_list':
  78. table_diffs = self.table_comparator.compare_table_flow_list(
  79. tables1[idx1]['data'], tables2[idx2]['data']
  80. )
  81. else:
  82. table_diffs = self.table_comparator.compare_tables(
  83. tables1[idx1]['data'], tables2[idx2]['data']
  84. )
  85. # 为每个差异添加表格标识
  86. for diff in table_diffs:
  87. diff['table_pair'] = f'表格{idx1+1}↔表格{idx2+1}'
  88. diff['table_similarity'] = similarity
  89. all_differences.extend(table_diffs)
  90. print(f" 发现 {len(table_diffs)} 个差异")
  91. # 检查未匹配的表格
  92. matched_tables1 = {m[0] for m in table_matches}
  93. matched_tables2 = {m[1] for m in table_matches}
  94. for i in range(len(tables1)):
  95. if i not in matched_tables1:
  96. all_differences.append({
  97. 'type': 'table_unmatched',
  98. 'position': f'文件1表格{i+1}',
  99. 'file1_value': f'表格{i+1} (无匹配)',
  100. 'file2_value': '',
  101. 'description': f'文件1的表格{i+1}在文件2中无匹配表格',
  102. 'severity': 'medium'
  103. })
  104. for j in range(len(tables2)):
  105. if j not in matched_tables2:
  106. all_differences.append({
  107. 'type': 'table_unmatched',
  108. 'position': f'文件2表格{j+1}',
  109. 'file1_value': '',
  110. 'file2_value': f'表格{j+1} (无匹配)',
  111. 'description': f'文件2的表格{j+1}在文件1中无匹配表格',
  112. 'severity': 'medium'
  113. })
  114. elif tables1 and not tables2:
  115. all_differences.append({
  116. 'type': 'table_structure',
  117. 'position': '表格结构',
  118. 'file1_value': f'包含{len(tables1)}个表格',
  119. 'file2_value': '无表格',
  120. 'description': '文件1包含表格但文件2无表格',
  121. 'severity': 'high'
  122. })
  123. elif not tables1 and tables2:
  124. all_differences.append({
  125. 'type': 'table_structure',
  126. 'position': '表格结构',
  127. 'file1_value': '无表格',
  128. 'file2_value': f'包含{len(tables2)}个表格',
  129. 'description': '文件2包含表格但文件1无表格',
  130. 'severity': 'high'
  131. })
  132. # ✅ 根据表格匹配结果对齐段落块
  133. print(f"\n🔍 开始段落对比(基于表格位置对齐)...")
  134. paragraph_blocks1 = structured_content1['paragraph_blocks']
  135. paragraph_blocks2 = structured_content2['paragraph_blocks']
  136. # ✅ 构建段落块对应关系
  137. aligned_blocks = self._align_paragraph_blocks(
  138. paragraph_blocks1, paragraph_blocks2,
  139. tables1, tables2, table_matches
  140. )
  141. for block_pair in aligned_blocks:
  142. block1 = block_pair['block1']
  143. block2 = block_pair['block2']
  144. position_desc = block_pair['position']
  145. paragraphs1 = block1['paragraphs'] if block1 else []
  146. paragraphs2 = block2['paragraphs'] if block2 else []
  147. if not paragraphs1 and not paragraphs2:
  148. continue
  149. print(f" 📦 {position_desc}: 文件1有{len(paragraphs1)}个段落, 文件2有{len(paragraphs2)}个段落")
  150. # 每个段落块独立对比,指针重新初始化
  151. block_diffs = self.paragraph_comparator.compare_paragraphs(
  152. paragraphs1, paragraphs2
  153. )
  154. # 为每个差异添加段落块标识
  155. for diff in block_diffs:
  156. diff['paragraph_block'] = position_desc
  157. all_differences.extend(block_diffs)
  158. total_paragraph_diffs = len([d for d in all_differences if d['type'] == 'paragraph'])
  159. print(f"✅ 段落对比完成,共发现 {total_paragraph_diffs} 个差异")
  160. print(f"\n✅ 对比完成")
  161. # 统计差异
  162. stats = {
  163. 'total_differences': len(all_differences),
  164. 'table_differences': len([d for d in all_differences if d['type'].startswith('table')]),
  165. 'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']),
  166. 'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']),
  167. 'datetime_differences': len([d for d in all_differences if d['type'] == 'table_datetime']),
  168. 'text_differences': len([d for d in all_differences if d['type'] == 'table_text']),
  169. 'table_pre_header': len([d for d in all_differences if d['type'] == 'table_pre_header']),
  170. 'table_header_mismatch': len([d for d in all_differences if d['type'] == 'table_header_mismatch']),
  171. 'table_header_critical': len([d for d in all_differences if d['type'] == 'table_header_critical']),
  172. 'table_header_position': len([d for d in all_differences if d['type'] == 'table_header_position']),
  173. 'table_row_missing': len([d for d in all_differences if d['type'] == 'table_row_missing']),
  174. 'high_severity': len([d for d in all_differences if d.get('severity') in ['critical', 'high']]),
  175. 'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']),
  176. 'low_severity': len([d for d in all_differences if d.get('severity') == 'low'])
  177. }
  178. # ✅ 构建返回结果
  179. result = {
  180. 'differences': all_differences,
  181. 'statistics': stats,
  182. 'file1_tables': len(tables1),
  183. 'file2_tables': len(tables2),
  184. 'file1_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks1),
  185. 'file2_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks2),
  186. 'file1_path': file1_path,
  187. 'file2_path': file2_path,
  188. 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  189. }
  190. print(f"\n" + "="*60)
  191. print(f"📊 对比结果汇总")
  192. print(f"="*60)
  193. print(f"总差异数: {result['statistics']['total_differences']}")
  194. print(f" - 段落差异: {result['statistics']['paragraph_differences']}")
  195. print(f" - 表格差异: {result['statistics']['table_differences']}")
  196. print(f" - 金额: {result['statistics']['amount_differences']}")
  197. print(f" - 日期: {result['statistics']['datetime_differences']}")
  198. print(f" - 文本: {result['statistics']['text_differences']}")
  199. print(f"\n严重级别分布:")
  200. print(f" 🔴 高: {result['statistics']['high_severity']}")
  201. print(f" 🟡 中: {result['statistics']['medium_severity']}")
  202. print(f" 🟢 低: {result['statistics']['low_severity']}")
  203. print(f"="*60)
  204. return result
  205. def _align_paragraph_blocks(self, blocks1: List[Dict], blocks2: List[Dict],
  206. tables1: List[Dict], tables2: List[Dict],
  207. table_matches: List[Tuple[int, int, float]]) -> List[Dict]:
  208. """
  209. 根据表格位置对齐段落块
  210. Returns:
  211. [
  212. {'block1': dict, 'block2': dict, 'position': str},
  213. ...
  214. ]
  215. """
  216. aligned = []
  217. # 如果没有表格,直接对比所有段落块
  218. if not tables1 and not tables2:
  219. max_blocks = max(len(blocks1), len(blocks2))
  220. for i in range(max_blocks):
  221. aligned.append({
  222. 'block1': blocks1[i] if i < len(blocks1) else None,
  223. 'block2': blocks2[i] if i < len(blocks2) else None,
  224. 'position': f'段落块{i+1}'
  225. })
  226. return aligned
  227. # 构建表格索引映射
  228. table_map = {idx1: idx2 for idx1, idx2, _ in table_matches}
  229. # ✅ 策略:根据表格位置划分段落块
  230. # 1. 第一个表格前的段落块
  231. # 2. 每对匹配表格之间的段落块
  232. # 3. 最后一个表格后的段落块
  233. # 第一个表格前的段落块
  234. if blocks1 or blocks2:
  235. first_table_idx1 = min(table_map.keys()) if table_map else len(blocks1)
  236. first_table_idx2 = min(table_map.values()) if table_map else len(blocks2)
  237. # 找到第一个表格前的所有段落块
  238. pre_blocks1 = [b for b in blocks1 if b['end_pos'] <= (tables1[first_table_idx1]['start_pos'] if first_table_idx1 < len(tables1) else float('inf'))]
  239. pre_blocks2 = [b for b in blocks2 if b['end_pos'] <= (tables2[first_table_idx2]['start_pos'] if first_table_idx2 < len(tables2) else float('inf'))]
  240. if pre_blocks1 or pre_blocks2:
  241. # 合并所有表格前的段落
  242. merged_block1 = self._merge_paragraph_blocks(pre_blocks1) if pre_blocks1 else None
  243. merged_block2 = self._merge_paragraph_blocks(pre_blocks2) if pre_blocks2 else None
  244. aligned.append({
  245. 'block1': merged_block1,
  246. 'block2': merged_block2,
  247. 'position': '文档开头(表格前)'
  248. })
  249. # 每对匹配表格之间的段落块
  250. sorted_matches = sorted(table_matches, key=lambda x: x[0])
  251. for i, (idx1, idx2, _) in enumerate(sorted_matches):
  252. # 当前表格后、下一个表格前的段落块
  253. table1_end = tables1[idx1]['end_pos']
  254. table2_end = tables2[idx2]['end_pos']
  255. # 下一个表格的开始位置
  256. if i + 1 < len(sorted_matches):
  257. next_idx1 = sorted_matches[i + 1][0]
  258. next_idx2 = sorted_matches[i + 1][1]
  259. next_table1_start = tables1[next_idx1]['start_pos']
  260. next_table2_start = tables2[next_idx2]['start_pos']
  261. else:
  262. next_table1_start = float('inf')
  263. next_table2_start = float('inf')
  264. # 找到这个范围内的段落块
  265. between_blocks1 = [b for b in blocks1
  266. if b['start_pos'] >= table1_end and b['end_pos'] <= next_table1_start]
  267. between_blocks2 = [b for b in blocks2
  268. if b['start_pos'] >= table2_end and b['end_pos'] <= next_table2_start]
  269. if between_blocks1 or between_blocks2:
  270. merged_block1 = self._merge_paragraph_blocks(between_blocks1) if between_blocks1 else None
  271. merged_block2 = self._merge_paragraph_blocks(between_blocks2) if between_blocks2 else None
  272. aligned.append({
  273. 'block1': merged_block1,
  274. 'block2': merged_block2,
  275. 'position': f'表格{idx1+1}↔表格{idx2+1} 之后'
  276. })
  277. return aligned
  278. def _merge_paragraph_blocks(self, blocks: List[Dict]) -> Dict:
  279. """合并多个段落块为一个"""
  280. if not blocks:
  281. return None
  282. if len(blocks) == 1:
  283. return blocks[0]
  284. all_paragraphs = []
  285. for block in blocks:
  286. all_paragraphs.extend(block['paragraphs'])
  287. return {
  288. 'start_pos': blocks[0]['start_pos'],
  289. 'end_pos': blocks[-1]['end_pos'],
  290. 'paragraphs': all_paragraphs
  291. }