import copy import json import os from loguru import logger from magic_pdf.data.data_reader_writer import FileBasedDataWriter from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox from magic_pdf.pipe.OCRPipe import OCRPipe from magic_pdf.pipe.TXTPipe import TXTPipe from magic_pdf.pipe.UNIPipe import UNIPipe import pandas as pd from zhch.html_zhch import read_html_zhch # todo: 设备类型选择 (?) from dotenv import load_dotenv; load_dotenv() print(f"os.environ['CUDA_VISIBLE_DEVICES']: {os.environ['CUDA_VISIBLE_DEVICES']}") print(f"os.environ['MINERU_TOOLS_CONFIG_JSON']: {os.environ['MINERU_TOOLS_CONFIG_JSON']}") def json_md_dump( pipe, md_writer, pdf_name, content_list, md_content, orig_model_list, ): # 写入模型结果到 model.json md_writer.write_string( f'{pdf_name}_model.json', json.dumps(orig_model_list, ensure_ascii=False, indent=4) ) # 写入中间结果到 middle.json md_writer.write_string( f'{pdf_name}_middle.json', json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4) ) # text文本结果写入到 conent_list.json md_writer.write_string( f'{pdf_name}_content_list.json', json.dumps(content_list, ensure_ascii=False, indent=4) ) # 写入结果到 .md 文件中 md_writer.write_string( f'{pdf_name}.md', md_content, ) # 使用Pydantic定义report数据结构 from pydantic import BaseModel import re from magic_pdf.config.ocr_content_type import BlockType class Report(BaseModel): sheet_name: str dataframe: pd.DataFrame last_available_label: bool first_available_label: bool # Pydantic 无法为 pandas.core.frame.DataFrame 类型生成 schema,所以需要手动设置 class Config: arbitrary_types_allowed = True def save_report( pipe: UNIPipe, excel_path: str ): """ 保存报表数据 """ def merge_tables(prev_report: Report, next_report: Report) -> pd.DataFrame: # ... existing code ... if prev_report.dataframe is not None and next_report.dataframe is not None: # 判断前一个table是否是最后一个有效标签,下一个table是否是第一个有效标签 if prev_report.last_available_label and next_report.first_available_label: # 判断2个dataframe的列数是否相同 if prev_report.dataframe.shape[1] == next_report.dataframe.shape[1]: # 列数相同,则合并, 使用prev_report.dataframe.columns next_report.dataframe.columns = prev_report.dataframe.columns next_report.dataframe.reset_index(drop=True, inplace=True) # next_table_reindexed = next_report.dataframe.reindex(columns=prev_report.dataframe.columns) merged_table = pd.concat([prev_report.dataframe, next_report.dataframe], axis=0, ignore_index=True) return merged_table else: logger.error(f"列数不同,无法合并: {prev_report.sheet_name}(report.dataframe.shape[1]) 和 {next_report.sheet_name}(next_report.dataframe.shape[1])") return None report_list = [] # 初始化 report_list 为空列表 pdf_info_list = pipe.pdf_mid_data['pdf_info'] # 遍历pdf_info_list,获取每页的page_info for page_info in pdf_info_list: paras_of_layout = page_info.get('para_blocks') page_idx = page_info.get('page_idx') if not paras_of_layout: continue # 遍历每页的para_block, 每页有多个para_block,每个para_block只包含一个或0个table for block_idx, para_block in enumerate(paras_of_layout): para_type = para_block['type'] if para_type == BlockType.Table: sheet_name = None dataframe = None for block in para_block['blocks']: # 遍历每个block,找到table_body和table_caption if block['type'] == BlockType.TableBody: # 将html转换为dataframe # dataframe = pd.read_html(block['lines'][0]['spans'][0]['html'])[0] dataframe = read_html_zhch(block['lines'][0]['spans'][0]['html'], custom_args={ "colspan_single": ["header", "body"], "number_strip": True })[0] elif block['type'] == BlockType.TableCaption: title = block['lines'][0]['spans'][0]['content'] # 如果title不为空,且title的最后一个字符是“表” 或者 结尾lowcase是“table” if title is not None and title != '' and (title[-1] == '表' or title.lower().endswith('table')): sheet_name = title if sheet_name is None: # 向上查找,类型是Title的para_block for title_block in reversed(paras_of_layout[:paras_of_layout.index(para_block)]): if title_block['type'] == BlockType.Title: title = title_block['lines'][0]['spans'][0]['content'].strip() # 如果title不为空,且title的最后一个字符是“表” 或者 结尾lowcase是“table” if title is not None and title != '' and (title[-1] == '表' or title.lower().endswith('table')): sheet_name = title break if dataframe is None: continue if sheet_name is None: sheet_name = f"Sheet_{page_idx}.{block_idx}" # 替换非法字符 sheet_name = re.sub(r'[\[\]:*?/\\]', '', sheet_name) report = Report(sheet_name=sheet_name, dataframe=dataframe, last_available_label=False, first_available_label=False) if para_block == paras_of_layout[-1]: report.last_available_label = True if para_block == paras_of_layout[0]: report.first_available_label = True report_list.append(report) excel_writer = pd.ExcelWriter(excel_path, engine='xlsxwriter') for report in report_list: if report.dataframe is not None: # report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False, float_format="%.2f") report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False) excel_writer.close() merged_report_list = [] prev_report = None for report in report_list: if prev_report is not None and prev_report.dataframe is not None: merged_table = merge_tables(prev_report, report) if merged_table is not None: prev_report.dataframe = merged_table continue else: merged_report_list.append(prev_report) prev_report = report merged_report_list.append(prev_report) merged_excel_path = excel_path.replace(".xlsx", "_merged.xlsx") excel_writer = pd.ExcelWriter(merged_excel_path, engine='xlsxwriter') for report in merged_report_list: report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False) logger.debug(f"保存报表: {report}") excel_writer.close() # 可视化 def draw_visualization_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name): # 画布局框,附带排序结果 draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name) # 画 span 框 draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name) def pdf_parse_main( pdf_path: str, parse_method: str = 'auto', model_json_path: str = None, is_json_md_dump: bool = True, is_draw_visualization_bbox: bool = True, output_dir: str = None ): """执行从 pdf 转换到 json、md 的过程,输出 md 和 json 文件到 pdf 文件所在的目录. :param pdf_path: .pdf 文件的路径,可以是相对路径,也可以是绝对路径 :param parse_method: 解析方法, 共 auto、ocr、txt 三种,默认 auto,如果效果不好,可以尝试 ocr :param model_json_path: 已经存在的模型数据文件,如果为空则使用内置模型,pdf 和 model_json 务必对应 :param is_json_md_dump: 是否将解析后的数据写入到 .json 和 .md 文件中,默认 True,会将不同阶段的数据写入到不同的 .json 文件中(共3个.json文件),md内容会保存到 .md 文件中 :param is_draw_visualization_bbox: 是否绘制可视化边界框,默认 True,会生成布局框和 span 框的图像 :param output_dir: 输出结果的目录地址,会生成一个以 pdf 文件名命名的文件夹并保存所有结果 """ try: pdf_name = os.path.basename(pdf_path).split('.')[0] pdf_path_parent = os.path.dirname(pdf_path) if output_dir: output_path = os.path.join(output_dir, pdf_name) else: output_path = os.path.join(pdf_path_parent, pdf_name) output_image_path = os.path.join(output_path, 'images') # 获取图片的父路径,为的是以相对路径保存到 .md 和 conent_list.json 文件中 image_path_parent = os.path.basename(output_image_path) pdf_bytes = open(pdf_path, 'rb').read() # 读取 pdf 文件的二进制数据 orig_model_list = [] if model_json_path: # 读取已经被模型解析后的pdf文件的 json 原始数据,list 类型 model_json = json.loads(open(model_json_path, 'r', encoding='utf-8').read()) orig_model_list = copy.deepcopy(model_json) else: model_json = [] # 执行解析步骤 image_writer, md_writer = FileBasedDataWriter(output_image_path), FileBasedDataWriter(output_path) # 选择解析方式 if parse_method == 'auto': jso_useful_key = {'_pdf_type': '', 'model_list': model_json} pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer) elif parse_method == 'txt': pipe = TXTPipe(pdf_bytes, model_json, image_writer) elif parse_method == 'ocr': pipe = OCRPipe(pdf_bytes, model_json, image_writer) else: logger.error('unknown parse method, only auto, ocr, txt allowed') exit(1) # 执行分类 pipe.pipe_classify() # 如果没有传入模型数据,则使用内置模型解析 if len(model_json) == 0: pipe.pipe_analyze() # 解析 orig_model_list = copy.deepcopy(pipe.model_list) # 执行解析 pipe.pipe_parse() # 保存 text 和 md 格式的结果 content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode='none') md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode='none') # 保存报表 save_report(pipe, os.path.join(output_path, f'{pdf_name}.xlsx')) if is_json_md_dump: json_md_dump(pipe, md_writer, pdf_name, content_list, md_content, orig_model_list) if is_draw_visualization_bbox: draw_visualization_bbox(pipe.pdf_mid_data['pdf_info'], pdf_bytes, output_path, pdf_name) except Exception as e: logger.exception(e) # 测试 if __name__ == '__main__': current_script_dir = os.path.dirname(os.path.abspath(__file__)) # demo_names = ['demo1', 'demo2', 'small_ocr'] demo_names = ['600916_中国黄金_2002年报_83_94'] for name in demo_names: file_path = os.path.join(current_script_dir, f'{name}.pdf') pdf_parse_main(file_path, output_dir='./output.demo')