""" 目录浏览 API 路由 提供文件系统浏览和图片+JSON文件扫描功能 """ import os import re from pathlib import Path from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from typing import List, Optional, Dict from models.schemas import HomeDirectoryResponse router = APIRouter(prefix="/api/directories", tags=["directories"]) class DirectoryItem(BaseModel): """目录项""" name: str path: str is_dir: bool size: Optional[int] = None class BrowseResponse(BaseModel): """浏览响应""" current_path: str parent_path: Optional[str] items: List[DirectoryItem] class FilePair(BaseModel): """图片+JSON文件对""" index: int image_path: str image_name: str json_path: str json_name: str has_structure: bool = False # 是否已有结构文件 structure_path: Optional[str] = None class ScanRequest(BaseModel): """扫描请求""" image_dir: str json_dir: str image_pattern: Optional[str] = r".*\.(png|jpg|jpeg)$" json_pattern: Optional[str] = r".*\.json$" output_dir: Optional[str] = None # 用于检查已有结构文件 class ScanResponse(BaseModel): """扫描响应""" total: int pairs: List[FilePair] labeled_count: int # 已标注数量 @router.get("/browse", response_model=BrowseResponse) async def browse_directory( path: str = Query(default="~", description="要浏览的目录路径"), show_hidden: bool = Query(default=False, description="是否显示隐藏文件") ): """ 浏览目录结构 Args: path: 目录路径,支持 ~ 表示用户主目录 show_hidden: 是否显示隐藏文件(以.开头的文件) """ # 展开用户目录 if path.startswith("~"): path = os.path.expanduser(path) target_path = Path(path).resolve() if not target_path.exists(): raise HTTPException(status_code=404, detail=f"目录不存在: {path}") if not target_path.is_dir(): raise HTTPException(status_code=400, detail=f"不是目录: {path}") items = [] try: for item in sorted(target_path.iterdir()): # 跳过隐藏文件 if not show_hidden and item.name.startswith('.'): continue try: is_dir = item.is_dir() size = None if not is_dir: try: size = item.stat().st_size except: pass items.append(DirectoryItem( name=item.name, path=str(item), is_dir=is_dir, size=size )) except PermissionError: continue except PermissionError: raise HTTPException(status_code=403, detail=f"无权限访问目录: {path}") # 目录排在前面 items.sort(key=lambda x: (not x.is_dir, x.name.lower())) # 计算父目录 parent = target_path.parent parent_path = str(parent) if parent != target_path else None return BrowseResponse( current_path=str(target_path), parent_path=parent_path, items=items ) @router.post("/scan", response_model=ScanResponse) async def scan_directory(request: ScanRequest): """ 扫描目录下的图片+JSON文件对 扫描 image_dir 中匹配 image_pattern 的图片, 然后在 json_dir 中查找对应的 JSON 文件。 """ image_dir = Path(request.image_dir) json_dir = Path(request.json_dir) if not image_dir.exists(): raise HTTPException(status_code=404, detail=f"图片目录不存在: {request.image_dir}") if not json_dir.exists(): raise HTTPException(status_code=404, detail=f"JSON目录不存在: {request.json_dir}") # 编译正则表达式 try: image_re = re.compile(request.image_pattern or r".*\.(png|jpg|jpeg)$", re.IGNORECASE) json_re = re.compile(request.json_pattern or r".*\.json$", re.IGNORECASE) except re.error as e: raise HTTPException(status_code=400, detail=f"无效的正则表达式: {e}") # 扫描图片文件 image_files = [] for f in image_dir.iterdir(): if f.is_file() and image_re.match(f.name): image_files.append(f) # 按名称排序 image_files.sort(key=lambda x: x.name) # 构建文件对 pairs = [] labeled_count = 0 output_dir = Path(request.output_dir) if request.output_dir else None for idx, image_file in enumerate(image_files): # 查找对应的 JSON 文件 base_name = image_file.stem json_file = None # 尝试多种匹配方式 for jf in json_dir.iterdir(): if jf.is_file() and json_re.match(jf.name): # 完全匹配 if jf.stem == base_name: json_file = jf break # 前缀匹配 if jf.stem.startswith(base_name) or base_name.startswith(jf.stem): json_file = jf break if json_file: # 检查是否已有结构文件 has_structure = False structure_path = None if output_dir and output_dir.exists(): structure_file = output_dir / f"{base_name}_structure.json" if structure_file.exists(): has_structure = True structure_path = str(structure_file) labeled_count += 1 pairs.append(FilePair( index=idx + 1, image_path=str(image_file), image_name=image_file.name, json_path=str(json_file), json_name=json_file.name, has_structure=has_structure, structure_path=structure_path )) return ScanResponse( total=len(pairs), pairs=pairs, labeled_count=labeled_count ) @router.get("/home", response_model=HomeDirectoryResponse) async def get_home_directory(): """获取用户主目录""" home = os.path.expanduser("~") return HomeDirectoryResponse(path=home)