| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- from core.router import Parser
- from models.result import ParseResult
- from utils.logger import log
- from utils.ffmpeg_wrapper import FFmpegWrapper
- import os
- import tempfile
- import shutil
- import subprocess
- import asyncio
- import aiohttp
- class AudioParser(Parser):
- """
- 音频文件解析器(支持长音频、自动切分)
- """
- # === ASR 参数(稳定工业值)===
- ASR_SAMPLE_RATE = 16000
- ASR_CHANNELS = 1
- SEGMENT_SECONDS = 180 # 3 分钟一段(≈8–10MB)
- REQUEST_TIMEOUT = 600
- def __init__(self):
- self.ffmpeg = FFmpegWrapper()
- self.qwen_asr_api_url = "http://10.192.72.13:7283/v1/audio/transcriptions"
- self.model_path = "/data/shared/Qwen3-ASR/qwen/Qwen3-ASR-1.7B"
- log.info("音频解析器初始化完成(支持长音频分片 + Qwen3-ASR)")
- async def parse(self, file_path: str) -> ParseResult:
- log.info(f"开始解析音频文件: {file_path}")
- work_dir = tempfile.mkdtemp(prefix="audio_asr_")
- asr_wav = os.path.join(work_dir, "asr.wav")
- segments_dir = os.path.join(work_dir, "segments")
- all_text = []
- try:
- # =========================================================
- # 1. 转换为 ASR 标准 WAV(16k / mono / s16)
- # =========================================================
- log.info("转换音频为 ASR 标准格式(16kHz / mono)")
- subprocess.run(
- [
- "ffmpeg", "-y",
- "-i", file_path,
- "-ac", str(self.ASR_CHANNELS),
- "-ar", str(self.ASR_SAMPLE_RATE),
- "-sample_fmt", "s16",
- "-threads", "4", # 利用多线程加速转换
- asr_wav
- ],
- check=True
- )
- wav_size_mb = os.path.getsize(asr_wav) / (1024 * 1024)
- log.info(f"标准 WAV 大小: {wav_size_mb:.2f} MB")
- # =========================================================
- # 2. 按时长切分
- # =========================================================
- os.makedirs(segments_dir, exist_ok=True)
- subprocess.run(
- [
- "ffmpeg",
- "-i", asr_wav,
- "-f", "segment",
- "-segment_time", str(self.SEGMENT_SECONDS),
- "-c", "copy",
- os.path.join(segments_dir, "chunk_%03d.wav")
- ],
- check=True
- )
- chunks = sorted(
- os.path.join(segments_dir, f)
- for f in os.listdir(segments_dir)
- if f.endswith(".wav")
- )
- log.info(f"音频切分完成,共 {len(chunks)} 段")
- # =========================================================
- # 3. 并行调用 Qwen3-ASR
- # =========================================================
- async def process_chunk(idx, chunk):
- """异步处理单个分片"""
- size_mb = os.path.getsize(chunk) / (1024 * 1024)
- log.info(f"[{idx+1}/{len(chunks)}] 处理分片: {chunk} ({size_mb:.2f} MB)")
-
- try:
- with open(chunk, "rb") as f:
- # 构建multipart/form-data
- form = aiohttp.FormData()
- form.add_field('file', f, filename='audio.wav', content_type='audio/wav')
- form.add_field('model', self.model_path)
- form.add_field('language', 'zh')
- form.add_field('response_format', 'json')
-
- async with aiohttp.ClientSession(trust_env=False) as session:
- async with session.post(
- self.qwen_asr_api_url,
- data=form,
- timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT)
- ) as response:
- if response.status != 200:
- log.warning(f"ASR 返回异常: {response.status} - {await response.text()}")
- return idx, ""
-
- result = await response.json()
- text = result.get("text", "").strip()
- return idx, text
- except Exception as e:
- log.error(f"分片 ASR 失败: {chunk} - {str(e)}")
- return idx, ""
- # 并行处理所有分片
- tasks = [process_chunk(idx, chunk) for idx, chunk in enumerate(chunks)]
- results = await asyncio.gather(*tasks)
-
- # 按原始顺序排序结果
- results.sort(key=lambda x: x[0])
- for idx, text in results:
- if text:
- all_text.append(text)
- # =========================================================
- # 4. 汇总结果
- # =========================================================
- final_text = "\n".join(all_text)
- if not final_text:
- log.warning("所有分片识别失败")
- final_text = "语音识别失败:未获取到有效文本"
- return ParseResult(
- content=final_text,
- metadata={
- "parser": "Qwen3-ASR",
- "file_size": os.path.getsize(file_path),
- "segments": len(chunks),
- "api_url": self.qwen_asr_api_url
- },
- file_type="audio"
- )
- except Exception as e:
- log.error(f"音频解析失败: {str(e)}")
- import traceback
- log.error(traceback.format_exc())
- return ParseResult(
- content="",
- metadata={"error": str(e)},
- file_type="audio"
- )
- finally:
- # =========================================================
- # 5. 清理临时目录
- # =========================================================
- try:
- shutil.rmtree(work_dir)
- log.info(f"清理临时目录: {work_dir}")
- except Exception:
- pass
|