| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- """
- 目录浏览 API 路由
- 提供文件系统浏览和图片+JSON文件扫描功能
- """
- import os
- import re
- from pathlib import Path
- from fastapi import APIRouter, HTTPException, Query
- from pydantic import BaseModel
- from typing import List, Optional, Dict
- from models.schemas import HomeDirectoryResponse
- router = APIRouter(prefix="/api/directories", tags=["directories"])
- class DirectoryItem(BaseModel):
- """目录项"""
- name: str
- path: str
- is_dir: bool
- size: Optional[int] = None
- class BrowseResponse(BaseModel):
- """浏览响应"""
- current_path: str
- parent_path: Optional[str]
- items: List[DirectoryItem]
- class FilePair(BaseModel):
- """图片+JSON文件对"""
- index: int
- image_path: str
- image_name: str
- json_path: str
- json_name: str
- has_structure: bool = False # 是否已有结构文件
- structure_path: Optional[str] = None
- class ScanRequest(BaseModel):
- """扫描请求"""
- image_dir: str
- json_dir: str
- image_pattern: Optional[str] = r".*\.(png|jpg|jpeg)$"
- json_pattern: Optional[str] = r".*\.json$"
- output_dir: Optional[str] = None # 用于检查已有结构文件
- class ScanResponse(BaseModel):
- """扫描响应"""
- total: int
- pairs: List[FilePair]
- labeled_count: int # 已标注数量
- @router.get("/browse", response_model=BrowseResponse)
- async def browse_directory(
- path: str = Query(default="~", description="要浏览的目录路径"),
- show_hidden: bool = Query(default=False, description="是否显示隐藏文件")
- ):
- """
- 浏览目录结构
-
- Args:
- path: 目录路径,支持 ~ 表示用户主目录
- show_hidden: 是否显示隐藏文件(以.开头的文件)
- """
- # 展开用户目录
- if path.startswith("~"):
- path = os.path.expanduser(path)
-
- target_path = Path(path).resolve()
-
- if not target_path.exists():
- raise HTTPException(status_code=404, detail=f"目录不存在: {path}")
-
- if not target_path.is_dir():
- raise HTTPException(status_code=400, detail=f"不是目录: {path}")
-
- items = []
- try:
- for item in sorted(target_path.iterdir()):
- # 跳过隐藏文件
- if not show_hidden and item.name.startswith('.'):
- continue
-
- try:
- is_dir = item.is_dir()
- size = None
- if not is_dir:
- try:
- size = item.stat().st_size
- except:
- pass
-
- items.append(DirectoryItem(
- name=item.name,
- path=str(item),
- is_dir=is_dir,
- size=size
- ))
- except PermissionError:
- continue
- except PermissionError:
- raise HTTPException(status_code=403, detail=f"无权限访问目录: {path}")
-
- # 目录排在前面
- items.sort(key=lambda x: (not x.is_dir, x.name.lower()))
-
- # 计算父目录
- parent = target_path.parent
- parent_path = str(parent) if parent != target_path else None
-
- return BrowseResponse(
- current_path=str(target_path),
- parent_path=parent_path,
- items=items
- )
- @router.post("/scan", response_model=ScanResponse)
- async def scan_directory(request: ScanRequest):
- """
- 扫描目录下的图片+JSON文件对
-
- 扫描 image_dir 中匹配 image_pattern 的图片,
- 然后在 json_dir 中查找对应的 JSON 文件。
- """
- image_dir = Path(request.image_dir)
- json_dir = Path(request.json_dir)
-
- if not image_dir.exists():
- raise HTTPException(status_code=404, detail=f"图片目录不存在: {request.image_dir}")
-
- if not json_dir.exists():
- raise HTTPException(status_code=404, detail=f"JSON目录不存在: {request.json_dir}")
-
- # 编译正则表达式
- try:
- image_re = re.compile(request.image_pattern or r".*\.(png|jpg|jpeg)$", re.IGNORECASE)
- json_re = re.compile(request.json_pattern or r".*\.json$", re.IGNORECASE)
- except re.error as e:
- raise HTTPException(status_code=400, detail=f"无效的正则表达式: {e}")
-
- # 扫描图片文件
- image_files = []
- for f in image_dir.iterdir():
- if f.is_file() and image_re.match(f.name):
- image_files.append(f)
-
- # 按名称排序
- image_files.sort(key=lambda x: x.name)
-
- # 构建文件对
- pairs = []
- labeled_count = 0
- output_dir = Path(request.output_dir) if request.output_dir else None
-
- for idx, image_file in enumerate(image_files):
- # 查找对应的 JSON 文件
- base_name = image_file.stem
- json_file = None
-
- # 尝试多种匹配方式
- for jf in json_dir.iterdir():
- if jf.is_file() and json_re.match(jf.name):
- # 完全匹配
- if jf.stem == base_name:
- json_file = jf
- break
- # 前缀匹配
- if jf.stem.startswith(base_name) or base_name.startswith(jf.stem):
- json_file = jf
- break
-
- if json_file:
- # 检查是否已有结构文件
- has_structure = False
- structure_path = None
- if output_dir and output_dir.exists():
- structure_file = output_dir / f"{base_name}_structure.json"
- if structure_file.exists():
- has_structure = True
- structure_path = str(structure_file)
- labeled_count += 1
-
- pairs.append(FilePair(
- index=idx + 1,
- image_path=str(image_file),
- image_name=image_file.name,
- json_path=str(json_file),
- json_name=json_file.name,
- has_structure=has_structure,
- structure_path=structure_path
- ))
-
- return ScanResponse(
- total=len(pairs),
- pairs=pairs,
- labeled_count=labeled_count
- )
- @router.get("/home", response_model=HomeDirectoryResponse)
- async def get_home_directory():
- """获取用户主目录"""
- home = os.path.expanduser("~")
- return HomeDirectoryResponse(path=home)
|