当前 GPU 分配:
问题:
图片/扫描 PDF → Topic: parse-image → Flink Job A → parse-service (GPU 0,1)
音频 → Topic: parse-audio → Flink Job B → parse-service (GPU 7)
视频 → Topic: parse-video → Flink Job C → parse-service (GPU 5,6)
原生 PDF/Office → Topic: parse-document → Flink Job D → parse-service (CPU)
| 扩展名 | 类型 | Topic | GPU |
|---|---|---|---|
.jpg, .jpeg, .png, .tiff, .bmp, .gif, .webp |
图片 | parse-image |
0,1 |
.mp3, .wav, .flac, .aac, .m4a, .ogg |
音频 | parse-audio |
7 |
.mp4, .avi, .mov, .mkv, .flv, .wmv |
视频 | parse-video |
5,6 |
.pdf (扫描件) |
扫描 PDF | parse-image |
0,1 |
.pdf (原生) |
原生 PDF | parse-document |
CPU |
.doc, .docx, .ppt, .pptx, .xls, .xlsx, .txt |
Office | parse-document |
CPU |
关键逻辑:只看前 3 页,统计有效字符数
def is_scanned_pdf(file_path):
import fitz # PyMuPDF
doc = fitz.open(file_path)
total_chars = 0
for page_num in range(min(3, len(doc))):
page = doc[page_num]
text = page.get_text()
total_chars += len([c for c in text if c.isalnum()])
doc.close()
return total_chars < 50 # 少于 50 个字符认为是扫描件
| 文件 | 说明 |
|---|---|
send_task_to_kafka_by_type.py |
按类型分发任务到不同 Topic |
# 创建 4 个 Topic
kafka-topics.sh --create --topic parse-image --bootstrap-server 10.192.72.13:9092 --partitions 3 --replication-factor 1
kafka-topics.sh --create --topic parse-audio --bootstrap-server 10.192.72.13:9092 --partitions 3 --replication-factor 1
kafka-topics.sh --create --topic parse-video --bootstrap-server 10.192.72.13:9092 --partitions 3 --replication-factor 1
kafka-topics.sh --create --topic parse-document --bootstrap-server 10.192.72.13:9092 --partitions 3 --replication-factor 1
每个 Job 消费一个 Topic:
ImageParseFlinkJob - 消费 parse-imageAudioParseFlinkJob - 消费 parse-audioVideoParseFlinkJob - 消费 parse-videoDocumentParseFlinkJob - 消费 parse-document每个类型的任务绑定对应的 GPU 实例:
parse-imageparse-audioparse-videoparse-documentsend_task_to_kafka_by_type.py(新创建)send_task_to_kafka.py(旧版本,单 Topic)schedule-flink/src/main/java/com/yusys/flink/RealFlinkJob.java(需要扩展为多 Job)