litserve_worker.py 19 KB


  1. """
  2. MinerU Tianshu - LitServe Worker
  3. 天枢 LitServe Worker
  4. 使用 LitServe 实现 GPU 资源的自动负载均衡
  5. Worker 主动循环拉取任务并处理
  6. """
  7. import os
  8. import json
  9. import sys
  10. import time
  11. import threading
  12. from pathlib import Path
  13. import litserve as ls
  14. from loguru import logger
  15. # 添加父目录到路径以导入 MinerU
  16. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  17. from task_db import TaskDB
  18. from mineru.cli.common import do_parse, read_fn
  19. from mineru.utils.config_reader import get_device
  20. from mineru.utils.model_utils import get_vram, clean_memory
  21. # 尝试导入 markitdown
  22. try:
  23. from markitdown import MarkItDown
  24. MARKITDOWN_AVAILABLE = True
  25. except ImportError:
  26. MARKITDOWN_AVAILABLE = False
  27. logger.warning("⚠️ markitdown not available, Office format parsing will be disabled")
  28. class MinerUWorkerAPI(ls.LitAPI):
  29. """
  30. LitServe API Worker
  31. Worker 主动循环拉取任务,利用 LitServe 的自动 GPU 负载均衡
  32. 支持两种解析方式:
  33. - PDF/图片 -> MinerU 解析(GPU 加速)
  34. - 其他所有格式 -> MarkItDown 解析(快速处理)
  35. 新模式:每个 worker 启动后持续循环拉取任务,处理完一个立即拉取下一个
  36. """
  37. # 支持的文件格式定义
  38. # MinerU 专用格式:PDF 和图片
  39. PDF_IMAGE_FORMATS = {'.pdf', '.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif', '.webp'}
  40. # 其他所有格式都使用 MarkItDown 解析
  41. def __init__(self, output_dir='/tmp/mineru_tianshu_output', worker_id_prefix='tianshu',
  42. poll_interval=0.5, enable_worker_loop=True):
  43. super().__init__()
  44. self.output_dir = Path(output_dir)
  45. self.output_dir.mkdir(parents=True, exist_ok=True)
  46. self.worker_id_prefix = worker_id_prefix
  47. self.poll_interval = poll_interval # Worker 拉取任务的间隔(秒)
  48. self.enable_worker_loop = enable_worker_loop # 是否启用 worker 循环拉取
  49. self.db = TaskDB()
  50. self.worker_id = None
  51. self.markitdown = None
  52. self.running = False # Worker 运行状态
  53. self.worker_thread = None # Worker 线程
  54. def setup(self, device):
  55. """
  56. 初始化环境(每个 worker 进程调用一次)
  57. 关键修复:使用 CUDA_VISIBLE_DEVICES 确保每个进程只使用分配的 GPU
  58. Args:
  59. device: LitServe 分配的设备 (cuda:0, cuda:1, etc.)
  60. """
  61. # 生成唯一的 worker_id
  62. import socket
  63. hostname = socket.gethostname()
  64. pid = os.getpid()
  65. self.worker_id = f"{self.worker_id_prefix}-{hostname}-{device}-{pid}"
  66. logger.info(f"⚙️ Worker {self.worker_id} setting up on device: {device}")
  67. # 关键修复:设置 CUDA_VISIBLE_DEVICES 限制进程只能看到分配的 GPU
  68. # 这样可以防止一个进程占用多张卡的显存
  69. if device != 'auto' and device != 'cpu' and ':' in str(device):
  70. # 从 'cuda:0' 提取设备ID '0'
  71. device_id = str(device).split(':')[-1]
  72. os.environ['CUDA_VISIBLE_DEVICES'] = device_id
  73. # 设置为 cuda:0,因为对进程来说只能看到一张卡(逻辑ID变为0)
  74. os.environ['MINERU_DEVICE_MODE'] = 'cuda:0'
  75. device_mode = 'cuda:0'
  76. logger.info(f"🔒 CUDA_VISIBLE_DEVICES={device_id} (Physical GPU {device_id} → Logical GPU 0)")
  77. else:
  78. # 配置 MinerU 环境
  79. if os.getenv('MINERU_DEVICE_MODE', None) is None:
  80. os.environ['MINERU_DEVICE_MODE'] = device if device != 'auto' else get_device()
  81. device_mode = os.environ['MINERU_DEVICE_MODE']
  82. # 配置显存
  83. if os.getenv('MINERU_VIRTUAL_VRAM_SIZE', None) is None:
  84. if device_mode.startswith("cuda") or device_mode.startswith("npu"):
  85. try:
  86. vram = round(get_vram(device_mode))
  87. os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = str(vram)
  88. except:
  89. os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = '8' # 默认值
  90. else:
  91. os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = '1'
  92. # 初始化 MarkItDown(如果可用)
  93. if MARKITDOWN_AVAILABLE:
  94. self.markitdown = MarkItDown()
  95. logger.info(f"✅ MarkItDown initialized for Office format parsing")
  96. logger.info(f"✅ Worker {self.worker_id} ready")
  97. logger.info(f" Device: {device_mode}")
  98. logger.info(f" VRAM: {os.environ['MINERU_VIRTUAL_VRAM_SIZE']}GB")
  99. # 启动 worker 循环拉取任务(在独立线程中)
  100. if self.enable_worker_loop:
  101. self.running = True
  102. self.worker_thread = threading.Thread(
  103. target=self._worker_loop,
  104. daemon=True,
  105. name=f"Worker-{self.worker_id}"
  106. )
  107. self.worker_thread.start()
  108. logger.info(f"🔄 Worker loop started (poll_interval={self.poll_interval}s)")
  109. def _worker_loop(self):
  110. """
  111. Worker 主循环:持续拉取并处理任务
  112. 这个方法在独立线程中运行,让每个 worker 主动拉取任务
  113. 而不是被动等待调度器触发
  114. """
  115. logger.info(f"🔁 {self.worker_id} started task polling loop")
  116. idle_count = 0
  117. while self.running:
  118. try:
  119. # 从数据库获取任务
  120. task = self.db.get_next_task(self.worker_id)
  121. if task:
  122. idle_count = 0 # 重置空闲计数
  123. # 处理任务
  124. task_id = task['task_id']
  125. logger.info(f"🔄 {self.worker_id} picked up task {task_id}")
  126. try:
  127. self._process_task(task)
  128. except Exception as e:
  129. logger.error(f"❌ {self.worker_id} failed to process task {task_id}: {e}")
  130. success = self.db.update_task_status(
  131. task_id, 'failed',
  132. error_message=str(e),
  133. worker_id=self.worker_id
  134. )
  135. if not success:
  136. logger.warning(f"⚠️ Task {task_id} was modified by another process during failure update")
  137. else:
  138. # 没有任务时,增加空闲计数
  139. idle_count += 1
  140. # 只在第一次空闲时记录日志,避免刷屏
  141. if idle_count == 1:
  142. logger.debug(f"💤 {self.worker_id} is idle, waiting for tasks...")
  143. # 空闲时等待一段时间再拉取
  144. time.sleep(self.poll_interval)
  145. except Exception as e:
  146. logger.error(f"❌ {self.worker_id} loop error: {e}")
  147. time.sleep(self.poll_interval)
  148. logger.info(f"⏹️ {self.worker_id} stopped task polling loop")
  149. def _process_task(self, task: dict):
  150. """
  151. 处理单个任务
  152. Args:
  153. task: 任务字典
  154. """
  155. task_id = task['task_id']
  156. file_path = task['file_path']
  157. file_name = task['file_name']
  158. backend = task['backend']
  159. options = json.loads(task['options'])
  160. logger.info(f"🔄 Processing task {task_id}: {file_name}")
  161. try:
  162. # 准备输出目录
  163. output_path = self.output_dir / task_id
  164. output_path.mkdir(parents=True, exist_ok=True)
  165. # 判断文件类型并选择解析方式
  166. file_type = self._get_file_type(file_path)
  167. if file_type == 'pdf_image':
  168. # 使用 MinerU 解析 PDF 和图片
  169. self._parse_with_mineru(
  170. file_path=Path(file_path),
  171. file_name=file_name,
  172. task_id=task_id,
  173. backend=backend,
  174. options=options,
  175. output_path=output_path
  176. )
  177. parse_method = 'MinerU'
  178. else: # file_type == 'markitdown'
  179. # 使用 markitdown 解析所有其他格式
  180. self._parse_with_markitdown(
  181. file_path=Path(file_path),
  182. file_name=file_name,
  183. output_path=output_path
  184. )
  185. parse_method = 'MarkItDown'
  186. # 更新状态为成功
  187. success = self.db.update_task_status(
  188. task_id, 'completed',
  189. result_path=str(output_path),
  190. worker_id=self.worker_id
  191. )
  192. if success:
  193. logger.info(f"✅ Task {task_id} completed by {self.worker_id}")
  194. logger.info(f" Parser: {parse_method}")
  195. logger.info(f" Output: {output_path}")
  196. else:
  197. logger.warning(
  198. f"⚠️ Task {task_id} was modified by another process. "
  199. f"Worker {self.worker_id} completed the work but status update was rejected."
  200. )
  201. finally:
  202. # 清理临时文件
  203. try:
  204. if Path(file_path).exists():
  205. Path(file_path).unlink()
  206. except Exception as e:
  207. logger.warning(f"Failed to clean up temp file {file_path}: {e}")
  208. def decode_request(self, request):
  209. """
  210. 解码请求
  211. 现在主要用于健康检查和手动触发(兼容旧接口)
  212. """
  213. return request.get('action', 'poll')
  214. def _get_file_type(self, file_path: str) -> str:
  215. """
  216. 判断文件类型
  217. Args:
  218. file_path: 文件路径
  219. Returns:
  220. 'pdf_image': PDF 或图片格式,使用 MinerU 解析
  221. 'markitdown': 其他所有格式,使用 markitdown 解析
  222. """
  223. suffix = Path(file_path).suffix.lower()
  224. if suffix in self.PDF_IMAGE_FORMATS:
  225. return 'pdf_image'
  226. else:
  227. # 所有非 PDF/图片格式都使用 markitdown
  228. return 'markitdown'
  229. def _parse_with_mineru(self, file_path: Path, file_name: str, task_id: str,
  230. backend: str, options: dict, output_path: Path):
  231. """
  232. 使用 MinerU 解析 PDF 和图片格式
  233. Args:
  234. file_path: 文件路径
  235. file_name: 文件名
  236. task_id: 任务ID
  237. backend: 后端类型
  238. options: 解析选项
  239. output_path: 输出路径
  240. """
  241. logger.info(f"📄 Using MinerU to parse: {file_name}")
  242. try:
  243. # 读取文件
  244. pdf_bytes = read_fn(file_path)
  245. # 执行解析(MinerU 的 ModelSingleton 会自动复用模型)
  246. do_parse(
  247. output_dir=str(output_path),
  248. pdf_file_names=[Path(file_name).stem],
  249. pdf_bytes_list=[pdf_bytes],
  250. p_lang_list=[options.get('lang', 'ch')],
  251. backend=backend,
  252. parse_method=options.get('method', 'auto'),
  253. formula_enable=options.get('formula_enable', True),
  254. table_enable=options.get('table_enable', True),
  255. )
  256. finally:
  257. # 使用 MinerU 自带的内存清理函数
  258. # 这个函数只清理推理产生的中间结果,不会卸载模型
  259. try:
  260. clean_memory()
  261. except Exception as e:
  262. logger.debug(f"Memory cleanup: {e}")
  263. def _parse_with_markitdown(self, file_path: Path, file_name: str,
  264. output_path: Path):
  265. """
  266. 使用 markitdown 解析文档(支持 Office、HTML、文本等多种格式)
  267. Args:
  268. file_path: 文件路径
  269. file_name: 文件名
  270. output_path: 输出路径
  271. """
  272. if not MARKITDOWN_AVAILABLE or self.markitdown is None:
  273. raise RuntimeError("markitdown is not available. Please install it: pip install markitdown")
  274. logger.info(f"📊 Using MarkItDown to parse: {file_name}")
  275. # 使用 markitdown 转换文档
  276. result = self.markitdown.convert(str(file_path))
  277. # 保存为 markdown 文件
  278. output_file = output_path / f"{Path(file_name).stem}.md"
  279. output_file.write_text(result.text_content, encoding='utf-8')
  280. logger.info(f"📝 Markdown saved to: {output_file}")
  281. def predict(self, action):
  282. """
  283. HTTP 接口(主要用于健康检查和监控)
  284. 现在任务由 worker 循环自动拉取处理,这个接口主要用于:
  285. 1. 健康检查
  286. 2. 获取 worker 状态
  287. 3. 兼容旧的手动触发模式(当 enable_worker_loop=False 时)
  288. """
  289. if action == 'health':
  290. # 健康检查
  291. stats = self.db.get_queue_stats()
  292. return {
  293. 'status': 'healthy',
  294. 'worker_id': self.worker_id,
  295. 'worker_loop_enabled': self.enable_worker_loop,
  296. 'worker_running': self.running,
  297. 'queue_stats': stats
  298. }
  299. elif action == 'poll':
  300. if not self.enable_worker_loop:
  301. # 兼容模式:手动触发任务拉取
  302. task = self.db.get_next_task(self.worker_id)
  303. if not task:
  304. return {
  305. 'status': 'idle',
  306. 'message': 'No pending tasks in queue',
  307. 'worker_id': self.worker_id
  308. }
  309. try:
  310. self._process_task(task)
  311. return {
  312. 'status': 'completed',
  313. 'task_id': task['task_id'],
  314. 'worker_id': self.worker_id
  315. }
  316. except Exception as e:
  317. return {
  318. 'status': 'failed',
  319. 'task_id': task['task_id'],
  320. 'error': str(e),
  321. 'worker_id': self.worker_id
  322. }
  323. else:
  324. # Worker 循环模式:返回状态信息
  325. return {
  326. 'status': 'auto_mode',
  327. 'message': 'Worker is running in auto-loop mode, tasks are processed automatically',
  328. 'worker_id': self.worker_id,
  329. 'worker_running': self.running
  330. }
  331. else:
  332. return {
  333. 'status': 'error',
  334. 'message': f'Invalid action: {action}. Use "health" or "poll".',
  335. 'worker_id': self.worker_id
  336. }
  337. def encode_response(self, response):
  338. """编码响应"""
  339. return response
  340. def start_litserve_workers(
  341. output_dir='/tmp/mineru_tianshu_output',
  342. accelerator='auto',
  343. devices='auto',
  344. workers_per_device=1,
  345. port=9000,
  346. poll_interval=0.5,
  347. enable_worker_loop=True
  348. ):
  349. """
  350. 启动 LitServe Worker Pool
  351. Args:
  352. output_dir: 输出目录
  353. accelerator: 加速器类型 (auto/cuda/cpu/mps)
  354. devices: 使用的设备 (auto/[0,1,2])
  355. workers_per_device: 每个 GPU 的 worker 数量
  356. port: 服务端口
  357. poll_interval: Worker 拉取任务的间隔(秒)
  358. enable_worker_loop: 是否启用 worker 自动循环拉取任务
  359. """
  360. logger.info("=" * 60)
  361. logger.info("🚀 Starting MinerU Tianshu LitServe Worker Pool")
  362. logger.info("=" * 60)
  363. logger.info(f"📂 Output Directory: {output_dir}")
  364. logger.info(f"🎮 Accelerator: {accelerator}")
  365. logger.info(f"💾 Devices: {devices}")
  366. logger.info(f"👷 Workers per Device: {workers_per_device}")
  367. logger.info(f"🔌 Port: {port}")
  368. logger.info(f"🔄 Worker Loop: {'Enabled' if enable_worker_loop else 'Disabled'}")
  369. if enable_worker_loop:
  370. logger.info(f"⏱️ Poll Interval: {poll_interval}s")
  371. logger.info("=" * 60)
  372. # 创建 LitServe 服务器
  373. api = MinerUWorkerAPI(
  374. output_dir=output_dir,
  375. poll_interval=poll_interval,
  376. enable_worker_loop=enable_worker_loop
  377. )
  378. server = ls.LitServer(
  379. api,
  380. accelerator=accelerator,
  381. devices=devices,
  382. workers_per_device=workers_per_device,
  383. timeout=False, # 不设置超时
  384. )
  385. logger.info(f"✅ LitServe worker pool initialized")
  386. logger.info(f"📡 Listening on: http://0.0.0.0:{port}/predict")
  387. if enable_worker_loop:
  388. logger.info(f"🔁 Workers will continuously poll and process tasks")
  389. else:
  390. logger.info(f"🔄 Workers will wait for scheduler triggers")
  391. logger.info("=" * 60)
  392. # 启动服务器
  393. server.run(port=port, generate_client_file=False)
  394. if __name__ == '__main__':
  395. import argparse
  396. parser = argparse.ArgumentParser(description='MinerU Tianshu LitServe Worker Pool')
  397. parser.add_argument('--output-dir', type=str, default='/tmp/mineru_tianshu_output',
  398. help='Output directory for processed files')
  399. parser.add_argument('--accelerator', type=str, default='auto',
  400. choices=['auto', 'cuda', 'cpu', 'mps'],
  401. help='Accelerator type')
  402. parser.add_argument('--devices', type=str, default='auto',
  403. help='Devices to use (auto or comma-separated list like 0,1,2)')
  404. parser.add_argument('--workers-per-device', type=int, default=1,
  405. help='Number of workers per device')
  406. parser.add_argument('--port', type=int, default=9000,
  407. help='Server port')
  408. parser.add_argument('--poll-interval', type=float, default=0.5,
  409. help='Worker poll interval in seconds (default: 0.5)')
  410. parser.add_argument('--disable-worker-loop', action='store_true',
  411. help='Disable worker auto-loop mode (use scheduler-driven mode)')
  412. args = parser.parse_args()
  413. # 处理 devices 参数
  414. devices = args.devices
  415. if devices != 'auto':
  416. try:
  417. devices = [int(d) for d in devices.split(',')]
  418. except:
  419. logger.warning(f"Invalid devices format: {devices}, using 'auto'")
  420. devices = 'auto'
  421. start_litserve_workers(
  422. output_dir=args.output_dir,
  423. accelerator=args.accelerator,
  424. devices=devices,
  425. workers_per_device=args.workers_per_device,
  426. port=args.port,
  427. poll_interval=args.poll_interval,
  428. enable_worker_loop=not args.disable_worker_loop
  429. )