""" 表格模板应用器 将人工标注的表格结构应用到其他页面 """ import json from pathlib import Path from PIL import Image, ImageDraw from typing import Dict, List, Tuple import numpy as np import argparse try: from table_line_generator import TableLineGenerator except ImportError: from .table_line_generator import TableLineGenerator class TableTemplateApplier: """表格模板应用器""" def __init__(self, template_config_path: str): """ 初始化模板应用器 Args: template_config_path: 模板配置文件路径(人工标注的结果) """ with open(template_config_path, 'r', encoding='utf-8') as f: self.template = json.load(f) # 🎯 从标注结果提取固定参数 self.col_widths = self.template['col_widths'] # 🔧 计算数据行的标准行高(排除表头) rows = self.template['rows'] if len(rows) > 1: # 计算每行的实际高度 row_heights = [row['y_end'] - row['y_start'] for row in rows] # 🎯 假设第一行是表头,从第二行开始计算 data_row_heights = row_heights[1:] if len(row_heights) > 1 else row_heights # 使用中位数作为标准行高(更稳健) self.row_height = int(np.median(data_row_heights)) self.header_height = row_heights[0] if row_heights else self.row_height print(f"📏 表头高度: {self.header_height}px") print(f"📏 数据行高度: {self.row_height}px") print(f" (从 {len(data_row_heights)} 行数据中计算,中位数)") else: # 兜底方案 self.row_height = self.template.get('row_height', 60) self.header_height = self.row_height # 🎯 计算列的相对位置(从第一列开始的偏移量) self.col_offsets = [0] for width in self.col_widths: self.col_offsets.append(self.col_offsets[-1] + width) # 🎯 提取表头的Y坐标(作为参考) self.template_header_y = rows[0]['y_start'] if rows else 0 print(f"\n✅ 加载模板配置:") print(f" 表头高度: {self.header_height}px") print(f" 数据行高度: {self.row_height}px") print(f" 列数: {len(self.col_widths)}") print(f" 列宽: {self.col_widths}") def detect_table_anchor(self, ocr_data: List[Dict]) -> Tuple[int, int]: """ 检测表格的锚点位置(表头左上角) 策略: 1. 找到Y坐标最小的文本框(表头第一行) 2. 找到X坐标最小的文本框(第一列) Args: ocr_data: OCR识别结果 Returns: (anchor_x, anchor_y): 表格左上角坐标 """ if not ocr_data: return (0, 0) # 找到最小的X和Y坐标 min_x = min(item['bbox'][0] for item in ocr_data) min_y = min(item['bbox'][1] for item in ocr_data) return (min_x, min_y) def detect_table_rows(self, ocr_data: List[Dict], header_y: int) -> int: """ 检测表格的行数(包括表头) 策略: 1. 找到Y坐标最大的文本框 2. 根据数据行高计算行数 3. 加上表头行 Args: ocr_data: OCR识别结果 header_y: 表头起始Y坐标 Returns: 总行数(包括表头) """ if not ocr_data: return 1 # 至少有表头 max_y = max(item['bbox'][3] for item in ocr_data) # 🔧 计算数据区的高度(排除表头) data_start_y = header_y + self.header_height data_height = max_y - data_start_y # 计算数据行数 num_data_rows = max(int(data_height / self.row_height), 0) # 总行数 = 1行表头 + n行数据 total_rows = 1 + num_data_rows print(f"📊 行数计算:") print(f" 表头Y: {header_y}, 数据区起始Y: {data_start_y}") print(f" 最大Y: {max_y}, 数据区高度: {data_height}px") print(f" 数据行数: {num_data_rows}, 总行数: {total_rows}") return total_rows def apply_to_image(self, image: Image.Image, ocr_data: List[Dict], anchor_x: int = None, anchor_y: int = None, num_rows: int = None, line_width: int = 2, line_color: Tuple[int, int, int] = (0, 0, 0)) -> Image.Image: """ 将模板应用到图片 Args: image: 目标图片 ocr_data: OCR识别结果(用于自动检测锚点) anchor_x: 表格起始X坐标(None=自动检测) anchor_y: 表头起始Y坐标(None=自动检测) num_rows: 总行数(None=自动检测) line_width: 线条宽度 line_color: 线条颜色 Returns: 绘制了表格线的图片 """ img_with_lines = image.copy() draw = ImageDraw.Draw(img_with_lines) # 🔍 自动检测锚点 if anchor_x is None or anchor_y is None: detected_x, detected_y = self.detect_table_anchor(ocr_data) anchor_x = anchor_x or detected_x anchor_y = anchor_y or detected_y # 🔍 自动检测行数 if num_rows is None: num_rows = self.detect_table_rows(ocr_data, anchor_y) print(f"\n📍 表格锚点: ({anchor_x}, {anchor_y})") print(f"📊 总行数: {num_rows} (1表头 + {num_rows-1}数据)") # 🎨 生成横线坐标 horizontal_lines = [] # 第1条线:表头顶部 horizontal_lines.append(anchor_y) # 第2条线:表头底部/数据区顶部 horizontal_lines.append(anchor_y + self.header_height) # 后续横线:数据行分隔线 current_y = anchor_y + self.header_height for i in range(num_rows - 1): # 减1因为表头已经占了1行 current_y += self.row_height horizontal_lines.append(current_y) # 🎨 生成竖线坐标 vertical_lines = [] for offset in self.col_offsets: x = anchor_x + offset vertical_lines.append(x) print(f"📏 横线坐标: {horizontal_lines[:3]}... (共{len(horizontal_lines)}条)") print(f"📏 竖线坐标: {vertical_lines[:3]}... (共{len(vertical_lines)}条)") # 🖊️ 绘制横线 x_start = vertical_lines[0] x_end = vertical_lines[-1] for y in horizontal_lines: draw.line([(x_start, y), (x_end, y)], fill=line_color, width=line_width) # 🖊️ 绘制竖线 y_start = horizontal_lines[0] y_end = horizontal_lines[-1] for x in vertical_lines: draw.line([(x, y_start), (x, y_end)], fill=line_color, width=line_width) return img_with_lines def generate_structure_for_image(self, ocr_data: List[Dict], anchor_x: int = None, anchor_y: int = None, num_rows: int = None) -> Dict: """ 为新图片生成表格结构配置 Args: ocr_data: OCR识别结果 anchor_x: 表格起始X坐标(None=自动检测) anchor_y: 表头起始Y坐标(None=自动检测) num_rows: 总行数(None=自动检测) Returns: 表格结构配置 """ # 🔍 自动检测锚点 if anchor_x is None or anchor_y is None: detected_x, detected_y = self.detect_table_anchor(ocr_data) anchor_x = anchor_x or detected_x anchor_y = anchor_y or detected_y # 🔍 自动检测行数 if num_rows is None: num_rows = self.detect_table_rows(ocr_data, anchor_y) # 🎨 生成横线坐标 horizontal_lines = [] horizontal_lines.append(anchor_y) horizontal_lines.append(anchor_y + self.header_height) current_y = anchor_y + self.header_height for i in range(num_rows - 1): current_y += self.row_height horizontal_lines.append(current_y) # 🎨 生成竖线坐标 vertical_lines = [] for offset in self.col_offsets: x = anchor_x + offset vertical_lines.append(x) # 🎨 生成行区间 rows = [] for i in range(num_rows): rows.append({ 'y_start': horizontal_lines[i], 'y_end': horizontal_lines[i + 1], 'bboxes': [] }) # 🎨 生成列区间 columns = [] for i in range(len(vertical_lines) - 1): columns.append({ 'x_start': vertical_lines[i], 'x_end': vertical_lines[i + 1] }) return { 'rows': rows, 'columns': columns, 'horizontal_lines': horizontal_lines, 'vertical_lines': vertical_lines, 'header_height': self.header_height, 'row_height': self.row_height, 'col_widths': self.col_widths, 'table_bbox': [ vertical_lines[0], horizontal_lines[0], vertical_lines[-1], horizontal_lines[-1] ], 'anchor': {'x': anchor_x, 'y': anchor_y}, 'num_rows': num_rows } def apply_template_to_single_file( applier: TableTemplateApplier, image_file: Path, json_file: Path, output_dir: Path, line_width: int = 2, line_color: Tuple[int, int, int] = (0, 0, 0) ) -> bool: """ 应用模板到单个文件 Args: applier: 模板应用器实例 image_file: 图片文件路径 json_file: OCR JSON文件路径 output_dir: 输出目录 line_width: 线条宽度 line_color: 线条颜色 Returns: 是否成功 """ print(f"📄 处理: {image_file.name}") try: # 加载OCR数据 with open(json_file, 'r', encoding='utf-8') as f: raw_data = json.load(f) # 🔧 解析OCR数据(支持PPStructure格式) if 'parsing_res_list' in raw_data and 'overall_ocr_res' in raw_data: table_bbox, ocr_data = TableLineGenerator.parse_ppstructure_result(raw_data) else: raise ValueError("不是PPStructure格式的OCR结果") print(f" ✅ 加载OCR数据: {len(ocr_data)} 个文本框") # 加载图片 image = Image.open(image_file) print(f" ✅ 加载图片: {image.size}") # 🎯 应用模板 img_with_lines = applier.apply_to_image( image, ocr_data, line_width=line_width, line_color=line_color ) # 保存图片 output_file = output_dir / f"{image_file.stem}_with_lines.png" img_with_lines.save(output_file) # 🆕 生成并保存结构配置 structure = applier.generate_structure_for_image(ocr_data) structure_file = output_dir / f"{image_file.stem}_structure.json" with open(structure_file, 'w', encoding='utf-8') as f: json.dump(structure, f, indent=2, ensure_ascii=False) print(f" ✅ 保存图片: {output_file.name}") print(f" ✅ 保存配置: {structure_file.name}") print(f" 📊 表格: {structure['num_rows']}行 x {len(structure['columns'])}列") return True except Exception as e: print(f" ❌ 处理失败: {e}") import traceback traceback.print_exc() return False def apply_template_batch( template_config_path: str, image_dir: str, json_dir: str, output_dir: str, line_width: int = 2, line_color: Tuple[int, int, int] = (0, 0, 0) ): """ 批量应用模板到所有图片 Args: template_config_path: 模板配置路径 image_dir: 图片目录 json_dir: OCR JSON目录 output_dir: 输出目录 line_width: 线条宽度 line_color: 线条颜色 """ applier = TableTemplateApplier(template_config_path) image_path = Path(image_dir) json_path = Path(json_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # 查找所有图片 image_files = list(image_path.glob("*.jpg")) + list(image_path.glob("*.png")) image_files.sort() print(f"\n🔍 找到 {len(image_files)} 个图片文件") print(f"📂 图片目录: {image_dir}") print(f"📂 JSON目录: {json_dir}") print(f"📂 输出目录: {output_dir}\n") results = [] success_count = 0 failed_count = 0 for idx, image_file in enumerate(image_files, 1): print(f"\n{'='*60}") print(f"[{idx}/{len(image_files)}] 处理: {image_file.name}") print(f"{'='*60}") # 查找对应的JSON文件 json_file = json_path / f"{image_file.stem}.json" if not json_file.exists(): print(f"⚠️ 找不到OCR结果: {json_file.name}") results.append({ 'source': str(image_file), 'status': 'skipped', 'reason': 'no_json' }) failed_count += 1 continue if apply_template_to_single_file( applier, image_file, json_file, output_path, line_width, line_color ): results.append({ 'source': str(image_file), 'json': str(json_file), 'status': 'success' }) success_count += 1 else: results.append({ 'source': str(image_file), 'json': str(json_file), 'status': 'error' }) failed_count += 1 print() # 保存批处理结果 result_file = output_path / "batch_results.json" with open(result_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) # 统计 skipped_count = sum(1 for r in results if r['status'] == 'skipped') print(f"\n{'='*60}") print(f"🎉 批处理完成!") print(f"{'='*60}") print(f"✅ 成功: {success_count}") print(f"❌ 失败: {failed_count}") print(f"⚠️ 跳过: {skipped_count}") print(f"📊 总计: {len(results)}") print(f"📄 结果保存: {result_file}") def main(): """主函数""" parser = argparse.ArgumentParser( description='应用表格模板到其他页面', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例用法: 1. 批量处理整个目录: python table_template_applier.py \\ --template output/康强_北京农村商业银行_page_001_structure.json \\ --image-dir /path/to/images \\ --json-dir /path/to/jsons \\ --output-dir /path/to/output 2. 处理单个文件: python table_template_applier.py \\ --template output/康强_北京农村商业银行_page_001_structure.json \\ --image-file /path/to/page_002.png \\ --json-file /path/to/page_002.json \\ --output-dir /path/to/output 输出内容: - {name}_with_lines.png: 带表格线的图片 - {name}_structure.json: 表格结构配置 - batch_results.json: 批处理统计结果 """ ) # 模板参数 parser.add_argument( '-t', '--template', type=str, required=True, help='模板配置文件路径(人工标注的第一页结构)' ) # 文件参数组 file_group = parser.add_argument_group('文件参数(单文件模式)') file_group.add_argument( '--image-file', type=str, help='图片文件路径' ) file_group.add_argument( '--json-file', type=str, help='OCR JSON文件路径' ) # 目录参数组 dir_group = parser.add_argument_group('目录参数(批量模式)') dir_group.add_argument( '--image-dir', type=str, help='图片目录' ) dir_group.add_argument( '--json-dir', type=str, help='OCR JSON目录' ) # 输出参数组 output_group = parser.add_argument_group('输出参数') output_group.add_argument( '-o', '--output-dir', type=str, required=True, help='输出目录(必需)' ) # 绘图参数组 draw_group = parser.add_argument_group('绘图参数') draw_group.add_argument( '-w', '--width', type=int, default=2, help='线条宽度(默认: 2)' ) draw_group.add_argument( '-c', '--color', default='black', choices=['black', 'blue', 'red'], help='线条颜色(默认: black)' ) args = parser.parse_args() # 颜色映射 color_map = { 'black': (0, 0, 0), 'blue': (0, 0, 255), 'red': (255, 0, 0) } line_color = color_map[args.color] # 验证模板文件 template_path = Path(args.template) if not template_path.exists(): print(f"❌ 错误: 模板文件不存在: {template_path}") return output_path = Path(args.output_dir) output_path.mkdir(parents=True, exist_ok=True) # 判断模式 if args.image_file and args.json_file: # 单文件模式 image_file = Path(args.image_file) json_file = Path(args.json_file) if not image_file.exists(): print(f"❌ 错误: 图片文件不存在: {image_file}") return if not json_file.exists(): print(f"❌ 错误: JSON文件不存在: {json_file}") return print("\n🔧 单文件处理模式") print(f"📄 模板: {template_path.name}") print(f"📄 图片: {image_file.name}") print(f"📄 JSON: {json_file.name}") print(f"📂 输出: {output_path}\n") applier = TableTemplateApplier(str(template_path)) success = apply_template_to_single_file( applier, image_file, json_file, output_path, args.width, line_color ) if success: print("\n✅ 处理完成!") else: print("\n❌ 处理失败!") elif args.image_dir and args.json_dir: # 批量模式 image_dir = Path(args.image_dir) json_dir = Path(args.json_dir) if not image_dir.exists(): print(f"❌ 错误: 图片目录不存在: {image_dir}") return if not json_dir.exists(): print(f"❌ 错误: JSON目录不存在: {json_dir}") return print("\n🔧 批量处理模式") print(f"📄 模板: {template_path.name}") apply_template_batch( str(template_path), str(image_dir), str(json_dir), str(output_path), args.width, line_color ) else: parser.print_help() print("\n❌ 错误: 请指定单文件模式或批量模式的参数") print("\n提示:") print(" 单文件模式: --image-file + --json-file") print(" 批量模式: --image-dir + --json-dir") if __name__ == "__main__": print("🚀 启动表格模板批量应用程序...") import sys if len(sys.argv) == 1: # 如果没有命令行参数,使用默认配置运行 print("ℹ️ 未提供命令行参数,使用默认配置运行...") # 默认配置 default_config = { "template": "output/table_structures/康强_北京农村商业银行_page_001_structure.json", "image-file": "/Users/zhch158/workspace/data/流水分析/康强_北京农村商业银行/ppstructurev3_client_results/康强_北京农村商业银行/康强_北京农村商业银行_page_002.png", "json-file": "/Users/zhch158/workspace/data/流水分析/康强_北京农村商业银行/ppstructurev3_client_results/康强_北京农村商业银行_page_002.json", "output-dir": "output/batch_results", "width": "2", "color": "black" } print("⚙️ 默认参数:") for key, value in default_config.items(): print(f" --{key}: {value}") # 构造参数 sys.argv = [sys.argv[0]] for key, value in default_config.items(): sys.argv.extend([f"--{key}", str(value)]) sys.exit(main())