Pārlūkot izejas kodu

refactor: modularize PDF processing and enhance async support for VLM backend

myhloli 4 mēneši atpakaļ
vecāks
revīzija
e7649e432c
2 mainītis faili ar 292 papildinājumiem un 142 dzēšanām
  1. 7 7
      mineru/backend/vlm/vlm_analyze.py
  2. 285 135
      mineru/cli/common.py

+ 7 - 7
mineru/backend/vlm/vlm_analyze.py

@@ -75,15 +75,15 @@ async def aio_doc_analyze(
     if predictor is None:
         predictor = ModelSingleton().get_model(backend, model_path, server_url)
 
-    load_images_start = time.time()
+    # load_images_start = time.time()
     images_list, pdf_doc = load_images_from_pdf(pdf_bytes)
     images_base64_list = [image_dict["img_base64"] for image_dict in images_list]
-    load_images_time = round(time.time() - load_images_start, 2)
-    logger.info(f"load images cost: {load_images_time}, speed: {round(len(images_base64_list)/load_images_time, 3)} images/s")
+    # load_images_time = round(time.time() - load_images_start, 2)
+    # logger.info(f"load images cost: {load_images_time}, speed: {round(len(images_base64_list)/load_images_time, 3)} images/s")
 
-    infer_start = time.time()
+    # infer_start = time.time()
     results = await predictor.aio_batch_predict(images=images_base64_list)
-    infer_time = round(time.time() - infer_start, 2)
-    logger.info(f"infer finished, cost: {infer_time}, speed: {round(len(results)/infer_time, 3)} page/s")
+    # infer_time = round(time.time() - infer_start, 2)
+    # logger.info(f"infer finished, cost: {infer_time}, speed: {round(len(results)/infer_time, 3)} page/s")
     middle_json = result_to_middle_json(results, images_list, pdf_doc, image_writer)
-    return middle_json
+    return middle_json, results

+ 285 - 135
mineru/cli/common.py

@@ -14,6 +14,7 @@ from mineru.utils.enum_class import MakeMode
 from mineru.utils.pdf_image_tools import images_bytes_to_pdf_bytes
 from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make
 from mineru.backend.vlm.vlm_analyze import doc_analyze as vlm_doc_analyze
+from mineru.backend.vlm.vlm_analyze import aio_doc_analyze as aio_vlm_doc_analyze
 
 pdf_suffixes = [".pdf"]
 image_suffixes = [".png", ".jpeg", ".jpg"]
@@ -73,155 +74,304 @@ def convert_pdf_bytes_to_bytes_by_pypdfium2(pdf_bytes, start_page_id=0, end_page
     return output_bytes
 
 
-def do_parse(
-    output_dir,
-    pdf_file_names: list[str],
-    pdf_bytes_list: list[bytes],
-    p_lang_list: list[str],
-    backend="pipeline",
-    parse_method="auto",
-    p_formula_enable=True,
-    p_table_enable=True,
-    server_url=None,
-    f_draw_layout_bbox=True,
-    f_draw_span_bbox=True,
-    f_dump_md=True,
-    f_dump_middle_json=True,
-    f_dump_model_output=True,
-    f_dump_orig_pdf=True,
-    f_dump_content_list=True,
-    f_make_md_mode=MakeMode.MM_MD,
-    start_page_id=0,
-    end_page_id=None,
+def _prepare_pdf_bytes(pdf_bytes_list, start_page_id, end_page_id):
+    """准备处理PDF字节数据"""
+    result = []
+    for pdf_bytes in pdf_bytes_list:
+        new_pdf_bytes = convert_pdf_bytes_to_bytes_by_pypdfium2(pdf_bytes, start_page_id, end_page_id)
+        result.append(new_pdf_bytes)
+    return result
+
+
+def _process_output(
+        pdf_info,
+        pdf_bytes,
+        pdf_file_name,
+        local_md_dir,
+        local_image_dir,
+        md_writer,
+        f_draw_layout_bbox,
+        f_draw_span_bbox,
+        f_dump_orig_pdf,
+        f_dump_md,
+        f_dump_content_list,
+        f_dump_middle_json,
+        f_dump_model_output,
+        f_make_md_mode,
+        middle_json,
+        model_output=None,
+        is_pipeline=True
 ):
+    from mineru.backend.pipeline.pipeline_middle_json_mkcontent import union_make as pipeline_union_make
+    """处理输出文件"""
+    if f_draw_layout_bbox:
+        draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_layout.pdf")
+
+    if f_draw_span_bbox:
+        draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_span.pdf")
+
+    if f_dump_orig_pdf:
+        md_writer.write(
+            f"{pdf_file_name}_origin.pdf",
+            pdf_bytes,
+        )
+
+    image_dir = str(os.path.basename(local_image_dir))
+
+    if f_dump_md:
+        make_func = pipeline_union_make if is_pipeline else vlm_union_make
+        md_content_str = make_func(pdf_info, f_make_md_mode, image_dir)
+        md_writer.write_string(
+            f"{pdf_file_name}.md",
+            md_content_str,
+        )
+
+    if f_dump_content_list:
+        make_func = pipeline_union_make if is_pipeline else vlm_union_make
+        content_list = make_func(pdf_info, MakeMode.CONTENT_LIST, image_dir)
+        md_writer.write_string(
+            f"{pdf_file_name}_content_list.json",
+            json.dumps(content_list, ensure_ascii=False, indent=4),
+        )
+
+    if f_dump_middle_json:
+        md_writer.write_string(
+            f"{pdf_file_name}_middle.json",
+            json.dumps(middle_json, ensure_ascii=False, indent=4),
+        )
+
+    if f_dump_model_output:
+        if is_pipeline:
+            md_writer.write_string(
+                f"{pdf_file_name}_model.json",
+                json.dumps(model_output, ensure_ascii=False, indent=4),
+            )
+        else:
+            output_text = ("\n" + "-" * 50 + "\n").join(model_output)
+            md_writer.write_string(
+                f"{pdf_file_name}_model_output.txt",
+                output_text,
+            )
+
+    logger.info(f"local output dir is {local_md_dir}")
+
+
+def _process_pipeline(
+        output_dir,
+        pdf_file_names,
+        pdf_bytes_list,
+        p_lang_list,
+        parse_method,
+        p_formula_enable,
+        p_table_enable,
+        f_draw_layout_bbox,
+        f_draw_span_bbox,
+        f_dump_md,
+        f_dump_middle_json,
+        f_dump_model_output,
+        f_dump_orig_pdf,
+        f_dump_content_list,
+        f_make_md_mode,
+):
+    """处理pipeline后端逻辑"""
+    from mineru.backend.pipeline.model_json_to_middle_json import result_to_middle_json as pipeline_result_to_middle_json
+    from mineru.backend.pipeline.pipeline_analyze import doc_analyze as pipeline_doc_analyze
+
+    infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list = (
+        pipeline_doc_analyze(
+            pdf_bytes_list, p_lang_list, parse_method=parse_method,
+            formula_enable=p_formula_enable, table_enable=p_table_enable
+        )
+    )
+
+    for idx, model_list in enumerate(infer_results):
+        model_json = copy.deepcopy(model_list)
+        pdf_file_name = pdf_file_names[idx]
+        local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method)
+        image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir)
+
+        images_list = all_image_lists[idx]
+        pdf_doc = all_pdf_docs[idx]
+        _lang = lang_list[idx]
+        _ocr_enable = ocr_enabled_list[idx]
+
+        middle_json = pipeline_result_to_middle_json(
+            model_list, images_list, pdf_doc, image_writer,
+            _lang, _ocr_enable, p_formula_enable
+        )
+
+        pdf_info = middle_json["pdf_info"]
+        pdf_bytes = pdf_bytes_list[idx]
+
+        _process_output(
+            pdf_info, pdf_bytes, pdf_file_name, local_md_dir, local_image_dir,
+            md_writer, f_draw_layout_bbox, f_draw_span_bbox, f_dump_orig_pdf,
+            f_dump_md, f_dump_content_list, f_dump_middle_json, f_dump_model_output,
+            f_make_md_mode, middle_json, model_json, is_pipeline=True
+        )
+
+
+async def _async_process_vlm(
+        output_dir,
+        pdf_file_names,
+        pdf_bytes_list,
+        backend,
+        f_draw_layout_bbox,
+        f_draw_span_bbox,
+        f_dump_md,
+        f_dump_middle_json,
+        f_dump_model_output,
+        f_dump_orig_pdf,
+        f_dump_content_list,
+        f_make_md_mode,
+        server_url=None,
+):
+    """异步处理VLM后端逻辑"""
+    parse_method = "vlm"
+    f_draw_span_bbox = False
+
+    for idx, pdf_bytes in enumerate(pdf_bytes_list):
+        pdf_file_name = pdf_file_names[idx]
+        local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method)
+        image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir)
+
+        middle_json, infer_result = await aio_vlm_doc_analyze(
+            pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url
+        )
+
+        pdf_info = middle_json["pdf_info"]
+
+        _process_output(
+            pdf_info, pdf_bytes, pdf_file_name, local_md_dir, local_image_dir,
+            md_writer, f_draw_layout_bbox, f_draw_span_bbox, f_dump_orig_pdf,
+            f_dump_md, f_dump_content_list, f_dump_middle_json, f_dump_model_output,
+            f_make_md_mode, middle_json, infer_result, is_pipeline=False
+        )
+
+
+def _process_vlm(
+        output_dir,
+        pdf_file_names,
+        pdf_bytes_list,
+        backend,
+        f_draw_layout_bbox,
+        f_draw_span_bbox,
+        f_dump_md,
+        f_dump_middle_json,
+        f_dump_model_output,
+        f_dump_orig_pdf,
+        f_dump_content_list,
+        f_make_md_mode,
+        server_url=None,
+):
+    """同步处理VLM后端逻辑"""
+    parse_method = "vlm"
+    f_draw_span_bbox = False
 
-    if backend == "pipeline":
-
-        from mineru.backend.pipeline.pipeline_middle_json_mkcontent import union_make as pipeline_union_make
-        from mineru.backend.pipeline.model_json_to_middle_json import result_to_middle_json as pipeline_result_to_middle_json
-        from mineru.backend.pipeline.pipeline_analyze import doc_analyze as pipeline_doc_analyze
-
-        for idx, pdf_bytes in enumerate(pdf_bytes_list):
-            new_pdf_bytes = convert_pdf_bytes_to_bytes_by_pypdfium2(pdf_bytes, start_page_id, end_page_id)
-            pdf_bytes_list[idx] = new_pdf_bytes
-
-        infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list = pipeline_doc_analyze(pdf_bytes_list, p_lang_list, parse_method=parse_method, formula_enable=p_formula_enable,table_enable=p_table_enable)
-
-        for idx, model_list in enumerate(infer_results):
-            model_json = copy.deepcopy(model_list)
-            pdf_file_name = pdf_file_names[idx]
-            local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method)
-            image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir)
-
-            images_list = all_image_lists[idx]
-            pdf_doc = all_pdf_docs[idx]
-            _lang = lang_list[idx]
-            _ocr_enable = ocr_enabled_list[idx]
-
-            middle_json = pipeline_result_to_middle_json(model_list, images_list, pdf_doc, image_writer, _lang, _ocr_enable, p_formula_enable)
-
-            pdf_info = middle_json["pdf_info"]
-
-            pdf_bytes = pdf_bytes_list[idx]
-            if f_draw_layout_bbox:
-                draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_layout.pdf")
-
-            if f_draw_span_bbox:
-                draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_span.pdf")
+    for idx, pdf_bytes in enumerate(pdf_bytes_list):
+        pdf_file_name = pdf_file_names[idx]
+        local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method)
+        image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir)
 
-            if f_dump_orig_pdf:
-                md_writer.write(
-                    f"{pdf_file_name}_origin.pdf",
-                    pdf_bytes,
-                )
+        middle_json, infer_result = vlm_doc_analyze(
+            pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url
+        )
 
-            if f_dump_md:
-                image_dir = str(os.path.basename(local_image_dir))
-                md_content_str = pipeline_union_make(pdf_info, f_make_md_mode, image_dir)
-                md_writer.write_string(
-                    f"{pdf_file_name}.md",
-                    md_content_str,
-                )
+        pdf_info = middle_json["pdf_info"]
 
-            if f_dump_content_list:
-                image_dir = str(os.path.basename(local_image_dir))
-                content_list = pipeline_union_make(pdf_info, MakeMode.CONTENT_LIST, image_dir)
-                md_writer.write_string(
-                    f"{pdf_file_name}_content_list.json",
-                    json.dumps(content_list, ensure_ascii=False, indent=4),
-                )
+        _process_output(
+            pdf_info, pdf_bytes, pdf_file_name, local_md_dir, local_image_dir,
+            md_writer, f_draw_layout_bbox, f_draw_span_bbox, f_dump_orig_pdf,
+            f_dump_md, f_dump_content_list, f_dump_middle_json, f_dump_model_output,
+            f_make_md_mode, middle_json, infer_result, is_pipeline=False
+        )
 
-            if f_dump_middle_json:
-                md_writer.write_string(
-                    f"{pdf_file_name}_middle.json",
-                    json.dumps(middle_json, ensure_ascii=False, indent=4),
-                )
 
-            if f_dump_model_output:
-                md_writer.write_string(
-                    f"{pdf_file_name}_model.json",
-                    json.dumps(model_json, ensure_ascii=False, indent=4),
-                )
+def do_parse(
+        output_dir,
+        pdf_file_names: list[str],
+        pdf_bytes_list: list[bytes],
+        p_lang_list: list[str],
+        backend="pipeline",
+        parse_method="auto",
+        p_formula_enable=True,
+        p_table_enable=True,
+        server_url=None,
+        f_draw_layout_bbox=True,
+        f_draw_span_bbox=True,
+        f_dump_md=True,
+        f_dump_middle_json=True,
+        f_dump_model_output=True,
+        f_dump_orig_pdf=True,
+        f_dump_content_list=True,
+        f_make_md_mode=MakeMode.MM_MD,
+        start_page_id=0,
+        end_page_id=None,
+):
+    # 预处理PDF字节数据
+    pdf_bytes_list = _prepare_pdf_bytes(pdf_bytes_list, start_page_id, end_page_id)
 
-            logger.info(f"local output dir is {local_md_dir}")
+    if backend == "pipeline":
+        _process_pipeline(
+            output_dir, pdf_file_names, pdf_bytes_list, p_lang_list,
+            parse_method, p_formula_enable, p_table_enable,
+            f_draw_layout_bbox, f_draw_span_bbox, f_dump_md, f_dump_middle_json,
+            f_dump_model_output, f_dump_orig_pdf, f_dump_content_list, f_make_md_mode
+        )
     else:
-
         if backend.startswith("vlm-"):
             backend = backend[4:]
 
-        f_draw_span_bbox = False
-        parse_method = "vlm"
-        for idx, pdf_bytes in enumerate(pdf_bytes_list):
-            pdf_file_name = pdf_file_names[idx]
-            pdf_bytes = convert_pdf_bytes_to_bytes_by_pypdfium2(pdf_bytes, start_page_id, end_page_id)
-            local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method)
-            image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir)
-            middle_json, infer_result = vlm_doc_analyze(pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url)
-
-            pdf_info = middle_json["pdf_info"]
-
-            if f_draw_layout_bbox:
-                draw_layout_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_layout.pdf")
-
-            if f_draw_span_bbox:
-                draw_span_bbox(pdf_info, pdf_bytes, local_md_dir, f"{pdf_file_name}_span.pdf")
-
-            if f_dump_orig_pdf:
-                md_writer.write(
-                    f"{pdf_file_name}_origin.pdf",
-                    pdf_bytes,
-                )
-
-            if f_dump_md:
-                image_dir = str(os.path.basename(local_image_dir))
-                md_content_str = vlm_union_make(pdf_info, f_make_md_mode, image_dir)
-                md_writer.write_string(
-                    f"{pdf_file_name}.md",
-                    md_content_str,
-                )
-
-            if f_dump_content_list:
-                image_dir = str(os.path.basename(local_image_dir))
-                content_list = vlm_union_make(pdf_info, MakeMode.CONTENT_LIST, image_dir)
-                md_writer.write_string(
-                    f"{pdf_file_name}_content_list.json",
-                    json.dumps(content_list, ensure_ascii=False, indent=4),
-                )
-
-            if f_dump_middle_json:
-                md_writer.write_string(
-                    f"{pdf_file_name}_middle.json",
-                    json.dumps(middle_json, ensure_ascii=False, indent=4),
-                )
+        _process_vlm(
+            output_dir, pdf_file_names, pdf_bytes_list, backend,
+            f_draw_layout_bbox, f_draw_span_bbox, f_dump_md, f_dump_middle_json,
+            f_dump_model_output, f_dump_orig_pdf, f_dump_content_list, f_make_md_mode,
+            server_url
+        )
+
+
+async def aio_do_parse(
+        output_dir,
+        pdf_file_names: list[str],
+        pdf_bytes_list: list[bytes],
+        p_lang_list: list[str],
+        backend="pipeline",
+        parse_method="auto",
+        p_formula_enable=True,
+        p_table_enable=True,
+        server_url=None,
+        f_draw_layout_bbox=True,
+        f_draw_span_bbox=True,
+        f_dump_md=True,
+        f_dump_middle_json=True,
+        f_dump_model_output=True,
+        f_dump_orig_pdf=True,
+        f_dump_content_list=True,
+        f_make_md_mode=MakeMode.MM_MD,
+        start_page_id=0,
+        end_page_id=None,
+):
+    # 预处理PDF字节数据
+    pdf_bytes_list = _prepare_pdf_bytes(pdf_bytes_list, start_page_id, end_page_id)
 
-            if f_dump_model_output:
-                model_output = ("\n" + "-" * 50 + "\n").join(infer_result)
-                md_writer.write_string(
-                    f"{pdf_file_name}_model_output.txt",
-                    model_output,
-                )
+    if backend == "pipeline":
+        # pipeline模式暂不支持异步,使用同步处理方式
+        _process_pipeline(
+            output_dir, pdf_file_names, pdf_bytes_list, p_lang_list,
+            parse_method, p_formula_enable, p_table_enable,
+            f_draw_layout_bbox, f_draw_span_bbox, f_dump_md, f_dump_middle_json,
+            f_dump_model_output, f_dump_orig_pdf, f_dump_content_list, f_make_md_mode
+        )
+    else:
+        if backend.startswith("vlm-"):
+            backend = backend[4:]
 
-            logger.info(f"local output dir is {local_md_dir}")
+        await _async_process_vlm(
+            output_dir, pdf_file_names, pdf_bytes_list, backend,
+            f_draw_layout_bbox, f_draw_span_bbox, f_dump_md, f_dump_middle_json,
+            f_dump_model_output, f_dump_orig_pdf, f_dump_content_list, f_make_md_mode,
+            server_url
+        )