batch_build_dataset.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. import concurrent.futures
  2. import fitz
  3. from magic_pdf.data.dataset import PymuDocDataset
  4. from magic_pdf.data.utils import fitz_doc_to_image # PyMuPDF
  5. def partition_array_greedy(arr, k):
  6. """Partition an array into k parts using a simple greedy approach.
  7. Parameters:
  8. -----------
  9. arr : list
  10. The input array of integers
  11. k : int
  12. Number of partitions to create
  13. Returns:
  14. --------
  15. partitions : list of lists
  16. The k partitions of the array
  17. """
  18. # Handle edge cases
  19. if k <= 0:
  20. raise ValueError('k must be a positive integer')
  21. if k > len(arr):
  22. k = len(arr) # Adjust k if it's too large
  23. if k == 1:
  24. return [list(range(len(arr)))]
  25. if k == len(arr):
  26. return [[i] for i in range(len(arr))]
  27. # Sort the array in descending order
  28. sorted_indices = sorted(range(len(arr)), key=lambda i: arr[i][1], reverse=True)
  29. # Initialize k empty partitions
  30. partitions = [[] for _ in range(k)]
  31. partition_sums = [0] * k
  32. # Assign each element to the partition with the smallest current sum
  33. for idx in sorted_indices:
  34. # Find the partition with the smallest sum
  35. min_sum_idx = partition_sums.index(min(partition_sums))
  36. # Add the element to this partition
  37. partitions[min_sum_idx].append(idx) # Store the original index
  38. partition_sums[min_sum_idx] += arr[idx][1]
  39. return partitions
  40. def process_pdf_batch(pdf_jobs, idx):
  41. """Process a batch of PDF pages using multiple threads.
  42. Parameters:
  43. -----------
  44. pdf_jobs : list of tuples
  45. List of (pdf_path, page_num) tuples
  46. output_dir : str or None
  47. Directory to save images to
  48. num_threads : int
  49. Number of threads to use
  50. **kwargs :
  51. Additional arguments for process_pdf_page
  52. Returns:
  53. --------
  54. images : list
  55. List of processed images
  56. """
  57. images = []
  58. for pdf_path, _ in pdf_jobs:
  59. doc = fitz.open(pdf_path)
  60. tmp = []
  61. for page_num in range(len(doc)):
  62. page = doc[page_num]
  63. tmp.append(fitz_doc_to_image(page))
  64. images.append(tmp)
  65. return (idx, images)
  66. def batch_build_dataset(pdf_paths, k, lang=None):
  67. """Process multiple PDFs by partitioning them into k balanced parts and
  68. processing each part in parallel.
  69. Parameters:
  70. -----------
  71. pdf_paths : list
  72. List of paths to PDF files
  73. k : int
  74. Number of partitions to create
  75. output_dir : str or None
  76. Directory to save images to
  77. threads_per_worker : int
  78. Number of threads to use per worker
  79. **kwargs :
  80. Additional arguments for process_pdf_page
  81. Returns:
  82. --------
  83. all_images : list
  84. List of all processed images
  85. """
  86. results = []
  87. for pdf_path in pdf_paths:
  88. with open(pdf_path, 'rb') as f:
  89. pdf_bytes = f.read()
  90. dataset = PymuDocDataset(pdf_bytes, lang=lang)
  91. results.append(dataset)
  92. return results
  93. #
  94. # # Get page counts for each PDF
  95. # pdf_info = []
  96. # total_pages = 0
  97. #
  98. # for pdf_path in pdf_paths:
  99. # try:
  100. # doc = fitz.open(pdf_path)
  101. # num_pages = len(doc)
  102. # pdf_info.append((pdf_path, num_pages))
  103. # total_pages += num_pages
  104. # doc.close()
  105. # except Exception as e:
  106. # print(f'Error opening {pdf_path}: {e}')
  107. #
  108. # # Partition the jobs based on page countEach job has 1 page
  109. # partitions = partition_array_greedy(pdf_info, k)
  110. #
  111. # # Process each partition in parallel
  112. # all_images_h = {}
  113. #
  114. # with concurrent.futures.ProcessPoolExecutor(max_workers=k) as executor:
  115. # # Submit one task per partition
  116. # futures = []
  117. # for sn, partition in enumerate(partitions):
  118. # # Get the jobs for this partition
  119. # partition_jobs = [pdf_info[idx] for idx in partition]
  120. #
  121. # # Submit the task
  122. # future = executor.submit(
  123. # process_pdf_batch,
  124. # partition_jobs,
  125. # sn
  126. # )
  127. # futures.append(future)
  128. # # Process results as they complete
  129. # for i, future in enumerate(concurrent.futures.as_completed(futures)):
  130. # try:
  131. # idx, images = future.result()
  132. # all_images_h[idx] = images
  133. # except Exception as e:
  134. # print(f'Error processing partition: {e}')
  135. # results = [None] * len(pdf_paths)
  136. # for i in range(len(partitions)):
  137. # partition = partitions[i]
  138. # for j in range(len(partition)):
  139. # with open(pdf_info[partition[j]][0], 'rb') as f:
  140. # pdf_bytes = f.read()
  141. # dataset = PymuDocDataset(pdf_bytes, lang=lang)
  142. # dataset.set_images(all_images_h[i][j])
  143. # results[partition[j]] = dataset
  144. # return results