| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- #!/usr/bin/env python3
- """
- 发送解析任务到 Kafka(供 Flink 消费)
- """
- import json
- import time
- import sys
- import os
- # Kafka 配置
- KAFKA_BOOTSTRAP_SERVERS = "10.192.72.13:9092"
- KAFKA_TASK_TOPIC = "parse-tasks" # 任务消息 topic
- # 测试文件
- TEST_FILES = [
- "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx",
- "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf",
- ]
- def send_task_to_kafka(producer, file_path, task_id=None):
- """发送单个任务到 Kafka"""
- import uuid
- if task_id is None:
- task_id = f"task-{uuid.uuid4().hex[:8]}"
- task_message = {
- "taskId": task_id,
- "filePath": file_path,
- "fileName": os.path.basename(file_path),
- "timestamp": int(time.time() * 1000),
- "priority": "normal"
- }
- print(f"发送任务: taskId={task_id}, file={task_message['fileName']}")
- try:
- future = producer.send(KAFKA_TASK_TOPIC, task_message)
- future.get(timeout=10)
- print(f" ✅ 任务已发送到 Kafka topic: {KAFKA_TASK_TOPIC}")
- return True
- except Exception as e:
- print(f" ❌ 发送失败: {e}")
- return False
- def main():
- print("\n")
- print("╔" + "=" * 58 + "╗")
- print("║" + " " * 10 + "发送解析任务到 Kafka" + " " * 26 + "║")
- print("╚" + "=" * 58 + "╝")
- print()
- print(f"流程: 任务消息 → Kafka ({KAFKA_TASK_TOPIC}) → Flink → parse-service → ...")
- print()
- # 检查文件
- valid_files = []
- for file_path in TEST_FILES:
- if os.path.exists(file_path):
- valid_files.append(file_path)
- print(f"✅ 文件存在: {os.path.basename(file_path)}")
- else:
- print(f"❌ 文件不存在: {file_path}")
- if not valid_files:
- print("\n没有有效的测试文件!")
- return 1
- # 初始化 Kafka Producer
- print("\n初始化 Kafka Producer...")
- try:
- from kafka import KafkaProducer
- producer = KafkaProducer(
- bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS.split(","),
- value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"),
- retries=3,
- acks="all"
- )
- print(f"✅ Kafka Producer 初始化成功: {KAFKA_BOOTSTRAP_SERVERS}")
- except Exception as e:
- print(f"❌ Kafka Producer 初始化失败: {e}")
- print(" 请确保 kafka-python 已安装: pip install kafka-python")
- return 1
- # 发送任务
- print()
- print("=" * 60)
- print(f"发送 {len(valid_files)} 个任务到 Kafka...")
- print("=" * 60)
- print()
- success_count = 0
- for file_path in valid_files:
- if send_task_to_kafka(producer, file_path):
- success_count += 1
- time.sleep(0.5)
- producer.close()
- # 总结
- print()
- print("=" * 60)
- print("发送完成!")
- print("=" * 60)
- print(f"总计: {len(valid_files)}")
- print(f"成功: {success_count}")
- print()
- print("接下来:")
- print(f" 1. 启动 Flink Job 消费 topic: {KAFKA_TASK_TOPIC}")
- print(" 2. Flink 会调用 parse-service 解析文件")
- print(" 3. 解析结果发送到 embedding-topic")
- print(" 4. schedule-embedding-api 消费并索引到 ES")
- print()
- return 0 if success_count == len(valid_files) else 1
- if __name__ == "__main__":
- sys.exit(main())
|