file_utils.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397
  1. """
  2. 文件处理工具模块
  3. 提供文件处理相关功能:
  4. - 输入文件获取(支持文件/目录/列表/CSV)
  5. - PDF转图片
  6. - 文件列表处理
  7. """
  8. import tempfile
  9. from pathlib import Path
  10. from typing import List, Tuple
  11. import json
  12. import traceback
  13. from loguru import logger
  14. try:
  15. from mineru.utils.pdf_image_tools import load_images_from_pdf
  16. from mineru.utils.enum_class import ImageType
  17. MINERU_AVAILABLE = True
  18. except ImportError:
  19. MINERU_AVAILABLE = False
  20. load_images_from_pdf = None
  21. ImageType = None
  22. def split_files(file_list: List[str], num_splits: int) -> List[List[str]]:
  23. """
  24. 将文件列表分割成指定数量的子列表
  25. Args:
  26. file_list: 文件路径列表
  27. num_splits: 分割数量
  28. Returns:
  29. 分割后的文件列表
  30. """
  31. if num_splits <= 0:
  32. return [file_list]
  33. chunk_size = len(file_list) // num_splits
  34. remainder = len(file_list) % num_splits
  35. chunks = []
  36. start = 0
  37. for i in range(num_splits):
  38. # 前remainder个chunk多分配一个文件
  39. current_chunk_size = chunk_size + (1 if i < remainder else 0)
  40. if current_chunk_size > 0:
  41. chunks.append(file_list[start:start + current_chunk_size])
  42. start += current_chunk_size
  43. return [chunk for chunk in chunks if chunk] # 过滤空列表
  44. def create_temp_file_list(file_chunk: List[str]) -> str:
  45. """
  46. 创建临时文件列表文件
  47. Args:
  48. file_chunk: 文件路径列表
  49. Returns:
  50. 临时文件路径
  51. """
  52. with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
  53. for file_path in file_chunk:
  54. f.write(f"{file_path}\n")
  55. return f.name
  56. def get_image_files_from_dir(input_dir: Path, pattern: str = "*", max_files: int | None = None) -> List[str]:
  57. """
  58. 从目录获取图像文件列表
  59. Args:
  60. input_dir: 输入目录
  61. pattern: 文件名模式
  62. max_files: 最大文件数量限制
  63. Returns:
  64. 图像文件路径列表
  65. """
  66. image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
  67. image_files = []
  68. for ext in image_extensions:
  69. image_files.extend(list(input_dir.glob(f"{pattern}{ext}")))
  70. image_files.extend(list(input_dir.glob(f"{pattern}{ext.upper()}")))
  71. # 去重并排序
  72. image_files = sorted(list(set(str(f) for f in image_files)))
  73. # 限制文件数量
  74. if max_files:
  75. image_files = image_files[:max_files]
  76. return image_files
  77. def get_image_files_from_list(file_list_path: str) -> List[str]:
  78. """
  79. 从文件列表获取图像文件列表
  80. Args:
  81. file_list_path: 文件列表路径
  82. Returns:
  83. 图像文件路径列表
  84. """
  85. logger.info(f"📄 Reading file list from: {file_list_path}")
  86. with open(file_list_path, 'r', encoding='utf-8') as f:
  87. image_files = [line.strip() for line in f if line.strip()]
  88. # 验证文件存在性
  89. valid_files = []
  90. missing_files = []
  91. for file_path in image_files:
  92. if Path(file_path).exists():
  93. valid_files.append(file_path)
  94. else:
  95. missing_files.append(file_path)
  96. if missing_files:
  97. logger.warning(f"⚠️ Warning: {len(missing_files)} files not found:")
  98. for missing_file in missing_files[:5]: # 只显示前5个
  99. logger.warning(f" - {missing_file}")
  100. if len(missing_files) > 5:
  101. logger.warning(f" ... and {len(missing_files) - 5} more")
  102. logger.info(f"✅ Found {len(valid_files)} valid files out of {len(image_files)} in list")
  103. return valid_files
  104. def get_image_files_from_csv(csv_file: str, status_filter: str = "fail") -> List[str]:
  105. """
  106. 从CSV文件获取图像文件列表
  107. Args:
  108. csv_file: CSV文件路径
  109. status_filter: 状态过滤器
  110. Returns:
  111. 图像文件路径列表
  112. """
  113. logger.info(f"📄 Reading image files from CSV: {csv_file}")
  114. # 读取CSV文件, 表头:image_path,status
  115. image_files = []
  116. with open(csv_file, 'r', encoding='utf-8') as f:
  117. for line in f:
  118. # 需要去掉表头, 按","分割,读取文件名,状态
  119. parts = line.strip().split(",")
  120. if len(parts) >= 2:
  121. image_file, status = parts[0], parts[1]
  122. if status.lower() == status_filter.lower():
  123. image_files.append(image_file)
  124. return image_files
  125. def collect_pid_files(pid_output_file: str) -> List[Tuple[str, str]]:
  126. """
  127. 从进程输出文件中收集文件
  128. Args:
  129. pid_output_file: 进程输出文件路径
  130. Returns:
  131. 文件列表(文件路径,处理结果)
  132. """
  133. """
  134. 单进程结果统计文件格式
  135. "results": [
  136. {
  137. "image_path": "docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.jpg",
  138. "processing_time": 2.0265579223632812e-06,
  139. "success": true,
  140. "device": "gpu:3",
  141. "output_json": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_3/docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.json",
  142. "output_md": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_3/docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.md"
  143. },
  144. ...
  145. """
  146. if not Path(pid_output_file).exists():
  147. logger.warning(f"⚠️ Warning: PID output file not found: {pid_output_file}")
  148. return []
  149. with open(pid_output_file, 'r', encoding='utf-8') as f:
  150. data = json.load(f)
  151. if not isinstance(data, dict) or "results" not in data:
  152. logger.warning(f"⚠️ Warning: Invalid PID output file format: {pid_output_file}")
  153. return []
  154. # 返回文件路径和处理状态, 如果"success": True, 则状态为"success", 否则为"fail"
  155. file_list = []
  156. for file_result in data.get("results", []):
  157. image_path = file_result.get("image_path", "")
  158. status = "success" if file_result.get("success", False) else "fail"
  159. file_list.append((image_path, status))
  160. return file_list
  161. def convert_pdf_to_images(
  162. pdf_file: str,
  163. output_dir: str | None = None,
  164. dpi: int = 200,
  165. page_range: str | None = None
  166. ) -> List[str]:
  167. """
  168. 将PDF转换为图像文件,支持页面范围过滤
  169. Args:
  170. pdf_file: PDF文件路径
  171. output_dir: 输出目录
  172. dpi: 图像分辨率
  173. page_range: 页面范围字符串,如 "1-5,7,9-12"
  174. Returns:
  175. 生成的图像文件路径列表
  176. """
  177. pdf_path = Path(pdf_file)
  178. if not pdf_path.exists() or pdf_path.suffix.lower() != '.pdf':
  179. logger.error(f"❌ Invalid PDF file: {pdf_path}")
  180. return []
  181. # 如果没有指定输出目录,使用PDF同名目录
  182. if output_dir is None:
  183. output_path = pdf_path.parent / f"{pdf_path.stem}"
  184. else:
  185. output_path = Path(output_dir) / f"{pdf_path.stem}"
  186. output_path = output_path.resolve()
  187. output_path.mkdir(parents=True, exist_ok=True)
  188. try:
  189. # 使用MinerU的函数加载PDF图像
  190. if not MINERU_AVAILABLE or load_images_from_pdf is None or ImageType is None:
  191. logger.error("❌ MinerU components not available for PDF to image conversion")
  192. return []
  193. images, _ = load_images_from_pdf(
  194. pdf_path.read_bytes(),
  195. dpi=dpi,
  196. image_type=ImageType.PIL # 返回包含 img_pil 的字典列表
  197. )
  198. # 应用页面范围过滤
  199. selected_pages = None
  200. if page_range:
  201. from .pdf_utils import PDFUtils
  202. total_pages = len(images)
  203. selected_pages = PDFUtils.parse_page_range(page_range, total_pages)
  204. if selected_pages:
  205. images = [images[i] for i in sorted(selected_pages)]
  206. logger.info(f"📋 PDF 共 {total_pages} 页,选择处理 {len(images)} 页")
  207. else:
  208. logger.warning(f"⚠️ 页面范围 '{page_range}' 没有匹配到任何有效页面")
  209. return []
  210. image_paths = []
  211. # 需要跟踪原始页码索引,以便正确命名文件
  212. original_indices = sorted(selected_pages) if selected_pages else list(range(len(images)))
  213. for idx, image in enumerate(images):
  214. # 获取原始页码索引(用于文件命名)
  215. original_idx = original_indices[idx] if selected_pages else idx
  216. # 生成图像文件名(使用原始页码,从1开始)
  217. image_filename = f"{pdf_path.stem}_page_{original_idx + 1:03d}.png"
  218. image_path = output_path / image_filename
  219. # 保存图像 - 从字典中提取 img_pil
  220. if isinstance(image, dict):
  221. pil_image = image.get('img_pil')
  222. if pil_image is None:
  223. logger.error(f"❌ Image dict at index {idx} does not contain 'img_pil' key")
  224. continue
  225. pil_image.save(str(image_path))
  226. else:
  227. # 如果不是字典,假设是直接的 PIL Image
  228. image.save(str(image_path))
  229. image_paths.append(str(image_path))
  230. logger.info(f"✅ Converted {len(images)} pages from {pdf_path.name} to images")
  231. return image_paths
  232. except Exception as e:
  233. logger.error(f"❌ Error converting PDF {pdf_path}: {e}")
  234. traceback.print_exc()
  235. return []
  236. def get_input_files(args, page_range: str | None = None) -> List[str]:
  237. """
  238. 获取输入文件列表,统一处理PDF和图像文件,支持页面范围过滤
  239. 支持自动判断输入类型:
  240. - 如果是文件路径,判断是PDF还是图片
  241. - 如果是目录,扫描所有PDF和图片文件
  242. - 如果是CSV文件,读取文件列表
  243. - 如果是文本文件,读取文件列表
  244. Args:
  245. args: 命令行参数对象,需要包含 input, output_dir, pdf_dpi 属性
  246. page_range: 页面范围字符串(可选),如 "1-5,7,9-12"
  247. Returns:
  248. 处理后的图像文件路径列表
  249. """
  250. input_files = []
  251. input_path = Path(args.input)
  252. if not input_path.exists():
  253. logger.error(f"❌ Input path does not exist: {input_path}")
  254. return []
  255. # 判断输入类型
  256. if input_path.is_file():
  257. # 单个文件
  258. if input_path.suffix.lower() == '.pdf':
  259. # PDF文件:转换为图片
  260. logger.info(f"📄 Processing PDF: {input_path.name}")
  261. pdf_images = convert_pdf_to_images(
  262. str(input_path),
  263. getattr(args, 'output_dir', None),
  264. dpi=getattr(args, 'pdf_dpi', 200),
  265. page_range=page_range # 传递页面范围参数
  266. )
  267. input_files.extend(pdf_images)
  268. elif input_path.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']:
  269. # 图片文件:直接添加
  270. input_files.append(str(input_path))
  271. elif input_path.suffix.lower() == '.csv':
  272. # CSV文件:读取文件列表
  273. input_files = get_image_files_from_csv(str(input_path), "fail")
  274. elif input_path.suffix.lower() in ['.txt', '.list']:
  275. # 文本文件:读取文件列表
  276. input_files = get_image_files_from_list(str(input_path))
  277. else:
  278. logger.warning(f"⚠️ Unsupported file type: {input_path.suffix}")
  279. elif input_path.is_dir():
  280. # 目录:扫描所有PDF和图片文件
  281. image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
  282. pdf_extensions = ['.pdf']
  283. raw_files = []
  284. for ext in image_extensions + pdf_extensions:
  285. raw_files.extend(list(input_path.glob(f"*{ext}")))
  286. raw_files.extend(list(input_path.glob(f"*{ext.upper()}")))
  287. # 分离PDF和图像文件
  288. pdf_files = [f for f in sorted(set(raw_files)) if f.suffix.lower() == '.pdf']
  289. image_files = [f for f in sorted(set(raw_files)) if f.suffix.lower() in image_extensions]
  290. # 对于图片目录,应用页面范围过滤
  291. if page_range and image_files:
  292. from .pdf_utils import PDFUtils
  293. total_pages = len(image_files)
  294. selected_pages = PDFUtils.parse_page_range(page_range, total_pages)
  295. if selected_pages:
  296. image_files = [image_files[i] for i in sorted(selected_pages)]
  297. logger.info(f"📋 图片目录共 {total_pages} 张,选择处理 {len(image_files)} 张")
  298. else:
  299. logger.warning(f"⚠️ 页面范围 '{page_range}' 没有匹配到任何有效图片")
  300. image_files = []
  301. # 分别处理PDF和图像文件
  302. pdf_count = 0
  303. image_count = 0
  304. for file_path in pdf_files:
  305. # 转换PDF为图像
  306. logger.info(f"📄 Processing PDF: {file_path.name}")
  307. pdf_images = convert_pdf_to_images(
  308. str(file_path),
  309. getattr(args, 'output_dir', None),
  310. dpi=getattr(args, 'pdf_dpi', 200),
  311. page_range=page_range # 传递页面范围参数
  312. )
  313. input_files.extend(pdf_images)
  314. pdf_count += 1
  315. for file_path in image_files:
  316. # 直接添加图像文件
  317. input_files.append(str(file_path))
  318. image_count += 1
  319. logger.info(f"📊 Input summary:")
  320. logger.info(f" PDF files processed: {pdf_count}")
  321. logger.info(f" Image files found: {image_count}")
  322. logger.info(f"📊 Total image files to process: {len(input_files)}")
  323. return sorted(list(set(str(f) for f in input_files)))