| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- # Copyright (c) Opendatalab. All rights reserved.
- import time
- from loguru import logger
- from .model_output_to_middle_json import result_to_middle_json
- from ...data.data_reader_writer import DataWriter
- from mineru.utils.pdf_image_tools import load_images_from_pdf
- from ...utils.enum_class import ImageType
- from ...utils.models_download_utils import auto_download_and_get_model_root_path
- from mineru_vl_utils import MinerUClient
- class ModelSingleton:
- _instance = None
- _models = {}
- def __new__(cls, *args, **kwargs):
- if cls._instance is None:
- cls._instance = super().__new__(cls)
- return cls._instance
- def get_model(
- self,
- backend: str,
- model_path: str | None,
- server_url: str | None,
- **kwargs,
- ) -> MinerUClient:
- key = (backend, model_path, server_url)
- if key not in self._models:
- start_time = time.time()
- model = None
- processor = None
- vllm_llm = None
- if backend in ['transformers', 'vllm-engine'] and not model_path:
- model_path = auto_download_and_get_model_root_path("/","vlm")
- if backend == "transformers":
- if not model_path:
- raise ValueError("model_path must be provided when model or processor is None.")
- try:
- from transformers import (
- AutoProcessor,
- Qwen2VLForConditionalGeneration,
- )
- from transformers import __version__ as transformers_version
- except ImportError:
- raise ImportError("Please install transformers to use the transformers backend.")
- from packaging import version
- if version.parse(transformers_version) >= version.parse("4.56.0"):
- dtype_key = "dtype"
- else:
- dtype_key = "torch_dtype"
- model = Qwen2VLForConditionalGeneration.from_pretrained(
- model_path,
- device_map="auto",
- **{dtype_key: "auto"}, # type: ignore
- )
- processor = AutoProcessor.from_pretrained(
- model_path,
- use_fast=True,
- )
- elif backend == "vllm-engine":
- if not model_path:
- raise ValueError("model_path must be provided when vllm_llm is None.")
- try:
- import vllm
- except ImportError:
- raise ImportError("Please install vllm to use the vllm-engine backend.")
- # logger.debug(kwargs)
- if "gpu_memory_utilization" not in kwargs:
- kwargs["gpu_memory_utilization"] = 0.5
- if "model" not in kwargs:
- kwargs["model"] = model_path
- # 使用kwargs为 vllm初始化参数
- vllm_llm = vllm.LLM(**kwargs)
- self._models[key] = MinerUClient(
- backend=backend,
- model=model,
- processor=processor,
- vllm_llm=vllm_llm,
- server_url=server_url,
- )
- elapsed = round(time.time() - start_time, 2)
- logger.info(f"get {backend} predictor cost: {elapsed}s")
- return self._models[key]
- def doc_analyze(
- pdf_bytes,
- image_writer: DataWriter | None,
- predictor: MinerUClient | None = None,
- backend="transformers",
- model_path: str | None = None,
- server_url: str | None = None,
- **kwargs,
- ):
- if predictor is None:
- predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs)
- # load_images_start = time.time()
- images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL)
- images_pil_list = [image_dict["img_pil"] for image_dict in images_list]
- # load_images_time = round(time.time() - load_images_start, 2)
- # logger.info(f"load images cost: {load_images_time}, speed: {round(len(images_base64_list)/load_images_time, 3)} images/s")
- # infer_start = time.time()
- results = predictor.batch_two_step_extract(images=images_pil_list)
- # infer_time = round(time.time() - infer_start, 2)
- # logger.info(f"infer finished, cost: {infer_time}, speed: {round(len(results)/infer_time, 3)} page/s")
- middle_json = result_to_middle_json(results, images_list, pdf_doc, image_writer)
- return middle_json, results
- async def aio_doc_analyze(
- pdf_bytes,
- image_writer: DataWriter | None,
- predictor: MinerUClient | None = None,
- backend="transformers",
- model_path: str | None = None,
- server_url: str | None = None,
- **kwargs,
- ):
- if predictor is None:
- predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs)
- # load_images_start = time.time()
- images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL)
- images_pil_list = [image_dict["img_pil"] for image_dict in images_list]
- # load_images_time = round(time.time() - load_images_start, 2)
- # logger.info(f"load images cost: {load_images_time}, speed: {round(len(images_base64_list)/load_images_time, 3)} images/s")
- # infer_start = time.time()
- results = await predictor.aio_batch_two_step_extract(images=images_pil_list)
- # infer_time = round(time.time() - infer_start, 2)
- # logger.info(f"infer finished, cost: {infer_time}, speed: {round(len(results)/infer_time, 3)} page/s")
- middle_json = result_to_middle_json(results, images_list, pdf_doc, image_writer)
- return middle_json, results
|