from core.router import Parser from models.result import ParseResult from utils.logger import log from utils.ffmpeg_wrapper import FFmpegWrapper import os import tempfile import shutil import subprocess import asyncio import aiohttp class AudioParser(Parser): """ 音频文件解析器(支持长音频、自动切分) """ # === ASR 参数(稳定工业值)=== ASR_SAMPLE_RATE = 16000 ASR_CHANNELS = 1 SEGMENT_SECONDS = 180 # 3 分钟一段(≈8–10MB) REQUEST_TIMEOUT = 600 def __init__(self): self.ffmpeg = FFmpegWrapper() self.qwen_asr_api_url = "http://10.192.72.13:7283/v1/audio/transcriptions" self.model_path = "/data/shared/Qwen3-ASR/qwen/Qwen3-ASR-1.7B" log.info("音频解析器初始化完成(支持长音频分片 + Qwen3-ASR)") async def parse(self, file_path: str) -> ParseResult: log.info(f"开始解析音频文件: {file_path}") work_dir = tempfile.mkdtemp(prefix="audio_asr_") asr_wav = os.path.join(work_dir, "asr.wav") segments_dir = os.path.join(work_dir, "segments") all_text = [] try: # ========================================================= # 1. 转换为 ASR 标准 WAV(16k / mono / s16) # ========================================================= log.info("转换音频为 ASR 标准格式(16kHz / mono)") subprocess.run( [ "ffmpeg", "-y", "-i", file_path, "-ac", str(self.ASR_CHANNELS), "-ar", str(self.ASR_SAMPLE_RATE), "-sample_fmt", "s16", "-threads", "4", # 利用多线程加速转换 asr_wav ], check=True ) wav_size_mb = os.path.getsize(asr_wav) / (1024 * 1024) log.info(f"标准 WAV 大小: {wav_size_mb:.2f} MB") # ========================================================= # 2. 按时长切分 # ========================================================= os.makedirs(segments_dir, exist_ok=True) subprocess.run( [ "ffmpeg", "-i", asr_wav, "-f", "segment", "-segment_time", str(self.SEGMENT_SECONDS), "-c", "copy", os.path.join(segments_dir, "chunk_%03d.wav") ], check=True ) chunks = sorted( os.path.join(segments_dir, f) for f in os.listdir(segments_dir) if f.endswith(".wav") ) log.info(f"音频切分完成,共 {len(chunks)} 段") # ========================================================= # 3. 并行调用 Qwen3-ASR # ========================================================= async def process_chunk(idx, chunk): """异步处理单个分片""" size_mb = os.path.getsize(chunk) / (1024 * 1024) log.info(f"[{idx+1}/{len(chunks)}] 处理分片: {chunk} ({size_mb:.2f} MB)") try: with open(chunk, "rb") as f: # 构建multipart/form-data form = aiohttp.FormData() form.add_field('file', f, filename='audio.wav', content_type='audio/wav') form.add_field('model', self.model_path) form.add_field('language', 'zh') form.add_field('response_format', 'json') async with aiohttp.ClientSession(trust_env=False) as session: async with session.post( self.qwen_asr_api_url, data=form, timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT) ) as response: if response.status != 200: log.warning(f"ASR 返回异常: {response.status} - {await response.text()}") return idx, "" result = await response.json() text = result.get("text", "").strip() return idx, text except Exception as e: log.error(f"分片 ASR 失败: {chunk} - {str(e)}") return idx, "" # 并行处理所有分片 tasks = [process_chunk(idx, chunk) for idx, chunk in enumerate(chunks)] results = await asyncio.gather(*tasks) # 按原始顺序排序结果 results.sort(key=lambda x: x[0]) for idx, text in results: if text: all_text.append(text) # ========================================================= # 4. 汇总结果 # ========================================================= final_text = "\n".join(all_text) if not final_text: log.warning("所有分片识别失败") final_text = "语音识别失败:未获取到有效文本" return ParseResult( content=final_text, metadata={ "parser": "Qwen3-ASR", "file_size": os.path.getsize(file_path), "segments": len(chunks), "api_url": self.qwen_asr_api_url }, file_type="audio" ) except Exception as e: log.error(f"音频解析失败: {str(e)}") import traceback log.error(traceback.format_exc()) return ParseResult( content="", metadata={"error": str(e)}, file_type="audio" ) finally: # ========================================================= # 5. 清理临时目录 # ========================================================= try: shutil.rmtree(work_dir) log.info(f"清理临时目录: {work_dir}") except Exception: pass