""" 文件处理工具模块 提供文件处理相关功能: - 输入文件获取(支持文件/目录/列表/CSV) - PDF转图片 - 文件列表处理 """ import tempfile import re from pathlib import Path from typing import List, Tuple, Optional, Set import json import traceback from loguru import logger try: from mineru.utils.pdf_image_tools import load_images_from_pdf from mineru.utils.enum_class import ImageType MINERU_AVAILABLE = True except ImportError: MINERU_AVAILABLE = False load_images_from_pdf = None ImageType = None def parse_page_range(page_range: Optional[str], total_pages: int) -> Set[int]: """ 解析页面范围字符串 支持格式: - "1-5" → {0, 1, 2, 3, 4}(页码从1开始,内部转为0-based索引) - "3" → {2} - "1-5,7,9-12" → {0, 1, 2, 3, 4, 6, 8, 9, 10, 11} - "1-" → 从第1页到最后 - "-5" → 从第1页到第5页 Args: page_range: 页面范围字符串(页码从1开始) total_pages: 总页数 Returns: 页面索引集合(0-based) """ if not page_range or not page_range.strip(): return set(range(total_pages)) pages = set() parts = page_range.replace(' ', '').split(',') for part in parts: part = part.strip() if not part: continue if '-' in part: # 范围格式 match = re.match(r'^(\d*)-(\d*)$', part) if match: start_str, end_str = match.groups() start = int(start_str) if start_str else 1 end = int(end_str) if end_str else total_pages # 转换为 0-based 索引 start = max(0, start - 1) end = min(total_pages, end) pages.update(range(start, end)) else: # 单页 try: page_num = int(part) if 1 <= page_num <= total_pages: pages.add(page_num - 1) # 转换为 0-based 索引 except ValueError: logger.warning(f"Invalid page number: {part}") return pages def split_files(file_list: List[str], num_splits: int) -> List[List[str]]: """ 将文件列表分割成指定数量的子列表 Args: file_list: 文件路径列表 num_splits: 分割数量 Returns: 分割后的文件列表 """ if num_splits <= 0: return [file_list] chunk_size = len(file_list) // num_splits remainder = len(file_list) % num_splits chunks = [] start = 0 for i in range(num_splits): # 前remainder个chunk多分配一个文件 current_chunk_size = chunk_size + (1 if i < remainder else 0) if current_chunk_size > 0: chunks.append(file_list[start:start + current_chunk_size]) start += current_chunk_size return [chunk for chunk in chunks if chunk] # 过滤空列表 def create_temp_file_list(file_chunk: List[str]) -> str: """ 创建临时文件列表文件 Args: file_chunk: 文件路径列表 Returns: 临时文件路径 """ with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: for file_path in file_chunk: f.write(f"{file_path}\n") return f.name def get_image_files_from_dir(input_dir: Path, pattern: str = "*", max_files: int | None = None) -> List[str]: """ 从目录获取图像文件列表 Args: input_dir: 输入目录 pattern: 文件名模式 max_files: 最大文件数量限制 Returns: 图像文件路径列表 """ image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'] image_files = [] for ext in image_extensions: image_files.extend(list(input_dir.glob(f"{pattern}{ext}"))) image_files.extend(list(input_dir.glob(f"{pattern}{ext.upper()}"))) # 去重并排序 image_files = sorted(list(set(str(f) for f in image_files))) # 限制文件数量 if max_files: image_files = image_files[:max_files] return image_files def get_image_files_from_list(file_list_path: str) -> List[str]: """ 从文件列表获取图像文件列表 Args: file_list_path: 文件列表路径 Returns: 图像文件路径列表 """ logger.info(f"📄 Reading file list from: {file_list_path}") with open(file_list_path, 'r', encoding='utf-8') as f: image_files = [line.strip() for line in f if line.strip()] # 验证文件存在性 valid_files = [] missing_files = [] for file_path in image_files: if Path(file_path).exists(): valid_files.append(file_path) else: missing_files.append(file_path) if missing_files: logger.warning(f"⚠️ Warning: {len(missing_files)} files not found:") for missing_file in missing_files[:5]: # 只显示前5个 logger.warning(f" - {missing_file}") if len(missing_files) > 5: logger.warning(f" ... and {len(missing_files) - 5} more") logger.info(f"✅ Found {len(valid_files)} valid files out of {len(image_files)} in list") return valid_files def get_image_files_from_csv(csv_file: str, status_filter: str = "fail") -> List[str]: """ 从CSV文件获取图像文件列表 Args: csv_file: CSV文件路径 status_filter: 状态过滤器 Returns: 图像文件路径列表 """ logger.info(f"📄 Reading image files from CSV: {csv_file}") # 读取CSV文件, 表头:image_path,status image_files = [] with open(csv_file, 'r', encoding='utf-8') as f: for line in f: # 需要去掉表头, 按","分割,读取文件名,状态 parts = line.strip().split(",") if len(parts) >= 2: image_file, status = parts[0], parts[1] if status.lower() == status_filter.lower(): image_files.append(image_file) return image_files def collect_pid_files(pid_output_file: str) -> List[Tuple[str, str]]: """ 从进程输出文件中收集文件 Args: pid_output_file: 进程输出文件路径 Returns: 文件列表(文件路径,处理结果) """ """ 单进程结果统计文件格式 "results": [ { "image_path": "docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.jpg", "processing_time": 2.0265579223632812e-06, "success": true, "device": "gpu:3", "output_json": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_3/docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.json", "output_md": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_3/docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.md" }, ... """ if not Path(pid_output_file).exists(): logger.warning(f"⚠️ Warning: PID output file not found: {pid_output_file}") return [] with open(pid_output_file, 'r', encoding='utf-8') as f: data = json.load(f) if not isinstance(data, dict) or "results" not in data: logger.warning(f"⚠️ Warning: Invalid PID output file format: {pid_output_file}") return [] # 返回文件路径和处理状态, 如果"success": True, 则状态为"success", 否则为"fail" file_list = [] for file_result in data.get("results", []): image_path = file_result.get("image_path", "") status = "success" if file_result.get("success", False) else "fail" file_list.append((image_path, status)) return file_list def convert_pdf_to_images( pdf_file: str, output_dir: str | None = None, dpi: int = 200, page_range: str | None = None ) -> List[str]: """ 将PDF转换为图像文件,支持页面范围过滤 Args: pdf_file: PDF文件路径 output_dir: 输出目录 dpi: 图像分辨率 page_range: 页面范围字符串,如 "1-5,7,9-12" Returns: 生成的图像文件路径列表 """ pdf_path = Path(pdf_file) if not pdf_path.exists() or pdf_path.suffix.lower() != '.pdf': logger.error(f"❌ Invalid PDF file: {pdf_path}") return [] # 如果没有指定输出目录,使用PDF同名目录 if output_dir is None: output_path = pdf_path.parent / f"{pdf_path.stem}" else: output_path = Path(output_dir) / f"{pdf_path.stem}" output_path = output_path.resolve() output_path.mkdir(parents=True, exist_ok=True) try: # 优先使用 MinerU 的函数(如果可用) if MINERU_AVAILABLE and load_images_from_pdf is not None and ImageType is not None: images, _ = load_images_from_pdf( pdf_path.read_bytes(), dpi=dpi, image_type=ImageType.PIL # 返回包含 img_pil 的字典列表 ) # 应用页面范围过滤 selected_pages = None if page_range: total_pages = len(images) selected_pages = parse_page_range(page_range, total_pages) if selected_pages: images = [images[i] for i in sorted(selected_pages)] logger.info(f"📋 PDF 共 {total_pages} 页,选择处理 {len(images)} 页") else: logger.warning(f"⚠️ 页面范围 '{page_range}' 没有匹配到任何有效页面") return [] else: selected_pages = None image_paths = [] # 需要跟踪原始页码索引,以便正确命名文件 original_indices = sorted(selected_pages) if selected_pages else list(range(len(images))) for idx, image in enumerate(images): # 获取原始页码索引(用于文件命名) original_idx = original_indices[idx] if selected_pages else idx # 生成图像文件名(使用原始页码,从1开始) image_filename = f"{pdf_path.stem}_page_{original_idx + 1:03d}.png" image_path = output_path / image_filename # 保存图像 - 从字典中提取 img_pil if isinstance(image, dict): pil_image = image.get('img_pil') if pil_image is None: logger.error(f"❌ Image dict at index {idx} does not contain 'img_pil' key") continue pil_image.save(str(image_path)) else: # 如果不是字典,假设是直接的 PIL Image image.save(str(image_path)) image_paths.append(str(image_path)) logger.info(f"✅ Converted {len(images)} pages from {pdf_path.name} to images (using MinerU)") return image_paths else: # Fallback: 使用 pypdfium2(PaddleX 环境中可用) logger.info("ℹ️ MinerU 不可用,使用 pypdfium2 进行 PDF 转图像") try: import pypdfium2 as pdfium except ImportError: logger.error("❌ pypdfium2 未安装,无法转换 PDF。请安装: pip install pypdfium2") return [] pdf_doc = pdfium.PdfDocument(pdf_path) try: total_pages = len(pdf_doc) # 解析页面范围(使用本地函数,不依赖 PDFUtils) selected_pages = parse_page_range(page_range, total_pages) if not selected_pages: logger.warning(f"⚠️ 页面范围 '{page_range}' 没有匹配到任何有效页面") return [] if page_range: logger.info(f"📋 PDF 共 {total_pages} 页,选择处理 {len(selected_pages)} 页") # 计算缩放比例(DPI 转换) # pypdfium2 的 scale 参数:1.0 = 72 DPI,所以 dpi/72 = scale scale = dpi / 72.0 image_paths = [] for page_idx in sorted(selected_pages): page = pdf_doc[page_idx] # 渲染页面为图像 bitmap = page.render(scale=scale) pil_image = bitmap.to_pil() # 生成图像文件名(页码从1开始) image_filename = f"{pdf_path.stem}_page_{page_idx + 1:03d}.png" image_path = output_path / image_filename # 保存图像 pil_image.save(str(image_path)) image_paths.append(str(image_path)) logger.info(f"✅ Converted {len(image_paths)} pages from {pdf_path.name} to images (using pypdfium2)") return image_paths finally: pdf_doc.close() except Exception as e: logger.error(f"❌ Error converting PDF {pdf_path}: {e}") traceback.print_exc() return [] def get_input_files(args, page_range: str | None = None) -> List[str]: """ 获取输入文件列表,统一处理PDF和图像文件,支持页面范围过滤 支持自动判断输入类型: - 如果是文件路径,判断是PDF还是图片 - 如果是目录,扫描所有PDF和图片文件 - 如果是CSV文件,读取文件列表 - 如果是文本文件,读取文件列表 Args: args: 命令行参数对象,需要包含 input, output_dir, pdf_dpi 属性 page_range: 页面范围字符串(可选),如 "1-5,7,9-12" Returns: 处理后的图像文件路径列表 """ input_files = [] input_path = Path(args.input) if not input_path.exists(): logger.error(f"❌ Input path does not exist: {input_path}") return [] # 判断输入类型 if input_path.is_file(): # 单个文件 if input_path.suffix.lower() == '.pdf': # PDF文件:转换为图片 logger.info(f"📄 Processing PDF: {input_path.name}") pdf_images = convert_pdf_to_images( str(input_path), getattr(args, 'output_dir', None), dpi=getattr(args, 'pdf_dpi', 200), page_range=page_range # 传递页面范围参数 ) input_files.extend(pdf_images) elif input_path.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']: # 图片文件:直接添加 input_files.append(str(input_path)) elif input_path.suffix.lower() == '.csv': # CSV文件:读取文件列表 input_files = get_image_files_from_csv(str(input_path), "fail") elif input_path.suffix.lower() in ['.txt', '.list']: # 文本文件:读取文件列表 input_files = get_image_files_from_list(str(input_path)) else: logger.warning(f"⚠️ Unsupported file type: {input_path.suffix}") elif input_path.is_dir(): # 目录:扫描所有PDF和图片文件 image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif'] pdf_extensions = ['.pdf'] raw_files = [] for ext in image_extensions + pdf_extensions: raw_files.extend(list(input_path.glob(f"*{ext}"))) raw_files.extend(list(input_path.glob(f"*{ext.upper()}"))) # 分离PDF和图像文件 pdf_files = [f for f in sorted(set(raw_files)) if f.suffix.lower() == '.pdf'] image_files = [f for f in sorted(set(raw_files)) if f.suffix.lower() in image_extensions] # 对于图片目录,应用页面范围过滤 if page_range and image_files: total_pages = len(image_files) selected_pages = parse_page_range(page_range, total_pages) if selected_pages: image_files = [image_files[i] for i in sorted(selected_pages)] logger.info(f"📋 图片目录共 {total_pages} 张,选择处理 {len(image_files)} 张") else: logger.warning(f"⚠️ 页面范围 '{page_range}' 没有匹配到任何有效图片") image_files = [] # 分别处理PDF和图像文件 pdf_count = 0 image_count = 0 for file_path in pdf_files: # 转换PDF为图像 logger.info(f"📄 Processing PDF: {file_path.name}") pdf_images = convert_pdf_to_images( str(file_path), getattr(args, 'output_dir', None), dpi=getattr(args, 'pdf_dpi', 200), page_range=page_range # 传递页面范围参数 ) input_files.extend(pdf_images) pdf_count += 1 for file_path in image_files: # 直接添加图像文件 input_files.append(str(file_path)) image_count += 1 logger.info(f"📊 Input summary:") logger.info(f" PDF files processed: {pdf_count}") logger.info(f" Image files found: {image_count}") logger.info(f"📊 Total image files to process: {len(input_files)}") return sorted(list(set(str(f) for f in input_files)))