""" 表格模板应用器 将人工标注的表格结构应用到其他页面 """ import json from pathlib import Path from PIL import Image, ImageDraw from typing import Dict, List, Tuple import numpy as np import argparse import sys # 添加父目录到路径 sys.path.insert(0, str(Path(__file__).parent)) try: from editor.data_processor import get_structure_from_ocr from table_line_generator import TableLineGenerator except ImportError: from .editor.data_processor import get_structure_from_ocr from .table_line_generator import TableLineGenerator class TableTemplateApplier: """表格模板应用器(混合模式)""" def __init__(self, template_config_path: str): """初始化时只提取列信息和表头信息""" with open(template_config_path, 'r', encoding='utf-8') as f: self.template = json.load(f) # ✅ 只提取列宽(固定) self.col_widths = self.template['col_widths'] # ✅ 计算列的相对位置 self.col_offsets = [0] for width in self.col_widths: self.col_offsets.append(self.col_offsets[-1] + width) # ✅ 提取表头高度(通常固定) rows = self.template['rows'] if rows: self.header_height = rows[0]['y_end'] - rows[0]['y_start'] else: self.header_height = 40 # ✅ 计算数据行高度(用于固定行高模式) if len(rows) > 1: data_row_heights = [row['y_end'] - row['y_start'] for row in rows[1:]] # 使用中位数作为典型行高 self.row_height = int(np.median(data_row_heights)) if data_row_heights else 40 # 兜底行高(同样使用中位数) self.fallback_row_height = self.row_height else: # 如果只有表头,使用默认值 self.row_height = 40 self.fallback_row_height = 40 print(f"\n✅ 加载模板配置:") print(f" 列数: {len(self.col_widths)}") print(f" 列宽: {self.col_widths}") print(f" 表头高度: {self.header_height}px") print(f" 数据行高: {self.row_height}px (用于固定行高模式)") print(f" 兜底行高: {self.fallback_row_height}px (OCR失败时使用)") def detect_table_anchor(self, ocr_data: List[Dict]) -> Tuple[int, int]: """ 检测表格的锚点位置(表头左上角) 策略: 1. 找到Y坐标最小的文本框(表头第一行) 2. 找到X坐标最小的文本框(第一列) Args: ocr_data: OCR识别结果 Returns: (anchor_x, anchor_y): 表格左上角坐标 """ if not ocr_data: return (0, 0) # 找到最小的X和Y坐标 min_x = min(item['bbox'][0] for item in ocr_data) min_y = min(item['bbox'][1] for item in ocr_data) return (min_x, min_y) def detect_table_rows(self, ocr_data: List[Dict], header_y: int) -> int: """ 检测表格的行数(包括表头) 策略: 1. 找到Y坐标最大的文本框 2. 根据数据行高计算行数 3. 加上表头行 Args: ocr_data: OCR识别结果 header_y: 表头起始Y坐标 Returns: 总行数(包括表头) """ if not ocr_data: return 1 # 至少有表头 max_y = max(item['bbox'][3] for item in ocr_data) # 🔧 计算数据区的高度(排除表头) data_start_y = header_y + self.header_height data_height = max_y - data_start_y # 计算数据行数 num_data_rows = max(int(data_height / self.row_height), 0) # 总行数 = 1行表头 + n行数据 total_rows = 1 + num_data_rows print(f"📊 行数计算:") print(f" 表头Y: {header_y}, 数据区起始Y: {data_start_y}") print(f" 最大Y: {max_y}, 数据区高度: {data_height}px") print(f" 数据行数: {num_data_rows}, 总行数: {total_rows}") return total_rows def apply_template_fixed(self, image: Image.Image, ocr_data: List[Dict], anchor_x: int = None, anchor_y: int = None, num_rows: int = None, line_width: int = 2, line_color: Tuple[int, int, int] = (0, 0, 0)) -> Tuple[Image.Image, Dict]: """ 将模板应用到图片 Args: image: 目标图片 ocr_data: OCR识别结果(用于自动检测锚点) anchor_x: 表格起始X坐标(None=自动检测) anchor_y: 表头起始Y坐标(None=自动检测) num_rows: 总行数(None=自动检测) line_width: 线条宽度 line_color: 线条颜色 Returns: 绘制了表格线的图片 """ img_with_lines = image.copy() draw = ImageDraw.Draw(img_with_lines) # 🔍 自动检测锚点 if anchor_x is None or anchor_y is None: detected_x, detected_y = self.detect_table_anchor(ocr_data) anchor_x = anchor_x or detected_x anchor_y = anchor_y or detected_y # 🔍 自动检测行数 if num_rows is None: num_rows = self.detect_table_rows(ocr_data, anchor_y) print(f"\n📍 表格锚点: ({anchor_x}, {anchor_y})") print(f"📊 总行数: {num_rows} (1表头 + {num_rows-1}数据)") # 🎨 生成横线坐标 horizontal_lines = [] # 第1条线:表头顶部 horizontal_lines.append(anchor_y) # 第2条线:表头底部/数据区顶部 horizontal_lines.append(anchor_y + self.header_height) # 后续横线:数据行分隔线 current_y = anchor_y + self.header_height for i in range(num_rows - 1): # 减1因为表头已经占了1行 current_y += self.row_height horizontal_lines.append(current_y) # 🎨 生成竖线坐标 vertical_lines = [] for offset in self.col_offsets: x = anchor_x + offset vertical_lines.append(x) print(f"📏 横线坐标: {horizontal_lines[:3]}... (共{len(horizontal_lines)}条)") print(f"📏 竖线坐标: {vertical_lines[:3]}... (共{len(vertical_lines)}条)") # 🖊️ 绘制横线 x_start = vertical_lines[0] x_end = vertical_lines[-1] for y in horizontal_lines: draw.line([(x_start, y), (x_end, y)], fill=line_color, width=line_width) # 🖊️ 绘制竖线 y_start = horizontal_lines[0] y_end = horizontal_lines[-1] for x in vertical_lines: draw.line([(x, y_start), (x, y_end)], fill=line_color, width=line_width) print(f"✅ 表格绘制完成: {len(horizontal_lines)}行 × {len(vertical_lines)-1}列") # 🔑 生成结构信息 structure = self._build_structure( horizontal_lines, vertical_lines, anchor_x, anchor_y, mode='fixed' ) return img_with_lines, structure def apply_template_hybrid(self, image: Image.Image, ocr_data_dict: Dict, use_ocr_rows: bool = True, anchor_x: int = None, anchor_y: int = None, y_tolerance: int = 5, line_width: int = 2, line_color: Tuple[int, int, int] = (0, 0, 0)) -> Tuple[Image.Image, Dict]: """ 混合模式:使用模板的列 + OCR的行 Args: image: 目标图片 ocr_data: OCR识别结果(用于检测行) use_ocr_rows: 是否使用OCR检测的行(True=自适应行高) anchor_x: 表格起始X坐标(None=自动检测) anchor_y: 表头起始Y坐标(None=自动检测) y_tolerance: Y轴聚类容差(像素) line_width: 线条宽度 line_color: 线条颜色 Returns: 绘制了表格线的图片, 结构信息 """ img_with_lines = image.copy() draw = ImageDraw.Draw(img_with_lines) ocr_data = ocr_data_dict.get('text_boxes', []) # 🔍 自动检测锚点 if anchor_x is None or anchor_y is None: detected_x, detected_y = self.detect_table_anchor(ocr_data) anchor_x = anchor_x or detected_x anchor_y = anchor_y or detected_y print(f"\n📍 表格锚点: ({anchor_x}, {anchor_y})") # ✅ 竖线:使用模板的列宽(固定) vertical_lines = [anchor_x + offset for offset in self.col_offsets] print(f"📏 竖线坐标: {vertical_lines} (使用模板,共{len(vertical_lines)}条)") # ✅ 横线:根据模式选择 if use_ocr_rows and ocr_data: horizontal_lines = self._detect_rows_from_ocr( ocr_data, anchor_y, y_tolerance ) print(f"📏 横线坐标: 使用OCR检测 (共{len(horizontal_lines)}条,自适应行高)") else: num_rows = self.detect_table_rows(ocr_data, anchor_y) if ocr_data else 10 horizontal_lines = self._generate_fixed_rows(anchor_y, num_rows) print(f"📏 横线坐标: 使用固定行高 (共{len(horizontal_lines)}条)") # 🖊️ 绘制横线 x_start = vertical_lines[0] x_end = vertical_lines[-1] for y in horizontal_lines: draw.line([(x_start, y), (x_end, y)], fill=line_color, width=line_width) # 🖊️ 绘制竖线 y_start = horizontal_lines[0] y_end = horizontal_lines[-1] for x in vertical_lines: draw.line([(x, y_start), (x, y_end)], fill=line_color, width=line_width) print(f"✅ 表格绘制完成: {len(horizontal_lines)}行 × {len(vertical_lines)-1}列") # 🔑 生成结构信息 structure = self._build_structure( horizontal_lines, vertical_lines, anchor_x, anchor_y, mode='hybrid' ) return img_with_lines, structure def _detect_rows_from_ocr(self, ocr_data: List[Dict], anchor_y: int, y_tolerance: int = 5) -> List[int]: """ 从OCR结果中检测行(自适应行高) 复用 get_structure_from_ocr 统一接口 Args: ocr_data: OCR识别结果(MinerU 格式的 text_boxes) anchor_y: 表格起始Y坐标 y_tolerance: Y轴聚类容差(未使用,保留参数兼容性) Returns: 横线 y 坐标列表 """ if not ocr_data: return [anchor_y, anchor_y + self.header_height] print(f"\n🔍 OCR行检测 (使用 MinerU 算法):") print(f" 有效文本框数: {len(ocr_data)}") # 🔑 验证是否为 MinerU 格式 has_cell_index = any('row' in item and 'col' in item for item in ocr_data) if not has_cell_index: print(" ⚠️ 警告: OCR数据不包含 row/col 索引,可能不是 MinerU 格式") print(" ⚠️ 混合模式需要 MinerU 格式的 JSON 文件") return [anchor_y, anchor_y + self.header_height] # 🔑 重构原始数据格式(MinerU 需要完整的 table 结构) raw_data = { 'type': 'table', 'table_cells': ocr_data } try: # ✅ 使用统一接口解析和分析(无需 dummy_image) table_bbox, structure = get_structure_from_ocr( raw_data, tool="mineru" ) if not structure or 'horizontal_lines' not in structure: print(" ⚠️ MinerU 分析失败,使用兜底方案") return [anchor_y, anchor_y + self.header_height] # 🔑 获取横线坐标 horizontal_lines = structure['horizontal_lines'] # 🔑 调整第一条线到 anchor_y(表头顶部) if horizontal_lines: offset = anchor_y - horizontal_lines[0] horizontal_lines = [y + offset for y in horizontal_lines] print(f" 检测到行数: {len(horizontal_lines) - 1}") # 🔑 分析行高分布 if len(horizontal_lines) > 1: row_heights = [] for i in range(len(horizontal_lines) - 1): h = horizontal_lines[i+1] - horizontal_lines[i] row_heights.append(h) if len(row_heights) > 1: import numpy as np print(f" 行高分布: min={min(row_heights)}, " f"median={int(np.median(row_heights))}, " f"max={max(row_heights)}") return horizontal_lines except Exception as e: print(f" ⚠️ 解析失败: {e}") import traceback traceback.print_exc() return [anchor_y, anchor_y + self.header_height] def _generate_fixed_rows(self, anchor_y: int, num_rows: int) -> List[int]: """生成固定行高的横线(兜底方案)""" horizontal_lines = [anchor_y] # 表头 horizontal_lines.append(anchor_y + self.header_height) # 数据行 current_y = anchor_y + self.header_height for i in range(num_rows - 1): current_y += self.fallback_row_height horizontal_lines.append(current_y) return horizontal_lines def _build_structure(self, horizontal_lines: List[int], vertical_lines: List[int], anchor_x: int, anchor_y: int, mode: str = 'fixed') -> Dict: """构建表格结构信息(统一)""" # 生成行区间 rows = [] for i in range(len(horizontal_lines) - 1): rows.append({ 'y_start': horizontal_lines[i], 'y_end': horizontal_lines[i + 1], 'bboxes': [] }) # 生成列区间 columns = [] for i in range(len(vertical_lines) - 1): columns.append({ 'x_start': vertical_lines[i], 'x_end': vertical_lines[i + 1] }) # ✅ 根据模式设置正确的 mode 值 if mode == 'hybrid': mode_value = 'hybrid' elif mode == 'fixed': mode_value = 'fixed' else: mode_value = mode # 保留原始值 return { 'rows': rows, 'columns': columns, 'horizontal_lines': horizontal_lines, 'vertical_lines': vertical_lines, 'col_widths': self.col_widths, 'row_height': self.row_height if mode == 'fixed' else None, 'table_bbox': [ vertical_lines[0], horizontal_lines[0], vertical_lines[-1], horizontal_lines[-1] ], 'mode': mode_value, # ✅ 确保有 mode 字段 'anchor': {'x': anchor_x, 'y': anchor_y}, 'modified_h_lines': [], # ✅ 添加修改记录字段 'modified_v_lines': [] # ✅ 添加修改记录字段 } def apply_template_to_single_file( applier: TableTemplateApplier, image_file: Path, json_file: Path, output_dir: Path, structure_suffix: str = "_structure.json", use_hybrid_mode: bool = True, line_width: int = 2, line_color: Tuple[int, int, int] = (0, 0, 0) ) -> bool: """ 应用模板到单个文件 Args: applier: 模板应用器实例 image_file: 图片文件路径 json_file: OCR JSON文件路径 output_dir: 输出目录 use_hybrid_mode: 是否使用混合模式(需要 MinerU 格式) line_width: 线条宽度 line_color: 线条颜色 Returns: 是否成功 """ print(f"📄 处理: {image_file.name}") try: # 加载OCR数据 with open(json_file, 'r', encoding='utf-8') as f: raw_data = json.load(f) # 🔑 自动检测 OCR 格式 ocr_format = None if 'parsing_res_list' in raw_data and 'overall_ocr_res' in raw_data: # PPStructure 格式 ocr_format = 'ppstructure' elif isinstance(raw_data, (list, dict)): # 尝试提取 MinerU 格式 table_data = None if isinstance(raw_data, list): for item in raw_data: if isinstance(item, dict) and item.get('type') == 'table': table_data = item break elif isinstance(raw_data, dict) and raw_data.get('type') == 'table': table_data = raw_data if table_data and 'table_cells' in table_data: ocr_format = 'mineru' else: raise ValueError("未识别的 OCR 格式") else: raise ValueError("未识别的 OCR 格式(仅支持 PPStructure 或 MinerU)") table_bbox, ocr_data = TableLineGenerator.parse_ocr_data( raw_data, tool=ocr_format ) text_boxes = ocr_data.get('text_boxes', []) print(f" ✅ 加载OCR数据: {len(text_boxes)} 个文本框") print(f" 📋 OCR格式: {ocr_format}") # 加载图片 image = Image.open(image_file) print(f" ✅ 加载图片: {image.size}") # 🔑 验证混合模式的格式要求 if use_hybrid_mode and ocr_format != 'mineru': print(f" ⚠️ 警告: 混合模式需要 MinerU 格式,当前格式为 {ocr_format}") print(f" ℹ️ 自动切换到完全模板模式") use_hybrid_mode = False # 🆕 根据模式选择处理方式 if use_hybrid_mode: print(f" 🔧 使用混合模式 (模板列 + MinerU 行)") img_with_lines, structure = applier.apply_template_hybrid( image, ocr_data, use_ocr_rows=True, line_width=line_width, line_color=line_color ) else: print(f" 🔧 使用完全模板模式 (固定行高)") img_with_lines, structure = applier.apply_template_fixed( image, text_boxes, line_width=line_width, line_color=line_color ) # 保存图片 output_file = output_dir / f"{image_file.stem}.png" img_with_lines.save(output_file) # 保存结构配置 structure_file = output_dir / f"{image_file.stem}{structure_suffix}" with open(structure_file, 'w', encoding='utf-8') as f: json.dump(structure, f, indent=2, ensure_ascii=False) print(f" ✅ 保存图片: {output_file.name}") print(f" ✅ 保存配置: {structure_file.name}") print(f" 📊 表格: {len(structure['rows'])}行 x {len(structure['columns'])}列") return True except Exception as e: print(f" ❌ 处理失败: {e}") import traceback traceback.print_exc() return False def apply_template_batch( template_config_path: str, image_dir: str, json_dir: str, output_dir: str, structure_suffix: str = "_structure.json", use_hybrid_mode: bool = False, line_width: int = 2, line_color: Tuple[int, int, int] = (0, 0, 0) ): """ 批量应用模板到所有图片 Args: template_config_path: 模板配置路径 image_dir: 图片目录 json_dir: OCR JSON目录 output_dir: 输出目录 line_width: 线条宽度 line_color: 线条颜色 """ applier = TableTemplateApplier(template_config_path) image_path = Path(image_dir) json_path = Path(json_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # 查找所有图片 image_files = list(image_path.glob("*.jpg")) + list(image_path.glob("*.png")) image_files.sort() print(f"\n🔍 找到 {len(image_files)} 个图片文件") print(f"📂 图片目录: {image_dir}") print(f"📂 JSON目录: {json_dir}") print(f"📂 输出目录: {output_dir}\n") results = [] success_count = 0 failed_count = 0 for idx, image_file in enumerate(image_files, 1): print(f"\n{'='*60}") print(f"[{idx}/{len(image_files)}] 处理: {image_file.name}") print(f"{'='*60}") # 查找对应的JSON文件 json_file = json_path / f"{image_file.stem}.json" if not json_file.exists(): print(f"⚠️ 找不到OCR结果: {json_file.name}") results.append({ 'source': str(image_file), 'status': 'skipped', 'reason': 'no_json' }) failed_count += 1 continue if apply_template_to_single_file( applier, image_file, json_file, output_path, structure_suffix, use_hybrid_mode, line_width, line_color ): results.append({ 'source': str(image_file), 'json': str(json_file), 'status': 'success' }) success_count += 1 else: results.append({ 'source': str(image_file), 'json': str(json_file), 'status': 'error' }) failed_count += 1 print() # 保存批处理结果 result_file = output_path / "batch_results.json" with open(result_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) # 统计 skipped_count = sum(1 for r in results if r['status'] == 'skipped') print(f"\n{'='*60}") print(f"🎉 批处理完成!") print(f"{'='*60}") print(f"✅ 成功: {success_count}") print(f"❌ 失败: {failed_count}") print(f"⚠️ 跳过: {skipped_count}") print(f"📊 总计: {len(results)}") print(f"📄 结果保存: {result_file}") def main(): """主函数""" parser = argparse.ArgumentParser( description='应用表格模板到其他页面(支持混合模式)', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例用法: 1. 混合模式(推荐,自适应行高): python table_template_applier.py \\ --template template.json \\ --image-dir /path/to/images \\ --json-dir /path/to/jsons \\ --output-dir /path/to/output \\ --structure-suffix _structure.json \\ --hybrid 2. 完全模板模式(固定行高): python table_template_applier.py \\ --template template.json \\ --image-file page.png \\ --json-file page.json \\ --output-dir /path/to/output \\ --structure-suffix _structure.json \\ 模式说明: - 混合模式(--hybrid): 列宽使用模板,行高根据OCR自适应 - 完全模板模式: 列宽和行高都使用模板(适合固定格式表格) """ ) # 模板参数 parser.add_argument( '-t', '--template', type=str, required=True, help='模板配置文件路径(人工标注的第一页结构)' ) # 文件参数组 file_group = parser.add_argument_group('文件参数(单文件模式)') file_group.add_argument( '--image-file', type=str, help='图片文件路径' ) file_group.add_argument( '--json-file', type=str, help='OCR JSON文件路径' ) # 目录参数组 dir_group = parser.add_argument_group('目录参数(批量模式)') dir_group.add_argument( '--image-dir', type=str, help='图片目录' ) dir_group.add_argument( '--json-dir', type=str, help='OCR JSON目录' ) # 输出参数组 output_group = parser.add_argument_group('输出参数') output_group.add_argument( '-o', '--output-dir', type=str, required=True, help='输出目录(必需)' ) output_group.add_argument( '--structure-suffix', type=str, default='_structure.json', help='输出结构配置文件后缀(默认: _structure.json)' ) # 绘图参数组 draw_group = parser.add_argument_group('绘图参数') draw_group.add_argument( '-w', '--width', type=int, default=2, help='线条宽度(默认: 2)' ) draw_group.add_argument( '-c', '--color', default='black', choices=['black', 'blue', 'red'], help='线条颜色(默认: black)' ) # 🆕 新增模式参数 mode_group = parser.add_argument_group('模式参数') mode_group.add_argument( '--hybrid', action='store_true', help='使用混合模式(模板列 + OCR行,自适应行高,推荐)' ) args = parser.parse_args() # 颜色映射 color_map = { 'black': (0, 0, 0), 'blue': (0, 0, 255), 'red': (255, 0, 0) } line_color = color_map[args.color] # 验证模板文件 template_path = Path(args.template) if not template_path.exists(): print(f"❌ 错误: 模板文件不存在: {template_path}") return output_path = Path(args.output_dir) output_path.mkdir(parents=True, exist_ok=True) # 判断模式 if args.image_file and args.json_file: # 单文件模式 image_file = Path(args.image_file) json_file = Path(args.json_file) if not image_file.exists(): print(f"❌ 错误: 图片文件不存在: {image_file}") return if not json_file.exists(): print(f"❌ 错误: JSON文件不存在: {json_file}") return print("\n🔧 单文件处理模式") print(f"📄 模板: {template_path.name}") print(f"📄 图片: {image_file.name}") print(f"📄 JSON: {json_file.name}") print(f"📂 输出: {output_path}\n") applier = TableTemplateApplier(str(template_path)) success = apply_template_to_single_file( applier, image_file, json_file, output_path, use_hybrid_mode=args.hybrid, # 🆕 传递混合模式参数 line_width=args.width, line_color=line_color ) if success: print("\n✅ 处理完成!") else: print("\n❌ 处理失败!") elif args.image_dir and args.json_dir: # 批量模式 image_dir = Path(args.image_dir) json_dir = Path(args.json_dir) if not image_dir.exists(): print(f"❌ 错误: 图片目录不存在: {image_dir}") return if not json_dir.exists(): print(f"❌ 错误: JSON目录不存在: {json_dir}") return print("\n🔧 批量处理模式") print(f"📄 模板: {template_path.name}") apply_template_batch( str(template_path), str(image_dir), str(json_dir), str(output_path), structure_suffix=args.structure_suffix, use_hybrid_mode=args.hybrid, # 🆕 传递混合模式参数 line_width=args.width, line_color=line_color, ) else: parser.print_help() print("\n❌ 错误: 请指定单文件模式或批量模式的参数") print("\n提示:") print(" 单文件模式: --image-file + --json-file") print(" 批量模式: --image-dir + --json-dir") if __name__ == "__main__": print("🚀 启动表格模板批量应用程序...") import sys if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 print("ℹ️ 未提供命令行参数,使用默认配置运行...") # 默认配置 default_config = { "template": "/Users/zhch158/workspace/data/流水分析/康强_北京农村商业银行.wiredtable/康强_北京农村商业银行_page_001_structure.json", "image-file": "/Users/zhch158/workspace/data/流水分析/康强_北京农村商业银行/ppstructurev3_client_results/康强_北京农村商业银行/康强_北京农村商业银行_page_002.png", "json-file": "/Users/zhch158/workspace/data/流水分析/康强_北京农村商业银行/ppstructurev3_client_results/康强_北京农村商业银行_page_002.json", "output-dir": "output/batch_results", "width": "2", "color": "black" } # default_config = { # "template": "/Users/zhch158/workspace/data/流水分析/B用户_扫描流水.wiredtable/B用户_扫描流水_page_001_structure.json", # "image-file": "/Users/zhch158/workspace/data/流水分析/B用户_扫描流水/mineru_vllm_results/B用户_扫描流水/B用户_扫描流水_page_002.png", # "json-file": "/Users/zhch158/workspace/data/流水分析/B用户_扫描流水/mineru_vllm_results_cell_bbox/B用户_扫描流水_page_002.json", # "output-dir": "output/batch_results", # "width": "2", # "color": "black" # } print("⚙️ 默认参数:") for key, value in default_config.items(): print(f" --{key}: {value}") # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) sys.argv.append("--hybrid") # 使用混合模式 sys.exit(main())