|
|
@@ -0,0 +1,654 @@
|
|
|
+package cn.com.yusys.monitor.service;
|
|
|
+
|
|
|
+import org.apache.kafka.clients.admin.AdminClient;
|
|
|
+import org.apache.kafka.clients.admin.AdminClientConfig;
|
|
|
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
|
|
|
+import org.apache.kafka.clients.admin.CreatePartitionsResult;
|
|
|
+import org.apache.kafka.clients.admin.CreateTopicsResult;
|
|
|
+import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
|
|
|
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
|
|
|
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
|
|
|
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
|
|
|
+import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
|
|
|
+import org.apache.kafka.clients.admin.ConsumerGroupListing;
|
|
|
+import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
|
|
|
+import org.apache.kafka.clients.admin.ListTopicsResult;
|
|
|
+import org.apache.kafka.clients.admin.MemberDescription;
|
|
|
+import org.apache.kafka.clients.admin.NewTopic;
|
|
|
+import org.apache.kafka.clients.admin.NewPartitions;
|
|
|
+import org.apache.kafka.clients.admin.TopicDescription;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
+import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
|
|
+import org.apache.kafka.common.TopicPartition;
|
|
|
+import org.apache.kafka.common.serialization.StringDeserializer;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
+import java.time.Duration;
|
|
|
+import java.util.*;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+
|
|
|
+@Service
|
|
|
+public class KafkaMonitorService {
|
|
|
+
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(KafkaMonitorService.class);
|
|
|
+
|
|
|
+ @Value("${spring.kafka.bootstrap-servers}")
|
|
|
+ private String bootstrapServers;
|
|
|
+
|
|
|
+ @Value("${kafka.topics.test-topic}")
|
|
|
+ private String testTopic;
|
|
|
+
|
|
|
+ private AdminClient adminClient;
|
|
|
+ private KafkaConsumer<String, String> consumer;
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ // 初始化AdminClient
|
|
|
+ Properties adminProps = new Properties();
|
|
|
+ adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
|
+ adminClient = AdminClient.create(adminProps);
|
|
|
+
|
|
|
+ // 初始化KafkaConsumer用于获取消费组信息
|
|
|
+ Properties consumerProps = new Properties();
|
|
|
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
|
|
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "monitor-group");
|
|
|
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
|
|
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
|
|
|
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
|
|
+ consumer = new KafkaConsumer<>(consumerProps);
|
|
|
+ }
|
|
|
+
|
|
|
+ @PreDestroy
|
|
|
+ public void close() {
|
|
|
+ if (adminClient != null) {
|
|
|
+ adminClient.close();
|
|
|
+ }
|
|
|
+ if (consumer != null) {
|
|
|
+ consumer.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取指定topic的分区信息
|
|
|
+ */
|
|
|
+ public Map<String, Object> getTopicPartitions(String topic) {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ try {
|
|
|
+ DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topic));
|
|
|
+ TopicDescription topicDescription = describeTopicsResult.values().get(topic).get();
|
|
|
+
|
|
|
+ result.put("topic", topic);
|
|
|
+ result.put("partitionCount", topicDescription.partitions().size());
|
|
|
+
|
|
|
+ List<Map<String, Object>> partitions = new ArrayList<>();
|
|
|
+ for (org.apache.kafka.common.TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
|
|
|
+ Map<String, Object> partitionData = new HashMap<>();
|
|
|
+ partitionData.put("partition", partitionInfo.partition());
|
|
|
+ partitionData.put("leader", partitionInfo.leader().id());
|
|
|
+ partitionData.put("replicas", partitionInfo.replicas().size());
|
|
|
+ partitionData.put("inSyncReplicas", partitionInfo.isr().size());
|
|
|
+ partitions.add(partitionData);
|
|
|
+ }
|
|
|
+ result.put("partitions", partitions);
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ log.error("Error getting topic partitions: {}", e.getMessage());
|
|
|
+ result.put("error", e.getMessage());
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取指定topic各个分区的任务数量执行情况
|
|
|
+ */
|
|
|
+ public Map<String, Object> getPartitionTaskStatus(String topic) {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ try {
|
|
|
+ // 订阅topic
|
|
|
+ consumer.subscribe(Collections.singleton(topic));
|
|
|
+ // 拉取消息以确保消费者加入组
|
|
|
+ consumer.poll(Duration.ofMillis(100));
|
|
|
+
|
|
|
+ // 获取消费组信息
|
|
|
+ ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
|
|
|
+ String groupId = groupMetadata.groupId();
|
|
|
+
|
|
|
+ // 获取所有分区
|
|
|
+ Set<TopicPartition> partitions = new HashSet<>();
|
|
|
+ for (TopicPartition partition : consumer.assignment()) {
|
|
|
+ if (partition.topic().equals(topic)) {
|
|
|
+ partitions.add(partition);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果没有分区分配,手动创建分区列表
|
|
|
+ if (partitions.isEmpty()) {
|
|
|
+ DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topic));
|
|
|
+ TopicDescription topicDescription = describeTopicsResult.values().get(topic).get();
|
|
|
+ for (org.apache.kafka.common.TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
|
|
|
+ partitions.add(new TopicPartition(topic, partitionInfo.partition()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取每个分区的最新偏移量
|
|
|
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
|
|
|
+ // 获取每个分区的消费偏移量
|
|
|
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(partitions);
|
|
|
+
|
|
|
+ List<Map<String, Object>> partitionStatus = new ArrayList<>();
|
|
|
+ for (TopicPartition partition : partitions) {
|
|
|
+ Map<String, Object> status = new HashMap<>();
|
|
|
+ status.put("partition", partition.partition());
|
|
|
+
|
|
|
+ long endOffset = endOffsets.get(partition);
|
|
|
+ status.put("totalTasks", endOffset);
|
|
|
+
|
|
|
+ OffsetAndMetadata committedOffset = committedOffsets.get(partition);
|
|
|
+ long consumedOffset = committedOffset != null ? committedOffset.offset() : 0;
|
|
|
+ status.put("completedTasks", consumedOffset);
|
|
|
+ status.put("pendingTasks", endOffset - consumedOffset);
|
|
|
+
|
|
|
+ partitionStatus.add(status);
|
|
|
+ }
|
|
|
+
|
|
|
+ result.put("topic", topic);
|
|
|
+ result.put("groupId", groupId);
|
|
|
+ result.put("partitionStatus", partitionStatus);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error getting partition task status: {}", e.getMessage());
|
|
|
+ result.put("error", e.getMessage());
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取默认测试topic的分区情况
|
|
|
+ */
|
|
|
+ public Map<String, Object> getTestTopicPartitions() {
|
|
|
+ return getTopicPartitions(testTopic);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取默认测试topic的任务执行情况
|
|
|
+ */
|
|
|
+ public Map<String, Object> getTestTopicTaskStatus() {
|
|
|
+ return getPartitionTaskStatus(testTopic);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取所有Kafka topic及其消息数和副本数(带分页)
|
|
|
+ */
|
|
|
+ public Map<String, Object> getAllTopics(int page, int pageSize) {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ List<Map<String, Object>> topicsWithCount = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ ListTopicsResult listTopicsResult = adminClient.listTopics();
|
|
|
+ Set<String> topicSet = listTopicsResult.names().get();
|
|
|
+
|
|
|
+ List<String> topicList = new ArrayList<>(topicSet);
|
|
|
+ int totalTopics = topicList.size();
|
|
|
+ int totalPages = (int) Math.ceil((double) totalTopics / pageSize);
|
|
|
+
|
|
|
+ int fromIndex = (page - 1) * pageSize;
|
|
|
+ int toIndex = Math.min(fromIndex + pageSize, totalTopics);
|
|
|
+
|
|
|
+ List<String> pagedTopics = topicList.subList(fromIndex, toIndex);
|
|
|
+
|
|
|
+ for (String topic : pagedTopics) {
|
|
|
+ Map<String, Object> topicData = new HashMap<>();
|
|
|
+ topicData.put("topic", topic);
|
|
|
+
|
|
|
+ // 获取分区信息
|
|
|
+ DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topic));
|
|
|
+ TopicDescription topicDescription = describeTopicsResult.values().get(topic).get();
|
|
|
+
|
|
|
+ // 获取每个分区的消息数
|
|
|
+ List<TopicPartition> partitions = new ArrayList<>();
|
|
|
+ for (org.apache.kafka.common.TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
|
|
|
+ partitions.add(new TopicPartition(topic, partitionInfo.partition()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取分区末尾偏移量(消息总数)
|
|
|
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
|
|
|
+ long totalMessages = endOffsets.values().stream().mapToLong(Long::longValue).sum();
|
|
|
+
|
|
|
+ // 获取副本数(取第一个分区的副本数,所有分区副本数相同)
|
|
|
+ int replicationFactor = topicDescription.partitions().get(0).replicas().size();
|
|
|
+
|
|
|
+ topicData.put("messageCount", totalMessages);
|
|
|
+ topicData.put("partitionCount", topicDescription.partitions().size());
|
|
|
+ topicData.put("replicationFactor", replicationFactor);
|
|
|
+ topicsWithCount.add(topicData);
|
|
|
+ }
|
|
|
+
|
|
|
+ result.put("data", topicsWithCount);
|
|
|
+ result.put("total", totalTopics);
|
|
|
+ result.put("page", page);
|
|
|
+ result.put("pageSize", pageSize);
|
|
|
+ result.put("totalPages", totalPages);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error getting topics with message count: {}", e.getMessage());
|
|
|
+ result.put("error", e.getMessage());
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据topic名称查询详情
|
|
|
+ */
|
|
|
+ public Map<String, Object> getTopicDetail(String topic) {
|
|
|
+ Map<String, Object> topicDetail = new HashMap<>();
|
|
|
+ try {
|
|
|
+ // 获取分区信息
|
|
|
+ DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topic));
|
|
|
+ TopicDescription topicDescription = describeTopicsResult.values().get(topic).get();
|
|
|
+
|
|
|
+ topicDetail.put("topic", topic);
|
|
|
+ topicDetail.put("partitionCount", topicDescription.partitions().size());
|
|
|
+
|
|
|
+ // 获取副本数(取第一个分区的副本数,所有分区副本数相同)
|
|
|
+ int replicationFactor = topicDescription.partitions().get(0).replicas().size();
|
|
|
+ topicDetail.put("replicationFactor", replicationFactor);
|
|
|
+
|
|
|
+ // 获取每个分区的消息数
|
|
|
+ List<TopicPartition> partitions = new ArrayList<>();
|
|
|
+ for (org.apache.kafka.common.TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
|
|
|
+ partitions.add(new TopicPartition(topic, partitionInfo.partition()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取分区末尾偏移量(消息总数)
|
|
|
+ Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
|
|
|
+ long totalMessages = endOffsets.values().stream().mapToLong(Long::longValue).sum();
|
|
|
+ topicDetail.put("messageCount", totalMessages);
|
|
|
+
|
|
|
+ // 获取分区详细信息
|
|
|
+ List<Map<String, Object>> partitionsDetail = new ArrayList<>();
|
|
|
+ for (org.apache.kafka.common.TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
|
|
|
+ Map<String, Object> partitionData = new HashMap<>();
|
|
|
+ partitionData.put("partition", partitionInfo.partition());
|
|
|
+ partitionData.put("leader", partitionInfo.leader().host() + ":" + partitionInfo.leader().port());
|
|
|
+
|
|
|
+ List<String> replicas = new ArrayList<>();
|
|
|
+ for (org.apache.kafka.common.Node replica : partitionInfo.replicas()) {
|
|
|
+ replicas.add(replica.host() + ":" + replica.port());
|
|
|
+ }
|
|
|
+ partitionData.put("replicas", replicas);
|
|
|
+
|
|
|
+ List<String> isr = new ArrayList<>();
|
|
|
+ for (org.apache.kafka.common.Node node : partitionInfo.isr()) {
|
|
|
+ isr.add(node.host() + ":" + node.port());
|
|
|
+ }
|
|
|
+ partitionData.put("isr", isr);
|
|
|
+
|
|
|
+ // 获取该分区的消息数
|
|
|
+ TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition());
|
|
|
+ Long partitionMessageCount = endOffsets.get(topicPartition);
|
|
|
+ partitionData.put("messageCount", partitionMessageCount);
|
|
|
+ partitionData.put("endOffset", partitionMessageCount);
|
|
|
+
|
|
|
+ // 获取当前消费者组的offset
|
|
|
+ OffsetAndMetadata currentOffset = consumer.committed(topicPartition);
|
|
|
+ if (currentOffset != null) {
|
|
|
+ partitionData.put("currentOffset", currentOffset.offset());
|
|
|
+ partitionData.put("lag", partitionMessageCount - currentOffset.offset());
|
|
|
+ } else {
|
|
|
+ partitionData.put("currentOffset", 0);
|
|
|
+ partitionData.put("lag", partitionMessageCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ partitionsDetail.add(partitionData);
|
|
|
+ }
|
|
|
+ topicDetail.put("partitionsDetail", partitionsDetail);
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error getting topic detail: {}", e.getMessage());
|
|
|
+ topicDetail.put("status", "error");
|
|
|
+ topicDetail.put("message", "Topic not found or error occurred: " + e.getMessage());
|
|
|
+ }
|
|
|
+ return topicDetail;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取分区消息列表
|
|
|
+ */
|
|
|
+ public List<Map<String, Object>> getPartitionMessages(String topic, int partition, long startOffset, int maxRecords) {
|
|
|
+ List<Map<String, Object>> messages = new ArrayList<>();
|
|
|
+ KafkaConsumer<String, String> partitionConsumer = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 创建临时消费者
|
|
|
+ Properties props = new Properties();
|
|
|
+ props.put("bootstrap.servers", "10.192.72.13:9092");
|
|
|
+ props.put("group.id", "monitor-group-temp");
|
|
|
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
+ props.put("auto.offset.reset", "earliest");
|
|
|
+ props.put("enable.auto.commit", "false");
|
|
|
+
|
|
|
+ partitionConsumer = new KafkaConsumer<>(props);
|
|
|
+
|
|
|
+ // 订阅指定分区
|
|
|
+ TopicPartition topicPartition = new TopicPartition(topic, partition);
|
|
|
+ partitionConsumer.assign(Collections.singletonList(topicPartition));
|
|
|
+
|
|
|
+ // 定位到指定的起始offset
|
|
|
+ partitionConsumer.seek(topicPartition, startOffset);
|
|
|
+
|
|
|
+ // 消费消息
|
|
|
+ ConsumerRecords<String, String> records = partitionConsumer.poll(Duration.ofMillis(10000));
|
|
|
+ int count = 0;
|
|
|
+
|
|
|
+ for (ConsumerRecord<String, String> record : records) {
|
|
|
+ if (count >= maxRecords) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Object> message = new HashMap<>();
|
|
|
+ message.put("offset", record.offset());
|
|
|
+ message.put("timestamp", new Date(record.timestamp()).toString());
|
|
|
+ message.put("key", record.key());
|
|
|
+ message.put("value", record.value());
|
|
|
+ message.put("partition", record.partition());
|
|
|
+
|
|
|
+ messages.add(message);
|
|
|
+ count++;
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error getting partition messages: {}", e.getMessage());
|
|
|
+ } finally {
|
|
|
+ if (partitionConsumer != null) {
|
|
|
+ partitionConsumer.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return messages;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取消费组列表
|
|
|
+ */
|
|
|
+ public List<Map<String, Object>> getConsumerGroups() {
|
|
|
+ List<Map<String, Object>> consumerGroups = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ ListConsumerGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups();
|
|
|
+ Collection<ConsumerGroupListing> consumerGroupListings = listConsumerGroupsResult.all().get();
|
|
|
+
|
|
|
+ for (ConsumerGroupListing group : consumerGroupListings) {
|
|
|
+ Map<String, Object> groupData = new HashMap<>();
|
|
|
+ groupData.put("groupId", group.groupId());
|
|
|
+
|
|
|
+ // 获取消费组成员信息
|
|
|
+ List<Map<String, Object>> members = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups(Collections.singletonList(group.groupId()));
|
|
|
+ Map<String, ConsumerGroupDescription> groupDescriptions = describeConsumerGroupsResult.all().get();
|
|
|
+
|
|
|
+ ConsumerGroupDescription description = groupDescriptions.get(group.groupId());
|
|
|
+ if (description != null) {
|
|
|
+ for (MemberDescription member : description.members()) {
|
|
|
+ Map<String, Object> memberData = new HashMap<>();
|
|
|
+ memberData.put("consumerId", member.consumerId());
|
|
|
+ memberData.put("clientId", member.clientId());
|
|
|
+ memberData.put("host", member.host());
|
|
|
+
|
|
|
+ // 获取成员分配的分区
|
|
|
+ List<Map<String, Object>> topicPartitions = new ArrayList<>();
|
|
|
+ for (TopicPartition tp : member.assignment().topicPartitions()) {
|
|
|
+ Map<String, Object> tpData = new HashMap<>();
|
|
|
+ tpData.put("topic", tp.topic());
|
|
|
+ tpData.put("partition", tp.partition());
|
|
|
+ topicPartitions.add(tpData);
|
|
|
+ }
|
|
|
+ memberData.put("topicPartitions", topicPartitions);
|
|
|
+ members.add(memberData);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error getting consumer group members: {}", e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ groupData.put("members", members);
|
|
|
+ consumerGroups.add(groupData);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error getting consumer groups: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ return consumerGroups;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取指定分区的日志末尾偏移量
|
|
|
+ */
|
|
|
+ private long getEndOffset(String topic, int partition) {
|
|
|
+ KafkaConsumer<String, String> consumer = null;
|
|
|
+ try {
|
|
|
+ Properties props = new Properties();
|
|
|
+ props.put("bootstrap.servers", "10.192.72.13:9092");
|
|
|
+ props.put("group.id", "monitor-group-temp");
|
|
|
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
|
|
|
+ props.put("auto.offset.reset", "latest");
|
|
|
+
|
|
|
+ consumer = new KafkaConsumer<>(props);
|
|
|
+ TopicPartition tp = new TopicPartition(topic, partition);
|
|
|
+ consumer.assign(Collections.singletonList(tp));
|
|
|
+ consumer.seekToEnd(Collections.singletonList(tp));
|
|
|
+ return consumer.position(tp);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error getting end offset: {}", e.getMessage());
|
|
|
+ return 0;
|
|
|
+ } finally {
|
|
|
+ if (consumer != null) {
|
|
|
+ consumer.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取消费组详情
|
|
|
+ */
|
|
|
+ public Map<String, Object> getConsumerGroupDetail(String groupId) {
|
|
|
+ Map<String, Object> groupDetail = new HashMap<>();
|
|
|
+ try {
|
|
|
+ DescribeConsumerGroupsResult describeConsumerGroupsResult = adminClient.describeConsumerGroups(Collections.singletonList(groupId));
|
|
|
+ Map<String, ConsumerGroupDescription> groupDescriptions = describeConsumerGroupsResult.all().get();
|
|
|
+
|
|
|
+ ConsumerGroupDescription description = groupDescriptions.get(groupId);
|
|
|
+ if (description != null) {
|
|
|
+ groupDetail.put("groupId", description.groupId());
|
|
|
+ groupDetail.put("state", description.state());
|
|
|
+ groupDetail.put("isSimpleConsumerGroup", description.isSimpleConsumerGroup());
|
|
|
+
|
|
|
+ List<Map<String, Object>> members = new ArrayList<>();
|
|
|
+ for (MemberDescription member : description.members()) {
|
|
|
+ Map<String, Object> memberData = new HashMap<>();
|
|
|
+ memberData.put("consumerId", member.consumerId());
|
|
|
+ memberData.put("clientId", member.clientId());
|
|
|
+ memberData.put("host", member.host());
|
|
|
+
|
|
|
+ // 获取成员分配的分区
|
|
|
+ Map<TopicPartition, OffsetAndMetadata> assignments = member.assignment().topicPartitions().stream()
|
|
|
+ .collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(0)));
|
|
|
+ List<Map<String, Object>> topicPartitions = new ArrayList<>();
|
|
|
+ for (TopicPartition tp : member.assignment().topicPartitions()) {
|
|
|
+ Map<String, Object> tpData = new HashMap<>();
|
|
|
+ tpData.put("topic", tp.topic());
|
|
|
+ tpData.put("partition", tp.partition());
|
|
|
+ topicPartitions.add(tpData);
|
|
|
+ }
|
|
|
+ memberData.put("topicPartitions", topicPartitions);
|
|
|
+ members.add(memberData);
|
|
|
+ }
|
|
|
+ groupDetail.put("members", members);
|
|
|
+
|
|
|
+ // 获取消费组的偏移量
|
|
|
+ ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = adminClient.listConsumerGroupOffsets(groupId);
|
|
|
+ Map<TopicPartition, OffsetAndMetadata> offsets = listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get();
|
|
|
+ List<Map<String, Object>> offsetDetails = new ArrayList<>();
|
|
|
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
|
|
|
+ TopicPartition tp = entry.getKey();
|
|
|
+ long currentOffset = entry.getValue().offset();
|
|
|
+ long endOffset = getEndOffset(tp.topic(), tp.partition());
|
|
|
+ long lag = endOffset - currentOffset;
|
|
|
+
|
|
|
+ Map<String, Object> offsetData = new HashMap<>();
|
|
|
+ offsetData.put("topic", tp.topic());
|
|
|
+ offsetData.put("partition", tp.partition());
|
|
|
+ offsetData.put("currentOffset", currentOffset);
|
|
|
+ offsetData.put("endOffset", endOffset);
|
|
|
+ offsetData.put("lag", lag);
|
|
|
+ offsetDetails.add(offsetData);
|
|
|
+ }
|
|
|
+ groupDetail.put("offsets", offsetDetails);
|
|
|
+ } else {
|
|
|
+ groupDetail.put("status", "error");
|
|
|
+ groupDetail.put("message", "Consumer group not found");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error getting consumer group detail: {}", e.getMessage());
|
|
|
+ groupDetail.put("status", "error");
|
|
|
+ groupDetail.put("message", "Error getting consumer group detail: " + e.getMessage());
|
|
|
+ }
|
|
|
+ return groupDetail;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 删除消费组
|
|
|
+ */
|
|
|
+ public Map<String, Object> deleteConsumerGroup(String groupId) {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ try {
|
|
|
+ DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Collections.singletonList(groupId));
|
|
|
+ deleteConsumerGroupsResult.all().get();
|
|
|
+ result.put("status", "success");
|
|
|
+ result.put("message", "Consumer group deleted successfully");
|
|
|
+ result.put("groupId", groupId);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error deleting consumer group: {}", e.getMessage());
|
|
|
+ result.put("status", "error");
|
|
|
+ result.put("message", "Error deleting consumer group: " + e.getMessage());
|
|
|
+ result.put("groupId", groupId);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取所有Kafka topic及其分区情况
|
|
|
+ */
|
|
|
+ public Map<String, Object> getAllTopicsWithPartitions() {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ try {
|
|
|
+ ListTopicsResult listTopicsResult = adminClient.listTopics();
|
|
|
+ Set<String> topicSet = listTopicsResult.names().get();
|
|
|
+
|
|
|
+ List<Map<String, Object>> topicList = new ArrayList<>();
|
|
|
+ for (String topic : topicSet) {
|
|
|
+ Map<String, Object> topicData = new HashMap<>();
|
|
|
+ topicData.put("topic", topic);
|
|
|
+
|
|
|
+ DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singleton(topic));
|
|
|
+ TopicDescription topicDescription = describeTopicsResult.values().get(topic).get();
|
|
|
+
|
|
|
+ topicData.put("partitionCount", topicDescription.partitions().size());
|
|
|
+
|
|
|
+ List<Map<String, Object>> partitions = new ArrayList<>();
|
|
|
+ for (org.apache.kafka.common.TopicPartitionInfo partitionInfo : topicDescription.partitions()) {
|
|
|
+ Map<String, Object> partitionData = new HashMap<>();
|
|
|
+ partitionData.put("partition", partitionInfo.partition());
|
|
|
+ partitionData.put("leader", partitionInfo.leader().id());
|
|
|
+ partitionData.put("replicas", partitionInfo.replicas().size());
|
|
|
+ partitionData.put("inSyncReplicas", partitionInfo.isr().size());
|
|
|
+ partitions.add(partitionData);
|
|
|
+ }
|
|
|
+ topicData.put("partitions", partitions);
|
|
|
+ topicList.add(topicData);
|
|
|
+ }
|
|
|
+
|
|
|
+ result.put("topics", topicList);
|
|
|
+ result.put("totalTopics", topicSet.size());
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ log.error("Error getting all topics with partitions: {}", e.getMessage());
|
|
|
+ result.put("error", e.getMessage());
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建新的topic
|
|
|
+ */
|
|
|
+ public Map<String, Object> createTopic(String topicName, int partitions, short replicationFactor) {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ try {
|
|
|
+ NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
|
|
|
+ CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singleton(newTopic));
|
|
|
+ createTopicsResult.all().get();
|
|
|
+
|
|
|
+ result.put("status", "success");
|
|
|
+ result.put("message", "Topic created successfully");
|
|
|
+ result.put("topic", topicName);
|
|
|
+ result.put("partitions", partitions);
|
|
|
+ result.put("replicationFactor", replicationFactor);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error creating topic: {}", e.getMessage());
|
|
|
+ result.put("status", "error");
|
|
|
+ result.put("message", e.getMessage());
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 删除topic
|
|
|
+ */
|
|
|
+ public Map<String, Object> deleteTopic(String topicName) {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ try {
|
|
|
+ DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName));
|
|
|
+ deleteTopicsResult.all().get();
|
|
|
+
|
|
|
+ result.put("status", "success");
|
|
|
+ result.put("message", "Topic deleted successfully");
|
|
|
+ result.put("topic", topicName);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error deleting topic: {}", e.getMessage());
|
|
|
+ result.put("status", "error");
|
|
|
+ result.put("message", e.getMessage());
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 增加topic的分区数量(只能增加,不能减少)
|
|
|
+ */
|
|
|
+ public Map<String, Object> increasePartitions(String topicName, int newPartitionCount) {
|
|
|
+ Map<String, Object> result = new HashMap<>();
|
|
|
+ try {
|
|
|
+ Map<String, NewPartitions> partitionsMap = new HashMap<>();
|
|
|
+ NewPartitions newPartitions = NewPartitions.increaseTo(newPartitionCount);
|
|
|
+ partitionsMap.put(topicName, newPartitions);
|
|
|
+
|
|
|
+ CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
|
|
|
+ createPartitionsResult.all().get();
|
|
|
+
|
|
|
+ result.put("status", "success");
|
|
|
+ result.put("message", "Topic partitions increased successfully");
|
|
|
+ result.put("topic", topicName);
|
|
|
+ result.put("newPartitionCount", newPartitionCount);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Error increasing partitions: {}", e.getMessage());
|
|
|
+ result.put("status", "error");
|
|
|
+ result.put("message", e.getMessage());
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+}
|