from core.router import Parser from models.result import ParseResult from utils.logger import log from utils.ffmpeg_wrapper import FFmpegWrapper import os import tempfile import base64 import requests from parsers.audio_parser import AudioParser class VideoParser(Parser): """视频文件解析器""" def __init__(self): self.ffmpeg = FFmpegWrapper() self.audio_parser = AudioParser() # Qwen3-VL模型配置 self.qwen_api_url = "http://10.192.72.13:7280/v1/chat/completions" async def parse(self, file_path: str) -> ParseResult: """ 解析视频文件 Args: file_path: 文件路径 Returns: ParseResult: 解析结果 """ log.info(f"开始解析视频文件: {file_path}") try: # 1. 提取音频轨道 with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: temp_audio_path = temp_file.name self.ffmpeg.extract_audio(file_path, temp_audio_path) log.info(f"音频提取完成: {temp_audio_path}") # 2. 使用AudioParser解析音频 audio_result = await self.audio_parser.parse(temp_audio_path) log.info("音频解析完成") # 3. 提取关键帧 frame_results = [] # 移到外部定义 with tempfile.TemporaryDirectory() as temp_dir: keyframes = self.ffmpeg.extract_keyframes(file_path, temp_dir) log.info(f"关键帧提取完成,共{len(keyframes)}张") # 4. 使用Qwen3-VL解析关键帧 for i, frame_path in enumerate(keyframes): try: frame_content = self._parse_frame_with_qwen(frame_path) log.info(f"解析关键帧 {i+1} 结果长度: {len(frame_content) if frame_content else 0}") if frame_content: # 计算关键帧的时间点(秒) time_second = i * 10 # 假设每10秒提取一个关键帧 frame_results.append((time_second, frame_content)) log.info(f"添加关键帧 {i+1} 到结果列表") else: log.warning(f"关键帧 {i+1} 解析结果为空") except Exception as e: log.warning(f"解析关键帧 {i+1} 失败: {str(e)}") log.info(f"关键帧解析完成,frame_results长度: {len(frame_results)}") # 5. 合并结果 content = [] content.append("# 音频内容") content.append(audio_result.content) if frame_results: log.info("开始添加画面内容到结果") content.append("\n# 画面内容") for time_second, frame_content in frame_results: content.append(f"\n## 第{time_second}秒") content.append(frame_content) log.info(f"添加第{time_second}秒画面内容,长度: {len(frame_content)}") else: log.warning("没有画面内容可以添加") # 清理临时文件 if os.path.exists(temp_audio_path): os.remove(temp_audio_path) return ParseResult( content="\n".join(content), metadata={ "parser": "VideoParser", "file_size": os.path.getsize(file_path), "audio_parser": "Qwen3-ASR", "visual_parser": "Qwen3-VL", "keyframe_count": len(keyframes) }, file_type="video" ) except Exception as e: log.error(f"视频文件解析失败: {str(e)}") # 清理临时文件 if 'temp_audio_path' in locals() and os.path.exists(temp_audio_path): os.remove(temp_audio_path) return ParseResult( content="", metadata={"error": str(e)}, file_type="video" ) def _parse_frame_with_qwen(self, image_path: str) -> str: """ 使用Qwen3-VL模型解析图片 Args: image_path: 图片路径 Returns: str: 解析结果 """ log.info(f"使用Qwen3-VL解析图片: {image_path}") # 编码图片 with open(image_path, "rb") as f: base64_image = base64.b64encode(f.read()).decode("utf-8") # 发送请求 payload = { "model": "/model", "messages": [{ "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}, {"type": "text", "text": "详细描述这张图片的内容,包括人物、物体、场景、文字等所有可见信息"} ] }], "max_tokens": 512 } response = requests.post(self.qwen_api_url, json=payload, timeout=120) response.raise_for_status() result = response.json() return result["choices"][0]["message"]["content"]