audio_parser.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. from core.router import Parser
  2. from models.result import ParseResult
  3. from utils.logger import log
  4. from utils.ffmpeg_wrapper import FFmpegWrapper
  5. import os
  6. import tempfile
  7. import shutil
  8. import subprocess
  9. import asyncio
  10. import aiohttp
  11. class AudioParser(Parser):
  12. """
  13. 音频文件解析器(支持长音频、自动切分)
  14. """
  15. # === ASR 参数(稳定工业值)===
  16. ASR_SAMPLE_RATE = 16000
  17. ASR_CHANNELS = 1
  18. SEGMENT_SECONDS = 180 # 3 分钟一段(≈8–10MB)
  19. REQUEST_TIMEOUT = 600
  20. def __init__(self):
  21. self.ffmpeg = FFmpegWrapper()
  22. self.qwen_asr_api_url = "http://10.192.72.13:7283/v1/audio/transcriptions"
  23. self.model_path = "/data/shared/Qwen3-ASR/qwen/Qwen3-ASR-1.7B"
  24. log.info("音频解析器初始化完成(支持长音频分片 + Qwen3-ASR)")
  25. async def parse(self, file_path: str) -> ParseResult:
  26. log.info(f"开始解析音频文件: {file_path}")
  27. work_dir = tempfile.mkdtemp(prefix="audio_asr_")
  28. asr_wav = os.path.join(work_dir, "asr.wav")
  29. segments_dir = os.path.join(work_dir, "segments")
  30. all_text = []
  31. try:
  32. # =========================================================
  33. # 1. 转换为 ASR 标准 WAV(16k / mono / s16)
  34. # =========================================================
  35. log.info("转换音频为 ASR 标准格式(16kHz / mono)")
  36. subprocess.run(
  37. [
  38. "ffmpeg", "-y",
  39. "-i", file_path,
  40. "-ac", str(self.ASR_CHANNELS),
  41. "-ar", str(self.ASR_SAMPLE_RATE),
  42. "-sample_fmt", "s16",
  43. "-threads", "4", # 利用多线程加速转换
  44. asr_wav
  45. ],
  46. check=True
  47. )
  48. wav_size_mb = os.path.getsize(asr_wav) / (1024 * 1024)
  49. log.info(f"标准 WAV 大小: {wav_size_mb:.2f} MB")
  50. # =========================================================
  51. # 2. 按时长切分
  52. # =========================================================
  53. os.makedirs(segments_dir, exist_ok=True)
  54. subprocess.run(
  55. [
  56. "ffmpeg",
  57. "-i", asr_wav,
  58. "-f", "segment",
  59. "-segment_time", str(self.SEGMENT_SECONDS),
  60. "-c", "copy",
  61. os.path.join(segments_dir, "chunk_%03d.wav")
  62. ],
  63. check=True
  64. )
  65. chunks = sorted(
  66. os.path.join(segments_dir, f)
  67. for f in os.listdir(segments_dir)
  68. if f.endswith(".wav")
  69. )
  70. log.info(f"音频切分完成,共 {len(chunks)} 段")
  71. # =========================================================
  72. # 3. 并行调用 Qwen3-ASR
  73. # =========================================================
  74. async def process_chunk(idx, chunk):
  75. """异步处理单个分片"""
  76. size_mb = os.path.getsize(chunk) / (1024 * 1024)
  77. log.info(f"[{idx+1}/{len(chunks)}] 处理分片: {chunk} ({size_mb:.2f} MB)")
  78. try:
  79. with open(chunk, "rb") as f:
  80. # 构建multipart/form-data
  81. form = aiohttp.FormData()
  82. form.add_field('file', f, filename='audio.wav', content_type='audio/wav')
  83. form.add_field('model', self.model_path)
  84. form.add_field('language', 'zh')
  85. form.add_field('response_format', 'json')
  86. async with aiohttp.ClientSession(trust_env=False) as session:
  87. async with session.post(
  88. self.qwen_asr_api_url,
  89. data=form,
  90. timeout=aiohttp.ClientTimeout(total=self.REQUEST_TIMEOUT)
  91. ) as response:
  92. if response.status != 200:
  93. log.warning(f"ASR 返回异常: {response.status} - {await response.text()}")
  94. return idx, ""
  95. result = await response.json()
  96. text = result.get("text", "").strip()
  97. return idx, text
  98. except Exception as e:
  99. log.error(f"分片 ASR 失败: {chunk} - {str(e)}")
  100. return idx, ""
  101. # 并行处理所有分片
  102. tasks = [process_chunk(idx, chunk) for idx, chunk in enumerate(chunks)]
  103. results = await asyncio.gather(*tasks)
  104. # 按原始顺序排序结果
  105. results.sort(key=lambda x: x[0])
  106. for idx, text in results:
  107. if text:
  108. all_text.append(text)
  109. # =========================================================
  110. # 4. 汇总结果
  111. # =========================================================
  112. final_text = "\n".join(all_text)
  113. if not final_text:
  114. log.warning("所有分片识别失败")
  115. final_text = "语音识别失败:未获取到有效文本"
  116. return ParseResult(
  117. content=final_text,
  118. metadata={
  119. "parser": "Qwen3-ASR",
  120. "file_size": os.path.getsize(file_path),
  121. "segments": len(chunks),
  122. "api_url": self.qwen_asr_api_url
  123. },
  124. file_type="audio"
  125. )
  126. except Exception as e:
  127. log.error(f"音频解析失败: {str(e)}")
  128. import traceback
  129. log.error(traceback.format_exc())
  130. return ParseResult(
  131. content="",
  132. metadata={"error": str(e)},
  133. file_type="audio"
  134. )
  135. finally:
  136. # =========================================================
  137. # 5. 清理临时目录
  138. # =========================================================
  139. try:
  140. shutil.rmtree(work_dir)
  141. log.info(f"清理临时目录: {work_dir}")
  142. except Exception:
  143. pass