Skip to content

Commit dd2e29d

Browse files
author
zengqiao
committed
bugfix, fix collect consumer metrics task
1 parent 74b5700 commit dd2e29d

5 files changed

Lines changed: 149 additions & 133 deletions

File tree

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package com.xiaojukeji.kafka.manager.common.entity.dto.consumer;
22

3-
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
4-
5-
import java.util.List;
63
import java.util.Map;
74

85
/**
@@ -11,20 +8,33 @@
118
* @date 2015/11/12
129
*/
1310
public class ConsumerDTO {
14-
/**
15-
* 消费group名
16-
*/
11+
private Long clusterId;
12+
13+
private String topicName;
14+
1715
private String consumerGroup;
1816

19-
/**
20-
* 消费类型,一般为static
21-
*/
2217
private String location;
2318

24-
/**
25-
* 订阅的每个topic的partition状态列表
26-
*/
27-
private Map<String, List<PartitionState>> topicPartitionMap;
19+
private Map<Integer, Long> partitionOffsetMap;
20+
21+
private Map<Integer, Long> consumerOffsetMap;
22+
23+
public Long getClusterId() {
24+
return clusterId;
25+
}
26+
27+
public void setClusterId(Long clusterId) {
28+
this.clusterId = clusterId;
29+
}
30+
31+
public String getTopicName() {
32+
return topicName;
33+
}
34+
35+
public void setTopicName(String topicName) {
36+
this.topicName = topicName;
37+
}
2838

2939
public String getConsumerGroup() {
3040
return consumerGroup;
@@ -42,20 +52,31 @@ public void setLocation(String location) {
4252
this.location = location;
4353
}
4454

45-
public Map<String, List<PartitionState>> getTopicPartitionMap() {
46-
return topicPartitionMap;
55+
public Map<Integer, Long> getPartitionOffsetMap() {
56+
return partitionOffsetMap;
57+
}
58+
59+
public void setPartitionOffsetMap(Map<Integer, Long> partitionOffsetMap) {
60+
this.partitionOffsetMap = partitionOffsetMap;
61+
}
62+
63+
public Map<Integer, Long> getConsumerOffsetMap() {
64+
return consumerOffsetMap;
4765
}
4866

49-
public void setTopicPartitionMap(Map<String, List<PartitionState>> topicPartitionMap) {
50-
this.topicPartitionMap = topicPartitionMap;
67+
public void setConsumerOffsetMap(Map<Integer, Long> consumerOffsetMap) {
68+
this.consumerOffsetMap = consumerOffsetMap;
5169
}
5270

5371
@Override
5472
public String toString() {
55-
return "Consumer{" +
56-
"consumerGroup='" + consumerGroup + '\'' +
73+
return "ConsumerDTO{" +
74+
"clusterId=" + clusterId +
75+
", topicName='" + topicName + '\'' +
76+
", consumerGroup='" + consumerGroup + '\'' +
5777
", location='" + location + '\'' +
58-
", topicPartitionMap=" + topicPartitionMap +
78+
", partitionOffsetMap=" + partitionOffsetMap +
79+
", consumerOffsetMap=" + consumerOffsetMap +
5980
'}';
6081
}
6182
}

service/src/main/java/com/xiaojukeji/kafka/manager/service/collector/CollectConsumerMetricsTask.java

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
import com.xiaojukeji.kafka.manager.common.constant.Constant;
44
import com.xiaojukeji.kafka.manager.common.entity.ConsumerMetrics;
5-
import com.xiaojukeji.kafka.manager.common.entity.zookeeper.PartitionState;
65
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumerDTO;
76
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
87
import com.xiaojukeji.kafka.manager.service.cache.ClusterMetadataManager;
98
import com.xiaojukeji.kafka.manager.service.cache.KafkaMetricsCache;
109
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
10+
import org.apache.kafka.common.TopicPartition;
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

@@ -34,8 +34,8 @@ public void collect() {
3434
if (clusterDO == null) {
3535
return;
3636
}
37-
Map<String, List<PartitionState>> topicNamePartitionStateListMap = new HashMap<>();
38-
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, topicNamePartitionStateListMap);
37+
Map<TopicPartition, Long> allPartitionOffsetMap = new HashMap<>();
38+
List<ConsumerDTO> consumerDTOList = consumerService.getMonitoredConsumerList(clusterDO, allPartitionOffsetMap);
3939

4040
List<ConsumerMetrics> consumerMetricsList = convert2ConsumerMetrics(consumerDTOList);
4141
KafkaMetricsCache.putConsumerMetricsToCache(clusterId, consumerMetricsList);
@@ -47,23 +47,27 @@ public void collect() {
4747
private List<ConsumerMetrics> convert2ConsumerMetrics(List<ConsumerDTO> consumerDTOList) {
4848
List<ConsumerMetrics> consumerMetricsList = new ArrayList<>();
4949
for (ConsumerDTO consumerDTO : consumerDTOList) {
50-
Map<String, List<PartitionState>> topicNamePartitionStateListMap = consumerDTO.getTopicPartitionMap();
51-
for(Map.Entry<String, List<PartitionState>> entry : topicNamePartitionStateListMap.entrySet()){
52-
String topicName = entry.getKey();
53-
List<PartitionState> partitionStateList = entry.getValue();
54-
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
55-
consumerMetrics.setClusterId(clusterId);
56-
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
57-
consumerMetrics.setLocation(consumerDTO.getLocation());
58-
consumerMetrics.setTopicName(topicName);
59-
long sumLag = 0;
60-
for (PartitionState partitionState : partitionStateList) {
61-
Map.Entry<Long, Long> offsetEntry = new AbstractMap.SimpleEntry<>(partitionState.getOffset(), partitionState.getConsumeOffset());
62-
sumLag += (offsetEntry.getKey() - offsetEntry.getValue() > 0 ? offsetEntry.getKey() - offsetEntry.getValue(): 0);
50+
if (consumerDTO.getPartitionOffsetMap() == null || consumerDTO.getConsumerOffsetMap() == null) {
51+
continue;
52+
}
53+
54+
ConsumerMetrics consumerMetrics = new ConsumerMetrics();
55+
consumerMetrics.setClusterId(consumerDTO.getClusterId());
56+
consumerMetrics.setConsumerGroup(consumerDTO.getConsumerGroup());
57+
consumerMetrics.setLocation(consumerDTO.getLocation());
58+
consumerMetrics.setTopicName(consumerDTO.getTopicName());
59+
60+
long sumLag = 0;
61+
for(Map.Entry<Integer, Long> entry : consumerDTO.getPartitionOffsetMap().entrySet()){
62+
Long partitionOffset = entry.getValue();
63+
Long consumerOffset = consumerDTO.getConsumerOffsetMap().get(entry.getKey());
64+
if (partitionOffset == null || consumerOffset == null) {
65+
continue;
6366
}
64-
consumerMetrics.setSumLag(sumLag);
65-
consumerMetricsList.add(consumerMetrics);
67+
sumLag += Math.max(partitionOffset - consumerOffset, 0);
6668
}
69+
consumerMetrics.setSumLag(sumLag);
70+
consumerMetricsList.add(consumerMetrics);
6771
}
6872
return consumerMetricsList;
6973
}

service/src/main/java/com/xiaojukeji/kafka/manager/service/service/ConsumerService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.xiaojukeji.kafka.manager.common.entity.dto.PartitionOffsetDTO;
88
import com.xiaojukeji.kafka.manager.common.entity.dto.consumer.ConsumeDetailDTO;
99
import com.xiaojukeji.kafka.manager.common.entity.po.ClusterDO;
10+
import org.apache.kafka.common.TopicPartition;
1011

1112
import java.util.List;
1213
import java.util.Map;
@@ -57,7 +58,7 @@ public interface ConsumerService {
5758
* @return
5859
*/
5960
List<ConsumerDTO> getMonitoredConsumerList(ClusterDO clusterDO,
60-
Map<String, List<PartitionState>> topicNamePartitionStateListMap);
61+
Map<TopicPartition, Long> partitionOffsetMap);
6162

6263
/**
6364
* 重置offset

0 commit comments

Comments
 (0)