#!/usr/bin/env python3 """ 发送解析任务到 Kafka(供 Flink 消费) """ import json import time import sys import os # Kafka 配置 KAFKA_BOOTSTRAP_SERVERS = "10.192.72.13:9092" KAFKA_TASK_TOPIC = "parse-tasks" # 任务消息 topic # 测试文件 TEST_FILES = [ "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/5.docx", "/Users/chaizi/Work/Projects/yusys/ai-study/code/four-level-schedule/duomotai/examples/20以内口算.pdf", ] def send_task_to_kafka(producer, file_path, task_id=None): """发送单个任务到 Kafka""" import uuid if task_id is None: task_id = f"task-{uuid.uuid4().hex[:8]}" task_message = { "taskId": task_id, "filePath": file_path, "fileName": os.path.basename(file_path), "timestamp": int(time.time() * 1000), "priority": "normal" } print(f"发送任务: taskId={task_id}, file={task_message['fileName']}") try: future = producer.send(KAFKA_TASK_TOPIC, task_message) future.get(timeout=10) print(f" ✅ 任务已发送到 Kafka topic: {KAFKA_TASK_TOPIC}") return True except Exception as e: print(f" ❌ 发送失败: {e}") return False def main(): print("\n") print("╔" + "=" * 58 + "╗") print("║" + " " * 10 + "发送解析任务到 Kafka" + " " * 26 + "║") print("╚" + "=" * 58 + "╝") print() print(f"流程: 任务消息 → Kafka ({KAFKA_TASK_TOPIC}) → Flink → parse-service → ...") print() # 检查文件 valid_files = [] for file_path in TEST_FILES: if os.path.exists(file_path): valid_files.append(file_path) print(f"✅ 文件存在: {os.path.basename(file_path)}") else: print(f"❌ 文件不存在: {file_path}") if not valid_files: print("\n没有有效的测试文件!") return 1 # 初始化 Kafka Producer print("\n初始化 Kafka Producer...") try: from kafka import KafkaProducer producer = KafkaProducer( bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS.split(","), value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode("utf-8"), retries=3, acks="all" ) print(f"✅ Kafka Producer 初始化成功: {KAFKA_BOOTSTRAP_SERVERS}") except Exception as e: print(f"❌ Kafka Producer 初始化失败: {e}") print(" 请确保 kafka-python 已安装: pip install kafka-python") return 1 # 发送任务 print() print("=" * 60) print(f"发送 {len(valid_files)} 个任务到 Kafka...") print("=" * 60) print() success_count = 0 for file_path in valid_files: if send_task_to_kafka(producer, file_path): success_count += 1 time.sleep(0.5) producer.close() # 总结 print() print("=" * 60) print("发送完成!") print("=" * 60) print(f"总计: {len(valid_files)}") print(f"成功: {success_count}") print() print("接下来:") print(f" 1. 启动 Flink Job 消费 topic: {KAFKA_TASK_TOPIC}") print(" 2. Flink 会调用 parse-service 解析文件") print(" 3. 解析结果发送到 embedding-topic") print(" 4. schedule-embedding-api 消费并索引到 ES") print() return 0 if success_count == len(valid_files) else 1 if __name__ == "__main__": sys.exit(main())