magic_pdf_parse_main_zhch.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. import copy
  2. import json
  3. import os
  4. from loguru import logger
  5. from magic_pdf.data.data_reader_writer import FileBasedDataWriter
  6. from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox
  7. from magic_pdf.pipe.OCRPipe import OCRPipe
  8. from magic_pdf.pipe.TXTPipe import TXTPipe
  9. from magic_pdf.pipe.UNIPipe import UNIPipe
  10. # todo: 设备类型选择 (?)
  11. from dotenv import load_dotenv; load_dotenv()
  12. print(f"os.environ['CUDA_VISIBLE_DEVICES']: {os.environ['CUDA_VISIBLE_DEVICES']}")
  13. print(f"os.environ['MINERU_TOOLS_CONFIG_JSON']: {os.environ['MINERU_TOOLS_CONFIG_JSON']}")
  14. def json_md_dump(
  15. pipe,
  16. md_writer,
  17. pdf_name,
  18. content_list,
  19. md_content,
  20. orig_model_list,
  21. ):
  22. # 写入模型结果到 model.json
  23. md_writer.write_string(
  24. f'{pdf_name}_model.json',
  25. json.dumps(orig_model_list, ensure_ascii=False, indent=4)
  26. )
  27. # 写入中间结果到 middle.json
  28. md_writer.write_string(
  29. f'{pdf_name}_middle.json',
  30. json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4)
  31. )
  32. # text文本结果写入到 conent_list.json
  33. md_writer.write_string(
  34. f'{pdf_name}_content_list.json',
  35. json.dumps(content_list, ensure_ascii=False, indent=4)
  36. )
  37. # 写入结果到 .md 文件中
  38. md_writer.write_string(
  39. f'{pdf_name}.md',
  40. md_content,
  41. )
  42. # 使用Pydantic定义report数据结构
  43. from pydantic import BaseModel
  44. import pandas as pd
  45. import re
  46. from magic_pdf.config.ocr_content_type import BlockType
  47. class Report(BaseModel):
  48. sheet_name: str
  49. dataframe: pd.DataFrame
  50. last_available_label: bool
  51. first_available_label: bool
  52. # Pydantic 无法为 pandas.core.frame.DataFrame 类型生成 schema,所以需要手动设置
  53. class Config:
  54. arbitrary_types_allowed = True
  55. def save_report(
  56. pipe: UNIPipe,
  57. excel_path: str
  58. ):
  59. """
  60. 保存报表数据
  61. """
  62. def merge_tables(prev_report: Report, next_report: Report) -> pd.DataFrame:
  63. # ... existing code ...
  64. if prev_report.dataframe is not None and next_report.dataframe is not None:
  65. # 判断前一个table是否是最后一个有效标签,下一个table是否是第一个有效标签
  66. if prev_report.last_available_label and next_report.first_available_label:
  67. # 判断2个dataframe的列数是否相同
  68. if prev_report.dataframe.shape[1] == next_report.dataframe.shape[1]:
  69. # 列数相同,则合并, 使用prev_report.dataframe.columns
  70. next_report.dataframe.columns = prev_report.dataframe.columns
  71. next_report.dataframe.reset_index(drop=True, inplace=True)
  72. # next_table_reindexed = next_report.dataframe.reindex(columns=prev_report.dataframe.columns)
  73. merged_table = pd.concat([prev_report.dataframe, next_report.dataframe], axis=0, ignore_index=True)
  74. return merged_table
  75. else:
  76. logger.error(f"列数不同,无法合并: {prev_report.sheet_name}(report.dataframe.shape[1]) 和 {next_report.sheet_name}(next_report.dataframe.shape[1])")
  77. return None
  78. report_list = [] # 初始化 report_list 为空列表
  79. pdf_info_list = pipe.pdf_mid_data['pdf_info']
  80. # 遍历pdf_info_list,获取每页的page_info
  81. for page_info in pdf_info_list:
  82. paras_of_layout = page_info.get('para_blocks')
  83. page_idx = page_info.get('page_idx')
  84. if not paras_of_layout:
  85. continue
  86. # 遍历每页的para_block, 每页有多个para_block,每个para_block只包含一个或0个table
  87. for block_idx, para_block in enumerate(paras_of_layout):
  88. para_type = para_block['type']
  89. if para_type == BlockType.Table:
  90. sheet_name = None
  91. dataframe = None
  92. for block in para_block['blocks']:
  93. # 遍历每个block,找到table_body和table_caption
  94. if block['type'] == BlockType.TableBody:
  95. # 将html转换为dataframe
  96. dataframe = pd.read_html(block['lines'][0]['spans'][0]['html'])[0]
  97. elif block['type'] == BlockType.TableCaption:
  98. title = block['lines'][0]['spans'][0]['content']
  99. # 如果title不为空,且title的最后一个字符是“表” 或者 结尾lowcase是“table”
  100. if title is not None and title != '' and (title[-1] == '表' or title.lower().endswith('table')):
  101. sheet_name = title
  102. if sheet_name is None:
  103. # 向上查找,类型是Title的para_block
  104. for title_block in reversed(paras_of_layout[:paras_of_layout.index(para_block)]):
  105. if title_block['type'] == BlockType.Title:
  106. title = title_block['lines'][0]['spans'][0]['content'].strip()
  107. # 如果title不为空,且title的最后一个字符是“表” 或者 结尾lowcase是“table”
  108. if title is not None and title != '' and (title[-1] == '表' or title.lower().endswith('table')):
  109. sheet_name = title
  110. break
  111. if dataframe is None:
  112. continue
  113. if sheet_name is None:
  114. sheet_name = f"Sheet_{page_idx}.{block_idx}"
  115. # 替换非法字符
  116. sheet_name = re.sub(r'[\[\]:*?/\\]', '', sheet_name)
  117. report = Report(sheet_name=sheet_name, dataframe=dataframe, last_available_label=False, first_available_label=False)
  118. if para_block == paras_of_layout[-1]:
  119. report.last_available_label = True
  120. if para_block == paras_of_layout[0]:
  121. report.first_available_label = True
  122. report_list.append(report)
  123. excel_writer = pd.ExcelWriter(excel_path, engine='xlsxwriter')
  124. for report in report_list:
  125. if report.dataframe is not None:
  126. report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False)
  127. excel_writer.close()
  128. merged_report_list = []
  129. prev_report = None
  130. for report in report_list:
  131. if prev_report is not None and prev_report.dataframe is not None:
  132. merged_table = merge_tables(prev_report, report)
  133. if merged_table is not None:
  134. prev_report.dataframe = merged_table
  135. continue
  136. else:
  137. merged_report_list.append(prev_report)
  138. prev_report = report
  139. merged_report_list.append(prev_report)
  140. merged_excel_path = excel_path.replace(".xlsx", "_merged.xlsx")
  141. excel_writer = pd.ExcelWriter(merged_excel_path, engine='xlsxwriter')
  142. for report in merged_report_list:
  143. report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False)
  144. logger.debug(f"保存报表: {report}")
  145. excel_writer.close()
  146. # 可视化
  147. def draw_visualization_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name):
  148. # 画布局框,附带排序结果
  149. draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
  150. # 画 span 框
  151. draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
  152. def pdf_parse_main(
  153. pdf_path: str,
  154. parse_method: str = 'auto',
  155. model_json_path: str = None,
  156. is_json_md_dump: bool = True,
  157. is_draw_visualization_bbox: bool = True,
  158. output_dir: str = None
  159. ):
  160. """执行从 pdf 转换到 json、md 的过程,输出 md 和 json 文件到 pdf 文件所在的目录.
  161. :param pdf_path: .pdf 文件的路径,可以是相对路径,也可以是绝对路径
  162. :param parse_method: 解析方法, 共 auto、ocr、txt 三种,默认 auto,如果效果不好,可以尝试 ocr
  163. :param model_json_path: 已经存在的模型数据文件,如果为空则使用内置模型,pdf 和 model_json 务必对应
  164. :param is_json_md_dump: 是否将解析后的数据写入到 .json 和 .md 文件中,默认 True,会将不同阶段的数据写入到不同的 .json 文件中(共3个.json文件),md内容会保存到 .md 文件中
  165. :param is_draw_visualization_bbox: 是否绘制可视化边界框,默认 True,会生成布局框和 span 框的图像
  166. :param output_dir: 输出结果的目录地址,会生成一个以 pdf 文件名命名的文件夹并保存所有结果
  167. """
  168. try:
  169. pdf_name = os.path.basename(pdf_path).split('.')[0]
  170. pdf_path_parent = os.path.dirname(pdf_path)
  171. if output_dir:
  172. output_path = os.path.join(output_dir, pdf_name)
  173. else:
  174. output_path = os.path.join(pdf_path_parent, pdf_name)
  175. output_image_path = os.path.join(output_path, 'images')
  176. # 获取图片的父路径,为的是以相对路径保存到 .md 和 conent_list.json 文件中
  177. image_path_parent = os.path.basename(output_image_path)
  178. pdf_bytes = open(pdf_path, 'rb').read() # 读取 pdf 文件的二进制数据
  179. orig_model_list = []
  180. if model_json_path:
  181. # 读取已经被模型解析后的pdf文件的 json 原始数据,list 类型
  182. model_json = json.loads(open(model_json_path, 'r', encoding='utf-8').read())
  183. orig_model_list = copy.deepcopy(model_json)
  184. else:
  185. model_json = []
  186. # 执行解析步骤
  187. image_writer, md_writer = FileBasedDataWriter(output_image_path), FileBasedDataWriter(output_path)
  188. # 选择解析方式
  189. if parse_method == 'auto':
  190. jso_useful_key = {'_pdf_type': '', 'model_list': model_json}
  191. pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
  192. elif parse_method == 'txt':
  193. pipe = TXTPipe(pdf_bytes, model_json, image_writer)
  194. elif parse_method == 'ocr':
  195. pipe = OCRPipe(pdf_bytes, model_json, image_writer)
  196. else:
  197. logger.error('unknown parse method, only auto, ocr, txt allowed')
  198. exit(1)
  199. # 执行分类
  200. pipe.pipe_classify()
  201. # 如果没有传入模型数据,则使用内置模型解析
  202. if len(model_json) == 0:
  203. pipe.pipe_analyze() # 解析
  204. orig_model_list = copy.deepcopy(pipe.model_list)
  205. # 执行解析
  206. pipe.pipe_parse()
  207. # 保存 text 和 md 格式的结果
  208. content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode='none')
  209. md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode='none')
  210. # 保存报表
  211. save_report(pipe, os.path.join(output_path, f'{pdf_name}.xlsx'))
  212. if is_json_md_dump:
  213. json_md_dump(pipe, md_writer, pdf_name, content_list, md_content, orig_model_list)
  214. if is_draw_visualization_bbox:
  215. draw_visualization_bbox(pipe.pdf_mid_data['pdf_info'], pdf_bytes, output_path, pdf_name)
  216. except Exception as e:
  217. logger.exception(e)
  218. # 测试
  219. if __name__ == '__main__':
  220. current_script_dir = os.path.dirname(os.path.abspath(__file__))
  221. # demo_names = ['demo1', 'demo2', 'small_ocr']
  222. demo_names = ['600916_中国黄金_2002年报_83_94']
  223. for name in demo_names:
  224. file_path = os.path.join(current_script_dir, f'{name}.pdf')
  225. pdf_parse_main(file_path, output_dir='./output.demo')