| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- """
- 表格模板应用器
- 将人工标注的表格结构应用到其他页面
- """
- import json
- from pathlib import Path
- from PIL import Image, ImageDraw
- from typing import Dict, List, Tuple
- import numpy as np
- import argparse
- try:
- from table_line_generator import TableLineGenerator
- except ImportError:
- from .table_line_generator import TableLineGenerator
- class TableTemplateApplier:
- """表格模板应用器"""
-
- def __init__(self, template_config_path: str):
- """
- 初始化模板应用器
-
- Args:
- template_config_path: 模板配置文件路径(人工标注的结果)
- """
- with open(template_config_path, 'r', encoding='utf-8') as f:
- self.template = json.load(f)
-
- # 🎯 从标注结果提取固定参数
- self.col_widths = self.template['col_widths']
-
- # 🔧 计算数据行的标准行高(排除表头)
- rows = self.template['rows']
- if len(rows) > 1:
- # 计算每行的实际高度
- row_heights = [row['y_end'] - row['y_start'] for row in rows]
-
- # 🎯 假设第一行是表头,从第二行开始计算
- data_row_heights = row_heights[1:] if len(row_heights) > 1 else row_heights
-
- # 使用中位数作为标准行高(更稳健)
- self.row_height = int(np.median(data_row_heights))
- self.header_height = row_heights[0] if row_heights else self.row_height
-
- print(f"📏 表头高度: {self.header_height}px")
- print(f"📏 数据行高度: {self.row_height}px")
- print(f" (从 {len(data_row_heights)} 行数据中计算,中位数)")
- else:
- # 兜底方案
- self.row_height = self.template.get('row_height', 60)
- self.header_height = self.row_height
-
- # 🎯 计算列的相对位置(从第一列开始的偏移量)
- self.col_offsets = [0]
- for width in self.col_widths:
- self.col_offsets.append(self.col_offsets[-1] + width)
-
- # 🎯 提取表头的Y坐标(作为参考)
- self.template_header_y = rows[0]['y_start'] if rows else 0
-
- print(f"\n✅ 加载模板配置:")
- print(f" 表头高度: {self.header_height}px")
- print(f" 数据行高度: {self.row_height}px")
- print(f" 列数: {len(self.col_widths)}")
- print(f" 列宽: {self.col_widths}")
-
- def detect_table_anchor(self, ocr_data: List[Dict]) -> Tuple[int, int]:
- """
- 检测表格的锚点位置(表头左上角)
-
- 策略:
- 1. 找到Y坐标最小的文本框(表头第一行)
- 2. 找到X坐标最小的文本框(第一列)
-
- Args:
- ocr_data: OCR识别结果
-
- Returns:
- (anchor_x, anchor_y): 表格左上角坐标
- """
- if not ocr_data:
- return (0, 0)
-
- # 找到最小的X和Y坐标
- min_x = min(item['bbox'][0] for item in ocr_data)
- min_y = min(item['bbox'][1] for item in ocr_data)
-
- return (min_x, min_y)
-
- def detect_table_rows(self, ocr_data: List[Dict], header_y: int) -> int:
- """
- 检测表格的行数(包括表头)
-
- 策略:
- 1. 找到Y坐标最大的文本框
- 2. 根据数据行高计算行数
- 3. 加上表头行
-
- Args:
- ocr_data: OCR识别结果
- header_y: 表头起始Y坐标
-
- Returns:
- 总行数(包括表头)
- """
- if not ocr_data:
- return 1 # 至少有表头
-
- max_y = max(item['bbox'][3] for item in ocr_data)
-
- # 🔧 计算数据区的高度(排除表头)
- data_start_y = header_y + self.header_height
- data_height = max_y - data_start_y
-
- # 计算数据行数
- num_data_rows = max(int(data_height / self.row_height), 0)
-
- # 总行数 = 1行表头 + n行数据
- total_rows = 1 + num_data_rows
-
- print(f"📊 行数计算:")
- print(f" 表头Y: {header_y}, 数据区起始Y: {data_start_y}")
- print(f" 最大Y: {max_y}, 数据区高度: {data_height}px")
- print(f" 数据行数: {num_data_rows}, 总行数: {total_rows}")
-
- return total_rows
-
- def apply_to_image(self,
- image: Image.Image,
- ocr_data: List[Dict],
- anchor_x: int = None,
- anchor_y: int = None,
- num_rows: int = None,
- line_width: int = 2,
- line_color: Tuple[int, int, int] = (0, 0, 0)) -> Image.Image:
- """
- 将模板应用到图片
-
- Args:
- image: 目标图片
- ocr_data: OCR识别结果(用于自动检测锚点)
- anchor_x: 表格起始X坐标(None=自动检测)
- anchor_y: 表头起始Y坐标(None=自动检测)
- num_rows: 总行数(None=自动检测)
- line_width: 线条宽度
- line_color: 线条颜色
-
- Returns:
- 绘制了表格线的图片
- """
- img_with_lines = image.copy()
- draw = ImageDraw.Draw(img_with_lines)
-
- # 🔍 自动检测锚点
- if anchor_x is None or anchor_y is None:
- detected_x, detected_y = self.detect_table_anchor(ocr_data)
- anchor_x = anchor_x or detected_x
- anchor_y = anchor_y or detected_y
-
- # 🔍 自动检测行数
- if num_rows is None:
- num_rows = self.detect_table_rows(ocr_data, anchor_y)
-
- print(f"\n📍 表格锚点: ({anchor_x}, {anchor_y})")
- print(f"📊 总行数: {num_rows} (1表头 + {num_rows-1}数据)")
-
- # 🎨 生成横线坐标
- horizontal_lines = []
-
- # 第1条线:表头顶部
- horizontal_lines.append(anchor_y)
-
- # 第2条线:表头底部/数据区顶部
- horizontal_lines.append(anchor_y + self.header_height)
-
- # 后续横线:数据行分隔线
- current_y = anchor_y + self.header_height
- for i in range(num_rows - 1): # 减1因为表头已经占了1行
- current_y += self.row_height
- horizontal_lines.append(current_y)
-
- # 🎨 生成竖线坐标
- vertical_lines = []
- for offset in self.col_offsets:
- x = anchor_x + offset
- vertical_lines.append(x)
-
- print(f"📏 横线坐标: {horizontal_lines[:3]}... (共{len(horizontal_lines)}条)")
- print(f"📏 竖线坐标: {vertical_lines[:3]}... (共{len(vertical_lines)}条)")
-
- # 🖊️ 绘制横线
- x_start = vertical_lines[0]
- x_end = vertical_lines[-1]
- for y in horizontal_lines:
- draw.line([(x_start, y), (x_end, y)], fill=line_color, width=line_width)
-
- # 🖊️ 绘制竖线
- y_start = horizontal_lines[0]
- y_end = horizontal_lines[-1]
- for x in vertical_lines:
- draw.line([(x, y_start), (x, y_end)], fill=line_color, width=line_width)
-
- return img_with_lines
-
- def generate_structure_for_image(self,
- ocr_data: List[Dict],
- anchor_x: int = None,
- anchor_y: int = None,
- num_rows: int = None) -> Dict:
- """
- 为新图片生成表格结构配置
-
- Args:
- ocr_data: OCR识别结果
- anchor_x: 表格起始X坐标(None=自动检测)
- anchor_y: 表头起始Y坐标(None=自动检测)
- num_rows: 总行数(None=自动检测)
-
- Returns:
- 表格结构配置
- """
- # 🔍 自动检测锚点
- if anchor_x is None or anchor_y is None:
- detected_x, detected_y = self.detect_table_anchor(ocr_data)
- anchor_x = anchor_x or detected_x
- anchor_y = anchor_y or detected_y
-
- # 🔍 自动检测行数
- if num_rows is None:
- num_rows = self.detect_table_rows(ocr_data, anchor_y)
-
- # 🎨 生成横线坐标
- horizontal_lines = []
- horizontal_lines.append(anchor_y)
- horizontal_lines.append(anchor_y + self.header_height)
-
- current_y = anchor_y + self.header_height
- for i in range(num_rows - 1):
- current_y += self.row_height
- horizontal_lines.append(current_y)
-
- # 🎨 生成竖线坐标
- vertical_lines = []
- for offset in self.col_offsets:
- x = anchor_x + offset
- vertical_lines.append(x)
-
- # 🎨 生成行区间
- rows = []
- for i in range(num_rows):
- rows.append({
- 'y_start': horizontal_lines[i],
- 'y_end': horizontal_lines[i + 1],
- 'bboxes': []
- })
-
- # 🎨 生成列区间
- columns = []
- for i in range(len(vertical_lines) - 1):
- columns.append({
- 'x_start': vertical_lines[i],
- 'x_end': vertical_lines[i + 1]
- })
-
- return {
- 'rows': rows,
- 'columns': columns,
- 'horizontal_lines': horizontal_lines,
- 'vertical_lines': vertical_lines,
- 'header_height': self.header_height,
- 'row_height': self.row_height,
- 'col_widths': self.col_widths,
- 'table_bbox': [
- vertical_lines[0],
- horizontal_lines[0],
- vertical_lines[-1],
- horizontal_lines[-1]
- ],
- 'anchor': {'x': anchor_x, 'y': anchor_y},
- 'num_rows': num_rows
- }
- def apply_template_to_single_file(
- applier: TableTemplateApplier,
- image_file: Path,
- json_file: Path,
- output_dir: Path,
- line_width: int = 2,
- line_color: Tuple[int, int, int] = (0, 0, 0)
- ) -> bool:
- """
- 应用模板到单个文件
-
- Args:
- applier: 模板应用器实例
- image_file: 图片文件路径
- json_file: OCR JSON文件路径
- output_dir: 输出目录
- line_width: 线条宽度
- line_color: 线条颜色
-
- Returns:
- 是否成功
- """
- print(f"📄 处理: {image_file.name}")
-
- try:
- # 加载OCR数据
- with open(json_file, 'r', encoding='utf-8') as f:
- raw_data = json.load(f)
-
- # 🔧 解析OCR数据(支持PPStructure格式)
- if 'parsing_res_list' in raw_data and 'overall_ocr_res' in raw_data:
- table_bbox, ocr_data = TableLineGenerator.parse_ppstructure_result(raw_data)
- else:
- raise ValueError("不是PPStructure格式的OCR结果")
-
- print(f" ✅ 加载OCR数据: {len(ocr_data)} 个文本框")
-
- # 加载图片
- image = Image.open(image_file)
- print(f" ✅ 加载图片: {image.size}")
-
- # 🎯 应用模板
- img_with_lines = applier.apply_to_image(
- image,
- ocr_data,
- line_width=line_width,
- line_color=line_color
- )
-
- # 保存图片
- output_file = output_dir / f"{image_file.stem}_with_lines.png"
- img_with_lines.save(output_file)
-
- # 🆕 生成并保存结构配置
- structure = applier.generate_structure_for_image(ocr_data)
- structure_file = output_dir / f"{image_file.stem}_structure.json"
- with open(structure_file, 'w', encoding='utf-8') as f:
- json.dump(structure, f, indent=2, ensure_ascii=False)
-
- print(f" ✅ 保存图片: {output_file.name}")
- print(f" ✅ 保存配置: {structure_file.name}")
- print(f" 📊 表格: {structure['num_rows']}行 x {len(structure['columns'])}列")
-
- return True
-
- except Exception as e:
- print(f" ❌ 处理失败: {e}")
- import traceback
- traceback.print_exc()
- return False
- def apply_template_batch(
- template_config_path: str,
- image_dir: str,
- json_dir: str,
- output_dir: str,
- line_width: int = 2,
- line_color: Tuple[int, int, int] = (0, 0, 0)
- ):
- """
- 批量应用模板到所有图片
-
- Args:
- template_config_path: 模板配置路径
- image_dir: 图片目录
- json_dir: OCR JSON目录
- output_dir: 输出目录
- line_width: 线条宽度
- line_color: 线条颜色
- """
- applier = TableTemplateApplier(template_config_path)
-
- image_path = Path(image_dir)
- json_path = Path(json_dir)
- output_path = Path(output_dir)
- output_path.mkdir(parents=True, exist_ok=True)
-
- # 查找所有图片
- image_files = list(image_path.glob("*.jpg")) + list(image_path.glob("*.png"))
- image_files.sort()
-
- print(f"\n🔍 找到 {len(image_files)} 个图片文件")
- print(f"📂 图片目录: {image_dir}")
- print(f"📂 JSON目录: {json_dir}")
- print(f"📂 输出目录: {output_dir}\n")
-
- results = []
- success_count = 0
- failed_count = 0
-
- for idx, image_file in enumerate(image_files, 1):
- print(f"\n{'='*60}")
- print(f"[{idx}/{len(image_files)}] 处理: {image_file.name}")
- print(f"{'='*60}")
-
- # 查找对应的JSON文件
- json_file = json_path / f"{image_file.stem}.json"
-
- if not json_file.exists():
- print(f"⚠️ 找不到OCR结果: {json_file.name}")
- results.append({
- 'source': str(image_file),
- 'status': 'skipped',
- 'reason': 'no_json'
- })
- failed_count += 1
- continue
-
- if apply_template_to_single_file(
- applier, image_file, json_file, output_path,
- line_width, line_color
- ):
- results.append({
- 'source': str(image_file),
- 'json': str(json_file),
- 'status': 'success'
- })
- success_count += 1
- else:
- results.append({
- 'source': str(image_file),
- 'json': str(json_file),
- 'status': 'error'
- })
- failed_count += 1
-
- print()
-
- # 保存批处理结果
- result_file = output_path / "batch_results.json"
- with open(result_file, 'w', encoding='utf-8') as f:
- json.dump(results, f, indent=2, ensure_ascii=False)
-
- # 统计
- skipped_count = sum(1 for r in results if r['status'] == 'skipped')
-
- print(f"\n{'='*60}")
- print(f"🎉 批处理完成!")
- print(f"{'='*60}")
- print(f"✅ 成功: {success_count}")
- print(f"❌ 失败: {failed_count}")
- print(f"⚠️ 跳过: {skipped_count}")
- print(f"📊 总计: {len(results)}")
- print(f"📄 结果保存: {result_file}")
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(
- description='应用表格模板到其他页面',
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 示例用法:
- 1. 批量处理整个目录:
- python table_template_applier.py \\
- --template output/康强_北京农村商业银行_page_001_structure.json \\
- --image-dir /path/to/images \\
- --json-dir /path/to/jsons \\
- --output-dir /path/to/output
- 2. 处理单个文件:
- python table_template_applier.py \\
- --template output/康强_北京农村商业银行_page_001_structure.json \\
- --image-file /path/to/page_002.png \\
- --json-file /path/to/page_002.json \\
- --output-dir /path/to/output
- 输出内容:
- - {name}_with_lines.png: 带表格线的图片
- - {name}_structure.json: 表格结构配置
- - batch_results.json: 批处理统计结果
- """
- )
-
- # 模板参数
- parser.add_argument(
- '-t', '--template',
- type=str,
- required=True,
- help='模板配置文件路径(人工标注的第一页结构)'
- )
-
- # 文件参数组
- file_group = parser.add_argument_group('文件参数(单文件模式)')
- file_group.add_argument(
- '--image-file',
- type=str,
- help='图片文件路径'
- )
- file_group.add_argument(
- '--json-file',
- type=str,
- help='OCR JSON文件路径'
- )
-
- # 目录参数组
- dir_group = parser.add_argument_group('目录参数(批量模式)')
- dir_group.add_argument(
- '--image-dir',
- type=str,
- help='图片目录'
- )
- dir_group.add_argument(
- '--json-dir',
- type=str,
- help='OCR JSON目录'
- )
-
- # 输出参数组
- output_group = parser.add_argument_group('输出参数')
- output_group.add_argument(
- '-o', '--output-dir',
- type=str,
- required=True,
- help='输出目录(必需)'
- )
-
- # 绘图参数组
- draw_group = parser.add_argument_group('绘图参数')
- draw_group.add_argument(
- '-w', '--width',
- type=int,
- default=2,
- help='线条宽度(默认: 2)'
- )
- draw_group.add_argument(
- '-c', '--color',
- default='black',
- choices=['black', 'blue', 'red'],
- help='线条颜色(默认: black)'
- )
-
- args = parser.parse_args()
-
- # 颜色映射
- color_map = {
- 'black': (0, 0, 0),
- 'blue': (0, 0, 255),
- 'red': (255, 0, 0)
- }
- line_color = color_map[args.color]
-
- # 验证模板文件
- template_path = Path(args.template)
- if not template_path.exists():
- print(f"❌ 错误: 模板文件不存在: {template_path}")
- return
-
- output_path = Path(args.output_dir)
- output_path.mkdir(parents=True, exist_ok=True)
-
- # 判断模式
- if args.image_file and args.json_file:
- # 单文件模式
- image_file = Path(args.image_file)
- json_file = Path(args.json_file)
-
- if not image_file.exists():
- print(f"❌ 错误: 图片文件不存在: {image_file}")
- return
-
- if not json_file.exists():
- print(f"❌ 错误: JSON文件不存在: {json_file}")
- return
-
- print("\n🔧 单文件处理模式")
- print(f"📄 模板: {template_path.name}")
- print(f"📄 图片: {image_file.name}")
- print(f"📄 JSON: {json_file.name}")
- print(f"📂 输出: {output_path}\n")
-
- applier = TableTemplateApplier(str(template_path))
-
- success = apply_template_to_single_file(
- applier, image_file, json_file, output_path,
- args.width, line_color
- )
-
- if success:
- print("\n✅ 处理完成!")
- else:
- print("\n❌ 处理失败!")
-
- elif args.image_dir and args.json_dir:
- # 批量模式
- image_dir = Path(args.image_dir)
- json_dir = Path(args.json_dir)
-
- if not image_dir.exists():
- print(f"❌ 错误: 图片目录不存在: {image_dir}")
- return
-
- if not json_dir.exists():
- print(f"❌ 错误: JSON目录不存在: {json_dir}")
- return
-
- print("\n🔧 批量处理模式")
- print(f"📄 模板: {template_path.name}")
-
- apply_template_batch(
- str(template_path),
- str(image_dir),
- str(json_dir),
- str(output_path),
- args.width,
- line_color
- )
-
- else:
- parser.print_help()
- print("\n❌ 错误: 请指定单文件模式或批量模式的参数")
- print("\n提示:")
- print(" 单文件模式: --image-file + --json-file")
- print(" 批量模式: --image-dir + --json-dir")
- if __name__ == "__main__":
- print("🚀 启动表格模板批量应用程序...")
-
- import sys
-
- if len(sys.argv) == 1:
- # 如果没有命令行参数,使用默认配置运行
- print("ℹ️ 未提供命令行参数,使用默认配置运行...")
-
- # 默认配置
- default_config = {
- "template": "output/table_structures/康强_北京农村商业银行_page_001_structure.json",
- "image-file": "/Users/zhch158/workspace/data/流水分析/康强_北京农村商业银行/ppstructurev3_client_results/康强_北京农村商业银行/康强_北京农村商业银行_page_002.png",
- "json-file": "/Users/zhch158/workspace/data/流水分析/康强_北京农村商业银行/ppstructurev3_client_results/康强_北京农村商业银行_page_002.json",
- "output-dir": "output/batch_results",
- "width": "2",
- "color": "black"
- }
-
- print("⚙️ 默认参数:")
- for key, value in default_config.items():
- print(f" --{key}: {value}")
-
- # 构造参数
- sys.argv = [sys.argv[0]]
- for key, value in default_config.items():
- sys.argv.extend([f"--{key}", str(value)])
-
- sys.exit(main())
|