import subprocess import os from typing import Optional try: import cv2 import numpy as np except Exception: cv2 = None np = None class FFmpegWrapper: """FFmpeg命令行包装工具""" def __init__(self): self.ffmpeg_path = "ffmpeg" # 假设ffmpeg已在系统PATH中 def extract_audio(self, video_path: str, output_audio_path: str) -> bool: """ 从视频中提取音频轨道 Args: video_path: 视频文件路径 output_audio_path: 输出音频文件路径 Returns: bool: 操作是否成功 """ try: cmd = [ self.ffmpeg_path, "-i", video_path, "-vn", # 禁用视频 "-acodec", "pcm_s16le", # 16位PCM "-ar", "16000", # 16kHz采样率 "-ac", "1", # 单声道 "-y", # 覆盖输出文件 output_audio_path ] subprocess.run(cmd, check=True, capture_output=True, text=True) return True except subprocess.CalledProcessError as e: raise Exception(f"音频提取失败: {e.stderr}") def convert_audio(self, input_audio_path: str, output_audio_path: str) -> bool: """ 转换音频格式为16k/16bit/mono wav Args: input_audio_path: 输入音频文件路径 output_audio_path: 输出音频文件路径 Returns: bool: 操作是否成功 """ try: cmd = [ self.ffmpeg_path, "-i", input_audio_path, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", output_audio_path ] subprocess.run(cmd, check=True, capture_output=True, text=True) return True except subprocess.CalledProcessError as e: raise Exception(f"音频转换失败: {e.stderr}") def extract_keyframes(self, video_path: str, output_dir: str, interval: int = 60, diff_threshold: Optional[float] = None) -> list: """ 从视频中提取关键帧 Args: video_path: 视频文件路径 output_dir: 输出目录 interval: 提取间隔(秒) Returns: list: 提取的关键帧文件路径列表 """ try: # 确保输出目录存在 os.makedirs(output_dir, exist_ok=True) # 提取关键帧(按固定频率导出帧) output_pattern = os.path.join(output_dir, "frame_%06d.jpg") cmd = [ self.ffmpeg_path, "-i", video_path, "-vf", f"fps=1/{interval}", # 每 interval 秒一张 "-y", output_pattern ] subprocess.run(cmd, check=True, capture_output=True, text=True) # 收集提取的帧 frames = [] for file in os.listdir(output_dir): if file.startswith("frame_") and file.endswith(".jpg"): frames.append(os.path.join(output_dir, file)) frames = sorted(frames) # 如果未提供差异阈值,直接返回所有按固定频率提取的帧 if diff_threshold is None: return frames # 检查依赖 if cv2 is None or np is None: raise Exception("OpenCV (opencv-python) 和 numpy 需要安装以启用帧差法(pip install opencv-python numpy)") # 使用 OpenCV 的灰度图像计算帧差,比较当前帧与上一个边界帧(pre),当差异>=阈值时标记为关键帧 # 首先读取所有帧(彩色),以便能使用 cvtColor 按要求比较 imgs = [] for frame_path in frames: try: img = cv2.imread(frame_path) # BGR imgs.append(img) except Exception: imgs.append(None) filtered = [] # 为了加速计算,统一缩放尺寸 (width, height) resize_to = (320, 240) # 找到第一个有效帧作为初始关键帧 pre = None for idx, img in enumerate(imgs): if img is not None: filtered.append(frames[idx]) pre = idx break if pre is None: return [] # 从下一个帧开始,比较当前帧与 imgs[pre] for i in range(pre + 1, len(imgs)): curr = imgs[i] if curr is None: continue prev = imgs[pre] try: prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY) curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY) if resize_to is not None: prev_gray = cv2.resize(prev_gray, resize_to, interpolation=cv2.INTER_AREA) curr_gray = cv2.resize(curr_gray, resize_to, interpolation=cv2.INTER_AREA) diff_val = np.mean(np.abs(curr_gray.astype(int) - prev_gray.astype(int))) except Exception: continue if diff_val >= float(diff_threshold): filtered.append(frames[i]) pre = i return filtered except subprocess.CalledProcessError as e: raise Exception(f"关键帧提取失败: {e.stderr}")