| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- import re
- import os
- from pathlib import Path
- from decimal import Decimal, InvalidOperation
- def _normalize_amount_token(token: str) -> str:
- """
- 规范单个金额 token 中逗号/小数点的用法。
- 仅在形态明显为金额时进行纠错,其他情况原样返回。
- """
- if not token:
- return token
- # 只处理包含数字的简单 token,避免带字母/其他符号的误改
- if not re.fullmatch(r"[+-]?\d[\d,\.]*\d", token):
- return token
- sign = ""
- core = token
- if core[0] in "+-":
- sign, core = core[0], core[1:]
- has_dot = "." in core
- has_comma = "," in core
- # 辅助: 尝试解析为 Decimal;失败则认为不安全,回退原值
- def _safe_decimal(s: str) -> bool:
- try:
- Decimal(s.replace(",", ""))
- return True
- except (InvalidOperation, ValueError):
- return False
- # 规则A:同时包含 . 和 ,,最后一个分隔符是逗号,且其后为 1-2 位数字
- if has_dot and has_comma:
- last_comma = core.rfind(",")
- last_dot = core.rfind(".")
- if last_comma > last_dot and last_comma != -1:
- frac = core[last_comma + 1 :]
- if 1 <= len(frac) <= 2 and frac.isdigit():
- # 先把所有点当作千分位逗号,再把最后一个逗号当作小数点
- temp = core.replace(".", ",")
- idx = temp.rfind(",")
- if idx != -1:
- candidate = temp[:idx] + "." + temp[idx + 1 :]
- if _safe_decimal(candidate):
- return sign + candidate
- # 规则B:只有 .,多个点,最后一段视为小数,其余为千分位
- if has_dot and not has_comma:
- parts = core.split(".")
- if len(parts) >= 3:
- last = parts[-1]
- ints = parts[:-1]
- if 1 <= len(last) <= 2 and all(len(p) == 3 for p in ints[1:]):
- candidate = ",".join(ints) + "." + last
- if _safe_decimal(candidate):
- return sign + candidate
- # 规则C:只有 ,,多个逗号,最后一段长度为 1-2 且前面为 3 位分组
- if has_comma and not has_dot:
- parts = core.split(",")
- if len(parts) >= 3:
- last = parts[-1]
- ints = parts[:-1]
- if 1 <= len(last) <= 2 and all(len(p) == 3 for p in ints[1:]):
- # 将最后一个逗号视为小数点
- idx = core.rfind(",")
- candidate = core[:idx] + "." + core[idx + 1 :]
- if _safe_decimal(candidate):
- return sign + candidate
- # 没有需要纠错的典型形态,直接返回原 token
- return token
- def normalize_financial_numbers(text: str) -> str:
- """
- 标准化财务数字:将全角字符转换为半角字符,并纠正常见的逗号/小数点错用。
- """
- if not text:
- return text
-
- # 定义全角到半角的映射
- fullwidth_to_halfwidth = {
- '0': '0', '1': '1', '2': '2', '3': '3', '4': '4',
- '5': '5', '6': '6', '7': '7', '8': '8', '9': '9',
- ',': ',', # 全角逗号转半角逗号
- '。': '.', # 全角句号转半角句号
- '.': '.', # 全角句点转半角句点
- ':': ':', # 全角冒号转半角冒号
- ';': ';', # 全角分号转半角分号
- '(': '(', # 全角左括号转半角左括号
- ')': ')', # 全角右括号转半角右括号
- '-': '-', # 全角减号转半角减号
- '+': '+', # 全角加号转半角加号
- '%': '%', # 全角百分号转半角百分号
- }
-
- # 第一步:执行基础字符替换(全角 -> 半角)
- normalized_text = text
- for fullwidth, halfwidth in fullwidth_to_halfwidth.items():
- normalized_text = normalized_text.replace(fullwidth, halfwidth)
-
- # 第二步:处理数字序列中的空格和分隔符(保留原有逻辑)
- number_sequence_pattern = r'(\d+(?:\s*[,,]\s*\d+)*(?:\s*[。..]\s*\d+)?)'
-
- def normalize_number_sequence(match):
- sequence = match.group(1)
- sequence = re.sub(r'(\d)\s*[,,]\s*(\d)', r'\1,\2', sequence)
- sequence = re.sub(r'(\d)\s*[。..]\s*(\d)', r'\1.\2', sequence)
- return sequence
-
- normalized_text = re.sub(number_sequence_pattern, normalize_number_sequence, normalized_text)
- # 第三步:对疑似金额 token 做逗号/小数点纠错
- amount_pattern = r'(?P<tok>[+-]?\d[\d,\.]*\d)'
- def _amount_sub(m: re.Match) -> str:
- tok = m.group('tok')
- return _normalize_amount_token(tok)
- normalized_text = re.sub(amount_pattern, _amount_sub, normalized_text)
- return normalized_text
-
- def normalize_markdown_table(markdown_content: str) -> str:
- """
- 专门处理Markdown表格中的数字标准化
-
- 注意:保留原始markdown中的换行符,只替换表格内的文本内容
-
- Args:
- markdown_content: Markdown内容
-
- Returns:
- 标准化后的Markdown内容
- """
- # 使用BeautifulSoup处理HTML表格
- from bs4 import BeautifulSoup, Tag
- import re
-
- # 使用正则表达式找到所有表格的位置,并保留其前后的内容
- # 匹配完整的HTML表格标签(包括嵌套)
- table_pattern = r'(<table[^>]*>.*?</table>)'
-
- def normalize_table_match(match):
- """处理单个表格匹配,保留原始格式,并追加数字标准化说明注释。"""
- table_html = match.group(1)
- original_table_html = table_html # 保存原始HTML用于比较
-
- # 解析表格HTML
- soup = BeautifulSoup(table_html, 'html.parser')
- tables = soup.find_all('table')
-
- # 记录本表格中所有数值修改
- changes: list[dict] = []
-
- for table in tables:
- if not isinstance(table, Tag):
- continue
- # 通过 tr / td(th) 计算行列位置
- for row_idx, tr in enumerate(table.find_all('tr')): # type: ignore[reportAttributeAccessIssue]
- cells = tr.find_all(['td', 'th']) # type: ignore[reportAttributeAccessIssue]
- for col_idx, cell in enumerate(cells):
- if not isinstance(cell, Tag):
- continue
- # 获取单元格纯文本
- original_text = cell.get_text()
- normalized_text = normalize_financial_numbers(original_text)
- if original_text == normalized_text:
- continue
- # 记录一条修改
- changes.append(
- {
- "row": row_idx,
- "col": col_idx,
- "old": original_text,
- "new": normalized_text,
- }
- )
- # 具体替换:保持原有逻辑,按文本节点逐个替换以保留空白
- from bs4.element import NavigableString
- for text_node in cell.find_all(string=True, recursive=True):
- if isinstance(text_node, NavigableString):
- text_str = str(text_node)
- if not text_str.strip():
- continue
- normalized = normalize_financial_numbers(text_str.strip())
- if normalized != text_str.strip():
- if text_str.strip() == text_str:
- text_node.replace_with(normalized)
- else:
- leading_ws = text_str[: len(text_str) - len(text_str.lstrip())]
- trailing_ws = text_str[len(text_str.rstrip()) :]
- text_node.replace_with(leading_ws + normalized + trailing_ws)
-
- # 如果没有任何数值修改,直接返回原始 HTML
- if not changes:
- return original_table_html
-
- # 获取修改后的HTML
- modified_html = str(soup)
-
- # 在表格后追加注释,说明哪些单元格被修改
- lines = ["<!-- 数字标准化说明:"]
- for ch in changes:
- lines.append(
- f" - [row={ch['row']},col={ch['col']}] {ch['old']} -> {ch['new']}"
- )
- lines.append("-->")
- comment = "\n".join(lines)
-
- return modified_html + "\n\n" + comment
-
- # 使用正则替换,只替换表格内容,保留其他部分(包括换行符)不变
- normalized_content = re.sub(table_pattern, normalize_table_match, markdown_content, flags=re.DOTALL)
-
- return normalized_content
- def normalize_json_table(json_content: str) -> str:
- """
- 专门处理JSON格式OCR结果中表格的数字标准化
-
- Args:
- json_content: JSON格式的OCR结果内容
-
- Returns:
- 标准化后的JSON内容
- """
- """
- json_content 示例:
- [
- {
- "category": "Table",
- "text": "<table>...</table>"
- },
- {
- "category": "Text",
- "text": "Some other text"
- }
- ]
- """
- import json
- from ast import literal_eval
-
- try:
- # 解析JSON内容
- data = json.loads(json_content) if isinstance(json_content, str) else json_content
-
- # 确保data是列表格式
- if not isinstance(data, list):
- return json_content
-
- # 遍历所有OCR结果项
- for item in data:
- if not isinstance(item, dict):
- continue
-
- # 检查是否是表格类型
- if item.get('category') == 'Table' and 'text' in item:
- table_html = item['text']
-
- # 使用BeautifulSoup处理HTML表格
- from bs4 import BeautifulSoup, Tag
-
- soup = BeautifulSoup(table_html, 'html.parser')
- tables = soup.find_all('table')
- table_changes: list[dict] = []
-
- for table in tables:
- if not isinstance(table, Tag):
- continue
- # 通过 tr / td(th) 计算行列位置
- for row_idx, tr in enumerate(table.find_all('tr')): # type: ignore[reportAttributeAccessIssue]
- cells = tr.find_all(['td', 'th']) # type: ignore[reportAttributeAccessIssue]
- for col_idx, cell in enumerate(cells):
- if not isinstance(cell, Tag):
- continue
- original_text = cell.get_text()
- normalized_text = normalize_financial_numbers(original_text)
- if original_text == normalized_text:
- continue
- # 记录本单元格的变更
- change: dict[str, object] = {
- "row": row_idx,
- "col": col_idx,
- "old": original_text,
- "new": normalized_text,
- }
- bbox_attr = cell.get("data-bbox")
- if isinstance(bbox_attr, str):
- try:
- change["bbox"] = literal_eval(bbox_attr)
- except Exception:
- change["bbox"] = bbox_attr
- table_changes.append(change)
- # 更新单元格内容(简单覆盖文本即可)
- cell.string = normalized_text
-
- # 更新 item 中的表格内容
- item['text'] = str(soup)
- if table_changes:
- item['number_normalization_changes'] = table_changes
-
- # 同时标准化普通文本中的数字(如果需要)
- # elif 'text' in item:
- # original_text = item['text']
- # normalized_text = normalize_financial_numbers(original_text)
- # if original_text != normalized_text:
- # item['text'] = normalized_text
-
- # 返回标准化后的JSON字符串
- return json.dumps(data, ensure_ascii=False, indent=2)
-
- except json.JSONDecodeError as e:
- print(f"⚠️ JSON解析失败: {e}")
- return json_content
- except Exception as e:
- print(f"⚠️ JSON表格标准化失败: {e}")
- return json_content
- def normalize_json_file(file_path: str, output_path: str | None = None) -> str:
- """
- 标准化JSON文件中的表格数字
-
- Args:
- file_path: 输入JSON文件路径
- output_path: 输出文件路径,如果为None则覆盖原文件
-
- Returns:
- 标准化后的JSON内容
- """
- input_file = Path(file_path)
- output_file = Path(output_path) if output_path else input_file
-
- if not input_file.exists():
- raise FileNotFoundError(f"找不到文件: {file_path}")
-
- # 读取原始JSON文件
- with open(input_file, 'r', encoding='utf-8') as f:
- original_content = f.read()
-
- print(f"🔧 正在标准化JSON文件: {input_file.name}")
-
- # 标准化内容
- normalized_content = normalize_json_table(original_content)
-
- # 保存标准化后的文件
- with open(output_file, 'w', encoding='utf-8') as f:
- f.write(normalized_content)
-
- # 统计变化
- changes = sum(1 for o, n in zip(original_content, normalized_content) if o != n)
- if changes > 0:
- print(f"✅ 标准化了 {changes} 个字符")
-
- # 如果输出路径不同,也保存原始版本
- if output_path and output_path != file_path:
- original_backup = Path(output_path).parent / f"{Path(output_path).stem}_original.json"
- with open(original_backup, 'w', encoding='utf-8') as f:
- f.write(original_content)
- print(f"📄 原始版本已保存到: {original_backup}")
- else:
- print("ℹ️ 无需标准化(已是标准格式)")
-
- print(f"📄 标准化结果已保存到: {output_file}")
- return normalized_content
-
- if __name__ == "__main__":
- """
- 简单验证:构造一份“故意打乱逗号/小数点”的 JSON / Markdown 示例,
- 并打印标准化前后的差异。
- """
- import json
- print("=== JSON 示例:金额格式纠错 + 变更记录 ===")
- demo_json_data = [
- {
- "category": "Table",
- "text": (
- "<table><tbody>"
- "<tr><td data-bbox=\"[0,0,10,10]\">项目</td>"
- "<td data-bbox=\"[10,0,20,10]\">2023 年12 月31 日</td></tr>"
- # 故意打乱的数字:应为 12,123,456.00 和 1,234,567.89
- "<tr><td data-bbox=\"[0,10,10,20]\">测试金额A</td>"
- "<td data-bbox=\"[10,10,20,20]\">12.123,456,00</td></tr>"
- "<tr><td data-bbox=\"[0,20,10,30]\">测试金额B</td>"
- "<td data-bbox=\"[10,20,20,30]\">1,234,567,89</td></tr>"
- "</tbody></table>"
- ),
- }
- ]
- demo_json_str = json.dumps(demo_json_data, ensure_ascii=False, indent=2)
- print("原始 JSON:")
- print(demo_json_str)
- normalized_json_str = normalize_json_table(demo_json_str)
- print("\n标准化后 JSON:")
- print(normalized_json_str)
- print("\n=== Markdown 示例:金额格式纠错 + 注释说明 ===")
- demo_md = """<table><tbody>
- <tr><td>项目</td><td>2023 年12 月31 日</td></tr>
- <tr><td>测试金额A</td><td>12.123,456,00</td></tr>
- <tr><td>测试金额B</td><td>1,234,567,89</td></tr>
- </tbody></table>
- """
- print("原始 Markdown:")
- print(demo_md)
- normalized_md = normalize_markdown_table(demo_md)
- print("\n标准化后 Markdown:")
- print(normalized_md)
|