| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- import subprocess
- import os
- from typing import Optional
- try:
- import cv2
- import numpy as np
- except Exception:
- cv2 = None
- np = None
- class FFmpegWrapper:
- """FFmpeg命令行包装工具"""
-
- def __init__(self):
- self.ffmpeg_path = "ffmpeg" # 假设ffmpeg已在系统PATH中
-
- def extract_audio(self, video_path: str, output_audio_path: str) -> bool:
- """
- 从视频中提取音频轨道
-
- Args:
- video_path: 视频文件路径
- output_audio_path: 输出音频文件路径
-
- Returns:
- bool: 操作是否成功
- """
- try:
- cmd = [
- self.ffmpeg_path,
- "-i", video_path,
- "-vn", # 禁用视频
- "-acodec", "pcm_s16le", # 16位PCM
- "-ar", "16000", # 16kHz采样率
- "-ac", "1", # 单声道
- "-y", # 覆盖输出文件
- output_audio_path
- ]
-
- subprocess.run(cmd, check=True, capture_output=True, text=True)
- return True
- except subprocess.CalledProcessError as e:
- raise Exception(f"音频提取失败: {e.stderr}")
-
- def convert_audio(self, input_audio_path: str, output_audio_path: str) -> bool:
- """
- 转换音频格式为16k/16bit/mono wav
-
- Args:
- input_audio_path: 输入音频文件路径
- output_audio_path: 输出音频文件路径
-
- Returns:
- bool: 操作是否成功
- """
- try:
- cmd = [
- self.ffmpeg_path,
- "-i", input_audio_path,
- "-acodec", "pcm_s16le",
- "-ar", "16000",
- "-ac", "1",
- "-y",
- output_audio_path
- ]
-
- subprocess.run(cmd, check=True, capture_output=True, text=True)
- return True
- except subprocess.CalledProcessError as e:
- raise Exception(f"音频转换失败: {e.stderr}")
-
- def extract_keyframes(self, video_path: str, output_dir: str, interval: int = 60, diff_threshold: Optional[float] = None) -> list:
- """
- 从视频中提取关键帧
-
- Args:
- video_path: 视频文件路径
- output_dir: 输出目录
- interval: 提取间隔(秒)
-
- Returns:
- list: 提取的关键帧文件路径列表
- """
- try:
- # 确保输出目录存在
- os.makedirs(output_dir, exist_ok=True)
-
- # 提取关键帧(按固定频率导出帧)
- output_pattern = os.path.join(output_dir, "frame_%06d.jpg")
- cmd = [
- self.ffmpeg_path,
- "-i", video_path,
- "-vf", f"fps=1/{interval}", # 每 interval 秒一张
- "-y",
- output_pattern
- ]
-
- subprocess.run(cmd, check=True, capture_output=True, text=True)
-
- # 收集提取的帧
- frames = []
- for file in os.listdir(output_dir):
- if file.startswith("frame_") and file.endswith(".jpg"):
- frames.append(os.path.join(output_dir, file))
- frames = sorted(frames)
- # 如果未提供差异阈值,直接返回所有按固定频率提取的帧
- if diff_threshold is None:
- return frames
- # 检查依赖
- if cv2 is None or np is None:
- raise Exception("OpenCV (opencv-python) 和 numpy 需要安装以启用帧差法(pip install opencv-python numpy)")
- # 使用 OpenCV 的灰度图像计算帧差,比较当前帧与上一个边界帧(pre),当差异>=阈值时标记为关键帧
- # 首先读取所有帧(彩色),以便能使用 cvtColor 按要求比较
- imgs = []
- for frame_path in frames:
- try:
- img = cv2.imread(frame_path) # BGR
- imgs.append(img)
- except Exception:
- imgs.append(None)
- filtered = []
- # 为了加速计算,统一缩放尺寸 (width, height)
- resize_to = (320, 240)
- # 找到第一个有效帧作为初始关键帧
- pre = None
- for idx, img in enumerate(imgs):
- if img is not None:
- filtered.append(frames[idx])
- pre = idx
- break
- if pre is None:
- return []
- # 从下一个帧开始,比较当前帧与 imgs[pre]
- for i in range(pre + 1, len(imgs)):
- curr = imgs[i]
- if curr is None:
- continue
- prev = imgs[pre]
- try:
- prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
- curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
- if resize_to is not None:
- prev_gray = cv2.resize(prev_gray, resize_to, interpolation=cv2.INTER_AREA)
- curr_gray = cv2.resize(curr_gray, resize_to, interpolation=cv2.INTER_AREA)
- diff_val = np.mean(np.abs(curr_gray.astype(int) - prev_gray.astype(int)))
- except Exception:
- continue
- if diff_val >= float(diff_threshold):
- filtered.append(frames[i])
- pre = i
- return filtered
- except subprocess.CalledProcessError as e:
- raise Exception(f"关键帧提取失败: {e.stderr}")
|