|
@@ -5,15 +5,18 @@ import cn.com.yusys.consumer.model.TaskRecordRequest;
|
|
|
import cn.com.yusys.consumer.util.ParseServiceClient;
|
|
import cn.com.yusys.consumer.util.ParseServiceClient;
|
|
|
import cn.com.yusys.consumer.util.TaskRecordClient;
|
|
import cn.com.yusys.consumer.util.TaskRecordClient;
|
|
|
import cn.com.yusys.consumer.util.response.ExecuteResponse;
|
|
import cn.com.yusys.consumer.util.response.ExecuteResponse;
|
|
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
|
|
+import org.springframework.kafka.core.ConsumerFactory;
|
|
|
import org.springframework.kafka.support.Acknowledgment;
|
|
import org.springframework.kafka.support.Acknowledgment;
|
|
|
import org.springframework.stereotype.Component;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
+import java.util.Map;
|
|
|
import java.util.UUID;
|
|
import java.util.UUID;
|
|
|
|
|
|
|
|
@Component
|
|
@Component
|
|
@@ -30,22 +33,36 @@ public class MessageListener {
|
|
|
@Value("${kafka.topics.listen}")
|
|
@Value("${kafka.topics.listen}")
|
|
|
private String topicsConfig;
|
|
private String topicsConfig;
|
|
|
|
|
|
|
|
|
|
+ // 注入 ConsumerFactory
|
|
|
|
|
+ private final ConsumerFactory<?, ?> consumerFactory;
|
|
|
|
|
+
|
|
|
|
|
+ public MessageListener(ConsumerFactory<?, ?> consumerFactory) {
|
|
|
|
|
+ this.consumerFactory = consumerFactory;
|
|
|
|
|
+ Map<String, Object> configs = consumerFactory.getConfigurationProperties();
|
|
|
|
|
+
|
|
|
|
|
+ Object maxRecords = configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
|
|
|
|
|
+ Object maxInterval = configs.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
|
|
|
|
|
+
|
|
|
|
|
+ log.info(">>> [配置验证] Factory 中的配置 -> max.poll.records: {}, max.poll.interval.ms: {}", maxRecords, maxInterval);
|
|
|
|
|
+ }
|
|
|
/**
|
|
/**
|
|
|
* 监听多个 Topic
|
|
* 监听多个 Topic
|
|
|
* 注意:同一个 groupId 下的不同消费者实例会共同负载均衡消费这些 Topic 的所有分区。
|
|
* 注意:同一个 groupId 下的不同消费者实例会共同负载均衡消费这些 Topic 的所有分区。
|
|
|
*/
|
|
*/
|
|
|
@KafkaListener( topics = "#{'${kafka.topics.listen}'.split(',')}",
|
|
@KafkaListener( topics = "#{'${kafka.topics.listen}'.split(',')}",
|
|
|
- groupId = "${spring.kafka.consumer.group-id}", concurrency = "24"
|
|
|
|
|
|
|
+ groupId = "${spring.kafka.consumer.group-id}"
|
|
|
)
|
|
)
|
|
|
public void listen(String message, ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
|
|
public void listen(String message, ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
|
|
|
// 打印当前消息来自哪个 Topic,方便区分
|
|
// 打印当前消息来自哪个 Topic,方便区分
|
|
|
- log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} ===",
|
|
|
|
|
- record.topic(), record.key(), message, record.offset());
|
|
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+ log.info("=== [CONSUMER] 收到消息 | Topic: {} | Key: {} | Msg: {} | Offset: {} | Thread: {} ===",
|
|
|
|
|
+ record.topic(), record.key(), message, record.offset(), Thread.currentThread().getName());
|
|
|
|
|
|
|
|
ExecuteResponse executeResponse = processBusinessLogic(message, record);
|
|
ExecuteResponse executeResponse = processBusinessLogic(message, record);
|
|
|
|
|
|
|
|
if ( executeResponse.getCode()==200) {
|
|
if ( executeResponse.getCode()==200) {
|
|
|
- log.info("=== [CONSUMER] 任务处理成功 ===");
|
|
|
|
|
|
|
+ log.info("=== [CONSUMER] 任务处理成功 :{} {}===",message,Thread.currentThread().getName());
|
|
|
acknowledgment.acknowledge();
|
|
acknowledgment.acknowledge();
|
|
|
|
|
|
|
|
}else{
|
|
}else{
|
|
@@ -73,7 +90,7 @@ public class MessageListener {
|
|
|
Task task = new Task();
|
|
Task task = new Task();
|
|
|
task.setTaskId(taskId);
|
|
task.setTaskId(taskId);
|
|
|
task.setFilePath(message);
|
|
task.setFilePath(message);
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 创建任务记录
|
|
// 创建任务记录
|
|
|
String logPath = "logs/tasks/" + taskId + ".log";
|
|
String logPath = "logs/tasks/" + taskId + ".log";
|
|
|
TaskRecordRequest taskRecordRequest = new TaskRecordRequest();
|
|
TaskRecordRequest taskRecordRequest = new TaskRecordRequest();
|
|
@@ -86,11 +103,11 @@ public class MessageListener {
|
|
|
if (!recordCreated) {
|
|
if (!recordCreated) {
|
|
|
throw new RuntimeException("创建任务记录失败");
|
|
throw new RuntimeException("创建任务记录失败");
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
// 调用ParseServiceClient执行任务
|
|
// 调用ParseServiceClient执行任务
|
|
|
log.info("开始调用execute接口执行任务,消息内容:{}", message);
|
|
log.info("开始调用execute接口执行任务,消息内容:{}", message);
|
|
|
ExecuteResponse response = parseServiceClient.executeTask(task);
|
|
ExecuteResponse response = parseServiceClient.executeTask(task);
|
|
|
- log.info("任务执行完成,响应:{}", response.getMessage());
|
|
|
|
|
|
|
+ log.info("任务执行完成,响应:{}", response.getMsg());
|
|
|
return response;
|
|
return response;
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("处理消息任务失败", e);
|
|
log.error("处理消息任务失败", e);
|