Przeglądaj źródła

增加接口将服务器的文件遍历,整合文件读取数据到kafaka的Topic

sunhy 2 tygodni temu
rodzic
commit
a5302676af

+ 20 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/controller/ProducerController.java

@@ -2,6 +2,7 @@ package cn.com.yusys.producer.controller;
 
 import cn.com.yusys.producer.model.Message;
 import cn.com.yusys.producer.service.MessageSender;
+import cn.com.yusys.producer.service.FileScannerService;
 import org.springframework.web.bind.annotation.*;
 
 import javax.annotation.Resource;
@@ -15,6 +16,9 @@ public class ProducerController {
     @Resource
     private MessageSender messageSender;
 
+    @Resource
+    private FileScannerService fileScannerService;
+
     /**
      * 通用发送接口
      */
@@ -55,4 +59,20 @@ public class ProducerController {
         res.put("msg", "Messages sent to multiple topics: " + String.join(", ", topics));
         return res;
     }
+
+    /**
+     * 扫描文件夹并将文件信息发送到对应的Kafka Topic
+     */
+    @PostMapping("/scan-and-send")
+    public Map<String, Object> scanAndSend(@RequestParam("rootDir") String rootDir, @RequestBody Map<String, String> topicMap) {
+        return fileScannerService.scanAndSend(rootDir, topicMap);
+    }
+
+    /**
+     * 扫描文件夹并将文件信息发送到对应的Kafka Topic(测试模式)
+     */
+    @PostMapping("/scan-and-send/test")
+    public Map<String, Object> scanAndSendTest(@RequestParam("rootDir") String rootDir, @RequestBody Map<String, String> topicMap) {
+        return fileScannerService.scanAndSend(rootDir, topicMap, true);
+    }
 }

+ 116 - 0
schedule-producer/src/main/java/cn/com/yusys/producer/service/FileScannerService.java

@@ -0,0 +1,116 @@
+package cn.com.yusys.producer.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+@Service
+public class FileScannerService {
+    private static final Logger log = LoggerFactory.getLogger(FileScannerService.class);
+
+    @Resource
+    private MessageSender messageSender;
+
+    /**
+     * 扫描文件夹并将文件信息发送到对应的Kafka Topic
+     * @param rootDir 根目录路径
+     * @param topicMap 文件夹到Topic的映射关系
+     * @param testMode 是否启用测试模式
+     * @return 扫描结果
+     */
+    public Map<String, Object> scanAndSend(String rootDir, Map<String, String> topicMap, boolean testMode) {
+        Map<String, Object> result = new HashMap<>();
+        int totalFiles = 0;
+        int successCount = 0;
+        int failedCount = 0;
+
+        try {
+            File root = new File(rootDir);
+            if (!root.exists() || !root.isDirectory()) {
+                result.put("status", "error");
+                result.put("message", "Root directory does not exist or is not a directory");
+                return result;
+            }
+
+            // 遍历根目录下的所有文件夹(扁平化结构)
+            File[] dirs = root.listFiles(File::isDirectory);
+            if (dirs != null) {
+                int dirCount = 0;
+                for (File dir : dirs) {
+                    String dirName = dir.getName();
+                    // 使用topicMap中的映射关系,若不存在则使用文件夹名称作为默认值
+                    String topicName = topicMap.getOrDefault(dirName, dirName);
+
+                    // 遍历文件夹下的所有文件
+                    File[] files = dir.listFiles(File::isFile);
+                    if (files != null) {
+                        int fileCount = 0;
+                        for (File file : files) {
+                            totalFiles++;
+                            try {
+                                // 构建消息内容:直接使用文件路径作为消息体
+                                String fileName = file.getName();
+                                String filePath = file.getAbsolutePath();
+
+                                // 保持原始编码,Java字符串已经是Unicode,不需要额外转换
+                                // 直接使用原始文件路径和文件名
+                                log.info("Processing file: {}", filePath);
+
+                                // 发送消息到对应的Topic
+                                messageSender.send(topicName, fileName, filePath);
+                                successCount++;
+                            } catch (Exception e) {
+                                log.error("Failed to send file info: {}", file.getAbsolutePath(), e);
+                                failedCount++;
+                            }
+                            
+                            // // 测试模式下只处理3个文件
+                            // if (testMode) {
+                            //     fileCount++;
+                            //     if (fileCount >= 3) {
+                            //         break;
+                            //     }
+                            // }
+                        }
+                    }
+                    
+                    // 测试模式下只处理1个文件夹
+                    if (testMode) {
+                        dirCount++;
+                        if (dirCount >= 1) {
+                            break;
+                        }
+                    }
+                }
+            }
+
+            result.put("status", "success");
+            result.put("totalFiles", totalFiles);
+            result.put("successCount", successCount);
+            result.put("failedCount", failedCount);
+            result.put("message", "File scanning and sending completed" + (testMode ? " (test mode)" : ""));
+
+        } catch (Exception e) {
+            log.error("Error during file scanning and sending", e);
+            result.put("status", "error");
+            result.put("message", "Error: " + e.getMessage());
+        }
+
+        return result;
+    }
+    
+    /**
+     * 扫描文件夹并将文件信息发送到对应的Kafka Topic(默认非测试模式)
+     * @param rootDir 根目录路径
+     * @param topicMap 文件夹到Topic的映射关系
+     * @return 扫描结果
+     */
+    public Map<String, Object> scanAndSend(String rootDir, Map<String, String> topicMap) {
+        return scanAndSend(rootDir, topicMap, false);
+    }
+}