doc_analyze_by_custom_model.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. import concurrent.futures as fut
  2. import multiprocessing as mp
  3. import os
  4. import time
  5. import numpy as np
  6. import torch
  7. os.environ['FLAGS_npu_jit_compile'] = '0' # 关闭paddle的jit编译
  8. os.environ['FLAGS_use_stride_kernel'] = '0'
  9. os.environ['PYTORCH_ENABLE_MPS_FALLBACK'] = '1' # 让mps可以fallback
  10. os.environ['NO_ALBUMENTATIONS_UPDATE'] = '1' # 禁止albumentations检查更新
  11. from loguru import logger
  12. from magic_pdf.model.sub_modules.model_utils import get_vram
  13. try:
  14. import torchtext
  15. if torchtext.__version__ >= '0.18.0':
  16. torchtext.disable_torchtext_deprecation_warning()
  17. except ImportError:
  18. pass
  19. import magic_pdf.model as model_config
  20. from magic_pdf.data.dataset import Dataset
  21. from magic_pdf.libs.clean_memory import clean_memory
  22. from magic_pdf.libs.config_reader import (get_device, get_formula_config,
  23. get_layout_config,
  24. get_local_models_dir,
  25. get_table_recog_config)
  26. from magic_pdf.model.model_list import MODEL
  27. # from magic_pdf.operators.models import InferenceResult
  28. MIN_BATCH_INFERENCE_SIZE = 100
  29. class ModelSingleton:
  30. _instance = None
  31. _models = {}
  32. def __new__(cls, *args, **kwargs):
  33. if cls._instance is None:
  34. cls._instance = super().__new__(cls)
  35. return cls._instance
  36. def get_model(
  37. self,
  38. ocr: bool,
  39. show_log: bool,
  40. lang=None,
  41. layout_model=None,
  42. formula_enable=None,
  43. table_enable=None,
  44. ):
  45. key = (ocr, show_log, lang, layout_model, formula_enable, table_enable)
  46. if key not in self._models:
  47. self._models[key] = custom_model_init(
  48. ocr=ocr,
  49. show_log=show_log,
  50. lang=lang,
  51. layout_model=layout_model,
  52. formula_enable=formula_enable,
  53. table_enable=table_enable,
  54. )
  55. return self._models[key]
  56. def custom_model_init(
  57. ocr: bool = False,
  58. show_log: bool = False,
  59. lang=None,
  60. layout_model=None,
  61. formula_enable=None,
  62. table_enable=None,
  63. ):
  64. model = None
  65. if model_config.__model_mode__ == 'lite':
  66. logger.warning(
  67. 'The Lite mode is provided for developers to conduct testing only, and the output quality is '
  68. 'not guaranteed to be reliable.'
  69. )
  70. model = MODEL.Paddle
  71. elif model_config.__model_mode__ == 'full':
  72. model = MODEL.PEK
  73. if model_config.__use_inside_model__:
  74. model_init_start = time.time()
  75. if model == MODEL.Paddle:
  76. from magic_pdf.model.pp_structure_v2 import CustomPaddleModel
  77. custom_model = CustomPaddleModel(ocr=ocr, show_log=show_log, lang=lang)
  78. elif model == MODEL.PEK:
  79. from magic_pdf.model.pdf_extract_kit import CustomPEKModel
  80. # 从配置文件读取model-dir和device
  81. local_models_dir = get_local_models_dir()
  82. device = get_device()
  83. layout_config = get_layout_config()
  84. if layout_model is not None:
  85. layout_config['model'] = layout_model
  86. formula_config = get_formula_config()
  87. if formula_enable is not None:
  88. formula_config['enable'] = formula_enable
  89. table_config = get_table_recog_config()
  90. if table_enable is not None:
  91. table_config['enable'] = table_enable
  92. model_input = {
  93. 'ocr': ocr,
  94. 'show_log': show_log,
  95. 'models_dir': local_models_dir,
  96. 'device': device,
  97. 'table_config': table_config,
  98. 'layout_config': layout_config,
  99. 'formula_config': formula_config,
  100. 'lang': lang,
  101. }
  102. custom_model = CustomPEKModel(**model_input)
  103. else:
  104. logger.error('Not allow model_name!')
  105. exit(1)
  106. model_init_cost = time.time() - model_init_start
  107. logger.info(f'model init cost: {model_init_cost}')
  108. else:
  109. logger.error('use_inside_model is False, not allow to use inside model')
  110. exit(1)
  111. return custom_model
  112. def doc_analyze(
  113. dataset: Dataset,
  114. ocr: bool = False,
  115. show_log: bool = False,
  116. start_page_id=0,
  117. end_page_id=None,
  118. lang=None,
  119. layout_model=None,
  120. formula_enable=None,
  121. table_enable=None,
  122. one_shot: bool = True,
  123. ):
  124. end_page_id = (
  125. end_page_id
  126. if end_page_id is not None and end_page_id >= 0
  127. else len(dataset) - 1
  128. )
  129. parallel_count = None
  130. if os.environ.get('MINERU_PARALLEL_INFERENCE_COUNT'):
  131. parallel_count = int(os.environ['MINERU_PARALLEL_INFERENCE_COUNT'])
  132. images = []
  133. page_wh_list = []
  134. for index in range(len(dataset)):
  135. if start_page_id <= index <= end_page_id:
  136. page_data = dataset.get_page(index)
  137. img_dict = page_data.get_image()
  138. images.append(img_dict['img'])
  139. page_wh_list.append((img_dict['width'], img_dict['height']))
  140. if one_shot and len(images) >= MIN_BATCH_INFERENCE_SIZE:
  141. if parallel_count is None:
  142. parallel_count = 2 # should check the gpu memory firstly !
  143. # split images into parallel_count batches
  144. if parallel_count > 1:
  145. batch_size = (len(images) + parallel_count - 1) // parallel_count
  146. batch_images = [images[i:i+batch_size] for i in range(0, len(images), batch_size)]
  147. else:
  148. batch_images = [images]
  149. results = []
  150. parallel_count = len(batch_images) # adjust to real parallel count
  151. # using concurrent.futures to analyze
  152. """
  153. with fut.ProcessPoolExecutor(max_workers=parallel_count) as executor:
  154. futures = [executor.submit(may_batch_image_analyze, batch_image, sn, ocr, show_log, lang, layout_model, formula_enable, table_enable) for sn, batch_image in enumerate(batch_images)]
  155. for future in fut.as_completed(futures):
  156. sn, result = future.result()
  157. result_history[sn] = result
  158. for key in sorted(result_history.keys()):
  159. results.extend(result_history[key])
  160. """
  161. results = []
  162. pool = mp.Pool(processes=parallel_count)
  163. mapped_results = pool.starmap(may_batch_image_analyze, [(batch_image, sn, ocr, show_log, lang, layout_model, formula_enable, table_enable) for sn, batch_image in enumerate(batch_images)])
  164. for sn, result in mapped_results:
  165. results.extend(result)
  166. else:
  167. _, results = may_batch_image_analyze(
  168. images,
  169. 0,
  170. ocr,
  171. show_log,
  172. lang, layout_model, formula_enable, table_enable)
  173. model_json = []
  174. for index in range(len(dataset)):
  175. if start_page_id <= index <= end_page_id:
  176. result = results.pop(0)
  177. page_width, page_height = page_wh_list.pop(0)
  178. else:
  179. result = []
  180. page_height = 0
  181. page_width = 0
  182. page_info = {'page_no': index, 'width': page_width, 'height': page_height}
  183. page_dict = {'layout_dets': result, 'page_info': page_info}
  184. model_json.append(page_dict)
  185. from magic_pdf.operators.models import InferenceResult
  186. return InferenceResult(model_json, dataset)
  187. def batch_doc_analyze(
  188. datasets: list[Dataset],
  189. ocr: bool = False,
  190. show_log: bool = False,
  191. lang=None,
  192. layout_model=None,
  193. formula_enable=None,
  194. table_enable=None,
  195. one_shot: bool = True,
  196. ):
  197. parallel_count = None
  198. if os.environ.get('MINERU_PARALLEL_INFERENCE_COUNT'):
  199. parallel_count = int(os.environ['MINERU_PARALLEL_INFERENCE_COUNT'])
  200. images = []
  201. page_wh_list = []
  202. for dataset in datasets:
  203. for index in range(len(dataset)):
  204. page_data = dataset.get_page(index)
  205. img_dict = page_data.get_image()
  206. images.append(img_dict['img'])
  207. page_wh_list.append((img_dict['width'], img_dict['height']))
  208. if one_shot and len(images) >= MIN_BATCH_INFERENCE_SIZE:
  209. if parallel_count is None:
  210. parallel_count = 2 # should check the gpu memory firstly !
  211. # split images into parallel_count batches
  212. if parallel_count > 1:
  213. batch_size = (len(images) + parallel_count - 1) // parallel_count
  214. batch_images = [images[i:i+batch_size] for i in range(0, len(images), batch_size)]
  215. else:
  216. batch_images = [images]
  217. results = []
  218. parallel_count = len(batch_images) # adjust to real parallel count
  219. # using concurrent.futures to analyze
  220. """
  221. with fut.ProcessPoolExecutor(max_workers=parallel_count) as executor:
  222. futures = [executor.submit(may_batch_image_analyze, batch_image, sn, ocr, show_log, lang, layout_model, formula_enable, table_enable) for sn, batch_image in enumerate(batch_images)]
  223. for future in fut.as_completed(futures):
  224. sn, result = future.result()
  225. result_history[sn] = result
  226. for key in sorted(result_history.keys()):
  227. results.extend(result_history[key])
  228. """
  229. results = []
  230. pool = mp.Pool(processes=parallel_count)
  231. mapped_results = pool.starmap(may_batch_image_analyze, [(batch_image, sn, ocr, show_log, lang, layout_model, formula_enable, table_enable) for sn, batch_image in enumerate(batch_images)])
  232. for sn, result in mapped_results:
  233. results.extend(result)
  234. else:
  235. _, results = may_batch_image_analyze(
  236. images,
  237. 0,
  238. ocr,
  239. show_log,
  240. lang, layout_model, formula_enable, table_enable)
  241. infer_results = []
  242. from magic_pdf.operators.models import InferenceResult
  243. for index in range(len(datasets)):
  244. dataset = datasets[index]
  245. model_json = []
  246. for i in range(len(dataset)):
  247. result = results.pop(0)
  248. page_width, page_height = page_wh_list.pop(0)
  249. page_info = {'page_no': i, 'width': page_width, 'height': page_height}
  250. page_dict = {'layout_dets': result, 'page_info': page_info}
  251. model_json.append(page_dict)
  252. infer_results.append(InferenceResult(model_json, dataset))
  253. return infer_results
  254. def may_batch_image_analyze(
  255. images: list[np.ndarray],
  256. idx: int,
  257. ocr: bool = False,
  258. show_log: bool = False,
  259. lang=None,
  260. layout_model=None,
  261. formula_enable=None,
  262. table_enable=None):
  263. # os.environ['CUDA_VISIBLE_DEVICES'] = str(idx)
  264. # 关闭paddle的信号处理
  265. import paddle
  266. paddle.disable_signal_handler()
  267. from magic_pdf.model.batch_analyze import BatchAnalyze
  268. model_manager = ModelSingleton()
  269. custom_model = model_manager.get_model(
  270. ocr, show_log, lang, layout_model, formula_enable, table_enable
  271. )
  272. batch_analyze = False
  273. batch_ratio = 1
  274. device = get_device()
  275. npu_support = False
  276. if str(device).startswith('npu'):
  277. import torch_npu
  278. if torch_npu.npu.is_available():
  279. npu_support = True
  280. torch.npu.set_compile_mode(jit_compile=False)
  281. if torch.cuda.is_available() and device != 'cpu' or npu_support:
  282. gpu_memory = int(os.getenv('VIRTUAL_VRAM_SIZE', round(get_vram(device))))
  283. if gpu_memory is not None and gpu_memory >= 8:
  284. if gpu_memory >= 20:
  285. batch_ratio = 16
  286. elif gpu_memory >= 15:
  287. batch_ratio = 8
  288. elif gpu_memory >= 10:
  289. batch_ratio = 4
  290. else:
  291. batch_ratio = 2
  292. logger.info(f'gpu_memory: {gpu_memory} GB, batch_ratio: {batch_ratio}')
  293. batch_analyze = True
  294. doc_analyze_start = time.time()
  295. if batch_analyze:
  296. """# batch analyze
  297. images = []
  298. page_wh_list = []
  299. for index in range(len(dataset)):
  300. if start_page_id <= index <= end_page_id:
  301. page_data = dataset.get_page(index)
  302. img_dict = page_data.get_image()
  303. images.append(img_dict['img'])
  304. page_wh_list.append((img_dict['width'], img_dict['height']))
  305. """
  306. batch_model = BatchAnalyze(model=custom_model, batch_ratio=batch_ratio)
  307. results = batch_model(images)
  308. """
  309. for index in range(len(dataset)):
  310. if start_page_id <= index <= end_page_id:
  311. result = analyze_result.pop(0)
  312. page_width, page_height = page_wh_list.pop(0)
  313. else:
  314. result = []
  315. page_height = 0
  316. page_width = 0
  317. page_info = {'page_no': index, 'width': page_width, 'height': page_height}
  318. page_dict = {'layout_dets': result, 'page_info': page_info}
  319. model_json.append(page_dict)
  320. """
  321. else:
  322. # single analyze
  323. """
  324. for index in range(len(dataset)):
  325. page_data = dataset.get_page(index)
  326. img_dict = page_data.get_image()
  327. img = img_dict['img']
  328. page_width = img_dict['width']
  329. page_height = img_dict['height']
  330. if start_page_id <= index <= end_page_id:
  331. page_start = time.time()
  332. result = custom_model(img)
  333. logger.info(f'-----page_id : {index}, page total time: {round(time.time() - page_start, 2)}-----')
  334. else:
  335. result = []
  336. page_info = {'page_no': index, 'width': page_width, 'height': page_height}
  337. page_dict = {'layout_dets': result, 'page_info': page_info}
  338. model_json.append(page_dict)
  339. """
  340. results = []
  341. for img_idx, img in enumerate(images):
  342. inference_start = time.time()
  343. result = custom_model(img)
  344. logger.info(f'-----image index : {img_idx}, image inference total time: {round(time.time() - inference_start, 2)}-----')
  345. results.append(result)
  346. gc_start = time.time()
  347. clean_memory(get_device())
  348. gc_time = round(time.time() - gc_start, 2)
  349. logger.info(f'gc time: {gc_time}')
  350. doc_analyze_time = round(time.time() - doc_analyze_start, 2)
  351. doc_analyze_speed = round(len(images) / doc_analyze_time, 2)
  352. logger.info(
  353. f'doc analyze time: {round(time.time() - doc_analyze_start, 2)},'
  354. f' speed: {doc_analyze_speed} pages/second'
  355. )
  356. return (idx, results)