ffmpeg_wrapper.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import subprocess
  2. import os
  3. from typing import Optional
  4. try:
  5. import cv2
  6. import numpy as np
  7. except Exception:
  8. cv2 = None
  9. np = None
  10. class FFmpegWrapper:
  11. """FFmpeg命令行包装工具"""
  12. def __init__(self):
  13. self.ffmpeg_path = "ffmpeg" # 假设ffmpeg已在系统PATH中
  14. def extract_audio(self, video_path: str, output_audio_path: str) -> bool:
  15. """
  16. 从视频中提取音频轨道
  17. Args:
  18. video_path: 视频文件路径
  19. output_audio_path: 输出音频文件路径
  20. Returns:
  21. bool: 操作是否成功
  22. """
  23. try:
  24. cmd = [
  25. self.ffmpeg_path,
  26. "-i", video_path,
  27. "-vn", # 禁用视频
  28. "-acodec", "pcm_s16le", # 16位PCM
  29. "-ar", "16000", # 16kHz采样率
  30. "-ac", "1", # 单声道
  31. "-y", # 覆盖输出文件
  32. output_audio_path
  33. ]
  34. subprocess.run(cmd, check=True, capture_output=True, text=True)
  35. return True
  36. except subprocess.CalledProcessError as e:
  37. raise Exception(f"音频提取失败: {e.stderr}")
  38. def convert_audio(self, input_audio_path: str, output_audio_path: str) -> bool:
  39. """
  40. 转换音频格式为16k/16bit/mono wav
  41. Args:
  42. input_audio_path: 输入音频文件路径
  43. output_audio_path: 输出音频文件路径
  44. Returns:
  45. bool: 操作是否成功
  46. """
  47. try:
  48. cmd = [
  49. self.ffmpeg_path,
  50. "-i", input_audio_path,
  51. "-acodec", "pcm_s16le",
  52. "-ar", "16000",
  53. "-ac", "1",
  54. "-y",
  55. output_audio_path
  56. ]
  57. subprocess.run(cmd, check=True, capture_output=True, text=True)
  58. return True
  59. except subprocess.CalledProcessError as e:
  60. raise Exception(f"音频转换失败: {e.stderr}")
  61. def extract_keyframes(self, video_path: str, output_dir: str, interval: int = 60, diff_threshold: Optional[float] = None) -> list:
  62. """
  63. 从视频中提取关键帧
  64. Args:
  65. video_path: 视频文件路径
  66. output_dir: 输出目录
  67. interval: 提取间隔(秒)
  68. Returns:
  69. list: 提取的关键帧文件路径列表
  70. """
  71. try:
  72. # 确保输出目录存在
  73. os.makedirs(output_dir, exist_ok=True)
  74. # 提取关键帧(按固定频率导出帧)
  75. output_pattern = os.path.join(output_dir, "frame_%06d.jpg")
  76. cmd = [
  77. self.ffmpeg_path,
  78. "-i", video_path,
  79. "-vf", f"fps=1/{interval}", # 每 interval 秒一张
  80. "-y",
  81. output_pattern
  82. ]
  83. subprocess.run(cmd, check=True, capture_output=True, text=True)
  84. # 收集提取的帧
  85. frames = []
  86. for file in os.listdir(output_dir):
  87. if file.startswith("frame_") and file.endswith(".jpg"):
  88. frames.append(os.path.join(output_dir, file))
  89. frames = sorted(frames)
  90. # 如果未提供差异阈值,直接返回所有按固定频率提取的帧
  91. if diff_threshold is None:
  92. return frames
  93. # 检查依赖
  94. if cv2 is None or np is None:
  95. raise Exception("OpenCV (opencv-python) 和 numpy 需要安装以启用帧差法(pip install opencv-python numpy)")
  96. # 使用 OpenCV 的灰度图像计算帧差,比较当前帧与上一个边界帧(pre),当差异>=阈值时标记为关键帧
  97. # 首先读取所有帧(彩色),以便能使用 cvtColor 按要求比较
  98. imgs = []
  99. for frame_path in frames:
  100. try:
  101. img = cv2.imread(frame_path) # BGR
  102. imgs.append(img)
  103. except Exception:
  104. imgs.append(None)
  105. filtered = []
  106. # 为了加速计算,统一缩放尺寸 (width, height)
  107. resize_to = (320, 240)
  108. # 找到第一个有效帧作为初始关键帧
  109. pre = None
  110. for idx, img in enumerate(imgs):
  111. if img is not None:
  112. filtered.append(frames[idx])
  113. pre = idx
  114. break
  115. if pre is None:
  116. return []
  117. # 从下一个帧开始,比较当前帧与 imgs[pre]
  118. for i in range(pre + 1, len(imgs)):
  119. curr = imgs[i]
  120. if curr is None:
  121. continue
  122. prev = imgs[pre]
  123. try:
  124. prev_gray = cv2.cvtColor(prev, cv2.COLOR_BGR2GRAY)
  125. curr_gray = cv2.cvtColor(curr, cv2.COLOR_BGR2GRAY)
  126. if resize_to is not None:
  127. prev_gray = cv2.resize(prev_gray, resize_to, interpolation=cv2.INTER_AREA)
  128. curr_gray = cv2.resize(curr_gray, resize_to, interpolation=cv2.INTER_AREA)
  129. diff_val = np.mean(np.abs(curr_gray.astype(int) - prev_gray.astype(int)))
  130. except Exception:
  131. continue
  132. if diff_val >= float(diff_threshold):
  133. filtered.append(frames[i])
  134. pre = i
  135. return filtered
  136. except subprocess.CalledProcessError as e:
  137. raise Exception(f"关键帧提取失败: {e.stderr}")