utils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import multiprocessing as mp
  2. import threading
  3. from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
  4. as_completed)
  5. import fitz
  6. import numpy as np
  7. from loguru import logger
  8. from magic_pdf.utils.annotations import ImportPIL
  9. @ImportPIL
  10. def fitz_doc_to_image(doc, dpi=200) -> dict:
  11. """Convert fitz.Document to image, Then convert the image to numpy array.
  12. Args:
  13. doc (_type_): pymudoc page
  14. dpi (int, optional): reset the dpi of dpi. Defaults to 200.
  15. Returns:
  16. dict: {'img': numpy array, 'width': width, 'height': height }
  17. """
  18. from PIL import Image
  19. mat = fitz.Matrix(dpi / 72, dpi / 72)
  20. pm = doc.get_pixmap(matrix=mat, alpha=False)
  21. # If the width or height exceeds 4500 after scaling, do not scale further.
  22. if pm.width > 4500 or pm.height > 4500:
  23. pm = doc.get_pixmap(matrix=fitz.Matrix(1, 1), alpha=False)
  24. img = Image.frombytes('RGB', (pm.width, pm.height), pm.samples)
  25. img = np.array(img)
  26. img_dict = {'img': img, 'width': pm.width, 'height': pm.height}
  27. return img_dict
  28. @ImportPIL
  29. def load_images_from_pdf(pdf_bytes: bytes, dpi=200, start_page_id=0, end_page_id=None) -> list:
  30. from PIL import Image
  31. images = []
  32. with fitz.open('pdf', pdf_bytes) as doc:
  33. pdf_page_num = doc.page_count
  34. end_page_id = (
  35. end_page_id
  36. if end_page_id is not None and end_page_id >= 0
  37. else pdf_page_num - 1
  38. )
  39. if end_page_id > pdf_page_num - 1:
  40. logger.warning('end_page_id is out of range, use images length')
  41. end_page_id = pdf_page_num - 1
  42. for index in range(0, doc.page_count):
  43. if start_page_id <= index <= end_page_id:
  44. page = doc[index]
  45. mat = fitz.Matrix(dpi / 72, dpi / 72)
  46. pm = page.get_pixmap(matrix=mat, alpha=False)
  47. # If the width or height exceeds 4500 after scaling, do not scale further.
  48. if pm.width > 4500 or pm.height > 4500:
  49. pm = page.get_pixmap(matrix=fitz.Matrix(1, 1), alpha=False)
  50. img = Image.frombytes('RGB', (pm.width, pm.height), pm.samples)
  51. img = np.array(img)
  52. img_dict = {'img': img, 'width': pm.width, 'height': pm.height}
  53. else:
  54. img_dict = {'img': [], 'width': 0, 'height': 0}
  55. images.append(img_dict)
  56. return images
  57. def convert_page(bytes_page):
  58. pdfs = fitz.open('pdf', bytes_page)
  59. page = pdfs[0]
  60. return fitz_doc_to_image(page)
  61. def parallel_process_pdf_safe(pages, num_workers=None, **kwargs):
  62. """Process PDF pages in parallel with serialization-safe approach."""
  63. if num_workers is None:
  64. num_workers = mp.cpu_count()
  65. # Process the extracted page data in parallel
  66. with ProcessPoolExecutor(max_workers=num_workers) as executor:
  67. # Process the page data
  68. results = list(
  69. executor.map(convert_page, pages)
  70. )
  71. return results
  72. def threaded_process_pdf(pdf_path, num_threads=4, **kwargs):
  73. """Process all pages of a PDF using multiple threads.
  74. Parameters:
  75. -----------
  76. pdf_path : str
  77. Path to the PDF file
  78. num_threads : int
  79. Number of threads to use
  80. **kwargs :
  81. Additional arguments for fitz_doc_to_image
  82. Returns:
  83. --------
  84. images : list
  85. List of processed images, in page order
  86. """
  87. # Open the PDF
  88. doc = fitz.open(pdf_path)
  89. num_pages = len(doc)
  90. # Create a list to store results in the correct order
  91. results = [None] * num_pages
  92. # Create a thread pool
  93. with ThreadPoolExecutor(max_workers=num_threads) as executor:
  94. # Submit all tasks
  95. futures = {}
  96. for page_num in range(num_pages):
  97. page = doc[page_num]
  98. future = executor.submit(fitz_doc_to_image, page, **kwargs)
  99. futures[future] = page_num
  100. # Process results as they complete with progress bar
  101. for future in as_completed(futures):
  102. page_num = futures[future]
  103. try:
  104. results[page_num] = future.result()
  105. except Exception as e:
  106. print(f'Error processing page {page_num}: {e}')
  107. results[page_num] = None
  108. # Close the document
  109. doc.close()
  110. if __name__ == '__main__':
  111. pdf = fitz.open('/tmp/[MS-DOC].pdf')
  112. pdf_page = [fitz.open() for i in range(pdf.page_count)]
  113. [pdf_page[i].insert_pdf(pdf, from_page=i, to_page=i) for i in range(pdf.page_count)]
  114. pdf_page = [v.tobytes() for v in pdf_page]
  115. results = parallel_process_pdf_safe(pdf_page, num_workers=16)
  116. # threaded_process_pdf('/tmp/[MS-DOC].pdf', num_threads=16)
  117. """ benchmark results of multi-threaded processing (fitz page to image)
  118. total page nums: 578
  119. thread nums, time cost
  120. 1 7.351 sec
  121. 2 6.334 sec
  122. 4 5.968 sec
  123. 8 6.728 sec
  124. 16 8.085 sec
  125. """
  126. """ benchmark results of multi-processor processing (fitz page to image)
  127. total page nums: 578
  128. processor nums, time cost
  129. 1 17.170 sec
  130. 2 10.170 sec
  131. 4 7.841 sec
  132. 8 7.900 sec
  133. 16 7.984 sec
  134. """