| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- import re
- import os
- from pathlib import Path
- def normalize_financial_numbers(text: str) -> str:
- """
- 标准化财务数字:将全角字符转换为半角字符
-
- Args:
- text: 原始文本
-
- Returns:
- 标准化后的文本
- """
- if not text:
- return text
-
- # 定义全角到半角的映射
- fullwidth_to_halfwidth = {
- '0': '0', '1': '1', '2': '2', '3': '3', '4': '4',
- '5': '5', '6': '6', '7': '7', '8': '8', '9': '9',
- ',': ',', # 全角逗号转半角逗号
- '。': '.', # 全角句号转半角句号
- '.': '.', # 全角句点转半角句点
- ':': ':', # 全角冒号转半角冒号
- ';': ';', # 全角分号转半角分号
- '(': '(', # 全角左括号转半角左括号
- ')': ')', # 全角右括号转半角右括号
- '-': '-', # 全角减号转半角减号
- '+': '+', # 全角加号转半角加号
- '%': '%', # 全角百分号转半角百分号
- }
-
- # 第一步:执行基础字符替换
- normalized_text = text
- for fullwidth, halfwidth in fullwidth_to_halfwidth.items():
- normalized_text = normalized_text.replace(fullwidth, halfwidth)
-
- # 第二步:处理数字序列中的空格和分隔符
- # 修改正则表达式以匹配完整的数字序列,包括空格
- # 匹配模式:数字 + (空格? + 逗号 + 空格? + 数字)* + (空格? + 小数点 + 数字+)?
- number_sequence_pattern = r'(\d+(?:\s*[,,]\s*\d+)*(?:\s*[。..]\s*\d+)?)'
-
- def normalize_number_sequence(match):
- sequence = match.group(1)
-
- # 处理千分位分隔符周围的空格
- # 将 "数字 + 空格 + 逗号 + 空格 + 数字" 标准化为 "数字,数字"
- sequence = re.sub(r'(\d)\s*[,,]\s*(\d)', r'\1,\2', sequence)
-
- # 处理小数点周围的空格
- # 将 "数字 + 空格 + 小数点 + 空格 + 数字" 标准化为 "数字.数字"
- sequence = re.sub(r'(\d)\s*[。..]\s*(\d)', r'\1.\2', sequence)
-
- return sequence
-
- normalized_text = re.sub(number_sequence_pattern, normalize_number_sequence, normalized_text)
- return normalized_text
-
- def normalize_markdown_table(markdown_content: str) -> str:
- """
- 专门处理Markdown表格中的数字标准化
-
- Args:
- markdown_content: Markdown内容
-
- Returns:
- 标准化后的Markdown内容
- """
- # 使用BeautifulSoup处理HTML表格
- from bs4 import BeautifulSoup, Tag
-
- soup = BeautifulSoup(markdown_content, 'html.parser')
- tables = soup.find_all('table')
-
- for table in tables:
- if isinstance(table, Tag):
- cells = table.find_all(['td', 'th'])
- for cell in cells:
- if isinstance(cell, Tag):
- original_text = cell.get_text()
- normalized_text = normalize_financial_numbers(original_text)
-
- # 如果内容发生了变化,更新单元格内容
- if original_text != normalized_text:
- cell.string = normalized_text
-
- # 返回更新后的HTML
- return str(soup)
- def normalize_json_table(json_content: str) -> str:
- """
- 专门处理JSON格式OCR结果中表格的数字标准化
-
- Args:
- json_content: JSON格式的OCR结果内容
-
- Returns:
- 标准化后的JSON内容
- """
- """
- json_content 示例:
- [
- {
- "category": "Table",
- "text": "<table>...</table>"
- },
- {
- "category": "Text",
- "text": "Some other text"
- }
- ]
- """
- import json
-
- try:
- # 解析JSON内容
- data = json.loads(json_content) if isinstance(json_content, str) else json_content
-
- # 确保data是列表格式
- if not isinstance(data, list):
- return json_content
-
- # 遍历所有OCR结果项
- for item in data:
- if not isinstance(item, dict):
- continue
-
- # 检查是否是表格类型
- if item.get('category') == 'Table' and 'text' in item:
- table_html = item['text']
-
- # 使用BeautifulSoup处理HTML表格
- from bs4 import BeautifulSoup, Tag
-
- soup = BeautifulSoup(table_html, 'html.parser')
- tables = soup.find_all('table')
-
- for table in tables:
- if isinstance(table, Tag):
- cells = table.find_all(['td', 'th'])
- for cell in cells:
- if isinstance(cell, Tag):
- original_text = cell.get_text()
-
- # 应用数字标准化
- normalized_text = normalize_financial_numbers(original_text)
-
- # 如果内容发生了变化,更新单元格内容
- if original_text != normalized_text:
- cell.string = normalized_text
-
- # 更新item中的表格内容
- item['text'] = str(soup)
-
- # 同时标准化普通文本中的数字(如果需要)
- # elif 'text' in item:
- # original_text = item['text']
- # normalized_text = normalize_financial_numbers(original_text)
- # if original_text != normalized_text:
- # item['text'] = normalized_text
-
- # 返回标准化后的JSON字符串
- return json.dumps(data, ensure_ascii=False, indent=2)
-
- except json.JSONDecodeError as e:
- print(f"⚠️ JSON解析失败: {e}")
- return json_content
- except Exception as e:
- print(f"⚠️ JSON表格标准化失败: {e}")
- return json_content
- def normalize_json_file(file_path: str, output_path: str | None = None) -> str:
- """
- 标准化JSON文件中的表格数字
-
- Args:
- file_path: 输入JSON文件路径
- output_path: 输出文件路径,如果为None则覆盖原文件
-
- Returns:
- 标准化后的JSON内容
- """
- input_file = Path(file_path)
- output_file = Path(output_path) if output_path else input_file
-
- if not input_file.exists():
- raise FileNotFoundError(f"找不到文件: {file_path}")
-
- # 读取原始JSON文件
- with open(input_file, 'r', encoding='utf-8') as f:
- original_content = f.read()
-
- print(f"🔧 正在标准化JSON文件: {input_file.name}")
-
- # 标准化内容
- normalized_content = normalize_json_table(original_content)
-
- # 保存标准化后的文件
- with open(output_file, 'w', encoding='utf-8') as f:
- f.write(normalized_content)
-
- # 统计变化
- changes = sum(1 for o, n in zip(original_content, normalized_content) if o != n)
- if changes > 0:
- print(f"✅ 标准化了 {changes} 个字符")
-
- # 如果输出路径不同,也保存原始版本
- if output_path and output_path != file_path:
- original_backup = Path(output_path).parent / f"{Path(output_path).stem}_original.json"
- with open(original_backup, 'w', encoding='utf-8') as f:
- f.write(original_content)
- print(f"📄 原始版本已保存到: {original_backup}")
- else:
- print("ℹ️ 无需标准化(已是标准格式)")
-
- print(f"📄 标准化结果已保存到: {output_file}")
- return normalized_content
- if __name__ == "__main__":
- # 简单测试
- test_strings = [
- "28, 239, 305.48",
- "2023年净利润为28,239,305.48元",
- "总资产为1,234,567.89元",
- "负债总额为500,000.00元",
- "收入增长了10.5%,达到1,200,000元",
- "费用为300,000元",
- "利润率为15.2%",
- "现金流量为-50,000元",
- "股东权益为2,500,000.00元",
- "每股收益为3.25元",
- "市盈率为20.5倍",
- "营业收入为750,000元",
- "净资产收益率为12.3%",
- "总负债为1,200,000元",
- "流动比率为1.5倍",
- "速动比率为1.2倍",
- "资产负债率为40%",
- "存货周转率为6次/年",
- "应收账款周转率为8次/年",
- "固定资产周转率为2次/年",
- "总资产周转率为1.2次/年",
- "经营活动产生的现金流量净额为200,000元"
- ]
-
- for s in test_strings:
- print("原始: ", s)
- print("标准化: ", normalize_financial_numbers(s))
- print("-" * 50)
|