litserve_worker.py 21 KB


  1. """
  2. MinerU Tianshu - LitServe Worker
  3. 天枢 LitServe Worker
  4. 使用 LitServe 实现 GPU 资源的自动负载均衡
  5. Worker 主动循环拉取任务并处理
  6. """
  7. import os
  8. import json
  9. import sys
  10. import time
  11. import threading
  12. import signal
  13. import atexit
  14. from pathlib import Path
  15. import litserve as ls
  16. from loguru import logger
  17. # 添加父目录到路径以导入 MinerU
  18. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  19. from task_db import TaskDB
  20. from mineru.cli.common import do_parse, read_fn
  21. from mineru.utils.config_reader import get_device
  22. from mineru.utils.model_utils import get_vram, clean_memory
  23. # 尝试导入 markitdown
  24. try:
  25. from markitdown import MarkItDown
  26. MARKITDOWN_AVAILABLE = True
  27. except ImportError:
  28. MARKITDOWN_AVAILABLE = False
  29. logger.warning("⚠️ markitdown not available, Office format parsing will be disabled")
  30. class MinerUWorkerAPI(ls.LitAPI):
  31. """
  32. LitServe API Worker
  33. Worker 主动循环拉取任务,利用 LitServe 的自动 GPU 负载均衡
  34. 支持两种解析方式:
  35. - PDF/图片 -> MinerU 解析(GPU 加速)
  36. - 其他所有格式 -> MarkItDown 解析(快速处理)
  37. 新模式:每个 worker 启动后持续循环拉取任务,处理完一个立即拉取下一个
  38. """
  39. # 支持的文件格式定义
  40. # MinerU 专用格式:PDF 和图片
  41. PDF_IMAGE_FORMATS = {'.pdf', '.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif', '.webp'}
  42. # 其他所有格式都使用 MarkItDown 解析
  43. def __init__(self, output_dir='/tmp/mineru_tianshu_output', worker_id_prefix='tianshu',
  44. poll_interval=0.5, enable_worker_loop=True):
  45. super().__init__()
  46. self.output_dir = Path(output_dir)
  47. self.output_dir.mkdir(parents=True, exist_ok=True)
  48. self.worker_id_prefix = worker_id_prefix
  49. self.poll_interval = poll_interval # Worker 拉取任务的间隔(秒)
  50. self.enable_worker_loop = enable_worker_loop # 是否启用 worker 循环拉取
  51. self.db = TaskDB()
  52. self.worker_id = None
  53. self.markitdown = None
  54. self.running = False # Worker 运行状态
  55. self.worker_thread = None # Worker 线程
  56. def setup(self, device):
  57. """
  58. 初始化环境(每个 worker 进程调用一次)
  59. 关键修复:使用 CUDA_VISIBLE_DEVICES 确保每个进程只使用分配的 GPU
  60. Args:
  61. device: LitServe 分配的设备 (cuda:0, cuda:1, etc.)
  62. """
  63. # 生成唯一的 worker_id
  64. import socket
  65. hostname = socket.gethostname()
  66. pid = os.getpid()
  67. self.worker_id = f"{self.worker_id_prefix}-{hostname}-{device}-{pid}"
  68. logger.info(f"⚙️ Worker {self.worker_id} setting up on device: {device}")
  69. # 关键修复:设置 CUDA_VISIBLE_DEVICES 限制进程只能看到分配的 GPU
  70. # 这样可以防止一个进程占用多张卡的显存
  71. if device != 'auto' and device != 'cpu' and ':' in str(device):
  72. # 从 'cuda:0' 提取设备ID '0'
  73. device_id = str(device).split(':')[-1]
  74. os.environ['CUDA_VISIBLE_DEVICES'] = device_id
  75. # 设置为 cuda:0,因为对进程来说只能看到一张卡(逻辑ID变为0)
  76. os.environ['MINERU_DEVICE_MODE'] = 'cuda:0'
  77. device_mode = os.environ['MINERU_DEVICE_MODE']
  78. logger.info(f"🔒 CUDA_VISIBLE_DEVICES={device_id} (Physical GPU {device_id} → Logical GPU 0)")
  79. else:
  80. # 配置 MinerU 环境
  81. if os.getenv('MINERU_DEVICE_MODE', None) is None:
  82. os.environ['MINERU_DEVICE_MODE'] = device if device != 'auto' else get_device()
  83. device_mode = os.environ['MINERU_DEVICE_MODE']
  84. # 配置显存
  85. if os.getenv('MINERU_VIRTUAL_VRAM_SIZE', None) is None:
  86. if device_mode.startswith("cuda") or device_mode.startswith("npu"):
  87. try:
  88. vram = round(get_vram(device_mode))
  89. os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = str(vram)
  90. except:
  91. os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = '8' # 默认值
  92. else:
  93. os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = '1'
  94. # 初始化 MarkItDown(如果可用)
  95. if MARKITDOWN_AVAILABLE:
  96. self.markitdown = MarkItDown()
  97. logger.info(f"✅ MarkItDown initialized for Office format parsing")
  98. logger.info(f"✅ Worker {self.worker_id} ready")
  99. logger.info(f" Device: {device_mode}")
  100. logger.info(f" VRAM: {os.environ['MINERU_VIRTUAL_VRAM_SIZE']}GB")
  101. # 启动 worker 循环拉取任务(在独立线程中)
  102. if self.enable_worker_loop:
  103. self.running = True
  104. self.worker_thread = threading.Thread(
  105. target=self._worker_loop,
  106. daemon=True,
  107. name=f"Worker-{self.worker_id}"
  108. )
  109. self.worker_thread.start()
  110. logger.info(f"🔄 Worker loop started (poll_interval={self.poll_interval}s)")
  111. def teardown(self):
  112. """
  113. 优雅关闭 Worker
  114. 设置 running 标志为 False,等待 worker 线程完成当前任务后退出。
  115. 这避免了守护线程可能导致的任务处理不完整或数据库操作不一致问题。
  116. """
  117. if self.enable_worker_loop and self.worker_thread and self.worker_thread.is_alive():
  118. logger.info(f"🛑 Shutting down worker {self.worker_id}...")
  119. self.running = False
  120. # 等待线程完成当前任务(最多等待 poll_interval * 2 秒)
  121. timeout = self.poll_interval * 2
  122. self.worker_thread.join(timeout=timeout)
  123. if self.worker_thread.is_alive():
  124. logger.warning(f"⚠️ Worker thread did not stop within {timeout}s, forcing exit")
  125. else:
  126. logger.info(f"✅ Worker {self.worker_id} shut down gracefully")
  127. def _worker_loop(self):
  128. """
  129. Worker 主循环:持续拉取并处理任务
  130. 这个方法在独立线程中运行,让每个 worker 主动拉取任务
  131. 而不是被动等待调度器触发
  132. """
  133. logger.info(f"🔁 {self.worker_id} started task polling loop")
  134. idle_count = 0
  135. while self.running:
  136. try:
  137. # 从数据库获取任务
  138. task = self.db.get_next_task(self.worker_id)
  139. if task:
  140. idle_count = 0 # 重置空闲计数
  141. # 处理任务
  142. task_id = task['task_id']
  143. logger.info(f"🔄 {self.worker_id} picked up task {task_id}")
  144. try:
  145. self._process_task(task)
  146. except Exception as e:
  147. logger.error(f"❌ {self.worker_id} failed to process task {task_id}: {e}")
  148. success = self.db.update_task_status(
  149. task_id, 'failed',
  150. error_message=str(e),
  151. worker_id=self.worker_id
  152. )
  153. if not success:
  154. logger.warning(f"⚠️ Task {task_id} was modified by another process during failure update")
  155. else:
  156. # 没有任务时,增加空闲计数
  157. idle_count += 1
  158. # 只在第一次空闲时记录日志,避免刷屏
  159. if idle_count == 1:
  160. logger.debug(f"💤 {self.worker_id} is idle, waiting for tasks...")
  161. # 空闲时等待一段时间再拉取
  162. time.sleep(self.poll_interval)
  163. except Exception as e:
  164. logger.error(f"❌ {self.worker_id} loop error: {e}")
  165. time.sleep(self.poll_interval)
  166. logger.info(f"⏹️ {self.worker_id} stopped task polling loop")
  167. def _process_task(self, task: dict):
  168. """
  169. 处理单个任务
  170. Args:
  171. task: 任务字典
  172. """
  173. task_id = task['task_id']
  174. file_path = task['file_path']
  175. file_name = task['file_name']
  176. backend = task['backend']
  177. options = json.loads(task['options'])
  178. logger.info(f"🔄 Processing task {task_id}: {file_name}")
  179. try:
  180. # 准备输出目录
  181. output_path = self.output_dir / task_id
  182. output_path.mkdir(parents=True, exist_ok=True)
  183. # 判断文件类型并选择解析方式
  184. file_type = self._get_file_type(file_path)
  185. if file_type == 'pdf_image':
  186. # 使用 MinerU 解析 PDF 和图片
  187. self._parse_with_mineru(
  188. file_path=Path(file_path),
  189. file_name=file_name,
  190. task_id=task_id,
  191. backend=backend,
  192. options=options,
  193. output_path=output_path
  194. )
  195. parse_method = 'MinerU'
  196. else: # file_type == 'markitdown'
  197. # 使用 markitdown 解析所有其他格式
  198. self._parse_with_markitdown(
  199. file_path=Path(file_path),
  200. file_name=file_name,
  201. output_path=output_path
  202. )
  203. parse_method = 'MarkItDown'
  204. # 更新状态为成功
  205. success = self.db.update_task_status(
  206. task_id, 'completed',
  207. result_path=str(output_path),
  208. worker_id=self.worker_id
  209. )
  210. if success:
  211. logger.info(f"✅ Task {task_id} completed by {self.worker_id}")
  212. logger.info(f" Parser: {parse_method}")
  213. logger.info(f" Output: {output_path}")
  214. else:
  215. logger.warning(
  216. f"⚠️ Task {task_id} was modified by another process. "
  217. f"Worker {self.worker_id} completed the work but status update was rejected."
  218. )
  219. finally:
  220. # 清理临时文件
  221. try:
  222. if Path(file_path).exists():
  223. Path(file_path).unlink()
  224. except Exception as e:
  225. logger.warning(f"Failed to clean up temp file {file_path}: {e}")
  226. def decode_request(self, request):
  227. """
  228. 解码请求
  229. 现在主要用于健康检查和手动触发(兼容旧接口)
  230. """
  231. return request.get('action', 'poll')
  232. def _get_file_type(self, file_path: str) -> str:
  233. """
  234. 判断文件类型
  235. Args:
  236. file_path: 文件路径
  237. Returns:
  238. 'pdf_image': PDF 或图片格式,使用 MinerU 解析
  239. 'markitdown': 其他所有格式,使用 markitdown 解析
  240. """
  241. suffix = Path(file_path).suffix.lower()
  242. if suffix in self.PDF_IMAGE_FORMATS:
  243. return 'pdf_image'
  244. else:
  245. # 所有非 PDF/图片格式都使用 markitdown
  246. return 'markitdown'
  247. def _parse_with_mineru(self, file_path: Path, file_name: str, task_id: str,
  248. backend: str, options: dict, output_path: Path):
  249. """
  250. 使用 MinerU 解析 PDF 和图片格式
  251. Args:
  252. file_path: 文件路径
  253. file_name: 文件名
  254. task_id: 任务ID
  255. backend: 后端类型
  256. options: 解析选项
  257. output_path: 输出路径
  258. """
  259. logger.info(f"📄 Using MinerU to parse: {file_name}")
  260. try:
  261. # 读取文件
  262. pdf_bytes = read_fn(file_path)
  263. # 执行解析(MinerU 的 ModelSingleton 会自动复用模型)
  264. do_parse(
  265. output_dir=str(output_path),
  266. pdf_file_names=[Path(file_name).stem],
  267. pdf_bytes_list=[pdf_bytes],
  268. p_lang_list=[options.get('lang', 'ch')],
  269. backend=backend,
  270. parse_method=options.get('method', 'auto'),
  271. formula_enable=options.get('formula_enable', True),
  272. table_enable=options.get('table_enable', True),
  273. )
  274. finally:
  275. # 使用 MinerU 自带的内存清理函数
  276. # 这个函数只清理推理产生的中间结果,不会卸载模型
  277. try:
  278. clean_memory()
  279. except Exception as e:
  280. logger.debug(f"Memory cleanup failed for task {task_id}: {e}")
  281. def _parse_with_markitdown(self, file_path: Path, file_name: str,
  282. output_path: Path):
  283. """
  284. 使用 markitdown 解析文档(支持 Office、HTML、文本等多种格式)
  285. Args:
  286. file_path: 文件路径
  287. file_name: 文件名
  288. output_path: 输出路径
  289. """
  290. if not MARKITDOWN_AVAILABLE or self.markitdown is None:
  291. raise RuntimeError("markitdown is not available. Please install it: pip install markitdown")
  292. logger.info(f"📊 Using MarkItDown to parse: {file_name}")
  293. # 使用 markitdown 转换文档
  294. result = self.markitdown.convert(str(file_path))
  295. # 保存为 markdown 文件
  296. output_file = output_path / f"{Path(file_name).stem}.md"
  297. output_file.write_text(result.text_content, encoding='utf-8')
  298. logger.info(f"📝 Markdown saved to: {output_file}")
  299. def predict(self, action):
  300. """
  301. HTTP 接口(主要用于健康检查和监控)
  302. 现在任务由 worker 循环自动拉取处理,这个接口主要用于:
  303. 1. 健康检查
  304. 2. 获取 worker 状态
  305. 3. 兼容旧的手动触发模式(当 enable_worker_loop=False 时)
  306. """
  307. if action == 'health':
  308. # 健康检查
  309. stats = self.db.get_queue_stats()
  310. return {
  311. 'status': 'healthy',
  312. 'worker_id': self.worker_id,
  313. 'worker_loop_enabled': self.enable_worker_loop,
  314. 'worker_running': self.running,
  315. 'queue_stats': stats
  316. }
  317. elif action == 'poll':
  318. if not self.enable_worker_loop:
  319. # 兼容模式:手动触发任务拉取
  320. task = self.db.get_next_task(self.worker_id)
  321. if not task:
  322. return {
  323. 'status': 'idle',
  324. 'message': 'No pending tasks in queue',
  325. 'worker_id': self.worker_id
  326. }
  327. try:
  328. self._process_task(task)
  329. return {
  330. 'status': 'completed',
  331. 'task_id': task['task_id'],
  332. 'worker_id': self.worker_id
  333. }
  334. except Exception as e:
  335. return {
  336. 'status': 'failed',
  337. 'task_id': task['task_id'],
  338. 'error': str(e),
  339. 'worker_id': self.worker_id
  340. }
  341. else:
  342. # Worker 循环模式:返回状态信息
  343. return {
  344. 'status': 'auto_mode',
  345. 'message': 'Worker is running in auto-loop mode, tasks are processed automatically',
  346. 'worker_id': self.worker_id,
  347. 'worker_running': self.running
  348. }
  349. else:
  350. return {
  351. 'status': 'error',
  352. 'message': f'Invalid action: {action}. Use "health" or "poll".',
  353. 'worker_id': self.worker_id
  354. }
  355. def encode_response(self, response):
  356. """编码响应"""
  357. return response
  358. def start_litserve_workers(
  359. output_dir='/tmp/mineru_tianshu_output',
  360. accelerator='auto',
  361. devices='auto',
  362. workers_per_device=1,
  363. port=9000,
  364. poll_interval=0.5,
  365. enable_worker_loop=True
  366. ):
  367. """
  368. 启动 LitServe Worker Pool
  369. Args:
  370. output_dir: 输出目录
  371. accelerator: 加速器类型 (auto/cuda/cpu/mps)
  372. devices: 使用的设备 (auto/[0,1,2])
  373. workers_per_device: 每个 GPU 的 worker 数量
  374. port: 服务端口
  375. poll_interval: Worker 拉取任务的间隔(秒)
  376. enable_worker_loop: 是否启用 worker 自动循环拉取任务
  377. """
  378. logger.info("=" * 60)
  379. logger.info("🚀 Starting MinerU Tianshu LitServe Worker Pool")
  380. logger.info("=" * 60)
  381. logger.info(f"📂 Output Directory: {output_dir}")
  382. logger.info(f"🎮 Accelerator: {accelerator}")
  383. logger.info(f"💾 Devices: {devices}")
  384. logger.info(f"👷 Workers per Device: {workers_per_device}")
  385. logger.info(f"🔌 Port: {port}")
  386. logger.info(f"🔄 Worker Loop: {'Enabled' if enable_worker_loop else 'Disabled'}")
  387. if enable_worker_loop:
  388. logger.info(f"⏱️ Poll Interval: {poll_interval}s")
  389. logger.info("=" * 60)
  390. # 创建 LitServe 服务器
  391. api = MinerUWorkerAPI(
  392. output_dir=output_dir,
  393. poll_interval=poll_interval,
  394. enable_worker_loop=enable_worker_loop
  395. )
  396. server = ls.LitServer(
  397. api,
  398. accelerator=accelerator,
  399. devices=devices,
  400. workers_per_device=workers_per_device,
  401. timeout=False, # 不设置超时
  402. )
  403. # 注册优雅关闭处理器
  404. def graceful_shutdown(signum=None, frame=None):
  405. """处理关闭信号,优雅地停止 worker"""
  406. logger.info("🛑 Received shutdown signal, gracefully stopping workers...")
  407. # 注意:LitServe 会为每个设备创建多个 worker 实例
  408. # 这里的 api 只是模板,实际的 worker 实例由 LitServe 管理
  409. # teardown 会在每个 worker 进程中被调用
  410. if hasattr(api, 'teardown'):
  411. api.teardown()
  412. sys.exit(0)
  413. # 注册信号处理器(Ctrl+C 等)
  414. signal.signal(signal.SIGINT, graceful_shutdown)
  415. signal.signal(signal.SIGTERM, graceful_shutdown)
  416. # 注册 atexit 处理器(正常退出时调用)
  417. atexit.register(lambda: api.teardown() if hasattr(api, 'teardown') else None)
  418. logger.info(f"✅ LitServe worker pool initialized")
  419. logger.info(f"📡 Listening on: http://0.0.0.0:{port}/predict")
  420. if enable_worker_loop:
  421. logger.info(f"🔁 Workers will continuously poll and process tasks")
  422. else:
  423. logger.info(f"🔄 Workers will wait for scheduler triggers")
  424. logger.info("=" * 60)
  425. # 启动服务器
  426. server.run(port=port, generate_client_file=False)
  427. if __name__ == '__main__':
  428. import argparse
  429. parser = argparse.ArgumentParser(description='MinerU Tianshu LitServe Worker Pool')
  430. parser.add_argument('--output-dir', type=str, default='/tmp/mineru_tianshu_output',
  431. help='Output directory for processed files')
  432. parser.add_argument('--accelerator', type=str, default='auto',
  433. choices=['auto', 'cuda', 'cpu', 'mps'],
  434. help='Accelerator type')
  435. parser.add_argument('--devices', type=str, default='auto',
  436. help='Devices to use (auto or comma-separated list like 0,1,2)')
  437. parser.add_argument('--workers-per-device', type=int, default=1,
  438. help='Number of workers per device')
  439. parser.add_argument('--port', type=int, default=9000,
  440. help='Server port')
  441. parser.add_argument('--poll-interval', type=float, default=0.5,
  442. help='Worker poll interval in seconds (default: 0.5)')
  443. parser.add_argument('--disable-worker-loop', action='store_true',
  444. help='Disable worker auto-loop mode (use scheduler-driven mode)')
  445. args = parser.parse_args()
  446. # 处理 devices 参数
  447. devices = args.devices
  448. if devices != 'auto':
  449. try:
  450. devices = [int(d) for d in devices.split(',')]
  451. except:
  452. logger.warning(f"Invalid devices format: {devices}, using 'auto'")
  453. devices = 'auto'
  454. start_litserve_workers(
  455. output_dir=args.output_dir,
  456. accelerator=args.accelerator,
  457. devices=devices,
  458. workers_per_device=args.workers_per_device,
  459. port=args.port,
  460. poll_interval=args.poll_interval,
  461. enable_worker_loop=not args.disable_worker_loop
  462. )