content_extractor.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. import re
  2. from typing import List, Dict
  3. from bs4 import BeautifulSoup
  4. try:
  5. from .text_processor import TextProcessor
  6. except ImportError:
  7. from text_processor import TextProcessor
  8. class ContentExtractor:
  9. """从Markdown中提取表格和段落"""
  10. def __init__(self):
  11. self.text_processor = TextProcessor()
  12. def _normalize_text(self, text: str) -> str:
  13. """标准化文本:去除多余空格、回车等无效字符"""
  14. if not text:
  15. return ""
  16. # 去除多余的空白字符
  17. text = re.sub(r'\s+', ' ', text.strip())
  18. # 去除标点符号周围的空格
  19. text = re.sub(r'\s*([,。:;!?、])\s*', r'\1', text)
  20. return text
  21. def _is_image_reference(self, text: str) -> bool:
  22. """判断是否为图片引用或描述"""
  23. image_keywords = [
  24. '图', '图片', '图像', 'image', 'figure', 'fig',
  25. '照片', '截图', '示意图', '流程图', '结构图'
  26. ]
  27. # 检查是否包含图片相关关键词
  28. for keyword in image_keywords:
  29. if keyword in text.lower():
  30. return True
  31. # 检查是否为Markdown图片语法
  32. if re.search(r'!\[.*?\]\(.*?\)', text):
  33. return True
  34. # 检查是否为HTML图片标签
  35. if re.search(r'<img[^>]*>', text, re.IGNORECASE):
  36. return True
  37. return False
  38. def extract_structured_content(self, content: str) -> Dict:
  39. """
  40. 提取结构化内容,返回表格和段落块
  41. Returns:
  42. {
  43. 'tables': [
  44. {'start_pos': int, 'end_pos': int, 'data': List[List[str]]},
  45. ...
  46. ],
  47. 'paragraph_blocks': [
  48. {'start_pos': int, 'end_pos': int, 'paragraphs': List[str]},
  49. ...
  50. ]
  51. }
  52. """
  53. # 查找所有表格的位置
  54. table_pattern = r'<table>.*?</table>'
  55. tables = []
  56. paragraph_blocks = []
  57. last_pos = 0
  58. for match in re.finditer(table_pattern, content, re.DOTALL):
  59. start_pos = match.start()
  60. end_pos = match.end()
  61. # 提取表格前的段落块
  62. if start_pos > last_pos:
  63. #[last_pos:start_pos) 左闭右开区间
  64. before_table_content = content[last_pos:start_pos]
  65. paragraphs = self.extract_paragraphs(before_table_content)
  66. if paragraphs:
  67. paragraph_blocks.append({
  68. 'start_pos': last_pos,
  69. 'end_pos': start_pos,
  70. 'paragraphs': paragraphs
  71. })
  72. # 提取表格数据
  73. table_html = match.group()
  74. table_data = self._parse_table_html(table_html)
  75. tables.append({
  76. 'start_pos': start_pos,
  77. 'end_pos': end_pos,
  78. 'data': table_data
  79. })
  80. last_pos = end_pos
  81. # 提取最后一个表格后的段落
  82. if last_pos < len(content):
  83. after_table_content = content[last_pos:]
  84. paragraphs = self.extract_paragraphs(after_table_content)
  85. if paragraphs:
  86. paragraph_blocks.append({
  87. 'start_pos': last_pos,
  88. 'end_pos': len(content),
  89. 'paragraphs': paragraphs
  90. })
  91. return {
  92. 'tables': tables,
  93. 'paragraph_blocks': paragraph_blocks
  94. }
  95. def extract_table_data(self, content: str) -> List[List[List[str]]]:
  96. """提取所有表格数据(保持原有接口兼容)"""
  97. structured = self.extract_structured_content(content)
  98. return [t['data'] for t in structured['tables']]
  99. def _parse_table_html(self, html: str) -> List[List[str]]:
  100. """
  101. 解析HTML表格为二维数组
  102. Args:
  103. html: HTML表格字符串
  104. Returns:
  105. 二维数组,每个元素为单元格文本
  106. """
  107. soup = BeautifulSoup(html, 'html.parser')
  108. table = soup.find('table')
  109. if not table:
  110. return []
  111. table_data = []
  112. rows = table.find_all('tr')
  113. for row in rows:
  114. cells = row.find_all(['td', 'th'])
  115. row_data = []
  116. for cell in cells:
  117. cell_text = self._normalize_text(cell.get_text())
  118. # 跳过图片内容
  119. if not self._is_image_reference(cell_text):
  120. row_data.append(cell_text)
  121. else:
  122. row_data.append("[图片内容-忽略]")
  123. if row_data: # 只添加非空行
  124. table_data.append(row_data)
  125. return table_data
  126. def merge_split_paragraphs(self, lines: List[str]) -> List[str]:
  127. # 合并连续的非空行作为一个段落,且过滤图片内容
  128. merged_lines = []
  129. current_paragraph = ""
  130. for i, line in enumerate(lines):
  131. # 跳过空行
  132. if not line:
  133. if current_paragraph:
  134. merged_lines.append(current_paragraph)
  135. current_paragraph = ""
  136. continue
  137. # 跳过图片内容
  138. if self._is_image_reference(line):
  139. continue
  140. # 检查是否是标题(以数字、中文数字或特殊标记开头)
  141. is_title = (
  142. line.startswith(('一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、')) or
  143. line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')) or
  144. line.startswith('#')
  145. )
  146. # 如果是标题,结束当前段落
  147. if is_title:
  148. if current_paragraph:
  149. merged_lines.append(current_paragraph)
  150. current_paragraph = ""
  151. merged_lines.append(line)
  152. else:
  153. # 检查是否应该与前一行合并 # 如果当前段落不为空,且当前段落最后一个字符非空白字符
  154. if current_paragraph and not current_paragraph.endswith((' ', '\t')):
  155. current_paragraph += line
  156. else:
  157. if current_paragraph:
  158. merged_lines.append(current_paragraph)
  159. current_paragraph = line
  160. # 处理最后一个段落
  161. if current_paragraph:
  162. merged_lines.append(current_paragraph)
  163. return merged_lines
  164. def extract_paragraphs(self, content: str) -> List[str]:
  165. """提取段落内容"""
  166. # 移除HTML标签
  167. content_no_html = re.sub(r'<[^>]+>', '', content)
  168. # 移除bbox注释
  169. content_no_bbox = re.sub(r'<!--.*?-->', '', content_no_html)
  170. # 按换行符分割
  171. paragraphs = []
  172. lines = content_no_bbox.split('\n')
  173. merged_lines = self.merge_split_paragraphs(lines)
  174. for line in merged_lines:
  175. normalized = self._normalize_text(line)
  176. if normalized:
  177. paragraphs.append(normalized)
  178. else:
  179. print(f"跳过的内容无效或图片段落: {line[0:30] if line else ''}...")
  180. return paragraphs