| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048 |
- """
- OCR验证工具的工具函数模块
- 包含数据处理、图像处理、统计分析等功能
- """
- import json
- import pandas as pd
- import numpy as np
- from pathlib import Path
- from PIL import Image, ImageDraw
- from typing import Dict, List, Optional, Tuple, Union
- from io import StringIO, BytesIO
- import re
- from html import unescape
- import yaml
- import base64
- from urllib.parse import urlparse
- import cv2
- import os
- def load_config(config_path: str = "config.yaml") -> Dict:
- """加载配置文件"""
- try:
- with open(config_path, 'r', encoding='utf-8') as f:
- return yaml.safe_load(f)
- except Exception as e:
- # 返回默认配置
- return get_default_config()
- def get_default_config() -> Dict:
- """获取默认配置 - 与config.yaml保持一致"""
- return {
- 'styles': {
- 'font_size': 8, # 修改:从字典改为单个数值
- 'colors': {
- 'primary': '#0288d1',
- 'secondary': '#ff9800',
- 'success': '#4caf50',
- 'error': '#f44336',
- 'warning': '#ff9800',
- 'background': '#fafafa',
- 'text': '#333333'
- },
- 'layout': {
- 'default_zoom': 1.0,
- 'default_height': 800, # 修改:从600改为800
- 'sidebar_width': 1, # 修改:从0.3改为1
- 'content_width': 0.7
- }
- },
- 'ui': {
- 'page_title': 'OCR可视化校验工具',
- 'page_icon': '🔍',
- 'layout': 'wide',
- 'sidebar_state': 'expanded'
- # 移除:default_font_size和default_layout
- },
- 'paths': {
- # 修改:使用config.yaml中的实际路径
- 'ocr_out_dir': '/Users/zhch158/workspace/data/至远彩色印刷工业有限公司/data_DotsOCR_Results',
- 'src_img_dir': '/Users/zhch158/workspace/data/至远彩色印刷工业有限公司/data_PPStructureV3_Results/2023年度报告母公司',
- 'supported_image_formats': ['.png', '.jpg', '.jpeg']
- },
- 'ocr': {
- 'min_text_length': 2,
- 'default_confidence': 1.0,
- 'exclude_texts': ['Picture', ''],
-
- # 新增:图片方向检测配置
- 'orientation_detection': {
- 'enabled': True,
- 'confidence_threshold': 0.3,
- 'methods': ['opencv_analysis'],
- 'cache_results': True
- },
-
- 'tools': {
- 'dots_ocr': {
- 'name': 'Dots OCR',
- 'description': '专业VLM OCR', # 新增描述
- 'json_structure': 'array',
- 'text_field': 'text',
- 'bbox_field': 'bbox',
- 'category_field': 'category',
- 'confidence_field': 'confidence', # 新增置信度字段
- # 新增:旋转处理配置
- 'rotation': {
- 'coordinates_are_pre_rotated': False
- }
- },
- 'ppstructv3': {
- 'name': 'PPStructV3',
- 'description': 'PaddleOCR PP-StructureV3', # 新增描述
- 'json_structure': 'object',
- 'parsing_results_field': 'parsing_res_list',
- 'text_field': 'block_content',
- 'bbox_field': 'block_bbox',
- 'category_field': 'block_label',
- 'confidence_field': 'confidence', # 新增置信度字段
- # 新增:旋转处理配置
- 'rotation': {
- 'coordinates_are_pre_rotated': True
- }
- }
- },
- 'auto_detection': {
- 'enabled': True,
- 'rules': [
- {
- 'field_exists': 'parsing_res_list',
- 'tool_type': 'ppstructv3'
- },
- {
- 'json_is_array': True,
- 'tool_type': 'dots_ocr'
- }
- ]
- }
- }
- }
- def load_css_styles(css_path: str = "styles.css") -> str:
- """加载CSS样式文件"""
- try:
- with open(css_path, 'r', encoding='utf-8') as f:
- return f.read()
- except Exception:
- # 返回基本样式
- return """
- .main > div { background-color: white !important; color: #333333 !important; }
- .stApp { background-color: white !important; }
- .block-container { background-color: white !important; color: #333333 !important; }
- """
- def rotate_image_and_coordinates(
- image: Image.Image,
- angle: float,
- coordinates_list: List[List[int]],
- rotate_coordinates: bool = True
- ) -> Tuple[Image.Image, List[List[int]]]:
- """
- 根据角度旋转图像和坐标 - 修正版本
-
- Args:
- image: 原始图像
- angle: 旋转角度(度数)
- coordinates_list: 坐标列表,每个坐标为[x1, y1, x2, y2]格式
- rotate_coordinates: 是否需要旋转坐标(针对不同OCR工具的处理方式)
-
- Returns:
- rotated_image: 旋转后的图像
- rotated_coordinates: 处理后的坐标列表
- """
- if angle == 0:
- return image, coordinates_list
-
- # 标准化旋转角度
- if angle == 270:
- rotation_angle = -90 # 顺时针90度
- elif angle == 90:
- rotation_angle = 90 # 逆时针90度
- elif angle == 180:
- rotation_angle = 180 # 180度
- else:
- rotation_angle = angle
-
- # 旋转图像
- rotated_image = image.rotate(rotation_angle, expand=True)
-
- # 如果不需要旋转坐标,直接返回原坐标
- if not rotate_coordinates:
- return rotated_image, coordinates_list
-
- # 获取原始和旋转后的图像尺寸
- orig_width, orig_height = image.size
- new_width, new_height = rotated_image.size
-
- # 计算旋转后的坐标
- rotated_coordinates = []
-
- for coord in coordinates_list:
- if len(coord) < 4:
- rotated_coordinates.append(coord)
- continue
-
- x1, y1, x2, y2 = coord[:4]
-
- # 验证原始坐标是否有效
- if x1 < 0 or y1 < 0 or x2 <= x1 or y2 <= y1:
- print(f"警告: 无效坐标 {coord}")
- rotated_coordinates.append([0, 0, 50, 50]) # 使用默认坐标
- continue
-
- # 根据旋转角度变换坐标
- if rotation_angle == -90: # 顺时针90度 (270度逆时针)
- # 变换公式: (x, y) -> (orig_height - y, x)
- new_x1 = orig_height - y2 # 这里是y2
- new_y1 = x1
- new_x2 = orig_height - y1 # 这里是y1
- new_y2 = x2
-
- elif rotation_angle == 90: # 逆时针90度
- # 变换公式: (x, y) -> (y, orig_width - x)
- new_x1 = y1
- new_y1 = orig_width - x2 # 这里是x2
- new_x2 = y2
- new_y2 = orig_width - x1 # 这里是x1
- elif rotation_angle == 180: # 180度
- # 变换公式: (x, y) -> (orig_width - x, orig_height - y)
- new_x1 = orig_width - x2
- new_y1 = orig_height - y2
- new_x2 = orig_width - x1
- new_y2 = orig_height - y1
-
- else: # 任意角度算法 - 修正版本
- # 将角度转换为弧度
- angle_rad = np.radians(rotation_angle)
- cos_angle = np.cos(angle_rad)
- sin_angle = np.sin(angle_rad)
-
- # 原图像中心点
- orig_center_x = orig_width / 2
- orig_center_y = orig_height / 2
-
- # 旋转后图像中心点
- new_center_x = new_width / 2
- new_center_y = new_height / 2
-
- # 将bbox的四个角点转换为相对于原图像中心的坐标
- corners = [
- (x1 - orig_center_x, y1 - orig_center_y), # 左上角
- (x2 - orig_center_x, y1 - orig_center_y), # 右上角
- (x2 - orig_center_x, y2 - orig_center_y), # 右下角
- (x1 - orig_center_x, y2 - orig_center_y) # 左下角
- ]
-
- # 应用修正后的旋转矩阵变换每个角点
- rotated_corners = []
- for x, y in corners:
- # 修正后的旋转矩阵: [cos(θ) sin(θ)] [x]
- # [-sin(θ) cos(θ)] [y]
- rotated_x = x * cos_angle + y * sin_angle
- rotated_y = -x * sin_angle + y * cos_angle
-
- # 转换回绝对坐标(相对于新图像)
- abs_x = rotated_x + new_center_x
- abs_y = rotated_y + new_center_y
-
- rotated_corners.append((abs_x, abs_y))
-
- # 从旋转后的四个角点计算新的边界框
- x_coords = [corner[0] for corner in rotated_corners]
- y_coords = [corner[1] for corner in rotated_corners]
-
- new_x1 = int(min(x_coords))
- new_y1 = int(min(y_coords))
- new_x2 = int(max(x_coords))
- new_y2 = int(max(y_coords))
-
- # 确保坐标在有效范围内
- new_x1 = max(0, min(new_width, new_x1))
- new_y1 = max(0, min(new_height, new_y1))
- new_x2 = max(0, min(new_width, new_x2))
- new_y2 = max(0, min(new_height, new_y2))
-
- # 确保x1 < x2, y1 < y2
- if new_x1 > new_x2:
- new_x1, new_x2 = new_x2, new_x1
- if new_y1 > new_y2:
- new_y1, new_y2 = new_y2, new_y1
-
- rotated_coordinates.append([new_x1, new_y1, new_x2, new_y2])
-
- return rotated_image, rotated_coordinates
- def detect_ocr_tool_type(data: Union[List, Dict], config: Dict) -> str:
- """自动检测OCR工具类型"""
- if not config['ocr']['auto_detection']['enabled']:
- return 'dots_ocr' # 默认类型
-
- rules = config['ocr']['auto_detection']['rules']
-
- for rule in rules:
- if 'field_exists' in rule:
- field_name = rule['field_exists']
- if isinstance(data, dict) and field_name in data:
- return rule['tool_type']
-
- if 'json_is_array' in rule:
- if rule['json_is_array'] and isinstance(data, list):
- return rule['tool_type']
-
- # 默认返回dots_ocr
- return 'dots_ocr'
- def parse_dots_ocr_data(data: List, config: Dict) -> List[Dict]:
- """解析Dots OCR格式的数据"""
- tool_config = config['ocr']['tools']['dots_ocr']
- parsed_data = []
-
- for item in data:
- if not isinstance(item, dict):
- continue
-
- # 提取字段
- text = item.get(tool_config['text_field'], '')
- bbox = item.get(tool_config['bbox_field'], [])
- category = item.get(tool_config['category_field'], 'Text')
- confidence = item.get(tool_config.get('confidence_field', 'confidence'),
- config['ocr']['default_confidence'])
-
- if text and bbox and len(bbox) >= 4:
- parsed_data.append({
- 'text': str(text).strip(),
- 'bbox': bbox[:4], # 确保只取前4个坐标
- 'category': category,
- 'confidence': confidence,
- 'source_tool': 'dots_ocr'
- })
-
- return parsed_data
- def parse_ppstructv3_data(data: Dict, config: Dict) -> List[Dict]:
- """解析PPStructV3格式的数据"""
- tool_config = config['ocr']['tools']['ppstructv3']
- parsed_data = []
-
- # 获取解析结果列表
- parsing_results_field = tool_config['parsing_results_field']
- if parsing_results_field not in data:
- return parsed_data
-
- parsing_results = data[parsing_results_field]
- if not isinstance(parsing_results, list):
- return parsed_data
-
- for item in parsing_results:
- if not isinstance(item, dict):
- continue
-
- # 提取字段
- text = item.get(tool_config['text_field'], '')
- bbox = item.get(tool_config['bbox_field'], [])
- category = item.get(tool_config['category_field'], 'text')
- confidence = item.get(tool_config.get('confidence_field', 'confidence'),
- config['ocr']['default_confidence'])
-
- if text and bbox and len(bbox) >= 4:
- parsed_data.append({
- 'text': str(text).strip(),
- 'bbox': bbox[:4], # 确保只取前4个坐标
- 'category': category,
- 'confidence': confidence,
- 'source_tool': 'ppstructv3'
- })
-
- # 如果有OCR文本识别结果,也添加进来
- if 'overall_ocr_res' in data:
- ocr_res = data['overall_ocr_res']
- if isinstance(ocr_res, dict) and 'rec_texts' in ocr_res and 'rec_boxes' in ocr_res:
- texts = ocr_res['rec_texts']
- boxes = ocr_res['rec_boxes']
- scores = ocr_res.get('rec_scores', [])
-
- for i, (text, box) in enumerate(zip(texts, boxes)):
- if text and len(box) >= 4:
- confidence = scores[i] if i < len(scores) else config['ocr']['default_confidence']
- parsed_data.append({
- 'text': str(text).strip(),
- 'bbox': box[:4],
- 'category': 'OCR_Text',
- 'confidence': confidence,
- 'source_tool': 'ppstructv3_ocr'
- })
-
- return parsed_data
- def normalize_ocr_data(raw_data: Union[List, Dict], config: Dict) -> List[Dict]:
- """统一不同OCR工具的数据格式"""
- # 自动检测OCR工具类型
- tool_type = detect_ocr_tool_type(raw_data, config)
-
- if tool_type == 'dots_ocr':
- return parse_dots_ocr_data(raw_data, config)
- elif tool_type == 'ppstructv3':
- return parse_ppstructv3_data(raw_data, config)
- else:
- raise ValueError(f"不支持的OCR工具类型: {tool_type}")
- def get_rotation_angle_from_ppstructv3(data: Dict) -> float:
- """从PPStructV3数据中获取旋转角度"""
- if 'doc_preprocessor_res' in data:
- doc_res = data['doc_preprocessor_res']
- if isinstance(doc_res, dict) and 'angle' in doc_res:
- return float(doc_res['angle'])
- return 0.0
- def find_image_in_multiple_locations(img_src: str, json_path: str) -> Optional[str]:
- """
- 在多个可能的位置查找图片文件
- """
- json_dir = os.path.dirname(json_path)
-
- # 可能的搜索路径
- search_paths = [
- # 相对于JSON文件的路径
- os.path.join(json_dir, img_src),
- # 相对于JSON文件父目录的路径
- os.path.join(os.path.dirname(json_dir), img_src),
- # imgs目录(常见的图片目录)
- os.path.join(json_dir, 'imgs', os.path.basename(img_src)),
- os.path.join(os.path.dirname(json_dir), 'imgs', os.path.basename(img_src)),
- # images目录
- os.path.join(json_dir, 'images', os.path.basename(img_src)),
- os.path.join(os.path.dirname(json_dir), 'images', os.path.basename(img_src)),
- # 同名目录
- os.path.join(json_dir, os.path.splitext(os.path.basename(json_path))[0], os.path.basename(img_src)),
- ]
-
- # 如果是绝对路径,也加入搜索
- if os.path.isabs(img_src):
- search_paths.insert(0, img_src)
-
- # 查找存在的文件
- for path in search_paths:
- if os.path.exists(path):
- return path
-
- return None
- def process_html_images(html_content: str, json_path: str) -> str:
- """
- 处理HTML内容中的图片引用,将本地图片转换为base64 - 增强版
- """
- import re
-
- # 匹配HTML图片标签: <img src="path" ... />
- img_pattern = r'<img\s+[^>]*src\s*=\s*["\']([^"\']+)["\'][^>]*/?>'
-
- def replace_html_image(match):
- full_tag = match.group(0)
- img_src = match.group(1)
-
- # 如果已经是base64或者网络链接,直接返回
- if img_src.startswith('data:image') or img_src.startswith('http'):
- return full_tag
-
- # 增强的图片查找
- full_img_path = find_image_in_multiple_locations(img_src, json_path)
-
- # 尝试转换为base64
- try:
- if full_img_path and os.path.exists(full_img_path):
- with open(full_img_path, 'rb') as img_file:
- img_data = img_file.read()
-
- # 获取文件扩展名确定MIME类型
- ext = os.path.splitext(full_img_path)[1].lower()
- mime_type = {
- '.png': 'image/png',
- '.jpg': 'image/jpeg',
- '.jpeg': 'image/jpeg',
- '.gif': 'image/gif',
- '.bmp': 'image/bmp',
- '.webp': 'image/webp'
- }.get(ext, 'image/jpeg')
-
- # 转换为base64
- img_base64 = base64.b64encode(img_data).decode('utf-8')
- data_url = f"data:{mime_type};base64,{img_base64}"
-
- # 替换src属性,保持其他属性不变
- updated_tag = re.sub(
- r'src\s*=\s*["\'][^"\']+["\']',
- f'src="{data_url}"',
- full_tag
- )
- return updated_tag
- else:
- # 文件不存在,显示详细的错误信息
- search_info = f"搜索路径: {img_src}"
- if full_img_path:
- search_info += f" -> {full_img_path}"
-
- error_content = f"""
- <div style="
- color: #d32f2f;
- border: 2px dashed #d32f2f;
- padding: 10px;
- margin: 10px 0;
- border-radius: 5px;
- background-color: #ffebee;
- text-align: center;
- ">
- <strong>🖼️ 图片无法加载</strong><br>
- <small>原始路径: {img_src}</small><br>
- <small>JSON文件: {os.path.basename(json_path)}</small><br>
- <em>请检查图片文件是否存在</em>
- </div>
- """
- return error_content
- except Exception as e:
- # 转换失败,返回错误信息
- error_content = f"""
- <div style="
- color: #f57c00;
- border: 2px dashed #f57c00;
- padding: 10px;
- margin: 10px 0;
- border-radius: 5px;
- background-color: #fff3e0;
- text-align: center;
- ">
- <strong>⚠️ 图片处理失败</strong><br>
- <small>文件: {img_src}</small><br>
- <small>错误: {str(e)}</small>
- </div>
- """
- return error_content
-
- # 替换所有HTML图片标签
- processed_content = re.sub(img_pattern, replace_html_image, html_content, flags=re.IGNORECASE)
- return processed_content
- def process_markdown_images(md_content: str, json_path: str) -> str:
- """
- 处理Markdown中的图片引用,将本地图片转换为base64
- """
- import re
-
- # 匹配Markdown图片语法: 
- img_pattern = r'!\[([^\]]*)\]\(([^)]+)\)'
-
- def replace_image(match):
- alt_text = match.group(1)
- img_path = match.group(2)
-
- # 如果已经是base64或者网络链接,直接返回
- if img_path.startswith('data:image') or img_path.startswith('http'):
- return match.group(0)
-
- # 处理相对路径
- if not os.path.isabs(img_path):
- # 相对于JSON文件的路径
- json_dir = os.path.dirname(json_path)
- full_img_path = os.path.join(json_dir, img_path)
- else:
- full_img_path = img_path
-
- # 尝试转换为base64
- try:
- if os.path.exists(full_img_path):
- with open(full_img_path, 'rb') as img_file:
- img_data = img_file.read()
-
- # 获取文件扩展名确定MIME类型
- ext = os.path.splitext(full_img_path)[1].lower()
- mime_type = {
- '.png': 'image/png',
- '.jpg': 'image/jpeg',
- '.jpeg': 'image/jpeg',
- '.gif': 'image/gif',
- '.bmp': 'image/bmp',
- '.webp': 'image/webp'
- }.get(ext, 'image/jpeg')
-
- # 转换为base64
- img_base64 = base64.b64encode(img_data).decode('utf-8')
- data_url = f"data:{mime_type};base64,{img_base64}"
-
- return f''
- else:
- # 文件不存在,返回原始链接但添加警告
- return f''
- except Exception as e:
- # 转换失败,返回原始链接
- return f''
-
- # 替换所有图片引用
- processed_content = re.sub(img_pattern, replace_image, md_content)
- return processed_content
- def process_all_images_in_content(content: str, json_path: str) -> str:
- """
- 处理内容中的所有图片引用(包括Markdown和HTML格式)
- """
- # 先处理HTML图片
- content = process_html_images(content, json_path)
- # 再处理Markdown图片
- content = process_markdown_images(content, json_path)
- return content
- # 修改 load_ocr_data_file 函数
- def load_ocr_data_file(json_path: str, config: Dict) -> Tuple[List, str, str]:
- """加载OCR相关数据文件"""
- json_file = Path(json_path)
- ocr_data = []
- md_content = ""
- image_path = ""
-
- # 加载JSON数据
- try:
- with open(json_file, 'r', encoding='utf-8') as f:
- raw_data = json.load(f)
-
- # 统一数据格式
- ocr_data = normalize_ocr_data(raw_data, config)
-
- # 检查是否需要处理图像旋转
- rotation_angle = 0.0
- if isinstance(raw_data, dict):
- rotation_angle = get_rotation_angle_from_ppstructv3(raw_data)
-
- # 如果有旋转角度,记录下来供后续图像处理使用
- if rotation_angle != 0:
- for item in ocr_data:
- item['rotation_angle'] = rotation_angle
-
- except Exception as e:
- raise Exception(f"加载JSON文件失败: {e}")
-
- # 加载MD文件
- md_file = json_file.with_suffix('.md')
- if md_file.exists():
- with open(md_file, 'r', encoding='utf-8') as f:
- raw_md_content = f.read()
-
- # 处理内容中的所有图片引用(HTML和Markdown)
- md_content = process_all_images_in_content(raw_md_content, str(json_file))
-
- # 推断图片路径
- image_name = json_file.stem
- src_img_dir = Path(config['paths']['src_img_dir'])
-
- image_candidates = []
- for ext in config['paths']['supported_image_formats']:
- image_candidates.extend([
- src_img_dir / f"{image_name}{ext}",
- json_file.parent / f"{image_name}{ext}",
- # 对于PPStructV3,可能图片名包含page信息 # 去掉page后缀的通用匹配
- src_img_dir / f"{image_name.split('_page_')[0]}{ext}" if '_page_' in image_name else None,
- ])
-
- # 移除None值
- image_candidates = [candidate for candidate in image_candidates if candidate is not None]
-
- for candidate in image_candidates:
- if candidate.exists():
- image_path = str(candidate)
- break
-
- return ocr_data, md_content, image_path
- def process_ocr_data(ocr_data: List, config: Dict) -> Dict[str, List]:
- """处理OCR数据,建立文本到bbox的映射"""
- text_bbox_mapping = {}
- exclude_texts = config['ocr']['exclude_texts']
- min_text_length = config['ocr']['min_text_length']
-
- if not isinstance(ocr_data, list):
- return text_bbox_mapping
-
- for i, item in enumerate(ocr_data):
- if not isinstance(item, dict):
- continue
-
- text = str(item['text']).strip()
- if text and text not in exclude_texts and len(text) >= min_text_length:
- bbox = item['bbox']
- if isinstance(bbox, list) and len(bbox) == 4:
- if text not in text_bbox_mapping:
- text_bbox_mapping[text] = []
- text_bbox_mapping[text].append({
- 'bbox': bbox,
- 'category': item.get('category', 'Text'),
- 'index': i,
- 'confidence': item.get('confidence', config['ocr']['default_confidence']),
- 'source_tool': item.get('source_tool', 'unknown'),
- 'rotation_angle': item.get('rotation_angle', 0.0) # 添加旋转角度信息
- })
-
- return text_bbox_mapping
- def find_available_ocr_files(ocr_out_dir: str) -> List[str]:
- """查找可用的OCR文件"""
- available_files = []
-
- # 搜索多个可能的目录
- search_dirs = [
- Path(ocr_out_dir),
- ]
-
- for search_dir in search_dirs:
- if search_dir.exists():
- # 递归搜索JSON文件
- for json_file in search_dir.rglob("*.json"):
- available_files.append(str(json_file))
- # 去重并排序
- # available_files = sorted(list(set(available_files)))
- # 解析文件名并提取页码信息
- file_info = []
- for file_path in available_files:
- file_name = Path(file_path).stem
- # 提取页码 (例如从 "2023年度报告母公司_page_001" 中提取 "001")
- if 'page_' in file_name:
- try:
- page_part = file_name.split('page_')[-1]
- page_num = int(page_part)
- file_info.append({
- 'path': file_path,
- 'page': page_num,
- 'display_name': f"第{page_num}页"
- })
- except ValueError:
- # 如果无法解析页码,使用文件名
- file_info.append({
- 'path': file_path,
- 'page': len(file_info) + 1,
- 'display_name': Path(file_path).stem
- })
- else:
- # 对于没有page_的文件,按顺序编号
- file_info.append({
- 'path': file_path,
- 'page': len(file_info) + 1,
- 'display_name': Path(file_path).stem
- })
-
- # 按页码排序
- file_info.sort(key=lambda x: x['page'])
- return file_info
- def get_ocr_tool_info(ocr_data: List) -> Dict:
- """获取OCR工具信息统计"""
- tool_counts = {}
- for item in ocr_data:
- if isinstance(item, dict):
- source_tool = item.get('source_tool', 'unknown')
- tool_counts[source_tool] = tool_counts.get(source_tool, 0) + 1
-
- return tool_counts
- def draw_bbox_on_image(image: Image.Image, bbox: List[int], color: str = "red", width: int = 3) -> Image.Image:
- """在图片上绘制bbox框"""
- img_copy = image.copy()
- draw = ImageDraw.Draw(img_copy)
-
- x1, y1, x2, y2 = bbox
-
- # 绘制矩形框
- draw.rectangle([x1, y1, x2, y2], outline=color, width=width)
-
- # 添加半透明填充
- overlay = Image.new('RGBA', img_copy.size, (0, 0, 0, 0))
- overlay_draw = ImageDraw.Draw(overlay)
-
- color_map = {
- "red": (255, 0, 0, 30),
- "blue": (0, 0, 255, 30),
- "green": (0, 255, 0, 30)
- }
- fill_color = color_map.get(color, (255, 255, 0, 30))
-
- overlay_draw.rectangle([x1, y1, x2, y2], fill=fill_color)
- img_copy = Image.alpha_composite(img_copy.convert('RGBA'), overlay).convert('RGB')
-
- return img_copy
- def get_ocr_statistics(ocr_data: List, text_bbox_mapping: Dict, marked_errors: set) -> Dict:
- """获取OCR数据统计信息"""
- if not isinstance(ocr_data, list) or not ocr_data:
- return {
- 'total_texts': 0, 'clickable_texts': 0, 'marked_errors': 0,
- 'categories': {}, 'accuracy_rate': 0, 'tool_info': {}
- }
-
- total_texts = len(ocr_data)
- clickable_texts = len(text_bbox_mapping)
- marked_errors_count = len(marked_errors)
-
- # 按类别统计
- categories = {}
- for item in ocr_data:
- if isinstance(item, dict):
- category = item.get('category', 'Unknown')
- categories[category] = categories.get(category, 0) + 1
-
- # OCR工具信息统计
- tool_info = get_ocr_tool_info(ocr_data)
-
- accuracy_rate = (clickable_texts - marked_errors_count) / clickable_texts * 100 if clickable_texts > 0 else 0
-
- return {
- 'total_texts': total_texts,
- 'clickable_texts': clickable_texts,
- 'marked_errors': marked_errors_count,
- 'categories': categories,
- 'accuracy_rate': accuracy_rate,
- 'tool_info': tool_info
- }
- def convert_html_table_to_markdown(content: str) -> str:
- """将HTML表格转换为Markdown表格格式 - 支持横向滚动的增强版本"""
- def replace_table(match):
- table_html = match.group(0)
-
- # 提取所有行
- rows = re.findall(r'<tr[^>]*>(.*?)</tr>', table_html, re.DOTALL | re.IGNORECASE)
- if not rows:
- return table_html
-
- markdown_rows = []
- max_cols = 0
-
- # 处理所有行,找出最大列数
- processed_rows = []
- for row in rows:
- # 提取单元格,支持 th 和 td
- cells = re.findall(r'<t[hd][^>]*>(.*?)</t[hd]>', row, re.DOTALL | re.IGNORECASE)
- if cells:
- clean_cells = []
- for cell in cells:
- cell_text = re.sub(r'<[^>]+>', '', cell).strip()
- cell_text = unescape(cell_text)
- # 限制单元格长度,避免表格过宽
- if len(cell_text) > 30:
- cell_text = cell_text[:27] + "..."
- clean_cells.append(cell_text or " ") # 空单元格用空格替代
-
- processed_rows.append(clean_cells)
- max_cols = max(max_cols, len(clean_cells))
-
- # 统一所有行的列数
- for i, row_cells in enumerate(processed_rows):
- while len(row_cells) < max_cols:
- row_cells.append(" ")
-
- # 构建Markdown行
- markdown_row = '| ' + ' | '.join(row_cells) + ' |'
- markdown_rows.append(markdown_row)
-
- # 在第一行后添加分隔符
- if i == 0:
- separator = '| ' + ' | '.join(['---'] * max_cols) + ' |'
- markdown_rows.append(separator)
-
- # 添加滚动提示
- if max_cols > 8:
- scroll_note = "\n> 📋 **提示**: 此表格列数较多,在某些视图中可能需要横向滚动查看完整内容。\n"
- return scroll_note + '\n'.join(markdown_rows) if markdown_rows else table_html
-
- return '\n'.join(markdown_rows) if markdown_rows else table_html
-
- # 替换所有HTML表格
- converted = re.sub(r'<table[^>]*>.*?</table>', replace_table, content, flags=re.DOTALL | re.IGNORECASE)
- return converted
- def parse_html_tables(html_content: str) -> List[pd.DataFrame]:
- """解析HTML内容中的表格为DataFrame列表"""
- try:
- tables = pd.read_html(StringIO(html_content))
- return tables if tables else []
- except Exception:
- return []
- def create_dynamic_css(config: Dict, font_size_key: str, height: int) -> str:
- """根据配置动态创建CSS样式"""
- colors = config['styles']['colors']
- font_size = config['styles']['font_sizes'][font_size_key]
-
- return f"""
- <style>
- .dynamic-content {{
- height: {height}px;
- font-size: {font_size}px !important;
- line-height: 1.4;
- background-color: {colors['background']} !important;
- color: {colors['text']} !important;
- border: 1px solid #ddd;
- padding: 10px;
- border-radius: 5px;
- }}
-
- .highlight-selected {{
- background-color: {colors['success']} !important;
- color: white !important;
- }}
-
- .highlight-error {{
- background-color: {colors['error']} !important;
- color: white !important;
- }}
- </style>
- """
- def export_tables_to_excel(tables: List[pd.DataFrame], filename: str = "ocr_tables.xlsx") -> BytesIO:
- """导出表格数据到Excel"""
- output = BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- for i, table in enumerate(tables):
- table.to_excel(writer, sheet_name=f'Table_{i+1}', index=False)
- return output
- def get_table_statistics(tables: List[pd.DataFrame]) -> List[Dict]:
- """获取表格统计信息"""
- stats = []
- for i, table in enumerate(tables):
- numeric_cols = len(table.select_dtypes(include=[np.number]).columns)
- stats.append({
- 'table_index': i + 1,
- 'rows': len(table),
- 'columns': len(table.columns),
- 'numeric_columns': numeric_cols
- })
- return stats
- def group_texts_by_category(text_bbox_mapping: Dict[str, List]) -> Dict[str, List[str]]:
- """按类别对文本进行分组"""
- categories = {}
- for text, info_list in text_bbox_mapping.items():
- category = info_list[0]['category']
- if category not in categories:
- categories[category] = []
- categories[category].append(text)
- return categories
- def get_ocr_tool_rotation_config(ocr_data: List, config: Dict) -> Dict:
- """获取OCR工具的旋转配置"""
- if not ocr_data or not isinstance(ocr_data, list):
- # 默认配置
- return {
- 'coordinates_are_pre_rotated': False
- }
-
- # 从第一个OCR数据项获取工具类型
- first_item = ocr_data[0] if ocr_data else {}
- source_tool = first_item.get('source_tool', 'dots_ocr')
-
- # 获取工具配置
- tools_config = config.get('ocr', {}).get('tools', {})
-
- if source_tool in tools_config:
- tool_config = tools_config[source_tool]
- return tool_config.get('rotation', {
- 'coordinates_are_pre_rotated': False
- })
- else:
- # 默认配置
- return {
- 'coordinates_are_pre_rotated': False
- }
- def detect_image_orientation_by_opencv(image_path: str) -> Dict:
- """
- 使用OpenCV的文本检测来判断图片方向
- """
- try:
- # 读取图像
- image = cv2.imread(image_path)
- if image is None:
- raise ValueError("无法读取图像文件")
-
- height, width = image.shape[:2]
- gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
-
- # 使用EAST文本检测器或其他方法
- # 这里使用简单的边缘检测和轮廓分析
- edges = cv2.Canny(gray, 50, 150, apertureSize=3)
-
- # 检测直线
- lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=100)
-
- if lines is None:
- return {
- 'detected_angle': 0.0,
- 'confidence': 0.0,
- 'method': 'opencv_analysis',
- 'message': '未检测到足够的直线特征'
- }
-
- # 分析直线角度
- angles = []
- for rho, theta in lines[:, 0]:
- angle = theta * 180 / np.pi
- # 将角度标准化到0-180度
- if angle > 90:
- angle = angle - 180
- angles.append(angle)
-
- # 统计主要角度
- angle_hist = np.histogram(angles, bins=36, range=(-90, 90))[0]
- dominant_angle_idx = np.argmax(angle_hist)
- dominant_angle = -90 + dominant_angle_idx * 5 # 每个bin 5度
-
- # 将角度映射到标准旋转角度
- if -22.5 <= dominant_angle <= 22.5:
- detected_angle = 0.0
- elif 22.5 < dominant_angle <= 67.5:
- detected_angle = 270.0
- elif 67.5 < dominant_angle <= 90 or -90 <= dominant_angle < -67.5:
- detected_angle = 90.0
- else:
- detected_angle = 180.0
-
- confidence = angle_hist[dominant_angle_idx] / len(lines) if len(lines) > 0 else 0.0
-
- return {
- 'detected_angle': detected_angle,
- 'confidence': min(1.0, confidence),
- 'method': 'opencv_analysis',
- 'line_count': len(lines),
- 'dominant_angle': dominant_angle,
- 'message': f'基于{len(lines)}条直线检测到旋转角度: {detected_angle}°'
- }
-
- except Exception as e:
- return {
- 'detected_angle': 0.0,
- 'confidence': 0.0,
- 'method': 'opencv_analysis',
- 'error': str(e),
- 'message': f'OpenCV检测过程中发生错误: {str(e)}'
- }
|