from core.router import Parser from models.result import ParseResult from utils.logger import log from utils.ffmpeg_wrapper import FFmpegWrapper import os import tempfile import base64 import requests from parsers.audio_parser import AudioParser class VideoParser(Parser): """视频文件解析器""" def __init__(self): self.ffmpeg = FFmpegWrapper() self.audio_parser = AudioParser() # Qwen3-VL模型配置 self.qwen_api_url = "http://10.192.72.13:7280/v1/chat/completions" async def parse(self, file_path: str) -> ParseResult: """ 解析视频文件 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ log.info(f"开始解析视频文件: {file_path}") try: # 1. 提取音频轨道 with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: temp_audio_path = temp_file.name self.ffmpeg.extract_audio(file_path, temp_audio_path) log.info(f"音频提取完成: {temp_audio_path}") # 2. 使用AudioParser解析音频 audio_result = await self.audio_parser.parse(temp_audio_path) log.info("音频解析完成") # 3. 提取关键帧 frame_results = [] # 移到外部定义 with tempfile.TemporaryDirectory() as temp_dir: # 使用固定频率先提取帧,再通过帧差法筛选关键帧 interval_seconds = 10 diff_threshold = 15.0 keyframes = self.ffmpeg.extract_keyframes(file_path, temp_dir, interval=interval_seconds, diff_threshold=diff_threshold) log.info(f"关键帧提取完成(帧差阈值={diff_threshold}),共{len(keyframes)}张") # 4. 使用Qwen3-VL解析关键帧;根据帧文件名计算时间点 for idx, frame_path in enumerate(keyframes): try: frame_content = self._parse_frame_with_qwen(frame_path) log.info(f"解析关键帧 {idx+1} 结果长度: {len(frame_content) if frame_content else 0}") if frame_content: # 从文件名解析帧序号,filename like frame_000001.jpg try: base = os.path.basename(frame_path) num_part = base.split('_')[1].split('.')[0] frame_index = int(num_part) time_second = (frame_index - 1) * interval_seconds except Exception: time_second = idx * interval_seconds frame_results.append((time_second, frame_content)) log.info(f"添加关键帧 到结果列表,时间:{time_second}s") else: log.warning(f"关键帧 {idx+1} 解析结果为空") except Exception as e: log.warning(f"解析关键帧 {idx+1} 失败: {str(e)}") log.info(f"关键帧解析完成,frame_results长度: {len(frame_results)}") # 5. 合并结果 content = [] content.append("# 音频内容") content.append(audio_result.content) if frame_results: log.info("开始添加画面内容到结果") content.append("\n# 画面内容") for time_second, frame_content in frame_results: content.append(f"\n## 第{time_second}秒") content.append(frame_content) log.info(f"添加第{time_second}秒画面内容,长度: {len(frame_content)}") else: log.warning("没有画面内容可以添加") # 清理临时文件 if os.path.exists(temp_audio_path): os.remove(temp_audio_path) return ParseResult( content="\n".join(content), metadata={ "parser": "VideoParser", "file_size": os.path.getsize(file_path), "audio_parser": "Qwen3-ASR", "visual_parser": "Qwen3-VL", "keyframe_count": len(keyframes) }, file_type="video" ) except Exception as e: log.error(f"视频文件解析失败: {str(e)}") # 清理临时文件 if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path): os.remove(temp_audio_path) return ParseResult( content="", metadata={"error": str(e)}, file_type="video" ) def _parse_frame_with_qwen(self, image_path: str) -> str: """ 使用Qwen3-VL模型解析图片 Args: image_path: 图片路径 Returns: str: 解析结果 """ log.info(f"使用Qwen3-VL解析图片: {image_path}") # 编码图片 with open(image_path, "rb") as f: base64_image = base64.b64encode(f.read()).decode("utf-8") # 发送请求 payload = { "model": "/model", "messages": [{ "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}, {"type": "text", "text": "详细描述这张图片的内容,包括人物、物体、场景、文字等所有可见信息"} ] }], "max_tokens": 512 } response = requests.post(self.qwen_api_url, json=payload, timeout=120) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"]