magic_pdf_parse_main_zhch.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. import copy
  2. import json
  3. import os
  4. from loguru import logger
  5. from magic_pdf.data.data_reader_writer import FileBasedDataWriter
  6. from magic_pdf.libs.draw_bbox import draw_layout_bbox, draw_span_bbox
  7. from magic_pdf.pipe.OCRPipe import OCRPipe
  8. from magic_pdf.pipe.TXTPipe import TXTPipe
  9. from magic_pdf.pipe.UNIPipe import UNIPipe
  10. import pandas as pd
  11. from zhch.html_zhch import read_html_zhch
  12. # todo: 设备类型选择 (?)
  13. from dotenv import load_dotenv; load_dotenv()
  14. print(f"os.environ['CUDA_VISIBLE_DEVICES']: {os.environ['CUDA_VISIBLE_DEVICES']}")
  15. print(f"os.environ['MINERU_TOOLS_CONFIG_JSON']: {os.environ['MINERU_TOOLS_CONFIG_JSON']}")
  16. def json_md_dump(
  17. pipe,
  18. md_writer,
  19. pdf_name,
  20. content_list,
  21. md_content,
  22. orig_model_list,
  23. ):
  24. # 写入模型结果到 model.json
  25. md_writer.write_string(
  26. f'{pdf_name}_model.json',
  27. json.dumps(orig_model_list, ensure_ascii=False, indent=4)
  28. )
  29. # 写入中间结果到 middle.json
  30. md_writer.write_string(
  31. f'{pdf_name}_middle.json',
  32. json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4)
  33. )
  34. # text文本结果写入到 conent_list.json
  35. md_writer.write_string(
  36. f'{pdf_name}_content_list.json',
  37. json.dumps(content_list, ensure_ascii=False, indent=4)
  38. )
  39. # 写入结果到 .md 文件中
  40. md_writer.write_string(
  41. f'{pdf_name}.md',
  42. md_content,
  43. )
  44. # 使用Pydantic定义report数据结构
  45. from pydantic import BaseModel
  46. import re
  47. from magic_pdf.config.ocr_content_type import BlockType
  48. class Report(BaseModel):
  49. sheet_name: str
  50. dataframe: pd.DataFrame
  51. last_available_label: bool
  52. first_available_label: bool
  53. # Pydantic 无法为 pandas.core.frame.DataFrame 类型生成 schema,所以需要手动设置
  54. class Config:
  55. arbitrary_types_allowed = True
  56. def save_report(
  57. pipe: UNIPipe,
  58. excel_path: str
  59. ):
  60. """
  61. 保存报表数据
  62. """
  63. def merge_tables(prev_report: Report, next_report: Report) -> pd.DataFrame:
  64. # ... existing code ...
  65. if prev_report.dataframe is not None and next_report.dataframe is not None:
  66. # 判断前一个table是否是最后一个有效标签,下一个table是否是第一个有效标签
  67. if prev_report.last_available_label and next_report.first_available_label:
  68. # 判断2个dataframe的列数是否相同
  69. if prev_report.dataframe.shape[1] == next_report.dataframe.shape[1]:
  70. # 列数相同,则合并, 使用prev_report.dataframe.columns
  71. next_report.dataframe.columns = prev_report.dataframe.columns
  72. next_report.dataframe.reset_index(drop=True, inplace=True)
  73. # next_table_reindexed = next_report.dataframe.reindex(columns=prev_report.dataframe.columns)
  74. merged_table = pd.concat([prev_report.dataframe, next_report.dataframe], axis=0, ignore_index=True)
  75. return merged_table
  76. else:
  77. logger.error(f"列数不同,无法合并: {prev_report.sheet_name}(report.dataframe.shape[1]) 和 {next_report.sheet_name}(next_report.dataframe.shape[1])")
  78. return None
  79. report_list = [] # 初始化 report_list 为空列表
  80. pdf_info_list = pipe.pdf_mid_data['pdf_info']
  81. # 遍历pdf_info_list,获取每页的page_info
  82. for page_info in pdf_info_list:
  83. paras_of_layout = page_info.get('para_blocks')
  84. page_idx = page_info.get('page_idx')
  85. if not paras_of_layout:
  86. continue
  87. # 遍历每页的para_block, 每页有多个para_block,每个para_block只包含一个或0个table
  88. for block_idx, para_block in enumerate(paras_of_layout):
  89. para_type = para_block['type']
  90. if para_type == BlockType.Table:
  91. sheet_name = None
  92. dataframe = None
  93. for block in para_block['blocks']:
  94. # 遍历每个block,找到table_body和table_caption
  95. if block['type'] == BlockType.TableBody:
  96. # 将html转换为dataframe
  97. # dataframe = pd.read_html(block['lines'][0]['spans'][0]['html'])[0]
  98. dataframe = read_html_zhch(block['lines'][0]['spans'][0]['html'], custom_args={
  99. "colspan_single": ["header", "body"],
  100. "number_strip": True
  101. })[0]
  102. elif block['type'] == BlockType.TableCaption:
  103. title = block['lines'][0]['spans'][0]['content']
  104. # 如果title不为空,且title的最后一个字符是“表” 或者 结尾lowcase是“table”
  105. if title is not None and title != '' and (title[-1] == '表' or title.lower().endswith('table')):
  106. sheet_name = title
  107. if sheet_name is None:
  108. # 向上查找,类型是Title的para_block
  109. for title_block in reversed(paras_of_layout[:paras_of_layout.index(para_block)]):
  110. if title_block['type'] == BlockType.Title:
  111. title = title_block['lines'][0]['spans'][0]['content'].strip()
  112. # 如果title不为空,且title的最后一个字符是“表” 或者 结尾lowcase是“table”
  113. if title is not None and title != '' and (title[-1] == '表' or title.lower().endswith('table')):
  114. sheet_name = title
  115. break
  116. if dataframe is None:
  117. continue
  118. if sheet_name is None:
  119. sheet_name = f"Sheet_{page_idx}.{block_idx}"
  120. # 替换非法字符
  121. sheet_name = re.sub(r'[\[\]:*?/\\]', '', sheet_name)
  122. report = Report(sheet_name=sheet_name, dataframe=dataframe, last_available_label=False, first_available_label=False)
  123. if para_block == paras_of_layout[-1]:
  124. report.last_available_label = True
  125. if para_block == paras_of_layout[0]:
  126. report.first_available_label = True
  127. report_list.append(report)
  128. excel_writer = pd.ExcelWriter(excel_path, engine='xlsxwriter')
  129. for report in report_list:
  130. if report.dataframe is not None:
  131. # report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False, float_format="%.2f")
  132. report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False)
  133. excel_writer.close()
  134. merged_report_list = []
  135. prev_report = None
  136. for report in report_list:
  137. if prev_report is not None and prev_report.dataframe is not None:
  138. merged_table = merge_tables(prev_report, report)
  139. if merged_table is not None:
  140. prev_report.dataframe = merged_table
  141. continue
  142. else:
  143. merged_report_list.append(prev_report)
  144. prev_report = report
  145. merged_report_list.append(prev_report)
  146. merged_excel_path = excel_path.replace(".xlsx", "_merged.xlsx")
  147. excel_writer = pd.ExcelWriter(merged_excel_path, engine='xlsxwriter')
  148. for report in merged_report_list:
  149. report.dataframe.to_excel(excel_writer, sheet_name=report.sheet_name, index=False)
  150. logger.debug(f"保存报表: {report}")
  151. excel_writer.close()
  152. # 可视化
  153. def draw_visualization_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name):
  154. # 画布局框,附带排序结果
  155. draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
  156. # 画 span 框
  157. draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, pdf_file_name)
  158. def pdf_parse_main(
  159. pdf_path: str,
  160. parse_method: str = 'auto',
  161. model_json_path: str = None,
  162. is_json_md_dump: bool = True,
  163. is_draw_visualization_bbox: bool = True,
  164. output_dir: str = None
  165. ):
  166. """执行从 pdf 转换到 json、md 的过程,输出 md 和 json 文件到 pdf 文件所在的目录.
  167. :param pdf_path: .pdf 文件的路径,可以是相对路径,也可以是绝对路径
  168. :param parse_method: 解析方法, 共 auto、ocr、txt 三种,默认 auto,如果效果不好,可以尝试 ocr
  169. :param model_json_path: 已经存在的模型数据文件,如果为空则使用内置模型,pdf 和 model_json 务必对应
  170. :param is_json_md_dump: 是否将解析后的数据写入到 .json 和 .md 文件中,默认 True,会将不同阶段的数据写入到不同的 .json 文件中(共3个.json文件),md内容会保存到 .md 文件中
  171. :param is_draw_visualization_bbox: 是否绘制可视化边界框,默认 True,会生成布局框和 span 框的图像
  172. :param output_dir: 输出结果的目录地址,会生成一个以 pdf 文件名命名的文件夹并保存所有结果
  173. """
  174. try:
  175. pdf_name = os.path.basename(pdf_path).split('.')[0]
  176. pdf_path_parent = os.path.dirname(pdf_path)
  177. if output_dir:
  178. output_path = os.path.join(output_dir, pdf_name)
  179. else:
  180. output_path = os.path.join(pdf_path_parent, pdf_name)
  181. output_image_path = os.path.join(output_path, 'images')
  182. # 获取图片的父路径,为的是以相对路径保存到 .md 和 conent_list.json 文件中
  183. image_path_parent = os.path.basename(output_image_path)
  184. pdf_bytes = open(pdf_path, 'rb').read() # 读取 pdf 文件的二进制数据
  185. orig_model_list = []
  186. if model_json_path:
  187. # 读取已经被模型解析后的pdf文件的 json 原始数据,list 类型
  188. model_json = json.loads(open(model_json_path, 'r', encoding='utf-8').read())
  189. orig_model_list = copy.deepcopy(model_json)
  190. else:
  191. model_json = []
  192. # 执行解析步骤
  193. image_writer, md_writer = FileBasedDataWriter(output_image_path), FileBasedDataWriter(output_path)
  194. # 选择解析方式
  195. if parse_method == 'auto':
  196. jso_useful_key = {'_pdf_type': '', 'model_list': model_json}
  197. pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
  198. elif parse_method == 'txt':
  199. pipe = TXTPipe(pdf_bytes, model_json, image_writer)
  200. elif parse_method == 'ocr':
  201. pipe = OCRPipe(pdf_bytes, model_json, image_writer)
  202. else:
  203. logger.error('unknown parse method, only auto, ocr, txt allowed')
  204. exit(1)
  205. # 执行分类
  206. pipe.pipe_classify()
  207. # 如果没有传入模型数据,则使用内置模型解析
  208. if len(model_json) == 0:
  209. pipe.pipe_analyze() # 解析
  210. orig_model_list = copy.deepcopy(pipe.model_list)
  211. # 执行解析
  212. pipe.pipe_parse()
  213. # 保存 text 和 md 格式的结果
  214. content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode='none')
  215. md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode='none')
  216. # 保存报表
  217. save_report(pipe, os.path.join(output_path, f'{pdf_name}.xlsx'))
  218. if is_json_md_dump:
  219. json_md_dump(pipe, md_writer, pdf_name, content_list, md_content, orig_model_list)
  220. if is_draw_visualization_bbox:
  221. draw_visualization_bbox(pipe.pdf_mid_data['pdf_info'], pdf_bytes, output_path, pdf_name)
  222. except Exception as e:
  223. logger.exception(e)
  224. # 测试
  225. if __name__ == '__main__':
  226. current_script_dir = os.path.dirname(os.path.abspath(__file__))
  227. # demo_names = ['demo1', 'demo2', 'small_ocr']
  228. demo_names = ['600916_中国黄金_2002年报_83_94']
  229. for name in demo_names:
  230. file_path = os.path.join(current_script_dir, f'{name}.pdf')
  231. pdf_parse_main(file_path, output_dir='./output.demo')