Sfoglia il codice sorgente

feat: 重构内容提取逻辑,增强文本标准化和段落提取功能

zhch158_admin 4 settimane fa
parent
commit
be7c23a036
1 ha cambiato i file con 163 aggiunte e 45 eliminazioni
  1. 163 45
      comparator/content_extractor.py

+ 163 - 45
comparator/content_extractor.py

@@ -1,7 +1,7 @@
 import re
-from typing import List
+from typing import List, Dict
 from bs4 import BeautifulSoup
-# ✅ 兼容相对导入和绝对导入
+
 try:
     from .text_processor import TextProcessor
 except ImportError:
@@ -14,85 +14,203 @@ class ContentExtractor:
     def __init__(self):
         self.text_processor = TextProcessor()
     
-    def extract_table_data(self, md_content: str) -> List[List[List[str]]]:
-        """从Markdown中提取表格数据"""
+    def _normalize_text(self, text: str) -> str:
+        """标准化文本:去除多余空格、回车等无效字符"""
+        if not text:
+            return ""
+        # 去除多余的空白字符
+        text = re.sub(r'\s+', ' ', text.strip())
+        # 去除标点符号周围的空格
+        text = re.sub(r'\s*([,。:;!?、])\s*', r'\1', text)
+        return text
+    
+    def _is_image_reference(self, text: str) -> bool:
+        """判断是否为图片引用或描述"""
+        image_keywords = [
+            '图', '图片', '图像', 'image', 'figure', 'fig',
+            '照片', '截图', '示意图', '流程图', '结构图'
+        ]
+        # 检查是否包含图片相关关键词
+        for keyword in image_keywords:
+            if keyword in text.lower():
+                return True
+        
+        # 检查是否为Markdown图片语法
+        if re.search(r'!\[.*?\]\(.*?\)', text):
+            return True
+            
+        # 检查是否为HTML图片标签
+        if re.search(r'<img[^>]*>', text, re.IGNORECASE):
+            return True
+            
+        return False
+
+    def extract_structured_content(self, content: str) -> Dict:
+        """
+        提取结构化内容,返回表格和段落块
+        
+        Returns:
+            {
+                'tables': [
+                    {'start_pos': int, 'end_pos': int, 'data': List[List[str]]},
+                    ...
+                ],
+                'paragraph_blocks': [
+                    {'start_pos': int, 'end_pos': int, 'paragraphs': List[str]},
+                    ...
+                ]
+            }
+        """
+        # 查找所有表格的位置
+        table_pattern = r'<table>.*?</table>'
         tables = []
+        paragraph_blocks = []
         
-        soup = BeautifulSoup(md_content, 'html.parser')
-        html_tables = soup.find_all('table')
+        last_pos = 0
         
-        for table in html_tables:
-            table_data = []
-            rows = table.find_all('tr')
+        for match in re.finditer(table_pattern, content, re.DOTALL):
+            start_pos = match.start()
+            end_pos = match.end()
+            
+            # 提取表格前的段落块
+            if start_pos > last_pos:
+                #[last_pos:start_pos) 左闭右开区间
+                before_table_content = content[last_pos:start_pos]
+                paragraphs = self.extract_paragraphs(before_table_content)
+                if paragraphs:
+                    paragraph_blocks.append({
+                        'start_pos': last_pos,
+                        'end_pos': start_pos,
+                        'paragraphs': paragraphs
+                    })
             
-            for row in rows:
-                cells = row.find_all(['td', 'th'])
-                row_data = []
-                for cell in cells:
-                    cell_text = self.text_processor.normalize_text(cell.get_text())
-                    if not self.text_processor.is_image_reference(cell_text):
-                        row_data.append(cell_text)
-                    else:
-                        row_data.append("[图片内容-忽略]")
-                        
-                if row_data:
-                    table_data.append(row_data)
+            # 提取表格数据
+            table_html = match.group()
+            table_data = self._parse_table_html(table_html)
+            tables.append({
+                'start_pos': start_pos,
+                'end_pos': end_pos,
+                'data': table_data
+            })
             
-            if table_data:
-                tables.append(table_data)
+            last_pos = end_pos
         
-        return tables
+        # 提取最后一个表格后的段落
+        if last_pos < len(content):
+            after_table_content = content[last_pos:]
+            paragraphs = self.extract_paragraphs(after_table_content)
+            if paragraphs:
+                paragraph_blocks.append({
+                    'start_pos': last_pos,
+                    'end_pos': len(content),
+                    'paragraphs': paragraphs
+                })
+        
+        return {
+            'tables': tables,
+            'paragraph_blocks': paragraph_blocks
+        }
+    
+    def extract_table_data(self, content: str) -> List[List[List[str]]]:
+        """提取所有表格数据(保持原有接口兼容)"""
+        structured = self.extract_structured_content(content)
+        return [t['data'] for t in structured['tables']]
     
-    def extract_paragraphs(self, md_content: str) -> List[str]:
-        """提取段落文本"""
-        content = re.sub(r'<table[^>]*>.*?</table>', '', md_content, flags=re.DOTALL | re.IGNORECASE)
-        content = re.sub(r'<[^>]+>', '', content)
-        content = re.sub(r'<!--.*?-->', '', content, flags=re.DOTALL)
+    def _parse_table_html(self, html: str) -> List[List[str]]:
+        """
+        解析HTML表格为二维数组
         
-        paragraphs = []
-        lines = content.split('\n')
-        merged_lines = self._merge_split_paragraphs(lines)
+        Args:
+            html: HTML表格字符串
         
-        for line in merged_lines:
-            normalized = self.text_processor.normalize_text(line)
-            if normalized:
-                paragraphs.append(normalized)
+        Returns:
+            二维数组,每个元素为单元格文本
+        """
+        soup = BeautifulSoup(html, 'html.parser')
+        table = soup.find('table')
+        
+        if not table:
+            return []
+        
+        table_data = []
+        rows = table.find_all('tr')
         
-        return paragraphs
+        for row in rows:
+            cells = row.find_all(['td', 'th'])
+            row_data = []
+            for cell in cells:
+                cell_text = self._normalize_text(cell.get_text())
+                # 跳过图片内容
+                if not self._is_image_reference(cell_text):
+                    row_data.append(cell_text)
+                else:
+                    row_data.append("[图片内容-忽略]")
+                    
+            if row_data:  # 只添加非空行
+                table_data.append(row_data)
+        
+        return table_data
     
-    def _merge_split_paragraphs(self, lines: List[str]) -> List[str]:
-        """合并连续的非空行作为一个段落"""
+    def merge_split_paragraphs(self, lines: List[str]) -> List[str]:
+        # 合并连续的非空行作为一个段落,且过滤图片内容
         merged_lines = []
         current_paragraph = ""
-        
-        for line in lines:
+        for i, line in enumerate(lines):
+            # 跳过空行
             if not line:
                 if current_paragraph:
                     merged_lines.append(current_paragraph)
                     current_paragraph = ""
                 continue
-            
-            if self.text_processor.is_image_reference(line):
+            # 跳过图片内容
+            if self._is_image_reference(line):
                 continue
 
+            # 检查是否是标题(以数字、中文数字或特殊标记开头)
             is_title = (
                 line.startswith(('一、', '二、', '三、', '四、', '五、', '六、', '七、', '八、', '九、', '十、')) or
                 line.startswith(('1.', '2.', '3.', '4.', '5.', '6.', '7.', '8.', '9.')) or
                 line.startswith('#')
             )
-            
+                        # 如果是标题,结束当前段落
             if is_title:
                 if current_paragraph:
                     merged_lines.append(current_paragraph)
                     current_paragraph = ""
                 merged_lines.append(line)
             else:
+                # 检查是否应该与前一行合并 # 如果当前段落不为空,且当前段落最后一个字符非空白字符
                 if current_paragraph and not current_paragraph.endswith((' ', '\t')):
                     current_paragraph += line
                 else:
+                    if current_paragraph:
+                        merged_lines.append(current_paragraph)
                     current_paragraph = line
         
+        # 处理最后一个段落
         if current_paragraph:
             merged_lines.append(current_paragraph)
         
-        return merged_lines
+        return merged_lines
+    
+    def extract_paragraphs(self, content: str) -> List[str]:
+        """提取段落内容"""
+        # 移除HTML标签
+        content_no_html = re.sub(r'<[^>]+>', '', content)
+        
+        # 移除bbox注释
+        content_no_bbox = re.sub(r'<!--.*?-->', '', content_no_html)
+        
+        # 按换行符分割
+        paragraphs = []
+        lines = content_no_bbox.split('\n')
+        merged_lines = self.merge_split_paragraphs(lines)
+        
+        for line in merged_lines:
+            normalized = self._normalize_text(line)
+            if normalized:
+                paragraphs.append(normalized)
+            else:
+                print(f"跳过的内容无效或图片段落: {line[0:30] if line else ''}...")        
+
+        return paragraphs