utils.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import multiprocessing as mp
  2. import threading
  3. import fitz
  4. import numpy as np
  5. from loguru import logger
  6. from magic_pdf.utils.annotations import ImportPIL
  7. from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
  8. @ImportPIL
  9. def fitz_doc_to_image(doc, dpi=200) -> dict:
  10. """Convert fitz.Document to image, Then convert the image to numpy array.
  11. Args:
  12. doc (_type_): pymudoc page
  13. dpi (int, optional): reset the dpi of dpi. Defaults to 200.
  14. Returns:
  15. dict: {'img': numpy array, 'width': width, 'height': height }
  16. """
  17. from PIL import Image
  18. mat = fitz.Matrix(dpi / 72, dpi / 72)
  19. pm = doc.get_pixmap(matrix=mat, alpha=False)
  20. # If the width or height exceeds 4500 after scaling, do not scale further.
  21. if pm.width > 4500 or pm.height > 4500:
  22. pm = doc.get_pixmap(matrix=fitz.Matrix(1, 1), alpha=False)
  23. img = Image.frombytes('RGB', (pm.width, pm.height), pm.samples)
  24. img = np.array(img)
  25. img_dict = {'img': img, 'width': pm.width, 'height': pm.height}
  26. return img_dict
  27. @ImportPIL
  28. def load_images_from_pdf(pdf_bytes: bytes, dpi=200, start_page_id=0, end_page_id=None) -> list:
  29. from PIL import Image
  30. images = []
  31. with fitz.open('pdf', pdf_bytes) as doc:
  32. pdf_page_num = doc.page_count
  33. end_page_id = (
  34. end_page_id
  35. if end_page_id is not None and end_page_id >= 0
  36. else pdf_page_num - 1
  37. )
  38. if end_page_id > pdf_page_num - 1:
  39. logger.warning('end_page_id is out of range, use images length')
  40. end_page_id = pdf_page_num - 1
  41. for index in range(0, doc.page_count):
  42. if start_page_id <= index <= end_page_id:
  43. page = doc[index]
  44. mat = fitz.Matrix(dpi / 72, dpi / 72)
  45. pm = page.get_pixmap(matrix=mat, alpha=False)
  46. # If the width or height exceeds 4500 after scaling, do not scale further.
  47. if pm.width > 4500 or pm.height > 4500:
  48. pm = page.get_pixmap(matrix=fitz.Matrix(1, 1), alpha=False)
  49. img = Image.frombytes('RGB', (pm.width, pm.height), pm.samples)
  50. img = np.array(img)
  51. img_dict = {'img': img, 'width': pm.width, 'height': pm.height}
  52. else:
  53. img_dict = {'img': [], 'width': 0, 'height': 0}
  54. images.append(img_dict)
  55. return images
  56. def convert_page(bytes_page):
  57. pdfs = fitz.open('pdf', bytes_page)
  58. page = pdfs[0]
  59. return fitz_doc_to_image(page)
  60. def parallel_process_pdf_safe(pages, num_workers=None, **kwargs):
  61. """Process PDF pages in parallel with serialization-safe approach"""
  62. if num_workers is None:
  63. num_workers = mp.cpu_count()
  64. # Process the extracted page data in parallel
  65. with ProcessPoolExecutor(max_workers=num_workers) as executor:
  66. # Process the page data
  67. results = list(
  68. executor.map(convert_page, pages)
  69. )
  70. return results
  71. def threaded_process_pdf(pdf_path, num_threads=4, **kwargs):
  72. """
  73. Process all pages of a PDF using multiple threads
  74. Parameters:
  75. -----------
  76. pdf_path : str
  77. Path to the PDF file
  78. num_threads : int
  79. Number of threads to use
  80. **kwargs :
  81. Additional arguments for fitz_doc_to_image
  82. Returns:
  83. --------
  84. images : list
  85. List of processed images, in page order
  86. """
  87. # Open the PDF
  88. doc = fitz.open(pdf_path)
  89. num_pages = len(doc)
  90. # Create a list to store results in the correct order
  91. results = [None] * num_pages
  92. # Create a thread pool
  93. with ThreadPoolExecutor(max_workers=num_threads) as executor:
  94. # Submit all tasks
  95. futures = {}
  96. for page_num in range(num_pages):
  97. page = doc[page_num]
  98. future = executor.submit(fitz_doc_to_image, page, **kwargs)
  99. futures[future] = page_num
  100. # Process results as they complete with progress bar
  101. for future in as_completed(futures):
  102. page_num = futures[future]
  103. try:
  104. results[page_num] = future.result()
  105. except Exception as e:
  106. print(f"Error processing page {page_num}: {e}")
  107. results[page_num] = None
  108. # Close the document
  109. doc.close()
  110. if __name__ == "__main__":
  111. pdf = fitz.open('/tmp/[MS-DOC].pdf')
  112. pdf_page = [fitz.open() for i in range(pdf.page_count)]
  113. [pdf_page[i].insert_pdf(pdf, from_page=i, to_page=i) for i in range(pdf.page_count)]
  114. pdf_page = [v.tobytes() for v in pdf_page]
  115. results = parallel_process_pdf_safe(pdf_page, num_workers=16)
  116. # threaded_process_pdf('/tmp/[MS-DOC].pdf', num_threads=16)
  117. """ benchmark results of multi-threaded processing (fitz page to image)
  118. total page nums: 578
  119. thread nums, time cost
  120. 1 7.351 sec
  121. 2 6.334 sec
  122. 4 5.968 sec
  123. 8 6.728 sec
  124. 16 8.085 sec
  125. """
  126. """ benchmark results of multi-processor processing (fitz page to image)
  127. total page nums: 578
  128. processor nums, time cost
  129. 1 17.170 sec
  130. 2 10.170 sec
  131. 4 7.841 sec
  132. 8 7.900 sec
  133. 16 7.984 sec
  134. """