apoorvmittal10 commented on code in PR #22382:
URL: https://github.com/apache/kafka/pull/22382#discussion_r3310166102
##########
server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java:
##########
@@ -127,13 +147,43 @@ public Histogram topicPartitionsAcquireTimeMs(String
groupId) {
return topicPartitionsAcquireTimeMs.get(groupId);
}
+ public void recordDLQRecordWrite(String shareGroupId, int count) {
+ dlqRecordCountPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_RECORD_COUNT,
+ "dlq-records",
Review Comment:
Will it be better to classify this as requests?
```suggestion
"requests",
```
##########
server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java:
##########
@@ -127,13 +147,43 @@ public Histogram topicPartitionsAcquireTimeMs(String
groupId) {
return topicPartitionsAcquireTimeMs.get(groupId);
}
+ public void recordDLQRecordWrite(String shareGroupId, int count) {
+ dlqRecordCountPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_RECORD_COUNT,
+ "dlq-records",
+ TimeUnit.SECONDS,
+ Map.of(GROUP_ID_TAG, k)
+ )).mark(count);
+ }
+
+ public void recordDLQProduce(String shareGroupId) {
+ dlqProduceTotalPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_TOTAL_PRODUCE_REQ_PER_SEC,
+ "dlq-produce",
+ TimeUnit.SECONDS,
+ Map.of(GROUP_ID_TAG, k)
+ )).mark();
+ }
+
+ public void recordDLQProduceFailed(String shareGroupId) {
+ dlqProduceFailedPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_FAILED_PRODUCE_REQ_PER_SEC,
+ "dlq-produce",
+ TimeUnit.SECONDS,
+ Map.of(GROUP_ID_TAG, k)
+ )).mark();
+ }
+
@Override
public void close() throws Exception {
Arrays.stream(AcknowledgeType.values()).forEach(
m -> metricsGroup.removeMetric(RECORD_ACKNOWLEDGEMENTS_PER_SEC,
Map.of(ACK_TYPE_TAG, m.toString())));
metricsGroup.removeMetric(PARTITION_LOAD_TIME_MS);
- topicPartitionsFetchRatio.forEach((k, v) ->
metricsGroup.removeMetric(TOPIC_PARTITIONS_FETCH_RATIO, Map.of("group", k)));
- topicPartitionsAcquireTimeMs.forEach((k, v) ->
metricsGroup.removeMetric(TOPIC_PARTITIONS_ACQUIRE_TIME_MS, Map.of("group",
k)));
Review Comment:
Can you please also change the string "group" to GROUP_ID_TAG in other
places in the file.
##########
server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java:
##########
@@ -127,13 +147,43 @@ public Histogram topicPartitionsAcquireTimeMs(String
groupId) {
return topicPartitionsAcquireTimeMs.get(groupId);
}
+ public void recordDLQRecordWrite(String shareGroupId, int count) {
+ dlqRecordCountPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_RECORD_COUNT,
+ "dlq-records",
+ TimeUnit.SECONDS,
+ Map.of(GROUP_ID_TAG, k)
+ )).mark(count);
+ }
+
+ public void recordDLQProduce(String shareGroupId) {
+ dlqProduceTotalPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_TOTAL_PRODUCE_REQ_PER_SEC,
+ "dlq-produce",
+ TimeUnit.SECONDS,
+ Map.of(GROUP_ID_TAG, k)
+ )).mark();
+ }
+
+ public void recordDLQProduceFailed(String shareGroupId) {
+ dlqProduceFailedPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_FAILED_PRODUCE_REQ_PER_SEC,
+ "dlq-produce",
Review Comment:
Will it be better to classify this as errors?
```suggestion
"errors",
```
##########
server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java:
##########
@@ -40,6 +40,10 @@ public class ShareGroupMetrics implements AutoCloseable {
private static final String TOPIC_PARTITIONS_FETCH_RATIO =
"RequestTopicPartitionsFetchRatio";
private static final String TOPIC_PARTITIONS_ACQUIRE_TIME_MS =
"TopicPartitionsAcquireTimeMs";
private static final String ACK_TYPE_TAG = "ackType";
+ private static final String DEAD_LETTER_QUEUE_RECORD_COUNT =
"DeadLetterQueueRecordCount";
+ private static final String DEAD_LETTER_QUEUE_TOTAL_PRODUCE_REQ_PER_SEC =
"DeadLetterQueueTotalProduceRequestsPerSec";
+ private static final String DEAD_LETTER_QUEUE_FAILED_PRODUCE_REQ_PER_SEC =
"DeadLetterQueueFailedProduceRequestsPerSec";
+ private static final String GROUP_ID_TAG = "group";
Review Comment:
nit: Move the GROUP_ID_TAG and ACK_TYPE_TAG together for readability
separated by a line from metrics defined together.
##########
server/src/main/java/org/apache/kafka/server/share/metrics/ShareGroupMetrics.java:
##########
@@ -127,13 +147,43 @@ public Histogram topicPartitionsAcquireTimeMs(String
groupId) {
return topicPartitionsAcquireTimeMs.get(groupId);
}
+ public void recordDLQRecordWrite(String shareGroupId, int count) {
+ dlqRecordCountPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_RECORD_COUNT,
+ "dlq-records",
+ TimeUnit.SECONDS,
+ Map.of(GROUP_ID_TAG, k)
+ )).mark(count);
+ }
+
+ public void recordDLQProduce(String shareGroupId) {
+ dlqProduceTotalPerGroup.computeIfAbsent(shareGroupId, k ->
metricsGroup.newMeter(
+ DEAD_LETTER_QUEUE_TOTAL_PRODUCE_REQ_PER_SEC,
+ "dlq-produce",
Review Comment:
Will it be better to classify this as requests?
```suggestion
"requests",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]