pdf_utils.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. """
  2. PDF处理工具模块
  3. 提供PDF相关处理功能:
  4. - PDF加载与分类
  5. - PDF文本提取
  6. - 跨页表格合并
  7. - 页面范围解析与过滤
  8. """
  9. from typing import Dict, List, Any, Optional, Tuple, Set
  10. from pathlib import Path
  11. from PIL import Image
  12. from loguru import logger
  13. import re
  14. # 导入页面范围解析函数(不依赖 MinerU)
  15. from .file_utils import parse_page_range
  16. # 导入 MinerU 组件
  17. try:
  18. from mineru.utils.pdf_classify import classify as pdf_classify
  19. from mineru.utils.pdf_image_tools import load_images_from_pdf
  20. from mineru.utils.enum_class import ImageType
  21. from mineru.utils.pdf_text_tool import get_page as pdf_get_page_text
  22. MINERU_AVAILABLE = True
  23. except ImportError:
  24. raise ImportError("MinerU components not available for PDF processing")
  25. class PDFUtils:
  26. """PDF处理工具类"""
  27. @staticmethod
  28. def parse_page_range(page_range: Optional[str], total_pages: int) -> Set[int]:
  29. """
  30. 解析页面范围字符串(向后兼容包装函数)
  31. 此方法是对 file_utils.parse_page_range 的包装,保持向后兼容性。
  32. 新代码应直接使用 file_utils.parse_page_range。
  33. 支持格式:
  34. - "1-5" → {0, 1, 2, 3, 4}(页码从1开始,内部转为0-based索引)
  35. - "3" → {2}
  36. - "1-5,7,9-12" → {0, 1, 2, 3, 4, 6, 8, 9, 10, 11}
  37. - "1-" → 从第1页到最后
  38. - "-5" → 从第1页到第5页
  39. Args:
  40. page_range: 页面范围字符串(页码从1开始)
  41. total_pages: 总页数
  42. Returns:
  43. 页面索引集合(0-based)
  44. """
  45. return parse_page_range(page_range, total_pages)
  46. @staticmethod
  47. def load_and_classify_document(
  48. document_path: Path,
  49. dpi: int = 200,
  50. page_range: Optional[str] = None
  51. ) -> Tuple[List[Dict], str, Optional[Any]]:
  52. """
  53. 加载文档并分类,支持页面范围过滤
  54. Args:
  55. document_path: 文档路径
  56. dpi: PDF渲染DPI
  57. page_range: 页面范围字符串,如 "1-5,7,9-12"
  58. - PDF:按页码(从1开始)
  59. - 图片目录:按文件名排序后的位置(从1开始)
  60. Returns:
  61. (images_list, pdf_type, pdf_doc)
  62. - images_list: 图像列表,每个元素包含 {'img_pil': PIL.Image, 'scale': float, 'page_idx': int}
  63. - pdf_type: 'ocr' 或 'txt'
  64. - pdf_doc: PDF文档对象(如果是PDF)
  65. """
  66. pdf_doc = None
  67. pdf_type = 'ocr' # 默认使用OCR模式
  68. all_images = []
  69. if document_path.is_dir():
  70. # 处理目录:遍历所有图片
  71. image_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif'}
  72. image_files = sorted([
  73. f for f in document_path.iterdir()
  74. if f.suffix.lower() in image_extensions
  75. ])
  76. # 解析页面范围
  77. total_pages = len(image_files)
  78. selected_pages = parse_page_range(page_range, total_pages)
  79. if page_range:
  80. logger.info(f"📋 图片目录共 {total_pages} 张,选择处理 {len(selected_pages)} 张")
  81. for idx, img_file in enumerate(image_files):
  82. if idx not in selected_pages:
  83. continue
  84. img = Image.open(img_file)
  85. if img.mode != 'RGB':
  86. img = img.convert('RGB')
  87. all_images.append({
  88. 'img_pil': img,
  89. 'scale': 1.0,
  90. 'source_path': str(img_file),
  91. 'page_idx': idx, # 原始索引
  92. 'page_name': img_file.stem # 文件名(不含扩展名)
  93. })
  94. pdf_type = 'ocr' # 图片目录始终使用OCR模式
  95. elif document_path.suffix.lower() == '.pdf':
  96. # 处理PDF文件
  97. if not MINERU_AVAILABLE:
  98. raise RuntimeError("MinerU components not available for PDF processing")
  99. with open(document_path, 'rb') as f:
  100. pdf_bytes = f.read()
  101. # PDF分类
  102. pdf_type = pdf_classify(pdf_bytes)
  103. logger.info(f"📋 PDF classified as: {pdf_type}")
  104. # 加载图像
  105. images_list, pdf_doc = load_images_from_pdf(
  106. pdf_bytes,
  107. dpi=dpi,
  108. image_type=ImageType.PIL
  109. )
  110. # 解析页面范围
  111. total_pages = len(images_list)
  112. selected_pages = parse_page_range(page_range, total_pages)
  113. if page_range:
  114. logger.info(f"📋 PDF 共 {total_pages} 页,选择处理 {len(selected_pages)} 页")
  115. for idx, img_dict in enumerate(images_list):
  116. if idx not in selected_pages:
  117. continue
  118. all_images.append({
  119. 'img_pil': img_dict['img_pil'],
  120. 'scale': img_dict.get('scale', dpi / 72),
  121. 'source_path': str(document_path),
  122. 'page_idx': idx # 原始页码索引
  123. })
  124. elif document_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif']:
  125. # 处理单个图片
  126. img = Image.open(document_path)
  127. if img.mode != 'RGB':
  128. img = img.convert('RGB')
  129. all_images.append({
  130. 'img_pil': img,
  131. 'scale': 1.0,
  132. 'source_path': str(document_path),
  133. 'page_idx': 0,
  134. 'page_name': document_path.stem
  135. })
  136. pdf_type = 'ocr'
  137. else:
  138. raise ValueError(f"Unsupported file format: {document_path.suffix}")
  139. return all_images, pdf_type, pdf_doc
  140. @staticmethod
  141. def extract_text_from_pdf(
  142. pdf_doc: Any,
  143. page_idx: int,
  144. bbox: List[float],
  145. scale: float
  146. ) -> Tuple[str, bool]:
  147. """
  148. 从PDF直接提取文本(使用 MinerU 的 pypdfium2 方式)
  149. Args:
  150. pdf_doc: pypdfium2 的 PdfDocument 对象
  151. page_idx: 页码索引
  152. bbox: 目标区域的bbox(图像坐标)
  153. scale: 图像与PDF的缩放比例
  154. Returns:
  155. (text, success)
  156. """
  157. if not MINERU_AVAILABLE or pdf_get_page_text is None:
  158. logger.debug("MinerU pdf_text_tool not available")
  159. return "", False
  160. try:
  161. page = pdf_doc[page_idx]
  162. # 将图像坐标转换为PDF坐标
  163. pdf_bbox = [
  164. bbox[0] / scale,
  165. bbox[1] / scale,
  166. bbox[2] / scale,
  167. bbox[3] / scale
  168. ]
  169. # 使用 MinerU 的方式获取页面文本信息
  170. page_dict = pdf_get_page_text(page)
  171. # 从 blocks 中提取与 bbox 重叠的文本
  172. text_parts = []
  173. for block in page_dict.get('blocks', []):
  174. for line in block.get('lines', []):
  175. line_bbox = line.get('bbox')
  176. if line_bbox and hasattr(line_bbox, 'bbox'):
  177. line_bbox = line_bbox.bbox # pdftext 的 BBox 对象
  178. elif isinstance(line_bbox, (list, tuple)) and len(line_bbox) >= 4:
  179. line_bbox = list(line_bbox)
  180. else:
  181. continue
  182. # 检查 line 是否与目标 bbox 重叠
  183. if PDFUtils._bbox_overlap(pdf_bbox, line_bbox):
  184. for span in line.get('spans', []):
  185. span_text = span.get('text', '')
  186. if span_text:
  187. text_parts.append(span_text)
  188. text = ' '.join(text_parts)
  189. return text.strip(), bool(text.strip())
  190. except Exception as e:
  191. import traceback
  192. logger.debug(f"PDF text extraction error: {e}")
  193. logger.debug(traceback.format_exc())
  194. return "", False
  195. @staticmethod
  196. def _bbox_overlap(bbox1: List[float], bbox2: List[float]) -> bool:
  197. """检查两个 bbox 是否重叠"""
  198. if len(bbox1) < 4 or len(bbox2) < 4:
  199. return False
  200. x1_1, y1_1, x2_1, y2_1 = bbox1[:4]
  201. x1_2, y1_2, x2_2, y2_2 = bbox2[:4]
  202. if x2_1 < x1_2 or x2_2 < x1_1:
  203. return False
  204. if y2_1 < y1_2 or y2_2 < y1_1:
  205. return False
  206. return True
  207. @staticmethod
  208. def merge_cross_page_tables(results: Dict[str, Any]) -> Dict[str, Any]:
  209. """
  210. 合并跨页表格
  211. TODO: 实现跨页表格合并逻辑
  212. 可以参考 MinerU 的 cross_page_table_merge 实现
  213. Args:
  214. results: 处理结果字典
  215. Returns:
  216. 合并后的结果
  217. """
  218. # TODO: 实现跨页表格合并逻辑
  219. return results