send_task_to_kafka.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #!/usr/bin/env python3
  2. """
  3. 发送解析任务到 Kafka(供 Flink 消费)
  4. """
  5. import json
  6. import time
  7. import sys
  8. import os
  9. # Kafka 配置
  10. KAFKA_BOOTSTRAP_SERVERS = "10.192.72.13:9092"
  11. KAFKA_TASK_TOPIC = "parse-tasks" # 任务消息 topic
  12. # 测试文件
  13. TEST_FILES = [
  14. "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx",
  15. "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf",
  16. ]
  17. def send_task_to_kafka(producer, file_path, task_id=None):
  18. """发送单个任务到 Kafka"""
  19. import uuid
  20. if task_id is None:
  21. task_id = f"task-{uuid.uuid4().hex[:8]}"
  22. task_message = {
  23. "taskId": task_id,
  24. "filePath": file_path,
  25. "fileName": os.path.basename(file_path),
  26. "timestamp": int(time.time() * 1000),
  27. "priority": "normal"
  28. }
  29. print(f"发送任务: taskId={task_id}, file={task_message['fileName']}")
  30. try:
  31. future = producer.send(KAFKA_TASK_TOPIC, task_message)
  32. future.get(timeout=10)
  33. print(f" ✅ 任务已发送到 Kafka topic: {KAFKA_TASK_TOPIC}")
  34. return True
  35. except Exception as e:
  36. print(f" ❌ 发送失败: {e}")
  37. return False
  38. def main():
  39. print("\n")
  40. print("╔" + "=" * 58 + "╗")
  41. print("║" + " " * 10 + "发送解析任务到 Kafka" + " " * 26 + "║")
  42. print("╚" + "=" * 58 + "╝")
  43. print()
  44. print(f"流程: 任务消息 → Kafka ({KAFKA_TASK_TOPIC}) → Flink → parse-service → ...")
  45. print()
  46. # 检查文件
  47. valid_files = []
  48. for file_path in TEST_FILES:
  49. if os.path.exists(file_path):
  50. valid_files.append(file_path)
  51. print(f"✅ 文件存在: {os.path.basename(file_path)}")
  52. else:
  53. print(f"❌ 文件不存在: {file_path}")
  54. if not valid_files:
  55. print("\n没有有效的测试文件!")
  56. return 1
  57. # 初始化 Kafka Producer
  58. print("\n初始化 Kafka Producer...")
  59. try:
  60. from kafka import KafkaProducer
  61. producer = KafkaProducer(
  62. bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS.split(","),
  63. value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
  64. retries=3,
  65. acks="all"
  66. )
  67. print(f"✅ Kafka Producer 初始化成功: {KAFKA_BOOTSTRAP_SERVERS}")
  68. except Exception as e:
  69. print(f"❌ Kafka Producer 初始化失败: {e}")
  70. print(" 请确保 kafka-python 已安装: pip install kafka-python")
  71. return 1
  72. # 发送任务
  73. print()
  74. print("=" * 60)
  75. print(f"发送 {len(valid_files)} 个任务到 Kafka...")
  76. print("=" * 60)
  77. print()
  78. success_count = 0
  79. for file_path in valid_files:
  80. if send_task_to_kafka(producer, file_path):
  81. success_count += 1
  82. time.sleep(0.5)
  83. producer.close()
  84. # 总结
  85. print()
  86. print("=" * 60)
  87. print("发送完成!")
  88. print("=" * 60)
  89. print(f"总计: {len(valid_files)}")
  90. print(f"成功: {success_count}")
  91. print()
  92. print("接下来:")
  93. print(f" 1. 启动 Flink Job 消费 topic: {KAFKA_TASK_TOPIC}")
  94. print(" 2. Flink 会调用 parse-service 解析文件")
  95. print(" 3. 解析结果发送到 embedding-topic")
  96. print(" 4. schedule-embedding-api 消费并索引到 ES")
  97. print()
  98. return 0 if success_count == len(valid_files) else 1
  99. if __name__ == "__main__":
  100. sys.exit(main())