| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270 |
- import copy
- import json
- import os
- from loguru import logger
- from magic_pdf.data.data_reader_writer import FileBasedDataWriter
- from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox
- from magic_pdf.pipe.OCRPipe import OCRPipe
- from magic_pdf.pipe.TXTPipe import TXTPipe
- from magic_pdf.pipe.UNIPipe import UNIPipe
- import pandas as pd
- from zhch.html_zhch import read_html_zhch
- # todo: 设备类型选择 (?)
- from dotenv import load_dotenv; load_dotenv()
- print(f"os.environ['CUDA_VISIBLE_DEVICES']: {os.environ['CUDA_VISIBLE_DEVICES']}")
- print(f"os.environ['MINERU_TOOLS_CONFIG_JSON']: {os.environ['MINERU_TOOLS_CONFIG_JSON']}")
- def json_md_dump(
- pipe,
- md_writer,
- pdf_name,
- content_list,
- md_content,
- orig_model_list,
- ):
- # 写入模型结果到 model.json
- md_writer.write_string(
- f'{pdf_name}_model.json',
- json.dumps(orig_model_list, ensure_ascii=False, indent=4)
- )
- # 写入中间结果到 middle.json
- md_writer.write_string(
- f'{pdf_name}_middle.json',
- json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4)
- )
- # text文本结果写入到 conent_list.json
- md_writer.write_string(
- f'{pdf_name}_content_list.json',
- json.dumps(content_list, ensure_ascii=False, indent=4)
- )
- # 写入结果到 .md 文件中
- md_writer.write_string(
- f'{pdf_name}.md',
- md_content,
- )
- # 使用Pydantic定义report数据结构
- from pydantic import BaseModel
- import re
- from magic_pdf.config.ocr_content_type import BlockType
- class Report(BaseModel):
- sheet_name: str
- dataframe: pd.DataFrame
- last_available_label: bool
- first_available_label: bool
- # Pydantic 无法为 pandas.core.frame.DataFrame 类型生成 schema,所以需要手动设置
- class Config:
- arbitrary_types_allowed = True
- def save_report(
- pipe: UNIPipe,
- excel_path: str
- ):
- """
- 保存报表数据
- """
- def merge_tables(prev_report: Report, next_report: Report) -> pd.DataFrame:
- # ... existing code ...
- if prev_report.dataframe is not None and next_report.dataframe is not None:
- # 判断前一个table是否是最后一个有效标签,下一个table是否是第一个有效标签
- if prev_report.last_available_label and next_report.first_available_label:
- # 判断2个dataframe的列数是否相同
- if prev_report.dataframe.shape[1] == next_report.dataframe.shape[1]:
- # 列数相同,则合并, 使用prev_report.dataframe.columns
- next_report.dataframe.columns = prev_report.dataframe.columns
- next_report.dataframe.reset_index(drop=True, inplace=True)
- # next_table_reindexed = next_report.dataframe.reindex(columns=prev_report.dataframe.columns)
- merged_table = pd.concat([prev_report.dataframe, next_report.dataframe], axis=0, ignore_index=True)
- return merged_table
- else:
- logger.error(f"列数不同,无法合并: {prev_report.sheet_name}(report.dataframe.shape[1]) 和 {next_report.sheet_name}(next_report.dataframe.shape[1])")
- return None
- report_list = [] # 初始化 report_list 为空列表
- pdf_info_list = pipe.pdf_mid_data['pdf_info']
- # 遍历pdf_info_list,获取每页的page_info
- for page_info in pdf_info_list:
- paras_of_layout = page_info.get('para_blocks')
- page_idx = page_info.get('page_idx')
- if not paras_of_layout:
- continue
- # 遍历每页的para_block, 每页有多个para_block,每个para_block只包含一个或0个table
- for block_idx, para_block in enumerate(paras_of_layout):
- para_type = para_block['type']
- if para_type == BlockType.Table:
- sheet_name = None
- dataframe = None
- for block in para_block['blocks']:
- # 遍历每个block,找到table_body和table_caption
- if block['type'] == BlockType.TableBody:
- # 将html转换为dataframe
- # dataframe = pd.read_html(block['lines'][0]['spans'][0]['html'])[0]
- dataframe = read_html_zhch(block['lines'][0]['spans'][0]['html'], custom_args={
- "colspan_single": ["header", "body"],
- "number_strip": True
- })[0]
- elif block['type'] == BlockType.TableCaption:
- title = block['lines'][0]['spans'][0]['content']
- # 如果title不为空,且title的最后一个字符是“表” 或者 结尾lowcase是“table”
- if title is not None and title != '' and (title[-1] == '表' or title.lower().endswith('table')):
- sheet_name = title
- if sheet_name is None:
- # 向上查找,类型是Title的para_block
- for title_block in reversed(paras_of_layout[:paras_of_layout.index(para_block)]):
- if title_block['type'] == BlockType.Title:
- title = title_block['lines'][0]['spans'][0]['content'].strip()
- # 如果title不为空,且title的最后一个字符是“表” 或者 结尾lowcase是“table”
- if title is not None and title != '' and (title[-1] == '表' or title.lower().endswith('table')):
- sheet_name = title
- break
- if dataframe is None:
- continue
- if sheet_name is None:
- sheet_name = f"Sheet_{page_idx}.{block_idx}"
- # 替换非法字符
- sheet_name = re.sub(r'[\[\]:*?/\\]', '', sheet_name)
- report = Report(sheet_name=sheet_name, dataframe=dataframe, last_available_label=False, first_available_label=False)
- if para_block == paras_of_layout[-1]:
- report.last_available_label = True
- if para_block == paras_of_layout[0]:
- report.first_available_label = True
- report_list.append(report)
-
- excel_writer = pd.ExcelWriter(excel_path, engine='xlsxwriter')
- for report in report_list:
- if report.dataframe is not None:
- # report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False, float_format="%.2f")
- report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False)
- excel_writer.close()
-
- merged_report_list = []
- prev_report = None
- for report in report_list:
- if prev_report is not None and prev_report.dataframe is not None:
- merged_table = merge_tables(prev_report, report)
- if merged_table is not None:
- prev_report.dataframe = merged_table
- continue
- else:
- merged_report_list.append(prev_report)
- prev_report = report
- merged_report_list.append(prev_report)
-
- merged_excel_path = excel_path.replace(".xlsx", "_merged.xlsx")
- excel_writer = pd.ExcelWriter(merged_excel_path, engine='xlsxwriter')
- for report in merged_report_list:
- report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False)
- logger.debug(f"保存报表: {report}")
- excel_writer.close()
- # 可视化
- def draw_visualization_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name):
- # 画布局框,附带排序结果
- draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
- # 画 span 框
- draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
- def pdf_parse_main(
- pdf_path: str,
- parse_method: str = 'auto',
- model_json_path: str = None,
- is_json_md_dump: bool = True,
- is_draw_visualization_bbox: bool = True,
- output_dir: str = None
- ):
- """执行从 pdf 转换到 json、md 的过程,输出 md 和 json 文件到 pdf 文件所在的目录.
- :param pdf_path: .pdf 文件的路径,可以是相对路径,也可以是绝对路径
- :param parse_method: 解析方法, 共 auto、ocr、txt 三种,默认 auto,如果效果不好,可以尝试 ocr
- :param model_json_path: 已经存在的模型数据文件,如果为空则使用内置模型,pdf 和 model_json 务必对应
- :param is_json_md_dump: 是否将解析后的数据写入到 .json 和 .md 文件中,默认 True,会将不同阶段的数据写入到不同的 .json 文件中(共3个.json文件),md内容会保存到 .md 文件中
- :param is_draw_visualization_bbox: 是否绘制可视化边界框,默认 True,会生成布局框和 span 框的图像
- :param output_dir: 输出结果的目录地址,会生成一个以 pdf 文件名命名的文件夹并保存所有结果
- """
- try:
- pdf_name = os.path.basename(pdf_path).split('.')[0]
- pdf_path_parent = os.path.dirname(pdf_path)
- if output_dir:
- output_path = os.path.join(output_dir, pdf_name)
- else:
- output_path = os.path.join(pdf_path_parent, pdf_name)
- output_image_path = os.path.join(output_path, 'images')
- # 获取图片的父路径,为的是以相对路径保存到 .md 和 conent_list.json 文件中
- image_path_parent = os.path.basename(output_image_path)
- pdf_bytes = open(pdf_path, 'rb').read() # 读取 pdf 文件的二进制数据
- orig_model_list = []
- if model_json_path:
- # 读取已经被模型解析后的pdf文件的 json 原始数据,list 类型
- model_json = json.loads(open(model_json_path, 'r', encoding='utf-8').read())
- orig_model_list = copy.deepcopy(model_json)
- else:
- model_json = []
- # 执行解析步骤
- image_writer, md_writer = FileBasedDataWriter(output_image_path), FileBasedDataWriter(output_path)
- # 选择解析方式
- if parse_method == 'auto':
- jso_useful_key = {'_pdf_type': '', 'model_list': model_json}
- pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
- elif parse_method == 'txt':
- pipe = TXTPipe(pdf_bytes, model_json, image_writer)
- elif parse_method == 'ocr':
- pipe = OCRPipe(pdf_bytes, model_json, image_writer)
- else:
- logger.error('unknown parse method, only auto, ocr, txt allowed')
- exit(1)
- # 执行分类
- pipe.pipe_classify()
- # 如果没有传入模型数据,则使用内置模型解析
- if len(model_json) == 0:
- pipe.pipe_analyze() # 解析
- orig_model_list = copy.deepcopy(pipe.model_list)
- # 执行解析
- pipe.pipe_parse()
- # 保存 text 和 md 格式的结果
- content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode='none')
- md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode='none')
- # 保存报表
- save_report(pipe, os.path.join(output_path, f'{pdf_name}.xlsx'))
- if is_json_md_dump:
- json_md_dump(pipe, md_writer, pdf_name, content_list, md_content, orig_model_list)
- if is_draw_visualization_bbox:
- draw_visualization_bbox(pipe.pdf_mid_data['pdf_info'], pdf_bytes, output_path, pdf_name)
- except Exception as e:
- logger.exception(e)
- # 测试
- if __name__ == '__main__':
- current_script_dir = os.path.dirname(os.path.abspath(__file__))
- # demo_names = ['demo1', 'demo2', 'small_ocr']
- demo_names = ['600916_中国黄金_2002年报_83_94']
- for name in demo_names:
- file_path = os.path.join(current_script_dir, f'{name}.pdf')
- pdf_parse_main(file_path, output_dir='./output.demo')
|