ocr_comparator.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. import os
  2. from typing import Dict, List, Tuple
  3. from datetime import datetime
  4. try:
  5. from .content_extractor import ContentExtractor
  6. from .table_comparator import TableComparator
  7. from .paragraph_comparator import ParagraphComparator
  8. except ImportError:
  9. from content_extractor import ContentExtractor
  10. from table_comparator import TableComparator
  11. from paragraph_comparator import ParagraphComparator
  12. class OCRResultComparator:
  13. """OCR结果比较器主类"""
  14. def __init__(self):
  15. self.content_extractor = ContentExtractor()
  16. self.table_comparator = TableComparator()
  17. self.paragraph_comparator = ParagraphComparator()
  18. self.differences = []
  19. self.paragraph_match_threshold = 80
  20. self.content_similarity_threshold = 95
  21. self.max_paragraph_window = 6
  22. self.table_comparison_mode = 'standard'
  23. self.header_similarity_threshold = 90
  24. def compare_files(self, file1_path: str, file2_path: str) -> Dict:
  25. """比较两个OCR结果文件"""
  26. print(f"\n📖 读取文件...")
  27. # 读取文件内容
  28. with open(file1_path, 'r', encoding='utf-8') as f:
  29. content1 = f.read()
  30. with open(file2_path, 'r', encoding='utf-8') as f:
  31. content2 = f.read()
  32. print(f"✅ 文件读取完成")
  33. print(f" 文件1大小: {len(content1)} 字符")
  34. print(f" 文件2大小: {len(content2)} 字符")
  35. # ✅ 提取结构化内容(包含位置信息)
  36. print(f"\n📊 提取结构化内容...")
  37. structured_content1 = self.content_extractor.extract_structured_content(content1)
  38. structured_content2 = self.content_extractor.extract_structured_content(content2)
  39. print(f" 文件1: {len(structured_content1['tables'])}个表格, {len(structured_content1['paragraph_blocks'])}个段落块")
  40. print(f" 文件2: {len(structured_content2['tables'])}个表格, {len(structured_content2['paragraph_blocks'])}个段落块")
  41. # 初始化差异列表
  42. all_differences = []
  43. # ✅ 智能表格匹配与比较
  44. print(f"\n🔍 开始表格智能匹配...")
  45. tables1 = structured_content1['tables']
  46. tables2 = structured_content2['tables']
  47. # 记录匹配的表格对
  48. table_matches = []
  49. if tables1 and tables2:
  50. # 找到匹配的表格对
  51. table_matches = self.table_comparator.find_matching_tables(
  52. [t['data'] for t in tables1],
  53. [t['data'] for t in tables2]
  54. )
  55. if not table_matches:
  56. print(f" ⚠️ 未找到匹配的表格")
  57. all_differences.append({
  58. 'type': 'table_structure',
  59. 'position': '表格匹配',
  60. 'file1_value': f'{len(tables1)}个表格',
  61. 'file2_value': f'{len(tables2)}个表格',
  62. 'description': '未找到可匹配的表格',
  63. 'severity': 'high'
  64. })
  65. else:
  66. # 比较每对匹配的表格
  67. for idx1, idx2, similarity in table_matches:
  68. print(f"\n 📋 对比匹配的表格: 表格{idx1+1} vs 表格{idx2+1}")
  69. if self.table_comparison_mode == 'flow_list':
  70. table_diffs = self.table_comparator.compare_table_flow_list(
  71. tables1[idx1]['data'], tables2[idx2]['data']
  72. )
  73. else:
  74. table_diffs = self.table_comparator.compare_tables(
  75. tables1[idx1]['data'], tables2[idx2]['data']
  76. )
  77. # 为每个差异添加表格标识
  78. for diff in table_diffs:
  79. diff['table_pair'] = f'表格{idx1+1}↔表格{idx2+1}'
  80. diff['table_similarity'] = similarity
  81. all_differences.extend(table_diffs)
  82. print(f" 发现 {len(table_diffs)} 个差异")
  83. # 检查未匹配的表格
  84. matched_tables1 = {m[0] for m in table_matches}
  85. matched_tables2 = {m[1] for m in table_matches}
  86. for i in range(len(tables1)):
  87. if i not in matched_tables1:
  88. all_differences.append({
  89. 'type': 'table_unmatched',
  90. 'position': f'文件1表格{i+1}',
  91. 'file1_value': f'表格{i+1} (无匹配)',
  92. 'file2_value': '',
  93. 'description': f'文件1的表格{i+1}在文件2中无匹配表格',
  94. 'severity': 'medium'
  95. })
  96. for j in range(len(tables2)):
  97. if j not in matched_tables2:
  98. all_differences.append({
  99. 'type': 'table_unmatched',
  100. 'position': f'文件2表格{j+1}',
  101. 'file1_value': '',
  102. 'file2_value': f'表格{j+1} (无匹配)',
  103. 'description': f'文件2的表格{j+1}在文件1中无匹配表格',
  104. 'severity': 'medium'
  105. })
  106. elif tables1 and not tables2:
  107. all_differences.append({
  108. 'type': 'table_structure',
  109. 'position': '表格结构',
  110. 'file1_value': f'包含{len(tables1)}个表格',
  111. 'file2_value': '无表格',
  112. 'description': '文件1包含表格但文件2无表格',
  113. 'severity': 'high'
  114. })
  115. elif not tables1 and tables2:
  116. all_differences.append({
  117. 'type': 'table_structure',
  118. 'position': '表格结构',
  119. 'file1_value': '无表格',
  120. 'file2_value': f'包含{len(tables2)}个表格',
  121. 'description': '文件2包含表格但文件1无表格',
  122. 'severity': 'high'
  123. })
  124. # ✅ 根据表格匹配结果对齐段落块
  125. print(f"\n🔍 开始段落对比(基于表格位置对齐)...")
  126. paragraph_blocks1 = structured_content1['paragraph_blocks']
  127. paragraph_blocks2 = structured_content2['paragraph_blocks']
  128. # ✅ 构建段落块对应关系
  129. aligned_blocks = self._align_paragraph_blocks(
  130. paragraph_blocks1, paragraph_blocks2,
  131. tables1, tables2, table_matches
  132. )
  133. for block_pair in aligned_blocks:
  134. block1 = block_pair['block1']
  135. block2 = block_pair['block2']
  136. position_desc = block_pair['position']
  137. paragraphs1 = block1['paragraphs'] if block1 else []
  138. paragraphs2 = block2['paragraphs'] if block2 else []
  139. if not paragraphs1 and not paragraphs2:
  140. continue
  141. print(f" 📦 {position_desc}: 文件1有{len(paragraphs1)}个段落, 文件2有{len(paragraphs2)}个段落")
  142. # 每个段落块独立对比,指针重新初始化
  143. block_diffs = self.paragraph_comparator.compare_paragraphs(
  144. paragraphs1, paragraphs2
  145. )
  146. # 为每个差异添加段落块标识
  147. for diff in block_diffs:
  148. diff['paragraph_block'] = position_desc
  149. all_differences.extend(block_diffs)
  150. total_paragraph_diffs = len([d for d in all_differences if d['type'] == 'paragraph'])
  151. print(f"✅ 段落对比完成,共发现 {total_paragraph_diffs} 个差异")
  152. print(f"\n✅ 对比完成")
  153. # 统计差异
  154. stats = {
  155. 'total_differences': len(all_differences),
  156. 'table_differences': len([d for d in all_differences if d['type'].startswith('table')]),
  157. 'paragraph_differences': len([d for d in all_differences if d['type'] == 'paragraph']),
  158. 'amount_differences': len([d for d in all_differences if d['type'] == 'table_amount']),
  159. 'datetime_differences': len([d for d in all_differences if d['type'] == 'table_datetime']),
  160. 'text_differences': len([d for d in all_differences if d['type'] == 'table_text']),
  161. 'table_pre_header': len([d for d in all_differences if d['type'] == 'table_pre_header']),
  162. 'table_header_mismatch': len([d for d in all_differences if d['type'] == 'table_header_mismatch']),
  163. 'table_header_critical': len([d for d in all_differences if d['type'] == 'table_header_critical']),
  164. 'table_header_position': len([d for d in all_differences if d['type'] == 'table_header_position']),
  165. 'table_row_missing': len([d for d in all_differences if d['type'] == 'table_row_missing']),
  166. 'high_severity': len([d for d in all_differences if d.get('severity') in ['critical', 'high']]),
  167. 'medium_severity': len([d for d in all_differences if d.get('severity') == 'medium']),
  168. 'low_severity': len([d for d in all_differences if d.get('severity') == 'low'])
  169. }
  170. # ✅ 构建返回结果
  171. result = {
  172. 'differences': all_differences,
  173. 'statistics': stats,
  174. 'file1_tables': len(tables1),
  175. 'file2_tables': len(tables2),
  176. 'file1_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks1),
  177. 'file2_paragraphs': sum(len(b['paragraphs']) for b in paragraph_blocks2),
  178. 'file1_path': file1_path,
  179. 'file2_path': file2_path,
  180. 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  181. }
  182. print(f"\n" + "="*60)
  183. print(f"📊 对比结果汇总")
  184. print(f"="*60)
  185. print(f"总差异数: {result['statistics']['total_differences']}")
  186. print(f" - 段落差异: {result['statistics']['paragraph_differences']}")
  187. print(f" - 表格差异: {result['statistics']['table_differences']}")
  188. print(f" - 金额: {result['statistics']['amount_differences']}")
  189. print(f" - 日期: {result['statistics']['datetime_differences']}")
  190. print(f" - 文本: {result['statistics']['text_differences']}")
  191. print(f"\n严重级别分布:")
  192. print(f" 🔴 高: {result['statistics']['high_severity']}")
  193. print(f" 🟡 中: {result['statistics']['medium_severity']}")
  194. print(f" 🟢 低: {result['statistics']['low_severity']}")
  195. print(f"="*60)
  196. return result
  197. def _align_paragraph_blocks(self, blocks1: List[Dict], blocks2: List[Dict],
  198. tables1: List[Dict], tables2: List[Dict],
  199. table_matches: List[Tuple[int, int, float]]) -> List[Dict]:
  200. """
  201. 根据表格位置对齐段落块
  202. Returns:
  203. [
  204. {'block1': dict, 'block2': dict, 'position': str},
  205. ...
  206. ]
  207. """
  208. aligned = []
  209. # 如果没有表格,直接对比所有段落块
  210. if not tables1 and not tables2:
  211. max_blocks = max(len(blocks1), len(blocks2))
  212. for i in range(max_blocks):
  213. aligned.append({
  214. 'block1': blocks1[i] if i < len(blocks1) else None,
  215. 'block2': blocks2[i] if i < len(blocks2) else None,
  216. 'position': f'段落块{i+1}'
  217. })
  218. return aligned
  219. # 构建表格索引映射
  220. table_map = {idx1: idx2 for idx1, idx2, _ in table_matches}
  221. # ✅ 策略:根据表格位置划分段落块
  222. # 1. 第一个表格前的段落块
  223. # 2. 每对匹配表格之间的段落块
  224. # 3. 最后一个表格后的段落块
  225. # 第一个表格前的段落块
  226. if blocks1 or blocks2:
  227. first_table_idx1 = min(table_map.keys()) if table_map else len(blocks1)
  228. first_table_idx2 = min(table_map.values()) if table_map else len(blocks2)
  229. # 找到第一个表格前的所有段落块
  230. pre_blocks1 = [b for b in blocks1 if b['end_pos'] <= (tables1[first_table_idx1]['start_pos'] if first_table_idx1 < len(tables1) else float('inf'))]
  231. pre_blocks2 = [b for b in blocks2 if b['end_pos'] <= (tables2[first_table_idx2]['start_pos'] if first_table_idx2 < len(tables2) else float('inf'))]
  232. if pre_blocks1 or pre_blocks2:
  233. # 合并所有表格前的段落
  234. merged_block1 = self._merge_paragraph_blocks(pre_blocks1) if pre_blocks1 else None
  235. merged_block2 = self._merge_paragraph_blocks(pre_blocks2) if pre_blocks2 else None
  236. aligned.append({
  237. 'block1': merged_block1,
  238. 'block2': merged_block2,
  239. 'position': '文档开头(表格前)'
  240. })
  241. # 每对匹配表格之间的段落块
  242. sorted_matches = sorted(table_matches, key=lambda x: x[0])
  243. for i, (idx1, idx2, _) in enumerate(sorted_matches):
  244. # 当前表格后、下一个表格前的段落块
  245. table1_end = tables1[idx1]['end_pos']
  246. table2_end = tables2[idx2]['end_pos']
  247. # 下一个表格的开始位置
  248. if i + 1 < len(sorted_matches):
  249. next_idx1 = sorted_matches[i + 1][0]
  250. next_idx2 = sorted_matches[i + 1][1]
  251. next_table1_start = tables1[next_idx1]['start_pos']
  252. next_table2_start = tables2[next_idx2]['start_pos']
  253. else:
  254. next_table1_start = float('inf')
  255. next_table2_start = float('inf')
  256. # 找到这个范围内的段落块
  257. between_blocks1 = [b for b in blocks1
  258. if b['start_pos'] >= table1_end and b['end_pos'] <= next_table1_start]
  259. between_blocks2 = [b for b in blocks2
  260. if b['start_pos'] >= table2_end and b['end_pos'] <= next_table2_start]
  261. if between_blocks1 or between_blocks2:
  262. merged_block1 = self._merge_paragraph_blocks(between_blocks1) if between_blocks1 else None
  263. merged_block2 = self._merge_paragraph_blocks(between_blocks2) if between_blocks2 else None
  264. aligned.append({
  265. 'block1': merged_block1,
  266. 'block2': merged_block2,
  267. 'position': f'表格{idx1+1}↔表格{idx2+1} 之后'
  268. })
  269. return aligned
  270. def _merge_paragraph_blocks(self, blocks: List[Dict]) -> Dict:
  271. """合并多个段落块为一个"""
  272. if not blocks:
  273. return None
  274. if len(blocks) == 1:
  275. return blocks[0]
  276. all_paragraphs = []
  277. for block in blocks:
  278. all_paragraphs.extend(block['paragraphs'])
  279. return {
  280. 'start_pos': blocks[0]['start_pos'],
  281. 'end_pos': blocks[-1]['end_pos'],
  282. 'paragraphs': all_paragraphs
  283. }