content_extractor.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. import re
  2. from typing import List, Dict
  3. from bs4 import BeautifulSoup
  4. try:
  5. from .text_processor import TextProcessor
  6. except ImportError:
  7. from text_processor import TextProcessor
  8. class ContentExtractor:
  9. """从Markdown中提取表格和段落"""
  10. def __init__(self):
  11. self.text_processor = TextProcessor()
  12. def _normalize_text(self, text: str) -> str:
  13. """标准化文本:去除多余空格、回车等无效字符"""
  14. if not text:
  15. return ""
  16. # 去除多余的空白字符
  17. text = re.sub(r'\s+', ' ', text.strip())
  18. # 去除标点符号周围的空格
  19. text = re.sub(r'\s*([,。:;!?、])\s*', r'\1', text)
  20. return text
  21. def _strip_html_comments(self, content: str) -> str:
  22. """移除 HTML/Markdown 注释块(含多行),不参与段落提取与对比。"""
  23. return re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
  24. def _is_image_reference(self, text: str) -> bool:
  25. """判断是否为图片引用或描述"""
  26. image_keywords = [
  27. '图', '图片', '图像', 'image', 'figure', 'fig',
  28. '照片', '截图', '示意图', '流程图', '结构图'
  29. ]
  30. # 检查是否包含图片相关关键词
  31. for keyword in image_keywords:
  32. if keyword in text.lower():
  33. return True
  34. # 检查是否为Markdown图片语法
  35. if re.search(r'!\[.*?\]\(.*?\)', text):
  36. return True
  37. # 检查是否为HTML图片标签
  38. if re.search(r'<img[^>]*>', text, re.IGNORECASE):
  39. return True
  40. return False
  41. def extract_structured_content(self, content: str) -> Dict:
  42. """
  43. 提取结构化内容,返回表格和段落块
  44. Returns:
  45. {
  46. 'tables': [
  47. {'start_pos': int, 'end_pos': int, 'data': List[List[str]]},
  48. ...
  49. ],
  50. 'paragraph_blocks': [
  51. {'start_pos': int, 'end_pos': int, 'paragraphs': List[str]},
  52. ...
  53. ]
  54. }
  55. """
  56. # 匹配一个可能带任意属性的 <table ...> 到对应的 </table> 区间
  57. table_pattern = r'<table\b[^>]*>.*?</table>'
  58. tables = []
  59. paragraph_blocks = []
  60. last_pos = 0
  61. for match in re.finditer(table_pattern, content, re.DOTALL | re.IGNORECASE):
  62. start_pos = match.start()
  63. end_pos = match.end()
  64. # 提取表格前的段落块
  65. if start_pos > last_pos:
  66. #[last_pos:start_pos) 左闭右开区间
  67. before_table_content = content[last_pos:start_pos]
  68. paragraphs = self.extract_paragraphs(before_table_content)
  69. if paragraphs:
  70. paragraph_blocks.append({
  71. 'start_pos': last_pos,
  72. 'end_pos': start_pos,
  73. 'paragraphs': paragraphs
  74. })
  75. # 提取表格数据
  76. table_html = match.group()
  77. table_data = self._parse_table_html(table_html)
  78. tables.append({
  79. 'start_pos': start_pos,
  80. 'end_pos': end_pos,
  81. 'data': table_data
  82. })
  83. last_pos = end_pos
  84. # 提取最后一个表格后的段落
  85. if last_pos < len(content):
  86. after_table_content = content[last_pos:]
  87. paragraphs = self.extract_paragraphs(after_table_content)
  88. if paragraphs:
  89. paragraph_blocks.append({
  90. 'start_pos': last_pos,
  91. 'end_pos': len(content),
  92. 'paragraphs': paragraphs
  93. })
  94. return {
  95. 'tables': tables,
  96. 'paragraph_blocks': paragraph_blocks
  97. }
  98. def extract_table_data(self, content: str) -> List[List[List[str]]]:
  99. """提取所有表格数据(保持原有接口兼容)"""
  100. structured = self.extract_structured_content(content)
  101. return [t['data'] for t in structured['tables']]
  102. def _parse_table_html(self, html: str) -> List[List[str]]:
  103. """
  104. 解析HTML表格为二维数组
  105. Args:
  106. html: HTML表格字符串
  107. Returns:
  108. 二维数组,每个元素为单元格文本
  109. """
  110. soup = BeautifulSoup(html, 'html.parser')
  111. table = soup.find('table')
  112. if not table:
  113. return []
  114. table_data = []
  115. rows = table.find_all('tr')
  116. for row in rows:
  117. cells = row.find_all(['td', 'th'])
  118. row_data = []
  119. for cell in cells:
  120. cell_text = self._normalize_text(cell.get_text())
  121. # 跳过图片内容
  122. if not self._is_image_reference(cell_text):
  123. row_data.append(cell_text)
  124. else:
  125. row_data.append("[图片内容-忽略]")
  126. if row_data: # 只添加非空行
  127. table_data.append(row_data)
  128. return table_data
  129. def merge_split_paragraphs(self, lines: List[str]) -> List[str]:
  130. # 合并连续的非空行作为一个段落,且过滤图片内容
  131. merged_lines = []
  132. current_paragraph = ""
  133. for i, line in enumerate(lines):
  134. # 跳过空行
  135. if not line:
  136. if current_paragraph:
  137. merged_lines.append(current_paragraph)
  138. current_paragraph = ""
  139. continue
  140. # 跳过图片内容
  141. if self._is_image_reference(line):
  142. continue
  143. # 检查是否是标题(以数字、中文数字或特殊标记开头)
  144. is_title = (
  145. line.startswith(('一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、')) or
  146. line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')) or
  147. line.startswith('#')
  148. )
  149. # 如果是标题,结束当前段落
  150. if is_title:
  151. if current_paragraph:
  152. merged_lines.append(current_paragraph)
  153. current_paragraph = ""
  154. merged_lines.append(line)
  155. else:
  156. # 检查是否应该与前一行合并 # 如果当前段落不为空,且当前段落最后一个字符非空白字符
  157. if current_paragraph and not current_paragraph.endswith((' ', '\t')):
  158. current_paragraph += line
  159. else:
  160. if current_paragraph:
  161. merged_lines.append(current_paragraph)
  162. current_paragraph = line
  163. # 处理最后一个段落
  164. if current_paragraph:
  165. merged_lines.append(current_paragraph)
  166. return merged_lines
  167. def extract_paragraphs(self, content: str) -> List[str]:
  168. """提取段落内容(HTML 注释、标准化说明元数据不参与对比)"""
  169. # 必须先去掉注释:多行 <!-- ... --> 无法用 <[^>]+> 或单行 .*? 一次清干净
  170. content_no_comments = self._strip_html_comments(content)
  171. content_no_html = re.sub(r'<[^>]+>', '', content_no_comments)
  172. paragraphs = []
  173. lines = content_no_html.split('\n')
  174. merged_lines = self.merge_split_paragraphs(lines)
  175. for line in merged_lines:
  176. normalized = self._normalize_text(line)
  177. if not normalized:
  178. continue
  179. paragraphs.append(normalized)
  180. return paragraphs