| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496 |
- """
- 文件处理工具模块
- 提供文件处理相关功能:
- - 输入文件获取(支持文件/目录/列表/CSV)
- - PDF转图片
- - 文件列表处理
- """
- import tempfile
- import re
- from pathlib import Path
- from typing import List, Tuple, Optional, Set
- import json
- import traceback
- from loguru import logger
- try:
- from mineru.utils.pdf_image_tools import load_images_from_pdf
- from mineru.utils.enum_class import ImageType
- MINERU_AVAILABLE = True
- except ImportError:
- MINERU_AVAILABLE = False
- load_images_from_pdf = None
- ImageType = None
- def parse_page_range(page_range: Optional[str], total_pages: int) -> Set[int]:
- """
- 解析页面范围字符串
-
- 支持格式:
- - "1-5" → {0, 1, 2, 3, 4}(页码从1开始,内部转为0-based索引)
- - "3" → {2}
- - "1-5,7,9-12" → {0, 1, 2, 3, 4, 6, 8, 9, 10, 11}
- - "1-" → 从第1页到最后
- - "-5" → 从第1页到第5页
-
- Args:
- page_range: 页面范围字符串(页码从1开始)
- total_pages: 总页数
-
- Returns:
- 页面索引集合(0-based)
- """
- if not page_range or not page_range.strip():
- return set(range(total_pages))
-
- pages = set()
- parts = page_range.replace(' ', '').split(',')
-
- for part in parts:
- part = part.strip()
- if not part:
- continue
-
- if '-' in part:
- # 范围格式
- match = re.match(r'^(\d*)-(\d*)$', part)
- if match:
- start_str, end_str = match.groups()
- start = int(start_str) if start_str else 1
- end = int(end_str) if end_str else total_pages
-
- # 转换为 0-based 索引
- start = max(0, start - 1)
- end = min(total_pages, end)
-
- pages.update(range(start, end))
- else:
- # 单页
- try:
- page_num = int(part)
- if 1 <= page_num <= total_pages:
- pages.add(page_num - 1) # 转换为 0-based 索引
- except ValueError:
- logger.warning(f"Invalid page number: {part}")
-
- return pages
- def split_files(file_list: List[str], num_splits: int) -> List[List[str]]:
- """
- 将文件列表分割成指定数量的子列表
-
- Args:
- file_list: 文件路径列表
- num_splits: 分割数量
-
- Returns:
- 分割后的文件列表
- """
- if num_splits <= 0:
- return [file_list]
-
- chunk_size = len(file_list) // num_splits
- remainder = len(file_list) % num_splits
-
- chunks = []
- start = 0
-
- for i in range(num_splits):
- # 前remainder个chunk多分配一个文件
- current_chunk_size = chunk_size + (1 if i < remainder else 0)
- if current_chunk_size > 0:
- chunks.append(file_list[start:start + current_chunk_size])
- start += current_chunk_size
-
- return [chunk for chunk in chunks if chunk] # 过滤空列表
- def create_temp_file_list(file_chunk: List[str]) -> str:
- """
- 创建临时文件列表文件
-
- Args:
- file_chunk: 文件路径列表
-
- Returns:
- 临时文件路径
- """
- with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
- for file_path in file_chunk:
- f.write(f"{file_path}\n")
- return f.name
- def get_image_files_from_dir(input_dir: Path, pattern: str = "*", max_files: int | None = None) -> List[str]:
- """
- 从目录获取图像文件列表
-
- Args:
- input_dir: 输入目录
- pattern: 文件名模式
- max_files: 最大文件数量限制
-
- Returns:
- 图像文件路径列表
- """
- image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
- image_files = []
-
- for ext in image_extensions:
- image_files.extend(list(input_dir.glob(f"{pattern}{ext}")))
- image_files.extend(list(input_dir.glob(f"{pattern}{ext.upper()}")))
-
- # 去重并排序
- image_files = sorted(list(set(str(f) for f in image_files)))
-
- # 限制文件数量
- if max_files:
- image_files = image_files[:max_files]
-
- return image_files
- def get_image_files_from_list(file_list_path: str) -> List[str]:
- """
- 从文件列表获取图像文件列表
-
- Args:
- file_list_path: 文件列表路径
-
- Returns:
- 图像文件路径列表
- """
- logger.info(f"📄 Reading file list from: {file_list_path}")
-
- with open(file_list_path, 'r', encoding='utf-8') as f:
- image_files = [line.strip() for line in f if line.strip()]
-
- # 验证文件存在性
- valid_files = []
- missing_files = []
-
- for file_path in image_files:
- if Path(file_path).exists():
- valid_files.append(file_path)
- else:
- missing_files.append(file_path)
-
- if missing_files:
- logger.warning(f"⚠️ Warning: {len(missing_files)} files not found:")
- for missing_file in missing_files[:5]: # 只显示前5个
- logger.warning(f" - {missing_file}")
- if len(missing_files) > 5:
- logger.warning(f" ... and {len(missing_files) - 5} more")
-
- logger.info(f"✅ Found {len(valid_files)} valid files out of {len(image_files)} in list")
- return valid_files
- def get_image_files_from_csv(csv_file: str, status_filter: str = "fail") -> List[str]:
- """
- 从CSV文件获取图像文件列表
- Args:
- csv_file: CSV文件路径
- status_filter: 状态过滤器
- Returns:
- 图像文件路径列表
- """
- logger.info(f"📄 Reading image files from CSV: {csv_file}")
- # 读取CSV文件, 表头:image_path,status
- image_files = []
- with open(csv_file, 'r', encoding='utf-8') as f:
- for line in f:
- # 需要去掉表头, 按","分割,读取文件名,状态
- parts = line.strip().split(",")
- if len(parts) >= 2:
- image_file, status = parts[0], parts[1]
- if status.lower() == status_filter.lower():
- image_files.append(image_file)
- return image_files
- def collect_pid_files(pid_output_file: str) -> List[Tuple[str, str]]:
- """
- 从进程输出文件中收集文件
- Args:
- pid_output_file: 进程输出文件路径
- Returns:
- 文件列表(文件路径,处理结果)
- """
- """
- 单进程结果统计文件格式
- "results": [
- {
- "image_path": "docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.jpg",
- "processing_time": 2.0265579223632812e-06,
- "success": true,
- "device": "gpu:3",
- "output_json": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_3/docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.json",
- "output_md": "/home/ubuntu/zhch/PaddleX/zhch/OmniDocBench_Results_Scheduler/process_3/docstructbench_dianzishu_zhongwenzaixian-o.O-61520612.pdf_140.md"
- },
- ...
- """
- if not Path(pid_output_file).exists():
- logger.warning(f"⚠️ Warning: PID output file not found: {pid_output_file}")
- return []
- with open(pid_output_file, 'r', encoding='utf-8') as f:
- data = json.load(f)
- if not isinstance(data, dict) or "results" not in data:
- logger.warning(f"⚠️ Warning: Invalid PID output file format: {pid_output_file}")
- return []
- # 返回文件路径和处理状态, 如果"success": True, 则状态为"success", 否则为"fail"
- file_list = []
- for file_result in data.get("results", []):
- image_path = file_result.get("image_path", "")
- status = "success" if file_result.get("success", False) else "fail"
- file_list.append((image_path, status))
- return file_list
- def convert_pdf_to_images(
- pdf_file: str,
- output_dir: str | None = None,
- dpi: int = 200,
- page_range: str | None = None
- ) -> List[str]:
- """
- 将PDF转换为图像文件,支持页面范围过滤
-
- Args:
- pdf_file: PDF文件路径
- output_dir: 输出目录
- dpi: 图像分辨率
- page_range: 页面范围字符串,如 "1-5,7,9-12"
-
- Returns:
- 生成的图像文件路径列表
- """
- pdf_path = Path(pdf_file)
- if not pdf_path.exists() or pdf_path.suffix.lower() != '.pdf':
- logger.error(f"❌ Invalid PDF file: {pdf_path}")
- return []
- # 如果没有指定输出目录,使用PDF同名目录
- if output_dir is None:
- output_path = pdf_path.parent / f"{pdf_path.stem}"
- else:
- output_path = Path(output_dir) / f"{pdf_path.stem}"
- output_path = output_path.resolve()
- output_path.mkdir(parents=True, exist_ok=True)
- try:
- # 优先使用 MinerU 的函数(如果可用)
- if MINERU_AVAILABLE and load_images_from_pdf is not None and ImageType is not None:
- images, _ = load_images_from_pdf(
- pdf_path.read_bytes(),
- dpi=dpi,
- image_type=ImageType.PIL # 返回包含 img_pil 的字典列表
- )
-
- # 应用页面范围过滤
- selected_pages = None
- if page_range:
- total_pages = len(images)
- selected_pages = parse_page_range(page_range, total_pages)
- if selected_pages:
- images = [images[i] for i in sorted(selected_pages)]
- logger.info(f"📋 PDF 共 {total_pages} 页,选择处理 {len(images)} 页")
- else:
- logger.warning(f"⚠️ 页面范围 '{page_range}' 没有匹配到任何有效页面")
- return []
- else:
- selected_pages = None
-
- image_paths = []
- # 需要跟踪原始页码索引,以便正确命名文件
- original_indices = sorted(selected_pages) if selected_pages else list(range(len(images)))
-
- for idx, image in enumerate(images):
- # 获取原始页码索引(用于文件命名)
- original_idx = original_indices[idx] if selected_pages else idx
- # 生成图像文件名(使用原始页码,从1开始)
- image_filename = f"{pdf_path.stem}_page_{original_idx + 1:03d}.png"
- image_path = output_path / image_filename
- # 保存图像 - 从字典中提取 img_pil
- if isinstance(image, dict):
- pil_image = image.get('img_pil')
- if pil_image is None:
- logger.error(f"❌ Image dict at index {idx} does not contain 'img_pil' key")
- continue
- pil_image.save(str(image_path))
- else:
- # 如果不是字典,假设是直接的 PIL Image
- image.save(str(image_path))
- image_paths.append(str(image_path))
-
- logger.info(f"✅ Converted {len(images)} pages from {pdf_path.name} to images (using MinerU)")
- return image_paths
-
- else:
- # Fallback: 使用 pypdfium2(PaddleX 环境中可用)
- logger.info("ℹ️ MinerU 不可用,使用 pypdfium2 进行 PDF 转图像")
- try:
- import pypdfium2 as pdfium
- except ImportError:
- logger.error("❌ pypdfium2 未安装,无法转换 PDF。请安装: pip install pypdfium2")
- return []
-
- pdf_doc = pdfium.PdfDocument(pdf_path)
- try:
- total_pages = len(pdf_doc)
-
- # 解析页面范围(使用本地函数,不依赖 PDFUtils)
- selected_pages = parse_page_range(page_range, total_pages)
- if not selected_pages:
- logger.warning(f"⚠️ 页面范围 '{page_range}' 没有匹配到任何有效页面")
- return []
-
- if page_range:
- logger.info(f"📋 PDF 共 {total_pages} 页,选择处理 {len(selected_pages)} 页")
-
- # 计算缩放比例(DPI 转换)
- # pypdfium2 的 scale 参数:1.0 = 72 DPI,所以 dpi/72 = scale
- scale = dpi / 72.0
-
- image_paths = []
- for page_idx in sorted(selected_pages):
- page = pdf_doc[page_idx]
-
- # 渲染页面为图像
- bitmap = page.render(scale=scale)
- pil_image = bitmap.to_pil()
-
- # 生成图像文件名(页码从1开始)
- image_filename = f"{pdf_path.stem}_page_{page_idx + 1:03d}.png"
- image_path = output_path / image_filename
-
- # 保存图像
- pil_image.save(str(image_path))
- image_paths.append(str(image_path))
-
- logger.info(f"✅ Converted {len(image_paths)} pages from {pdf_path.name} to images (using pypdfium2)")
- return image_paths
-
- finally:
- pdf_doc.close()
-
- except Exception as e:
- logger.error(f"❌ Error converting PDF {pdf_path}: {e}")
- traceback.print_exc()
- return []
- def get_input_files(args, page_range: str | None = None) -> List[str]:
- """
- 获取输入文件列表,统一处理PDF和图像文件,支持页面范围过滤
-
- 支持自动判断输入类型:
- - 如果是文件路径,判断是PDF还是图片
- - 如果是目录,扫描所有PDF和图片文件
- - 如果是CSV文件,读取文件列表
- - 如果是文本文件,读取文件列表
-
- Args:
- args: 命令行参数对象,需要包含 input, output_dir, pdf_dpi 属性
- page_range: 页面范围字符串(可选),如 "1-5,7,9-12"
-
- Returns:
- 处理后的图像文件路径列表
- """
- input_files = []
- input_path = Path(args.input)
-
- if not input_path.exists():
- logger.error(f"❌ Input path does not exist: {input_path}")
- return []
-
- # 判断输入类型
- if input_path.is_file():
- # 单个文件
- if input_path.suffix.lower() == '.pdf':
- # PDF文件:转换为图片
- logger.info(f"📄 Processing PDF: {input_path.name}")
- pdf_images = convert_pdf_to_images(
- str(input_path),
- getattr(args, 'output_dir', None),
- dpi=getattr(args, 'pdf_dpi', 200),
- page_range=page_range # 传递页面范围参数
- )
- input_files.extend(pdf_images)
- elif input_path.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']:
- # 图片文件:直接添加
- input_files.append(str(input_path))
- elif input_path.suffix.lower() == '.csv':
- # CSV文件:读取文件列表
- input_files = get_image_files_from_csv(str(input_path), "fail")
- elif input_path.suffix.lower() in ['.txt', '.list']:
- # 文本文件:读取文件列表
- input_files = get_image_files_from_list(str(input_path))
- else:
- logger.warning(f"⚠️ Unsupported file type: {input_path.suffix}")
-
- elif input_path.is_dir():
- # 目录:扫描所有PDF和图片文件
- image_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.tif']
- pdf_extensions = ['.pdf']
-
- raw_files = []
- for ext in image_extensions + pdf_extensions:
- raw_files.extend(list(input_path.glob(f"*{ext}")))
- raw_files.extend(list(input_path.glob(f"*{ext.upper()}")))
-
- # 分离PDF和图像文件
- pdf_files = [f for f in sorted(set(raw_files)) if f.suffix.lower() == '.pdf']
- image_files = [f for f in sorted(set(raw_files)) if f.suffix.lower() in image_extensions]
-
- # 对于图片目录,应用页面范围过滤
- if page_range and image_files:
- total_pages = len(image_files)
- selected_pages = parse_page_range(page_range, total_pages)
- if selected_pages:
- image_files = [image_files[i] for i in sorted(selected_pages)]
- logger.info(f"📋 图片目录共 {total_pages} 张,选择处理 {len(image_files)} 张")
- else:
- logger.warning(f"⚠️ 页面范围 '{page_range}' 没有匹配到任何有效图片")
- image_files = []
-
- # 分别处理PDF和图像文件
- pdf_count = 0
- image_count = 0
-
- for file_path in pdf_files:
- # 转换PDF为图像
- logger.info(f"📄 Processing PDF: {file_path.name}")
- pdf_images = convert_pdf_to_images(
- str(file_path),
- getattr(args, 'output_dir', None),
- dpi=getattr(args, 'pdf_dpi', 200),
- page_range=page_range # 传递页面范围参数
- )
- input_files.extend(pdf_images)
- pdf_count += 1
-
- for file_path in image_files:
- # 直接添加图像文件
- input_files.append(str(file_path))
- image_count += 1
-
- logger.info(f"📊 Input summary:")
- logger.info(f" PDF files processed: {pdf_count}")
- logger.info(f" Image files found: {image_count}")
-
- logger.info(f"📊 Total image files to process: {len(input_files)}")
-
- return sorted(list(set(str(f) for f in input_files)))
|