task_scheduler.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. """
  2. MinerU Tianshu - Task Scheduler (Optional)
  3. 天枢任务调度器(可选)
  4. 在 Worker 自动循环模式下,调度器主要用于:
  5. 1. 监控队列状态(默认5分钟一次)
  6. 2. 健康检查(默认15分钟一次)
  7. 3. 统计信息收集
  8. 4. 故障恢复(重置超时任务)
  9. 注意:
  10. - 如果 workers 启用了自动循环模式(默认),则不需要调度器来触发任务处理
  11. - Worker 已经主动工作,调度器只是偶尔检查系统状态
  12. - 较长的间隔可以最小化系统开销,同时保持必要的监控能力
  13. - 5分钟监控、15分钟健康检查对于自动运行的系统来说已经足够及时
  14. """
  15. import asyncio
  16. import aiohttp
  17. from loguru import logger
  18. from task_db import TaskDB
  19. import signal
  20. class TaskScheduler:
  21. """
  22. 任务调度器(可选)
  23. 职责(在 Worker 自动循环模式下):
  24. 1. 监控 SQLite 任务队列状态
  25. 2. 健康检查 Workers
  26. 3. 故障恢复(重置超时任务)
  27. 4. 收集和展示统计信息
  28. 职责(在传统模式下):
  29. 1. 触发 Workers 拉取任务
  30. """
  31. def __init__(
  32. self,
  33. litserve_url='http://localhost:9000/predict',
  34. monitor_interval=300,
  35. health_check_interval=900,
  36. stale_task_timeout=60,
  37. cleanup_old_files_days=7,
  38. cleanup_old_records_days=0,
  39. worker_auto_mode=True
  40. ):
  41. """
  42. 初始化调度器
  43. Args:
  44. litserve_url: LitServe Worker 的 URL
  45. monitor_interval: 监控间隔(秒,默认300秒=5分钟)
  46. health_check_interval: 健康检查间隔(秒,默认900秒=15分钟)
  47. stale_task_timeout: 超时任务重置时间(分钟)
  48. cleanup_old_files_days: 清理多少天前的结果文件(0=禁用,默认7天)
  49. cleanup_old_records_days: 清理多少天前的数据库记录(0=禁用,不推荐删除)
  50. worker_auto_mode: Worker 是否启用自动循环模式
  51. """
  52. self.litserve_url = litserve_url
  53. self.monitor_interval = monitor_interval
  54. self.health_check_interval = health_check_interval
  55. self.stale_task_timeout = stale_task_timeout
  56. self.cleanup_old_files_days = cleanup_old_files_days
  57. self.cleanup_old_records_days = cleanup_old_records_days
  58. self.worker_auto_mode = worker_auto_mode
  59. self.db = TaskDB()
  60. self.running = True
  61. async def check_worker_health(self, session: aiohttp.ClientSession):
  62. """
  63. 检查 worker 健康状态
  64. """
  65. try:
  66. async with session.post(
  67. self.litserve_url,
  68. json={'action': 'health'},
  69. timeout=aiohttp.ClientTimeout(total=10)
  70. ) as resp:
  71. if resp.status == 200:
  72. result = await resp.json()
  73. return result
  74. else:
  75. logger.error(f"Health check failed with status {resp.status}")
  76. return None
  77. except asyncio.TimeoutError:
  78. logger.warning("Health check timeout")
  79. return None
  80. except Exception as e:
  81. logger.error(f"Health check error: {e}")
  82. return None
  83. async def schedule_loop(self):
  84. """
  85. 主监控循环
  86. """
  87. logger.info("🔄 Task scheduler started")
  88. logger.info(f" LitServe URL: {self.litserve_url}")
  89. logger.info(f" Worker Mode: {'Auto-Loop' if self.worker_auto_mode else 'Scheduler-Driven'}")
  90. logger.info(f" Monitor Interval: {self.monitor_interval}s")
  91. logger.info(f" Health Check Interval: {self.health_check_interval}s")
  92. logger.info(f" Stale Task Timeout: {self.stale_task_timeout}m")
  93. if self.cleanup_old_files_days > 0:
  94. logger.info(f" Cleanup Old Files: {self.cleanup_old_files_days} days")
  95. else:
  96. logger.info(f" Cleanup Old Files: Disabled")
  97. if self.cleanup_old_records_days > 0:
  98. logger.info(f" Cleanup Old Records: {self.cleanup_old_records_days} days (Not Recommended)")
  99. else:
  100. logger.info(f" Cleanup Old Records: Disabled (Keep Forever)")
  101. health_check_counter = 0
  102. stale_task_counter = 0
  103. cleanup_counter = 0
  104. async with aiohttp.ClientSession() as session:
  105. while self.running:
  106. try:
  107. # 1. 监控队列状态
  108. stats = self.db.get_queue_stats()
  109. pending_count = stats.get('pending', 0)
  110. processing_count = stats.get('processing', 0)
  111. completed_count = stats.get('completed', 0)
  112. failed_count = stats.get('failed', 0)
  113. if pending_count > 0 or processing_count > 0:
  114. logger.info(
  115. f"📊 Queue: {pending_count} pending, {processing_count} processing, "
  116. f"{completed_count} completed, {failed_count} failed"
  117. )
  118. # 2. 定期健康检查
  119. health_check_counter += 1
  120. if health_check_counter * self.monitor_interval >= self.health_check_interval:
  121. health_check_counter = 0
  122. logger.info("🏥 Performing health check...")
  123. health_result = await self.check_worker_health(session)
  124. if health_result:
  125. logger.info(f"✅ Workers healthy: {health_result}")
  126. else:
  127. logger.warning("⚠️ Workers health check failed")
  128. # 3. 定期重置超时任务
  129. stale_task_counter += 1
  130. if stale_task_counter * self.monitor_interval >= self.stale_task_timeout * 60:
  131. stale_task_counter = 0
  132. reset_count = self.db.reset_stale_tasks(self.stale_task_timeout)
  133. if reset_count > 0:
  134. logger.warning(f"⚠️ Reset {reset_count} stale tasks (timeout: {self.stale_task_timeout}m)")
  135. # 4. 定期清理旧任务文件和记录
  136. cleanup_counter += 1
  137. # 每24小时清理一次(基于当前监控间隔计算)
  138. cleanup_interval_cycles = (24 * 3600) / self.monitor_interval
  139. if cleanup_counter >= cleanup_interval_cycles:
  140. cleanup_counter = 0
  141. # 清理旧结果文件(保留数据库记录)
  142. if self.cleanup_old_files_days > 0:
  143. logger.info(f"🧹 Cleaning up result files older than {self.cleanup_old_files_days} days...")
  144. file_count = self.db.cleanup_old_task_files(days=self.cleanup_old_files_days)
  145. if file_count > 0:
  146. logger.info(f"✅ Cleaned up {file_count} result directories (DB records kept)")
  147. # 清理极旧的数据库记录(可选,默认不启用)
  148. if self.cleanup_old_records_days > 0:
  149. logger.warning(
  150. f"🗑️ Cleaning up database records older than {self.cleanup_old_records_days} days..."
  151. )
  152. record_count = self.db.cleanup_old_task_records(days=self.cleanup_old_records_days)
  153. if record_count > 0:
  154. logger.warning(f"⚠️ Deleted {record_count} task records permanently")
  155. # 等待下一次监控
  156. await asyncio.sleep(self.monitor_interval)
  157. except Exception as e:
  158. logger.error(f"Scheduler loop error: {e}")
  159. await asyncio.sleep(self.monitor_interval)
  160. logger.info("⏹️ Task scheduler stopped")
  161. def start(self):
  162. """启动调度器"""
  163. logger.info("🚀 Starting MinerU Tianshu Task Scheduler...")
  164. # 设置信号处理
  165. def signal_handler(sig, frame):
  166. logger.info("\n🛑 Received stop signal, shutting down...")
  167. self.running = False
  168. signal.signal(signal.SIGINT, signal_handler)
  169. signal.signal(signal.SIGTERM, signal_handler)
  170. # 运行调度循环
  171. asyncio.run(self.schedule_loop())
  172. def stop(self):
  173. """停止调度器"""
  174. self.running = False
  175. async def health_check(litserve_url: str) -> bool:
  176. """
  177. 健康检查:验证 LitServe Worker 是否可用
  178. """
  179. try:
  180. async with aiohttp.ClientSession() as session:
  181. async with session.get(
  182. litserve_url.replace('/predict', '/health'),
  183. timeout=aiohttp.ClientTimeout(total=5)
  184. ) as resp:
  185. return resp.status == 200
  186. except:
  187. return False
  188. if __name__ == '__main__':
  189. import argparse
  190. parser = argparse.ArgumentParser(description='MinerU Tianshu Task Scheduler (Optional)')
  191. parser.add_argument('--litserve-url', type=str, default='http://localhost:9000/predict',
  192. help='LitServe worker URL')
  193. parser.add_argument('--monitor-interval', type=int, default=300,
  194. help='Monitor interval in seconds (default: 300s = 5 minutes)')
  195. parser.add_argument('--health-check-interval', type=int, default=900,
  196. help='Health check interval in seconds (default: 900s = 15 minutes)')
  197. parser.add_argument('--stale-task-timeout', type=int, default=60,
  198. help='Timeout for stale tasks in minutes (default: 60)')
  199. parser.add_argument('--cleanup-old-files-days', type=int, default=7,
  200. help='Delete result files older than N days (0=disable, default: 7)')
  201. parser.add_argument('--cleanup-old-records-days', type=int, default=0,
  202. help='Delete DB records older than N days (0=disable, NOT recommended)')
  203. parser.add_argument('--wait-for-workers', action='store_true',
  204. help='Wait for workers to be ready before starting')
  205. parser.add_argument('--no-worker-auto-mode', action='store_true',
  206. help='Disable worker auto-loop mode assumption')
  207. args = parser.parse_args()
  208. # 等待 workers 就绪(可选)
  209. if args.wait_for_workers:
  210. logger.info("⏳ Waiting for LitServe workers to be ready...")
  211. import time
  212. max_retries = 30
  213. for i in range(max_retries):
  214. if asyncio.run(health_check(args.litserve_url)):
  215. logger.info("✅ LitServe workers are ready!")
  216. break
  217. time.sleep(2)
  218. if i == max_retries - 1:
  219. logger.error("❌ LitServe workers not responding, starting anyway...")
  220. # 创建并启动调度器
  221. scheduler = TaskScheduler(
  222. litserve_url=args.litserve_url,
  223. monitor_interval=args.monitor_interval,
  224. health_check_interval=args.health_check_interval,
  225. stale_task_timeout=args.stale_task_timeout,
  226. cleanup_old_files_days=args.cleanup_old_files_days,
  227. cleanup_old_records_days=args.cleanup_old_records_days,
  228. worker_auto_mode=not args.no_worker_auto_mode
  229. )
  230. try:
  231. scheduler.start()
  232. except KeyboardInterrupt:
  233. logger.info("👋 Scheduler interrupted by user")