import re import os from pathlib import Path def _normalize_amount_token(token: str) -> str: """ 规范单个金额 token 中逗号/小数点的用法,统一输出美式格式(千分位逗号 + 点小数)。 算法: 1. 找小数分隔符:优先取最后一个 '.'(若其后恰好为 1-2 位纯数字), 次选最后一个 ','(同条件);均不满足则视为纯整数。 2. 整数部分去除所有逗号和点,得到纯数字串,重新按三位一组插入千分位逗号。 3. 与小数部分拼接,统一输出 xxx,xxx.xx 格式。 """ if not token: return token # 只处理含分隔符的数字串,避免误改年份/ID 等纯数字 if not re.fullmatch(r"[+-]?\d[\d,\.]*\d", token): return token if "," not in token and "." not in token: return token sign = "" core = token if core[0] in "+-": sign, core = core[0], core[1:] # 条件1:去符号后为纯整数(无分隔符),无需处理 if core.isdigit(): return token # 条件2:最后一个小数点之前无逗号/小数点(整数部分是纯数字)→ 已是正确小数格式,直接返回 dot_pos = core.rfind('.') if dot_pos != -1 and core[:dot_pos].isdigit() and core[dot_pos + 1:].isdigit(): return token # 步骤 1:确定小数分隔符('.' 优先于 ',') dec_digits: str | None = None int_part = core for sep in (".", ","): pos = core.rfind(sep) if pos == -1: continue after = core[pos + 1 :] if 1 <= len(after) <= 2 and after.isdigit(): dec_digits = after int_part = core[:pos] break # 步骤 2:整数部分去除所有分隔符,得到纯数字串 int_digits = re.sub(r"[,.]", "", int_part) if not int_digits or not int_digits.isdigit(): return token # 无法解析,保留原样 # 步骤 2.5:整数部分本身没有分隔符(如 1101,55 中的 1101) # → 原数字未使用千分位,只修正小数点符号,不添加千分位 if int_part == int_digits: result = sign + int_digits if dec_digits is not None: result += "." + dec_digits return result # 步骤 3:重新做千分位分组 n = len(int_digits) rem = n % 3 or 3 groups = [int_digits[:rem]] + [int_digits[i : i + 3] for i in range(rem, n, 3)] result = sign + ",".join(groups) if dec_digits is not None: result += "." + dec_digits return result def normalize_financial_numbers(text: str) -> str: """ 标准化财务数字:将全角字符转换为半角字符,并纠正常见的逗号/小数点错用。 """ if not text: return text # 定义全角到半角的映射 fullwidth_to_halfwidth = { '0': '0', '1': '1', '2': '2', '3': '3', '4': '4', '5': '5', '6': '6', '7': '7', '8': '8', '9': '9', ',': ',', # 全角逗号转半角逗号 '。': '.', # 全角句号转半角句号 '.': '.', # 全角句点转半角句点 ':': ':', # 全角冒号转半角冒号 ';': ';', # 全角分号转半角分号 '(': '(', # 全角左括号转半角左括号 ')': ')', # 全角右括号转半角右括号 '-': '-', # 全角减号转半角减号 '+': '+', # 全角加号转半角加号 '%': '%', # 全角百分号转半角百分号 } # 第一步:执行基础字符替换(全角 -> 半角) normalized_text = text for fullwidth, halfwidth in fullwidth_to_halfwidth.items(): normalized_text = normalized_text.replace(fullwidth, halfwidth) # 第二步:处理数字序列中的空格和分隔符(保留原有逻辑) number_sequence_pattern = r'(\d+(?:\s*[,,]\s*\d+)*(?:\s*[。..]\s*\d+)?)' def normalize_number_sequence(match): sequence = match.group(1) sequence = re.sub(r'(\d)\s*[,,]\s*(\d)', r'\1,\2', sequence) sequence = re.sub(r'(\d)\s*[。..]\s*(\d)', r'\1.\2', sequence) return sequence normalized_text = re.sub(number_sequence_pattern, normalize_number_sequence, normalized_text) # 第三步:对疑似金额 token 做逗号/小数点纠错 amount_pattern = r'(?P[+-]?\d[\d,\.]*\d)' def _amount_sub(m: re.Match) -> str: tok = m.group('tok') return _normalize_amount_token(tok) normalized_text = re.sub(amount_pattern, _amount_sub, normalized_text) return normalized_text def normalize_markdown_table(markdown_content: str) -> str: """ 专门处理Markdown表格中的数字标准化 注意:保留原始markdown中的换行符,只替换表格内的文本内容 Args: markdown_content: Markdown内容 Returns: 标准化后的Markdown内容 """ # 使用BeautifulSoup处理HTML表格 from bs4 import BeautifulSoup, Tag import re # 使用正则表达式找到所有表格的位置,并保留其前后的内容 # 匹配完整的HTML表格标签(包括嵌套) table_pattern = r'(]*>.*?)' def normalize_table_match(match): """处理单个表格匹配,保留原始格式,并追加数字标准化说明注释。""" table_html = match.group(1) original_table_html = table_html # 保存原始HTML用于比较 # 解析表格HTML soup = BeautifulSoup(table_html, 'html.parser') tables = soup.find_all('table') # 记录本表格中所有数值修改 changes: list[dict] = [] for table in tables: if not isinstance(table, Tag): continue # 通过 tr / td(th) 计算行列位置 for row_idx, tr in enumerate(table.find_all('tr')): # type: ignore[reportAttributeAccessIssue] cells = tr.find_all(['td', 'th']) # type: ignore[reportAttributeAccessIssue] for col_idx, cell in enumerate(cells): if not isinstance(cell, Tag): continue # 与 normalize_json_table 一致:整格取文本、只标准化一次、再写回 original_text = cell.get_text() normalized_text = normalize_financial_numbers(original_text) if original_text == normalized_text: continue # 记录一条修改 changes.append( { "row": row_idx, "col": col_idx, "old": original_text, "new": normalized_text, } ) # 整格替换为标准化后的文本(与 normalize_json_table 的 cell.string = normalized_text 一致) cell.string = normalized_text # 如果没有任何数值修改,直接返回原始 HTML if not changes: return original_table_html # 获取修改后的HTML modified_html = str(soup) # 在表格后追加注释,说明哪些单元格被修改 lines = ["") comment = "\n".join(lines) return modified_html + "\n\n" + comment # 使用正则替换,只替换表格内容,保留其他部分(包括换行符)不变 normalized_content = re.sub(table_pattern, normalize_table_match, markdown_content, flags=re.DOTALL) return normalized_content def normalize_json_table( json_content: str, *, table_type_key: str = "category", table_type_value: str = "Table", html_key: str = "text", cells_key: str | None = None, ) -> str: """ 专门处理JSON格式OCR结果中表格的数字标准化。 通过参数指定提取用的 key,以兼容不同 OCR 工具的 JSON 结构。 Args: json_content: JSON格式的OCR结果内容(字符串或已解析的 list) table_type_key: 用于判断“是否为表格”的字段名,如 "type" 或 "category" table_type_value: 上述字段等于该值时视为表格,如 "table" 或 "Table" html_key: 存放表格 HTML 的字段名,如 "table_body" 或 "text" cells_key: 存放单元格列表的字段名,如 "table_cells";为 None 则不处理 cells, 仅标准化 html_key 中的表格 Returns: 标准化后的JSON内容(字符串) 常见格式示例: - 旧格式: category="Table", html 在 "text" normalize_json_table(s) # 默认即此 - mineru_vllm_results_cell_bbox: type="table", html 在 "table_body", cells 在 "table_cells" normalize_json_table(s, table_type_key="type", table_type_value="table", html_key="table_body", cells_key="table_cells") """ import json from ast import literal_eval try: data = json.loads(json_content) if isinstance(json_content, str) else json_content if not isinstance(data, list): return json_content for item in data: if not isinstance(item, dict): continue # 按参数判断是否为表格项,且包含 HTML if item.get(table_type_key) != table_type_value or html_key not in item: continue table_html = item[html_key] if not table_html or not isinstance(table_html, str): continue from bs4 import BeautifulSoup, Tag soup = BeautifulSoup(table_html, "html.parser") tables = soup.find_all("table") table_changes: list[dict] = [] for table in tables: if not isinstance(table, Tag): continue for row_idx, tr in enumerate(table.find_all("tr")): # type: ignore[reportAttributeAccessIssue] cells_tag = tr.find_all(["td", "th"]) # type: ignore[reportAttributeAccessIssue] for col_idx, cell in enumerate(cells_tag): if not isinstance(cell, Tag): continue original_text = cell.get_text() normalized_text = normalize_financial_numbers(original_text) if original_text == normalized_text: continue change: dict[str, object] = { "row": row_idx, "col": col_idx, "old": original_text, "new": normalized_text, } bbox_attr = cell.get("data-bbox") if isinstance(bbox_attr, str): try: change["bbox"] = literal_eval(bbox_attr) except Exception: change["bbox"] = bbox_attr table_changes.append(change) cell.string = normalized_text # 写回 HTML item[html_key] = str(soup) if table_changes: item["number_normalization_changes"] = table_changes # 若指定了 cells_key,同时标准化 cells 中每格的 text(及 matched_text) # for key in ("text", "matched_text"): table_cell_text_keys = ["text"] if cells_key and cells_key in item and isinstance(item[cells_key], list): for cell in item[cells_key]: if not isinstance(cell, dict): continue for key in table_cell_text_keys: if key not in cell or not isinstance(cell[key], str): continue orig = cell[key] norm = normalize_financial_numbers(orig) if norm != orig: cell[key] = norm return json.dumps(data, ensure_ascii=False, indent=2) except json.JSONDecodeError as e: print(f"⚠️ JSON解析失败: {e}") return json_content except Exception as e: print(f"⚠️ JSON表格标准化失败: {e}") return json_content def normalize_json_file( file_path: str, output_path: str | None = None, *, table_type_key: str = "category", table_type_value: str = "Table", html_key: str = "text", cells_key: str | None = None, ) -> str: """ 标准化JSON文件中的表格数字。 提取表格时使用的 key 可通过参数指定,以兼容不同 OCR 工具。 Args: file_path: 输入JSON文件路径 output_path: 输出文件路径,如果为None则覆盖原文件 table_type_key: 判断表格的字段名(见 normalize_json_table) table_type_value: 判断表格的字段值 html_key: 表格 HTML 所在字段名 cells_key: 单元格列表所在字段名,None 表示不处理 cells Returns: 标准化后的JSON内容 """ input_file = Path(file_path) output_file = Path(output_path) if output_path else input_file if not input_file.exists(): raise FileNotFoundError(f"找不到文件: {file_path}") with open(input_file, "r", encoding="utf-8") as f: original_content = f.read() print(f"🔧 正在标准化JSON文件: {input_file.name}") normalized_content = normalize_json_table( original_content, table_type_key=table_type_key, table_type_value=table_type_value, html_key=html_key, cells_key=cells_key, ) # 保存标准化后的文件 with open(output_file, 'w', encoding='utf-8') as f: f.write(normalized_content) # 统计变化 changes = sum(1 for o, n in zip(original_content, normalized_content) if o != n) if changes > 0: print(f"✅ 标准化了 {changes} 个字符") # 如果输出路径不同,也保存原始版本 if output_path and output_path != file_path: original_backup = Path(output_path).parent / f"{Path(output_path).stem}_original.json" with open(original_backup, 'w', encoding='utf-8') as f: f.write(original_content) print(f"📄 原始版本已保存到: {original_backup}") else: print("ℹ️ 无需标准化(已是标准格式)") print(f"📄 标准化结果已保存到: {output_file}") return normalized_content if __name__ == "__main__": """ 简单验证:构造一份“故意打乱逗号/小数点”的 JSON / Markdown 示例, 并打印标准化前后的差异。 """ import json print("=== JSON 示例:金额格式纠错 + 变更记录 ===") demo_json_data = [ { "category": "Table", "text": ( "" "" "" # 故意打乱的数字:应为 12,123,456.00 和 1,234,567.89 "" "" "" "" "" "" "" "" "
项目2023 年12 月31 日
测试金额A12.123,456,00
测试金额B1,234,567,89
测试金额C301,55
测试金额D1.068.987,094.02
" ), } ] demo_json_str = json.dumps(demo_json_data, ensure_ascii=False, indent=2) print("原始 JSON:") print(demo_json_str) normalized_json_str = normalize_json_table(demo_json_str) print("\n标准化后 JSON:") print(normalized_json_str) print("\n=== Markdown 示例:金额格式纠错 + 注释说明 ===") demo_md = """
项目2023 年12 月31 日
测试金额A12.123,456,00
测试金额B1,234,567,89
测试金额C301,55
测试金额D1.068.987,094.02
""" print("原始 Markdown:") print(demo_md) normalized_md = normalize_markdown_table(demo_md) print("\n标准化后 Markdown:") print(normalized_md) cases = [ # A 类:标准美式格式,不应被修改 ("10,000.00", "10,000.00"), ("67,455.00", "67,455.00"), ("89,400.00", "89,400.00"), ("100,200.00", "100,200.00"), ("494,339.63", "494,339.63"), ("1,179.05", "1,179.05"), ("27,396.05", "27,396.05"), # B 类:混合/大数格式,需被修正 ("19.879,111.45", "19,879,111.45"), ("27.072,795.05", "27,072,795.05"), ("468.348,422.85", "468,348,422.85"), ("4740,251.56", "4,740,251.56"), # C 类:多余分隔符 ("585,515.936.19", "585,515,936.19"), ("22,240.761.60", "22,240,761.60"), ("198,757.280.38", "198,757,280.38"), ("618,846.219.71", "618,846,219.71"), # 原 demo 案例 ("12.123,456,00", "12,123,456.00"), ("1,234,567,89", "1,234,567.89"), ("301,55", "301.55"), ("1.068.987,094.02", "1,068,987,094.02"), # 标准欧洲格式 ("1.234,56", "1,234.56"), ("1.234.567,89", "1,234,567.89"), ] ok = fail = 0 for inp, expected in cases: got = _normalize_amount_token(inp) status = "✅" if got == expected else "❌" if got != expected: fail += 1 print(f"{status} {inp!r:30s} → {got!r}" + (f" (期望 {expected!r})" if got != expected else "")) print(f"\n共 {ok+fail} 个,通过 {len(cases)-fail},失败 {fail}")