""" OCR验证工具的工具函数模块 包含数据处理、图像处理、统计分析等功能 """ import json import pandas as pd import numpy as np from pathlib import Path from PIL import Image, ImageDraw from typing import Dict, List, Optional, Tuple, Union from io import StringIO, BytesIO import re from html import unescape import yaml import base64 from urllib.parse import urlparse import cv2 import os def load_config(config_path: str = "config.yaml") -> Dict: """加载配置文件""" try: with open(config_path, 'r', encoding='utf-8') as f: return yaml.safe_load(f) except Exception as e: # 返回默认配置 return get_default_config() def get_default_config() -> Dict: """获取默认配置 - 与config.yaml保持一致""" return { 'styles': { 'font_size': 8, # 修改:从字典改为单个数值 'colors': { 'primary': '#0288d1', 'secondary': '#ff9800', 'success': '#4caf50', 'error': '#f44336', 'warning': '#ff9800', 'background': '#fafafa', 'text': '#333333' }, 'layout': { 'default_zoom': 1.0, 'default_height': 800, # 修改:从600改为800 'sidebar_width': 1, # 修改:从0.3改为1 'content_width': 0.7 } }, 'ui': { 'page_title': 'OCR可视化校验工具', 'page_icon': '🔍', 'layout': 'wide', 'sidebar_state': 'expanded' # 移除:default_font_size和default_layout }, 'paths': { # 修改:使用config.yaml中的实际路径 'ocr_out_dir': '/Users/zhch158/workspace/data/至远彩色印刷工业有限公司/data_DotsOCR_Results', 'src_img_dir': '/Users/zhch158/workspace/data/至远彩色印刷工业有限公司/data_PPStructureV3_Results/2023年度报告母公司', 'supported_image_formats': ['.png', '.jpg', '.jpeg'] }, 'ocr': { 'min_text_length': 2, 'default_confidence': 1.0, 'exclude_texts': ['Picture', ''], # 新增:图片方向检测配置 'orientation_detection': { 'enabled': True, 'confidence_threshold': 0.3, 'methods': ['opencv_analysis'], 'cache_results': True }, 'tools': { 'dots_ocr': { 'name': 'Dots OCR', 'description': '专业VLM OCR', # 新增描述 'json_structure': 'array', 'text_field': 'text', 'bbox_field': 'bbox', 'category_field': 'category', 'confidence_field': 'confidence', # 新增置信度字段 # 新增:旋转处理配置 'rotation': { 'coordinates_are_pre_rotated': False } }, 'ppstructv3': { 'name': 'PPStructV3', 'description': 'PaddleOCR PP-StructureV3', # 新增描述 'json_structure': 'object', 'parsing_results_field': 'parsing_res_list', 'text_field': 'block_content', 'bbox_field': 'block_bbox', 'category_field': 'block_label', 'confidence_field': 'confidence', # 新增置信度字段 # 新增:旋转处理配置 'rotation': { 'coordinates_are_pre_rotated': True } } }, 'auto_detection': { 'enabled': True, 'rules': [ { 'field_exists': 'parsing_res_list', 'tool_type': 'ppstructv3' }, { 'json_is_array': True, 'tool_type': 'dots_ocr' } ] } } } def load_css_styles(css_path: str = "styles.css") -> str: """加载CSS样式文件""" try: with open(css_path, 'r', encoding='utf-8') as f: return f.read() except Exception: # 返回基本样式 return """ .main > div { background-color: white !important; color: #333333 !important; } .stApp { background-color: white !important; } .block-container { background-color: white !important; color: #333333 !important; } """ def rotate_image_and_coordinates( image: Image.Image, angle: float, coordinates_list: List[List[int]], rotate_coordinates: bool = True ) -> Tuple[Image.Image, List[List[int]]]: """ 根据角度旋转图像和坐标 - 修正版本 Args: image: 原始图像 angle: 旋转角度(度数) coordinates_list: 坐标列表,每个坐标为[x1, y1, x2, y2]格式 rotate_coordinates: 是否需要旋转坐标(针对不同OCR工具的处理方式) Returns: rotated_image: 旋转后的图像 rotated_coordinates: 处理后的坐标列表 """ if angle == 0: return image, coordinates_list # 标准化旋转角度 if angle == 270: rotation_angle = -90 # 顺时针90度 elif angle == 90: rotation_angle = 90 # 逆时针90度 elif angle == 180: rotation_angle = 180 # 180度 else: rotation_angle = angle # 旋转图像 rotated_image = image.rotate(rotation_angle, expand=True) # 如果不需要旋转坐标,直接返回原坐标 if not rotate_coordinates: return rotated_image, coordinates_list # 获取原始和旋转后的图像尺寸 orig_width, orig_height = image.size new_width, new_height = rotated_image.size # 计算旋转后的坐标 rotated_coordinates = [] for coord in coordinates_list: if len(coord) < 4: rotated_coordinates.append(coord) continue x1, y1, x2, y2 = coord[:4] # 验证原始坐标是否有效 if x1 < 0 or y1 < 0 or x2 <= x1 or y2 <= y1: print(f"警告: 无效坐标 {coord}") rotated_coordinates.append([0, 0, 50, 50]) # 使用默认坐标 continue # 根据旋转角度变换坐标 if rotation_angle == -90: # 顺时针90度 (270度逆时针) # 变换公式: (x, y) -> (orig_height - y, x) new_x1 = orig_height - y2 # 这里是y2 new_y1 = x1 new_x2 = orig_height - y1 # 这里是y1 new_y2 = x2 elif rotation_angle == 90: # 逆时针90度 # 变换公式: (x, y) -> (y, orig_width - x) new_x1 = y1 new_y1 = orig_width - x2 # 这里是x2 new_x2 = y2 new_y2 = orig_width - x1 # 这里是x1 elif rotation_angle == 180: # 180度 # 变换公式: (x, y) -> (orig_width - x, orig_height - y) new_x1 = orig_width - x2 new_y1 = orig_height - y2 new_x2 = orig_width - x1 new_y2 = orig_height - y1 else: # 任意角度算法 - 修正版本 # 将角度转换为弧度 angle_rad = np.radians(rotation_angle) cos_angle = np.cos(angle_rad) sin_angle = np.sin(angle_rad) # 原图像中心点 orig_center_x = orig_width / 2 orig_center_y = orig_height / 2 # 旋转后图像中心点 new_center_x = new_width / 2 new_center_y = new_height / 2 # 将bbox的四个角点转换为相对于原图像中心的坐标 corners = [ (x1 - orig_center_x, y1 - orig_center_y), # 左上角 (x2 - orig_center_x, y1 - orig_center_y), # 右上角 (x2 - orig_center_x, y2 - orig_center_y), # 右下角 (x1 - orig_center_x, y2 - orig_center_y) # 左下角 ] # 应用修正后的旋转矩阵变换每个角点 rotated_corners = [] for x, y in corners: # 修正后的旋转矩阵: [cos(θ) sin(θ)] [x] # [-sin(θ) cos(θ)] [y] rotated_x = x * cos_angle + y * sin_angle rotated_y = -x * sin_angle + y * cos_angle # 转换回绝对坐标(相对于新图像) abs_x = rotated_x + new_center_x abs_y = rotated_y + new_center_y rotated_corners.append((abs_x, abs_y)) # 从旋转后的四个角点计算新的边界框 x_coords = [corner[0] for corner in rotated_corners] y_coords = [corner[1] for corner in rotated_corners] new_x1 = int(min(x_coords)) new_y1 = int(min(y_coords)) new_x2 = int(max(x_coords)) new_y2 = int(max(y_coords)) # 确保坐标在有效范围内 new_x1 = max(0, min(new_width, new_x1)) new_y1 = max(0, min(new_height, new_y1)) new_x2 = max(0, min(new_width, new_x2)) new_y2 = max(0, min(new_height, new_y2)) # 确保x1 < x2, y1 < y2 if new_x1 > new_x2: new_x1, new_x2 = new_x2, new_x1 if new_y1 > new_y2: new_y1, new_y2 = new_y2, new_y1 rotated_coordinates.append([new_x1, new_y1, new_x2, new_y2]) return rotated_image, rotated_coordinates def detect_ocr_tool_type(data: Union[List, Dict], config: Dict) -> str: """自动检测OCR工具类型""" if not config['ocr']['auto_detection']['enabled']: return 'dots_ocr' # 默认类型 rules = config['ocr']['auto_detection']['rules'] for rule in rules: if 'field_exists' in rule: field_name = rule['field_exists'] if isinstance(data, dict) and field_name in data: return rule['tool_type'] if 'json_is_array' in rule: if rule['json_is_array'] and isinstance(data, list): return rule['tool_type'] # 默认返回dots_ocr return 'dots_ocr' def parse_dots_ocr_data(data: List, config: Dict) -> List[Dict]: """解析Dots OCR格式的数据""" tool_config = config['ocr']['tools']['dots_ocr'] parsed_data = [] for item in data: if not isinstance(item, dict): continue # 提取字段 text = item.get(tool_config['text_field'], '') bbox = item.get(tool_config['bbox_field'], []) category = item.get(tool_config['category_field'], 'Text') confidence = item.get(tool_config.get('confidence_field', 'confidence'), config['ocr']['default_confidence']) if text and bbox and len(bbox) >= 4: parsed_data.append({ 'text': str(text).strip(), 'bbox': bbox[:4], # 确保只取前4个坐标 'category': category, 'confidence': confidence, 'source_tool': 'dots_ocr' }) return parsed_data def parse_ppstructv3_data(data: Dict, config: Dict) -> List[Dict]: """解析PPStructV3格式的数据""" tool_config = config['ocr']['tools']['ppstructv3'] parsed_data = [] # 获取解析结果列表 parsing_results_field = tool_config['parsing_results_field'] if parsing_results_field not in data: return parsed_data parsing_results = data[parsing_results_field] if not isinstance(parsing_results, list): return parsed_data for item in parsing_results: if not isinstance(item, dict): continue # 提取字段 text = item.get(tool_config['text_field'], '') bbox = item.get(tool_config['bbox_field'], []) category = item.get(tool_config['category_field'], 'text') confidence = item.get(tool_config.get('confidence_field', 'confidence'), config['ocr']['default_confidence']) if text and bbox and len(bbox) >= 4: parsed_data.append({ 'text': str(text).strip(), 'bbox': bbox[:4], # 确保只取前4个坐标 'category': category, 'confidence': confidence, 'source_tool': 'ppstructv3' }) # 如果有OCR文本识别结果,也添加进来 if 'overall_ocr_res' in data: ocr_res = data['overall_ocr_res'] if isinstance(ocr_res, dict) and 'rec_texts' in ocr_res and 'rec_boxes' in ocr_res: texts = ocr_res['rec_texts'] boxes = ocr_res['rec_boxes'] scores = ocr_res.get('rec_scores', []) for i, (text, box) in enumerate(zip(texts, boxes)): if text and len(box) >= 4: confidence = scores[i] if i < len(scores) else config['ocr']['default_confidence'] parsed_data.append({ 'text': str(text).strip(), 'bbox': box[:4], 'category': 'OCR_Text', 'confidence': confidence, 'source_tool': 'ppstructv3_ocr' }) return parsed_data def normalize_ocr_data(raw_data: Union[List, Dict], config: Dict) -> List[Dict]: """统一不同OCR工具的数据格式""" # 自动检测OCR工具类型 tool_type = detect_ocr_tool_type(raw_data, config) if tool_type == 'dots_ocr': return parse_dots_ocr_data(raw_data, config) elif tool_type == 'ppstructv3': return parse_ppstructv3_data(raw_data, config) else: raise ValueError(f"不支持的OCR工具类型: {tool_type}") def get_rotation_angle_from_ppstructv3(data: Dict) -> float: """从PPStructV3数据中获取旋转角度""" if 'doc_preprocessor_res' in data: doc_res = data['doc_preprocessor_res'] if isinstance(doc_res, dict) and 'angle' in doc_res: return float(doc_res['angle']) return 0.0 def find_image_in_multiple_locations(img_src: str, json_path: str) -> Optional[str]: """ 在多个可能的位置查找图片文件 """ json_dir = os.path.dirname(json_path) # 可能的搜索路径 search_paths = [ # 相对于JSON文件的路径 os.path.join(json_dir, img_src), # 相对于JSON文件父目录的路径 os.path.join(os.path.dirname(json_dir), img_src), # imgs目录(常见的图片目录) os.path.join(json_dir, 'imgs', os.path.basename(img_src)), os.path.join(os.path.dirname(json_dir), 'imgs', os.path.basename(img_src)), # images目录 os.path.join(json_dir, 'images', os.path.basename(img_src)), os.path.join(os.path.dirname(json_dir), 'images', os.path.basename(img_src)), # 同名目录 os.path.join(json_dir, os.path.splitext(os.path.basename(json_path))[0], os.path.basename(img_src)), ] # 如果是绝对路径,也加入搜索 if os.path.isabs(img_src): search_paths.insert(0, img_src) # 查找存在的文件 for path in search_paths: if os.path.exists(path): return path return None def process_html_images(html_content: str, json_path: str) -> str: """ 处理HTML内容中的图片引用,将本地图片转换为base64 - 增强版 """ import re # 匹配HTML图片标签: img_pattern = r']*src\s*=\s*["\']([^"\']+)["\'][^>]*/?>' def replace_html_image(match): full_tag = match.group(0) img_src = match.group(1) # 如果已经是base64或者网络链接,直接返回 if img_src.startswith('data:image') or img_src.startswith('http'): return full_tag # 增强的图片查找 full_img_path = find_image_in_multiple_locations(img_src, json_path) # 尝试转换为base64 try: if full_img_path and os.path.exists(full_img_path): with open(full_img_path, 'rb') as img_file: img_data = img_file.read() # 获取文件扩展名确定MIME类型 ext = os.path.splitext(full_img_path)[1].lower() mime_type = { '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp' }.get(ext, 'image/jpeg') # 转换为base64 img_base64 = base64.b64encode(img_data).decode('utf-8') data_url = f"data:{mime_type};base64,{img_base64}" # 替换src属性,保持其他属性不变 updated_tag = re.sub( r'src\s*=\s*["\'][^"\']+["\']', f'src="{data_url}"', full_tag ) return updated_tag else: # 文件不存在,显示详细的错误信息 search_info = f"搜索路径: {img_src}" if full_img_path: search_info += f" -> {full_img_path}" error_content = f"""
🖼️ 图片无法加载
原始路径: {img_src}
JSON文件: {os.path.basename(json_path)}
请检查图片文件是否存在
""" return error_content except Exception as e: # 转换失败,返回错误信息 error_content = f"""
⚠️ 图片处理失败
文件: {img_src}
错误: {str(e)}
""" return error_content # 替换所有HTML图片标签 processed_content = re.sub(img_pattern, replace_html_image, html_content, flags=re.IGNORECASE) return processed_content def process_markdown_images(md_content: str, json_path: str) -> str: """ 处理Markdown中的图片引用,将本地图片转换为base64 """ import re # 匹配Markdown图片语法: ![alt](path) img_pattern = r'!\[([^\]]*)\]\(([^)]+)\)' def replace_image(match): alt_text = match.group(1) img_path = match.group(2) # 如果已经是base64或者网络链接,直接返回 if img_path.startswith('data:image') or img_path.startswith('http'): return match.group(0) # 处理相对路径 if not os.path.isabs(img_path): # 相对于JSON文件的路径 json_dir = os.path.dirname(json_path) full_img_path = os.path.join(json_dir, img_path) else: full_img_path = img_path # 尝试转换为base64 try: if os.path.exists(full_img_path): with open(full_img_path, 'rb') as img_file: img_data = img_file.read() # 获取文件扩展名确定MIME类型 ext = os.path.splitext(full_img_path)[1].lower() mime_type = { '.png': 'image/png', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.gif': 'image/gif', '.bmp': 'image/bmp', '.webp': 'image/webp' }.get(ext, 'image/jpeg') # 转换为base64 img_base64 = base64.b64encode(img_data).decode('utf-8') data_url = f"data:{mime_type};base64,{img_base64}" return f'![{alt_text}]({data_url})' else: # 文件不存在,返回原始链接但添加警告 return f'![{alt_text} (文件不存在)]({img_path})' except Exception as e: # 转换失败,返回原始链接 return f'![{alt_text} (加载失败)]({img_path})' # 替换所有图片引用 processed_content = re.sub(img_pattern, replace_image, md_content) return processed_content def process_all_images_in_content(content: str, json_path: str) -> str: """ 处理内容中的所有图片引用(包括Markdown和HTML格式) """ # 先处理HTML图片 content = process_html_images(content, json_path) # 再处理Markdown图片 content = process_markdown_images(content, json_path) return content # 修改 load_ocr_data_file 函数 def load_ocr_data_file(json_path: str, config: Dict) -> Tuple[List, str, str]: """加载OCR数据文件 - 支持多数据源配置""" json_file = Path(json_path) if not json_file.exists(): raise FileNotFoundError(f"找不到JSON文件: {json_path}") # 加载JSON数据 try: with open(json_file, 'r', encoding='utf-8') as f: raw_data = json.load(f) # 统一数据格式 ocr_data = normalize_ocr_data(raw_data, config) # 检查是否需要处理图像旋转 rotation_angle = 0.0 if isinstance(raw_data, dict): rotation_angle = get_rotation_angle_from_ppstructv3(raw_data) # 如果有旋转角度,记录下来供后续图像处理使用 if rotation_angle != 0: for item in ocr_data: item['rotation_angle'] = rotation_angle except Exception as e: raise Exception(f"加载JSON文件失败: {e}") # 加载MD文件 md_file = json_file.with_suffix('.md') md_content = "" if md_file.exists(): with open(md_file, 'r', encoding='utf-8') as f: md_content = f.read() # 查找对应的图片文件 image_path = find_corresponding_image(json_file, config) return ocr_data, md_content, image_path def find_corresponding_image(json_file: Path, config: Dict) -> str: """查找对应的图片文件 - 支持多数据源""" # 从配置中获取图片目录 src_img_dir = config.get('paths', {}).get('src_img_dir', '') if not src_img_dir: # 如果没有配置图片目录,尝试在JSON文件同级目录查找 src_img_dir = json_file.parent src_img_path = Path(src_img_dir) # 支持多种图片格式 image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'] for ext in image_extensions: image_file = src_img_path / f"{json_file.stem}{ext}" if image_file.exists(): return str(image_file) # 如果找不到,返回空字符串 return "" def process_ocr_data(ocr_data: List, config: Dict) -> Dict[str, List]: """处理OCR数据,建立文本到bbox的映射""" text_bbox_mapping = {} exclude_texts = config['ocr']['exclude_texts'] min_text_length = config['ocr']['min_text_length'] if not isinstance(ocr_data, list): return text_bbox_mapping for i, item in enumerate(ocr_data): if not isinstance(item, dict): continue text = str(item['text']).strip() if text and text not in exclude_texts and len(text) >= min_text_length: bbox = item['bbox'] if isinstance(bbox, list) and len(bbox) == 4: if text not in text_bbox_mapping: text_bbox_mapping[text] = [] text_bbox_mapping[text].append({ 'bbox': bbox, 'category': item.get('category', 'Text'), 'index': i, 'confidence': item.get('confidence', config['ocr']['default_confidence']), 'source_tool': item.get('source_tool', 'unknown'), 'rotation_angle': item.get('rotation_angle', 0.0) # 添加旋转角度信息 }) return text_bbox_mapping def find_available_ocr_files(ocr_out_dir: str) -> List[str]: """查找可用的OCR文件""" available_files = [] # 搜索多个可能的目录 search_dirs = [ Path(ocr_out_dir), ] for search_dir in search_dirs: if search_dir.exists(): # 递归搜索JSON文件 for json_file in search_dir.rglob("*.json"): if re.match(r'.*_page_\d+\.json$', json_file.name, re.IGNORECASE): available_files.append(str(json_file)) # 去重并排序 # available_files = sorted(list(set(available_files))) # 解析文件名并提取页码信息 file_info = [] for file_path in available_files: file_name = Path(file_path).stem # 提取页码 (例如从 "2023年度报告母公司_page_001" 中提取 "001") if 'page_' in file_name: try: page_part = file_name.split('page_')[-1] page_num = int(page_part) file_info.append({ 'path': file_path, 'page': page_num, 'display_name': f"第{page_num}页" }) except ValueError: # 如果无法解析页码,使用文件名 file_info.append({ 'path': file_path, 'page': len(file_info) + 1, 'display_name': Path(file_path).stem }) else: # 对于没有page_的文件,按顺序编号 file_info.append({ 'path': file_path, 'page': len(file_info) + 1, 'display_name': Path(file_path).stem }) # 按页码排序 file_info.sort(key=lambda x: x['page']) return file_info def get_ocr_tool_info(ocr_data: List) -> Dict: """获取OCR工具信息统计""" tool_counts = {} for item in ocr_data: if isinstance(item, dict): source_tool = item.get('source_tool', 'unknown') tool_counts[source_tool] = tool_counts.get(source_tool, 0) + 1 return tool_counts def draw_bbox_on_image(image: Image.Image, bbox: List[int], color: str = "red", width: int = 3) -> Image.Image: """在图片上绘制bbox框""" img_copy = image.copy() draw = ImageDraw.Draw(img_copy) x1, y1, x2, y2 = bbox # 绘制矩形框 draw.rectangle([x1, y1, x2, y2], outline=color, width=width) # 添加半透明填充 overlay = Image.new('RGBA', img_copy.size, (0, 0, 0, 0)) overlay_draw = ImageDraw.Draw(overlay) color_map = { "red": (255, 0, 0, 30), "blue": (0, 0, 255, 30), "green": (0, 255, 0, 30) } fill_color = color_map.get(color, (255, 255, 0, 30)) overlay_draw.rectangle([x1, y1, x2, y2], fill=fill_color) img_copy = Image.alpha_composite(img_copy.convert('RGBA'), overlay).convert('RGB') return img_copy def get_ocr_statistics(ocr_data: List, text_bbox_mapping: Dict, marked_errors: set) -> Dict: """获取OCR数据统计信息""" if not isinstance(ocr_data, list) or not ocr_data: return { 'total_texts': 0, 'clickable_texts': 0, 'marked_errors': 0, 'categories': {}, 'accuracy_rate': 0, 'tool_info': {} } total_texts = len(ocr_data) clickable_texts = len(text_bbox_mapping) marked_errors_count = len(marked_errors) # 按类别统计 categories = {} for item in ocr_data: if isinstance(item, dict): category = item.get('category', 'Unknown') categories[category] = categories.get(category, 0) + 1 # OCR工具信息统计 tool_info = get_ocr_tool_info(ocr_data) accuracy_rate = (clickable_texts - marked_errors_count) / clickable_texts * 100 if clickable_texts > 0 else 0 return { 'total_texts': total_texts, 'clickable_texts': clickable_texts, 'marked_errors': marked_errors_count, 'categories': categories, 'accuracy_rate': accuracy_rate, 'tool_info': tool_info } def convert_html_table_to_markdown(content: str) -> str: """将HTML表格转换为Markdown表格格式 - 支持横向滚动的增强版本""" def replace_table(match): table_html = match.group(0) # 提取所有行 rows = re.findall(r']*>(.*?)', table_html, re.DOTALL | re.IGNORECASE) if not rows: return table_html markdown_rows = [] max_cols = 0 # 处理所有行,找出最大列数 processed_rows = [] for row in rows: # 提取单元格,支持 th 和 td cells = re.findall(r']*>(.*?)', row, re.DOTALL | re.IGNORECASE) if cells: clean_cells = [] for cell in cells: cell_text = re.sub(r'<[^>]+>', '', cell).strip() cell_text = unescape(cell_text) # 限制单元格长度,避免表格过宽 if len(cell_text) > 30: cell_text = cell_text[:27] + "..." clean_cells.append(cell_text or " ") # 空单元格用空格替代 processed_rows.append(clean_cells) max_cols = max(max_cols, len(clean_cells)) # 统一所有行的列数 for i, row_cells in enumerate(processed_rows): while len(row_cells) < max_cols: row_cells.append(" ") # 构建Markdown行 markdown_row = '| ' + ' | '.join(row_cells) + ' |' markdown_rows.append(markdown_row) # 在第一行后添加分隔符 if i == 0: separator = '| ' + ' | '.join(['---'] * max_cols) + ' |' markdown_rows.append(separator) # 添加滚动提示 if max_cols > 8: scroll_note = "\n> 📋 **提示**: 此表格列数较多,在某些视图中可能需要横向滚动查看完整内容。\n" return scroll_note + '\n'.join(markdown_rows) if markdown_rows else table_html return '\n'.join(markdown_rows) if markdown_rows else table_html # 替换所有HTML表格 converted = re.sub(r']*>.*?', replace_table, content, flags=re.DOTALL | re.IGNORECASE) return converted def parse_html_tables(html_content: str) -> List[pd.DataFrame]: """解析HTML内容中的表格为DataFrame列表""" try: tables = pd.read_html(StringIO(html_content)) return tables if tables else [] except Exception: return [] def create_dynamic_css(config: Dict, font_size_key: str, height: int) -> str: """根据配置动态创建CSS样式""" colors = config['styles']['colors'] font_size = config['styles']['font_sizes'][font_size_key] return f""" """ def export_tables_to_excel(tables: List[pd.DataFrame], filename: str = "ocr_tables.xlsx") -> BytesIO: """导出表格数据到Excel""" output = BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: for i, table in enumerate(tables): table.to_excel(writer, sheet_name=f'Table_{i+1}', index=False) return output def get_table_statistics(tables: List[pd.DataFrame]) -> List[Dict]: """获取表格统计信息""" stats = [] for i, table in enumerate(tables): numeric_cols = len(table.select_dtypes(include=[np.number]).columns) stats.append({ 'table_index': i + 1, 'rows': len(table), 'columns': len(table.columns), 'numeric_columns': numeric_cols }) return stats def group_texts_by_category(text_bbox_mapping: Dict[str, List]) -> Dict[str, List[str]]: """按类别对文本进行分组""" categories = {} for text, info_list in text_bbox_mapping.items(): category = info_list[0]['category'] if category not in categories: categories[category] = [] categories[category].append(text) return categories def get_ocr_tool_rotation_config(ocr_data: List, config: Dict) -> Dict: """获取OCR工具的旋转配置""" if not ocr_data or not isinstance(ocr_data, list): # 默认配置 return { 'coordinates_are_pre_rotated': False } # 从第一个OCR数据项获取工具类型 first_item = ocr_data[0] if ocr_data else {} source_tool = first_item.get('source_tool', 'dots_ocr') # 获取工具配置 tools_config = config.get('ocr', {}).get('tools', {}) if source_tool in tools_config: tool_config = tools_config[source_tool] return tool_config.get('rotation', { 'coordinates_are_pre_rotated': False }) else: # 默认配置 return { 'coordinates_are_pre_rotated': False } def detect_image_orientation_by_opencv(image_path: str) -> Dict: """ 使用OpenCV的文本检测来判断图片方向 """ try: # 读取图像 image = cv2.imread(image_path) if image is None: raise ValueError("无法读取图像文件") height, width = image.shape[:2] gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # 使用EAST文本检测器或其他方法 # 这里使用简单的边缘检测和轮廓分析 edges = cv2.Canny(gray, 50, 150, apertureSize=3) # 检测直线 lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=100) if lines is None: return { 'detected_angle': 0.0, 'confidence': 0.0, 'method': 'opencv_analysis', 'message': '未检测到足够的直线特征' } # 分析直线角度 angles = [] for rho, theta in lines[:, 0]: angle = theta * 180 / np.pi # 将角度标准化到0-180度 if angle > 90: angle = angle - 180 angles.append(angle) # 统计主要角度 angle_hist = np.histogram(angles, bins=36, range=(-90, 90))[0] dominant_angle_idx = np.argmax(angle_hist) dominant_angle = -90 + dominant_angle_idx * 5 # 每个bin 5度 # 将角度映射到标准旋转角度 if -22.5 <= dominant_angle <= 22.5: detected_angle = 0.0 elif 22.5 < dominant_angle <= 67.5: detected_angle = 270.0 elif 67.5 < dominant_angle <= 90 or -90 <= dominant_angle < -67.5: detected_angle = 90.0 else: detected_angle = 180.0 confidence = angle_hist[dominant_angle_idx] / len(lines) if len(lines) > 0 else 0.0 return { 'detected_angle': detected_angle, 'confidence': min(1.0, confidence), 'method': 'opencv_analysis', 'line_count': len(lines), 'dominant_angle': dominant_angle, 'message': f'基于{len(lines)}条直线检测到旋转角度: {detected_angle}°' } except Exception as e: return { 'detected_angle': 0.0, 'confidence': 0.0, 'method': 'opencv_analysis', 'error': str(e), 'message': f'OpenCV检测过程中发生错误: {str(e)}' } # ocr_validator_utils.py def find_available_ocr_files_multi_source(config: Dict) -> Dict[str, List[Dict]]: """查找多个数据源的OCR文件""" all_sources = {} for source in config.get('data_sources', []): source_name = source['name'] ocr_tool = source['ocr_tool'] source_key = f"{source_name}_{ocr_tool}" # 创建唯一标识 ocr_out_dir = source['ocr_out_dir'] if Path(ocr_out_dir).exists(): files = find_available_ocr_files(ocr_out_dir) # 为每个文件添加数据源信息 for file_info in files: file_info.update({ 'source_name': source_name, 'ocr_tool': ocr_tool, 'description': source.get('description', ''), 'src_img_dir': source.get('src_img_dir', ''), 'ocr_out_dir': ocr_out_dir }) all_sources[source_key] = { 'files': files, 'config': source } print(f"📁 找到数据源: {source_key} - {len(files)} 个文件") return all_sources def get_data_source_display_name(source_config: Dict) -> str: """生成数据源的显示名称""" name = source_config['name'] tool = source_config['ocr_tool'] description = source_config.get('description', '') # 获取工具的友好名称 tool_name_map = { 'dots_ocr': 'Dots OCR', 'ppstructv3': 'PPStructV3' } tool_display = tool_name_map.get(tool, tool) return f"{name} ({tool_display})"