doc_analyze_by_custom_model.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import time
  2. import fitz
  3. import numpy as np
  4. from loguru import logger
  5. from magic_pdf.libs.config_reader import get_local_models_dir, get_device, get_table_mode
  6. from magic_pdf.model.model_list import MODEL
  7. import magic_pdf.model as model_config
  8. def dict_compare(d1, d2):
  9. return d1.items() == d2.items()
  10. def remove_duplicates_dicts(lst):
  11. unique_dicts = []
  12. for dict_item in lst:
  13. if not any(
  14. dict_compare(dict_item, existing_dict) for existing_dict in unique_dicts
  15. ):
  16. unique_dicts.append(dict_item)
  17. return unique_dicts
  18. def load_images_from_pdf(pdf_bytes: bytes, dpi=200) -> list:
  19. try:
  20. from PIL import Image
  21. except ImportError:
  22. logger.error("Pillow not installed, please install by pip.")
  23. exit(1)
  24. images = []
  25. with fitz.open("pdf", pdf_bytes) as doc:
  26. for index in range(0, doc.page_count):
  27. page = doc[index]
  28. mat = fitz.Matrix(dpi / 72, dpi / 72)
  29. pm = page.get_pixmap(matrix=mat, alpha=False)
  30. # if width or height > 3000 pixels, don't enlarge the image
  31. if pm.width > 3000 or pm.height > 3000:
  32. pm = page.get_pixmap(matrix=fitz.Matrix(1, 1), alpha=False)
  33. img = Image.frombytes("RGB", (pm.width, pm.height), pm.samples)
  34. img = np.array(img)
  35. img_dict = {"img": img, "width": pm.width, "height": pm.height}
  36. images.append(img_dict)
  37. return images
  38. class ModelSingleton:
  39. _instance = None
  40. _models = {}
  41. def __new__(cls, *args, **kwargs):
  42. if cls._instance is None:
  43. cls._instance = super().__new__(cls)
  44. return cls._instance
  45. def get_model(self, ocr: bool, show_log: bool):
  46. key = (ocr, show_log)
  47. if key not in self._models:
  48. self._models[key] = custom_model_init(ocr=ocr, show_log=show_log)
  49. return self._models[key]
  50. def custom_model_init(ocr: bool = False, show_log: bool = False):
  51. model = None
  52. if model_config.__model_mode__ == "lite":
  53. model = MODEL.Paddle
  54. elif model_config.__model_mode__ == "full":
  55. model = MODEL.PEK
  56. if model_config.__use_inside_model__:
  57. model_init_start = time.time()
  58. if model == MODEL.Paddle:
  59. from magic_pdf.model.pp_structure_v2 import CustomPaddleModel
  60. custom_model = CustomPaddleModel(ocr=ocr, show_log=show_log)
  61. elif model == MODEL.PEK:
  62. from magic_pdf.model.pdf_extract_kit import CustomPEKModel
  63. # 从配置文件读取model-dir和device
  64. local_models_dir = get_local_models_dir()
  65. device = get_device()
  66. table_mode = get_table_mode()
  67. model_input = {"ocr": ocr,
  68. "show_log": show_log,
  69. "models_dir": local_models_dir,
  70. "device": device,
  71. "table_mode": table_mode}
  72. custom_model = CustomPEKModel(**model_input)
  73. else:
  74. logger.error("Not allow model_name!")
  75. exit(1)
  76. model_init_cost = time.time() - model_init_start
  77. logger.info(f"model init cost: {model_init_cost}")
  78. else:
  79. logger.error("use_inside_model is False, not allow to use inside model")
  80. exit(1)
  81. return custom_model
  82. def doc_analyze(pdf_bytes: bytes, ocr: bool = False, show_log: bool = False):
  83. model_manager = ModelSingleton()
  84. custom_model = model_manager.get_model(ocr, show_log)
  85. images = load_images_from_pdf(pdf_bytes)
  86. model_json = []
  87. doc_analyze_start = time.time()
  88. for index, img_dict in enumerate(images):
  89. img = img_dict["img"]
  90. page_width = img_dict["width"]
  91. page_height = img_dict["height"]
  92. result = custom_model(img)
  93. page_info = {"page_no": index, "height": page_height, "width": page_width}
  94. page_dict = {"layout_dets": result, "page_info": page_info}
  95. model_json.append(page_dict)
  96. doc_analyze_cost = time.time() - doc_analyze_start
  97. logger.info(f"doc analyze cost: {doc_analyze_cost}")
  98. return model_json