pdf_utils.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. """
  2. PDF处理工具模块
  3. 提供PDF相关处理功能:
  4. - PDF加载与分类
  5. - PDF文本提取
  6. - 跨页表格合并
  7. - 页面范围解析与过滤
  8. """
  9. from typing import Dict, List, Any, Optional, Tuple, Set
  10. from pathlib import Path
  11. from PIL import Image
  12. from loguru import logger
  13. import re
  14. # 导入 MinerU 组件
  15. try:
  16. from mineru.utils.pdf_classify import classify as pdf_classify
  17. from mineru.utils.pdf_image_tools import load_images_from_pdf
  18. from mineru.utils.enum_class import ImageType
  19. from mineru.utils.pdf_text_tool import get_page as pdf_get_page_text
  20. MINERU_AVAILABLE = True
  21. except ImportError:
  22. raise ImportError("MinerU components not available for PDF processing")
  23. class PDFUtils:
  24. """PDF处理工具类"""
  25. @staticmethod
  26. def parse_page_range(page_range: Optional[str], total_pages: int) -> Set[int]:
  27. """
  28. 解析页面范围字符串
  29. 支持格式:
  30. - "1-5" → {0, 1, 2, 3, 4}(页码从1开始,内部转为0-based索引)
  31. - "3" → {2}
  32. - "1-5,7,9-12" → {0, 1, 2, 3, 4, 6, 8, 9, 10, 11}
  33. - "1-" → 从第1页到最后
  34. - "-5" → 从第1页到第5页
  35. Args:
  36. page_range: 页面范围字符串(页码从1开始)
  37. total_pages: 总页数
  38. Returns:
  39. 页面索引集合(0-based)
  40. """
  41. if not page_range or not page_range.strip():
  42. return set(range(total_pages))
  43. pages = set()
  44. parts = page_range.replace(' ', '').split(',')
  45. for part in parts:
  46. part = part.strip()
  47. if not part:
  48. continue
  49. if '-' in part:
  50. # 范围格式
  51. match = re.match(r'^(\d*)-(\d*)$', part)
  52. if match:
  53. start_str, end_str = match.groups()
  54. start = int(start_str) if start_str else 1
  55. end = int(end_str) if end_str else total_pages
  56. # 转换为 0-based 索引
  57. start = max(0, start - 1)
  58. end = min(total_pages, end)
  59. pages.update(range(start, end))
  60. else:
  61. # 单页
  62. try:
  63. page_num = int(part)
  64. if 1 <= page_num <= total_pages:
  65. pages.add(page_num - 1) # 转换为 0-based 索引
  66. except ValueError:
  67. logger.warning(f"Invalid page number: {part}")
  68. return pages
  69. @staticmethod
  70. def load_and_classify_document(
  71. document_path: Path,
  72. dpi: int = 200,
  73. page_range: Optional[str] = None
  74. ) -> Tuple[List[Dict], str, Optional[Any]]:
  75. """
  76. 加载文档并分类,支持页面范围过滤
  77. Args:
  78. document_path: 文档路径
  79. dpi: PDF渲染DPI
  80. page_range: 页面范围字符串,如 "1-5,7,9-12"
  81. - PDF:按页码(从1开始)
  82. - 图片目录:按文件名排序后的位置(从1开始)
  83. Returns:
  84. (images_list, pdf_type, pdf_doc)
  85. - images_list: 图像列表,每个元素包含 {'img_pil': PIL.Image, 'scale': float, 'page_idx': int}
  86. - pdf_type: 'ocr' 或 'txt'
  87. - pdf_doc: PDF文档对象(如果是PDF)
  88. """
  89. pdf_doc = None
  90. pdf_type = 'ocr' # 默认使用OCR模式
  91. all_images = []
  92. if document_path.is_dir():
  93. # 处理目录:遍历所有图片
  94. image_extensions = {'.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif'}
  95. image_files = sorted([
  96. f for f in document_path.iterdir()
  97. if f.suffix.lower() in image_extensions
  98. ])
  99. # 解析页面范围
  100. total_pages = len(image_files)
  101. selected_pages = PDFUtils.parse_page_range(page_range, total_pages)
  102. if page_range:
  103. logger.info(f"📋 图片目录共 {total_pages} 张,选择处理 {len(selected_pages)} 张")
  104. for idx, img_file in enumerate(image_files):
  105. if idx not in selected_pages:
  106. continue
  107. img = Image.open(img_file)
  108. if img.mode != 'RGB':
  109. img = img.convert('RGB')
  110. all_images.append({
  111. 'img_pil': img,
  112. 'scale': 1.0,
  113. 'source_path': str(img_file),
  114. 'page_idx': idx, # 原始索引
  115. 'page_name': img_file.stem # 文件名(不含扩展名)
  116. })
  117. pdf_type = 'ocr' # 图片目录始终使用OCR模式
  118. elif document_path.suffix.lower() == '.pdf':
  119. # 处理PDF文件
  120. if not MINERU_AVAILABLE:
  121. raise RuntimeError("MinerU components not available for PDF processing")
  122. with open(document_path, 'rb') as f:
  123. pdf_bytes = f.read()
  124. # PDF分类
  125. pdf_type = pdf_classify(pdf_bytes)
  126. logger.info(f"📋 PDF classified as: {pdf_type}")
  127. # 加载图像
  128. images_list, pdf_doc = load_images_from_pdf(
  129. pdf_bytes,
  130. dpi=dpi,
  131. image_type=ImageType.PIL
  132. )
  133. # 解析页面范围
  134. total_pages = len(images_list)
  135. selected_pages = PDFUtils.parse_page_range(page_range, total_pages)
  136. if page_range:
  137. logger.info(f"📋 PDF 共 {total_pages} 页,选择处理 {len(selected_pages)} 页")
  138. for idx, img_dict in enumerate(images_list):
  139. if idx not in selected_pages:
  140. continue
  141. all_images.append({
  142. 'img_pil': img_dict['img_pil'],
  143. 'scale': img_dict.get('scale', dpi / 72),
  144. 'source_path': str(document_path),
  145. 'page_idx': idx # 原始页码索引
  146. })
  147. elif document_path.suffix.lower() in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif']:
  148. # 处理单个图片
  149. img = Image.open(document_path)
  150. if img.mode != 'RGB':
  151. img = img.convert('RGB')
  152. all_images.append({
  153. 'img_pil': img,
  154. 'scale': 1.0,
  155. 'source_path': str(document_path),
  156. 'page_idx': 0,
  157. 'page_name': document_path.stem
  158. })
  159. pdf_type = 'ocr'
  160. else:
  161. raise ValueError(f"Unsupported file format: {document_path.suffix}")
  162. return all_images, pdf_type, pdf_doc
  163. @staticmethod
  164. def extract_text_from_pdf(
  165. pdf_doc: Any,
  166. page_idx: int,
  167. bbox: List[float],
  168. scale: float
  169. ) -> Tuple[str, bool]:
  170. """
  171. 从PDF直接提取文本(使用 MinerU 的 pypdfium2 方式)
  172. Args:
  173. pdf_doc: pypdfium2 的 PdfDocument 对象
  174. page_idx: 页码索引
  175. bbox: 目标区域的bbox(图像坐标)
  176. scale: 图像与PDF的缩放比例
  177. Returns:
  178. (text, success)
  179. """
  180. if not MINERU_AVAILABLE or pdf_get_page_text is None:
  181. logger.debug("MinerU pdf_text_tool not available")
  182. return "", False
  183. try:
  184. page = pdf_doc[page_idx]
  185. # 将图像坐标转换为PDF坐标
  186. pdf_bbox = [
  187. bbox[0] / scale,
  188. bbox[1] / scale,
  189. bbox[2] / scale,
  190. bbox[3] / scale
  191. ]
  192. # 使用 MinerU 的方式获取页面文本信息
  193. page_dict = pdf_get_page_text(page)
  194. # 从 blocks 中提取与 bbox 重叠的文本
  195. text_parts = []
  196. for block in page_dict.get('blocks', []):
  197. for line in block.get('lines', []):
  198. line_bbox = line.get('bbox')
  199. if line_bbox and hasattr(line_bbox, 'bbox'):
  200. line_bbox = line_bbox.bbox # pdftext 的 BBox 对象
  201. elif isinstance(line_bbox, (list, tuple)) and len(line_bbox) >= 4:
  202. line_bbox = list(line_bbox)
  203. else:
  204. continue
  205. # 检查 line 是否与目标 bbox 重叠
  206. if PDFUtils._bbox_overlap(pdf_bbox, line_bbox):
  207. for span in line.get('spans', []):
  208. span_text = span.get('text', '')
  209. if span_text:
  210. text_parts.append(span_text)
  211. text = ' '.join(text_parts)
  212. return text.strip(), bool(text.strip())
  213. except Exception as e:
  214. import traceback
  215. logger.debug(f"PDF text extraction error: {e}")
  216. logger.debug(traceback.format_exc())
  217. return "", False
  218. @staticmethod
  219. def _bbox_overlap(bbox1: List[float], bbox2: List[float]) -> bool:
  220. """检查两个 bbox 是否重叠"""
  221. if len(bbox1) < 4 or len(bbox2) < 4:
  222. return False
  223. x1_1, y1_1, x2_1, y2_1 = bbox1[:4]
  224. x1_2, y1_2, x2_2, y2_2 = bbox2[:4]
  225. if x2_1 < x1_2 or x2_2 < x1_1:
  226. return False
  227. if y2_1 < y1_2 or y2_2 < y1_1:
  228. return False
  229. return True
  230. @staticmethod
  231. def merge_cross_page_tables(results: Dict[str, Any]) -> Dict[str, Any]:
  232. """
  233. 合并跨页表格
  234. TODO: 实现跨页表格合并逻辑
  235. 可以参考 MinerU 的 cross_page_table_merge 实现
  236. Args:
  237. results: 处理结果字典
  238. Returns:
  239. 合并后的结果
  240. """
  241. # TODO: 实现跨页表格合并逻辑
  242. return results