| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- # Copyright (c) Opendatalab. All rights reserved.
- import io
- import json
- import os
- import copy
- from pathlib import Path
- import pypdfium2 as pdfium
- from loguru import logger
- from mineru.backend.pipeline.pipeline_middle_json_mkcontent import union_make as pipeline_union_make
- from mineru.backend.pipeline.model_json_to_middle_json import result_to_middle_json as pipeline_result_to_middle_json
- from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make
- from mineru.backend.vlm.vlm_analyze import doc_analyze as vlm_doc_analyze
- from mineru.backend.pipeline.pipeline_analyze import doc_analyze as pipeline_doc_analyze
- from mineru.data.data_reader_writer import FileBasedDataWriter
- from mineru.utils.draw_bbox import draw_layout_bbox, draw_span_bbox
- from mineru.utils.enum_class import MakeMode
- from mineru.utils.pdf_image_tools import images_bytes_to_pdf_bytes
- pdf_suffixes = [".pdf"]
- image_suffixes = [".png", ".jpeg", ".jpg"]
- def read_fn(path: Path):
- with open(str(path), "rb") as input_file:
- file_bytes = input_file.read()
- if path.suffix in image_suffixes:
- return images_bytes_to_pdf_bytes(file_bytes)
- elif path.suffix in pdf_suffixes:
- return file_bytes
- else:
- raise Exception(f"Unknown file suffix: {path.suffix}")
- def prepare_env(output_dir, pdf_file_name, parse_method):
- local_md_dir = str(os.path.join(output_dir, pdf_file_name, parse_method))
- local_image_dir = os.path.join(str(local_md_dir), "images")
- os.makedirs(local_image_dir, exist_ok=True)
- os.makedirs(local_md_dir, exist_ok=True)
- return local_image_dir, local_md_dir
- def convert_pdf_bytes_to_bytes_by_pypdfium2(pdf_bytes, start_page_id=0, end_page_id=None):
- # 从字节数据加载PDF
- pdf = pdfium.PdfDocument(pdf_bytes)
- # 确定结束页
- end_page_id = end_page_id if end_page_id is not None and end_page_id >= 0 else len(pdf) - 1
- if end_page_id > len(pdf) - 1:
- logger.warning("end_page_id is out of range, use pdf_docs length")
- end_page_id = len(pdf) - 1
- # 创建一个新的PDF文档
- output_pdf = pdfium.PdfDocument.new()
- # 选择要导入的页面索引
- page_indices = list(range(start_page_id, end_page_id + 1))
- # 从原PDF导入页面到新PDF
- output_pdf.import_pages(pdf, page_indices)
- # 将新PDF保存到内存缓冲区
- output_buffer = io.BytesIO()
- output_pdf.save(output_buffer)
- # 获取字节数据
- output_bytes = output_buffer.getvalue()
- return output_bytes
- def do_parse(
- output_dir,
- pdf_file_names: list[str],
- pdf_bytes_list: list[bytes],
- p_lang_list: list[str],
- backend="pipeline",
- model_path="jinzhenj/OEEzRkQ3RTAtMDMx-0415", # TODO: change to formal path after release.
- parse_method="auto",
- p_formula_enable=True,
- p_table_enable=True,
- server_url=None,
- f_draw_layout_bbox=True,
- f_draw_span_bbox=True,
- f_dump_md=True,
- f_dump_middle_json=True,
- f_dump_model_output=True,
- f_dump_orig_pdf=True,
- f_dump_content_list=True,
- f_make_md_mode=MakeMode.MM_MD,
- start_page_id=0,
- end_page_id=None,
- ):
- if backend == "pipeline":
- for idx, pdf_bytes in enumerate(pdf_bytes_list):
- new_pdf_bytes = convert_pdf_bytes_to_bytes_by_pypdfium2(pdf_bytes, start_page_id, end_page_id)
- pdf_bytes_list[idx] = new_pdf_bytes
- infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list = pipeline_doc_analyze(pdf_bytes_list, p_lang_list, parse_method=parse_method, formula_enable=p_formula_enable,table_enable=p_table_enable)
- for idx, model_list in enumerate(infer_results):
- model_json = copy.deepcopy(model_list)
- pdf_file_name = pdf_file_names[idx]
- local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method)
- image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir)
- images_list = all_image_lists[idx]
- pdf_doc = all_pdf_docs[idx]
- _lang = lang_list[idx]
- _ocr = ocr_enabled_list[idx]
- middle_json = pipeline_result_to_middle_json(model_list, images_list, pdf_doc, image_writer, _lang, _ocr)
- pdf_info = middle_json["pdf_info"]
- pdf_bytes = pdf_bytes_list[idx]
- if f_draw_layout_bbox:
- draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_layout.pdf")
- if f_draw_span_bbox:
- draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_span.pdf")
- if f_dump_orig_pdf:
- md_writer.write(
- f"{pdf_file_name}_origin.pdf",
- pdf_bytes,
- )
- if f_dump_md:
- image_dir = str(os.path.basename(local_image_dir))
- md_content_str = pipeline_union_make(pdf_info, f_make_md_mode, image_dir)
- md_writer.write_string(
- f"{pdf_file_name}.md",
- md_content_str,
- )
- if f_dump_content_list:
- image_dir = str(os.path.basename(local_image_dir))
- content_list = pipeline_union_make(pdf_info, MakeMode.STANDARD_FORMAT, image_dir)
- md_writer.write_string(
- f"{pdf_file_name}_content_list.json",
- json.dumps(content_list, ensure_ascii=False, indent=4),
- )
- if f_dump_middle_json:
- md_writer.write_string(
- f"{pdf_file_name}_middle.json",
- json.dumps(middle_json, ensure_ascii=False, indent=4),
- )
- if f_dump_model_output:
- md_writer.write_string(
- f"{pdf_file_name}_model.json",
- json.dumps(model_json, ensure_ascii=False, indent=4),
- )
- logger.info(f"local output dir is {local_md_dir}")
- else:
- f_draw_span_bbox = False
- parse_method = "vlm"
- for idx, pdf_bytes in enumerate(pdf_bytes_list):
- pdf_file_name = pdf_file_names[idx]
- pdf_bytes = convert_pdf_bytes_to_bytes_by_pypdfium2(pdf_bytes, start_page_id, end_page_id)
- local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method)
- image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir)
- middle_json, infer_result = vlm_doc_analyze(pdf_bytes, image_writer=image_writer, backend=backend, model_path=model_path, server_url=server_url)
- pdf_info = middle_json["pdf_info"]
- if f_draw_layout_bbox:
- draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_layout.pdf")
- if f_draw_span_bbox:
- draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_span.pdf")
- if f_dump_orig_pdf:
- md_writer.write(
- f"{pdf_file_name}_origin.pdf",
- pdf_bytes,
- )
- if f_dump_md:
- image_dir = str(os.path.basename(local_image_dir))
- md_content_str = vlm_union_make(pdf_info, f_make_md_mode, image_dir)
- md_writer.write_string(
- f"{pdf_file_name}.md",
- md_content_str,
- )
- if f_dump_content_list:
- image_dir = str(os.path.basename(local_image_dir))
- content_list = vlm_union_make(pdf_info, MakeMode.STANDARD_FORMAT, image_dir)
- md_writer.write_string(
- f"{pdf_file_name}_content_list.json",
- json.dumps(content_list, ensure_ascii=False, indent=4),
- )
- if f_dump_middle_json:
- md_writer.write_string(
- f"{pdf_file_name}_middle.json",
- json.dumps(middle_json, ensure_ascii=False, indent=4),
- )
- if f_dump_model_output:
- model_output = ("\n" + "-" * 50 + "\n").join(infer_result)
- md_writer.write_string(
- f"{pdf_file_name}_model_output.txt",
- model_output,
- )
- logger.info(f"local output dir is {local_md_dir}")
- if __name__ == "__main__":
- # pdf_path = "../../demo/pdfs/hello-algo-1.1.0-zh-c-word转换的span有问题.pdf"
- pdf_path = "C:/Users/zhaoxiaomeng/Downloads/数学新星问题征解第一期(2014.03).pdf"
- try:
- do_parse("./output", [Path(pdf_path).stem], [read_fn(Path(pdf_path))],["ch"], end_page_id=20,)
- except Exception as e:
- logger.exception(e)
|