utils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import multiprocessing as mp
  2. import threading
  3. from concurrent.futures import (ProcessPoolExecutor, ThreadPoolExecutor,
  4. as_completed)
  5. import fitz
  6. import numpy as np
  7. from loguru import logger
  8. def fitz_doc_to_image(page, dpi=200) -> dict:
  9. """Convert fitz.Document to image, Then convert the image to numpy array.
  10. Args:
  11. page (_type_): pymudoc page
  12. dpi (int, optional): reset the dpi of dpi. Defaults to 200.
  13. Returns:
  14. dict: {'img': numpy array, 'width': width, 'height': height }
  15. """
  16. mat = fitz.Matrix(dpi / 72, dpi / 72)
  17. pm = page.get_pixmap(matrix=mat, alpha=False)
  18. # If the width or height exceeds 4500 after scaling, do not scale further.
  19. if pm.width > 4500 or pm.height > 4500:
  20. pm = page.get_pixmap(matrix=fitz.Matrix(1, 1), alpha=False)
  21. # Convert pixmap samples directly to numpy array
  22. img = np.frombuffer(pm.samples, dtype=np.uint8).reshape(pm.height, pm.width, 3)
  23. img_dict = {'img': img, 'width': pm.width, 'height': pm.height}
  24. return img_dict
  25. def load_images_from_pdf(pdf_bytes: bytes, dpi=200, start_page_id=0, end_page_id=None) -> list:
  26. images = []
  27. with fitz.open('pdf', pdf_bytes) as doc:
  28. pdf_page_num = doc.page_count
  29. end_page_id = (
  30. end_page_id
  31. if end_page_id is not None and end_page_id >= 0
  32. else pdf_page_num - 1
  33. )
  34. if end_page_id > pdf_page_num - 1:
  35. logger.warning('end_page_id is out of range, use images length')
  36. end_page_id = pdf_page_num - 1
  37. for index in range(0, doc.page_count):
  38. if start_page_id <= index <= end_page_id:
  39. page = doc[index]
  40. mat = fitz.Matrix(dpi / 72, dpi / 72)
  41. pm = page.get_pixmap(matrix=mat, alpha=False)
  42. # If the width or height exceeds 4500 after scaling, do not scale further.
  43. if pm.width > 4500 or pm.height > 4500:
  44. pm = page.get_pixmap(matrix=fitz.Matrix(1, 1), alpha=False)
  45. # Convert pixmap samples directly to numpy array
  46. img = np.frombuffer(pm.samples, dtype=np.uint8).reshape(pm.height, pm.width, 3)
  47. img_dict = {'img': img, 'width': pm.width, 'height': pm.height}
  48. else:
  49. img_dict = {'img': [], 'width': 0, 'height': 0}
  50. images.append(img_dict)
  51. return images
  52. def convert_page(bytes_page):
  53. pdfs = fitz.open('pdf', bytes_page)
  54. page = pdfs[0]
  55. return fitz_doc_to_image(page)
  56. def parallel_process_pdf_safe(pages, num_workers=None, **kwargs):
  57. """Process PDF pages in parallel with serialization-safe approach."""
  58. if num_workers is None:
  59. num_workers = mp.cpu_count()
  60. # Process the extracted page data in parallel
  61. with ProcessPoolExecutor(max_workers=num_workers) as executor:
  62. # Process the page data
  63. results = list(
  64. executor.map(convert_page, pages)
  65. )
  66. return results
  67. def threaded_process_pdf(pdf_path, num_threads=4, **kwargs):
  68. """Process all pages of a PDF using multiple threads.
  69. Parameters:
  70. -----------
  71. pdf_path : str
  72. Path to the PDF file
  73. num_threads : int
  74. Number of threads to use
  75. **kwargs :
  76. Additional arguments for fitz_doc_to_image
  77. Returns:
  78. --------
  79. images : list
  80. List of processed images, in page order
  81. """
  82. # Open the PDF
  83. doc = fitz.open(pdf_path)
  84. num_pages = len(doc)
  85. # Create a list to store results in the correct order
  86. results = [None] * num_pages
  87. # Create a thread pool
  88. with ThreadPoolExecutor(max_workers=num_threads) as executor:
  89. # Submit all tasks
  90. futures = {}
  91. for page_num in range(num_pages):
  92. page = doc[page_num]
  93. future = executor.submit(fitz_doc_to_image, page, **kwargs)
  94. futures[future] = page_num
  95. # Process results as they complete with progress bar
  96. for future in as_completed(futures):
  97. page_num = futures[future]
  98. try:
  99. results[page_num] = future.result()
  100. except Exception as e:
  101. print(f'Error processing page {page_num}: {e}')
  102. results[page_num] = None
  103. # Close the document
  104. doc.close()
  105. if __name__ == '__main__':
  106. pdf = fitz.open('/tmp/[MS-DOC].pdf')
  107. pdf_page = [fitz.open() for i in range(pdf.page_count)]
  108. [pdf_page[i].insert_pdf(pdf, from_page=i, to_page=i) for i in range(pdf.page_count)]
  109. pdf_page = [v.tobytes() for v in pdf_page]
  110. results = parallel_process_pdf_safe(pdf_page, num_workers=16)
  111. # threaded_process_pdf('/tmp/[MS-DOC].pdf', num_threads=16)
  112. """ benchmark results of multi-threaded processing (fitz page to image)
  113. total page nums: 578
  114. thread nums, time cost
  115. 1 7.351 sec
  116. 2 6.334 sec
  117. 4 5.968 sec
  118. 8 6.728 sec
  119. 16 8.085 sec
  120. """
  121. """ benchmark results of multi-processor processing (fitz page to image)
  122. total page nums: 578
  123. processor nums, time cost
  124. 1 17.170 sec
  125. 2 10.170 sec
  126. 4 7.841 sec
  127. 8 7.900 sec
  128. 16 7.984 sec
  129. """