AndrewJSchofield commented on code in PR #16488:
URL: https://github.com/apache/kafka/pull/16488#discussion_r1666522093


##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams, 
String groupId, String m
             this.partitionMaxBytes = partitionMaxBytes;
         }
     }
+
+    static class ShareGroupMetrics {
+        /**
+         * share-acknowledgement (share-acknowledgement-rate and 
share-acknowledgement-count) - The total number of offsets acknowledged for 
share groups (requests to be ack).
+         * record-acknowledgement (record-acknowledgement-rate and 
record-acknowledgement-count) - The number of records acknowledged per 
acknowledgement type.
+         * partition-load-time (partition-load-time-avg and 
partition-load-time-max) - The time taken to load the share partitions.
+         */
+
+        public static final String METRICS_GROUP_NAME = "share-group-metrics";
+
+        public static final String SHARE_ACK_SENSOR = 
"share-acknowledgement-sensor";
+        public static final String SHARE_ACK_RATE = 
"share-acknowledgement-rate";
+        public static final String SHARE_ACK_COUNT = 
"share-acknowledgement-count";
+
+        public static final String RECORD_ACK_SENSOR_PREFIX = 
"record-acknowledgement";
+        public static final String RECORD_ACK_RATE = 
"record-acknowledgement-rate";
+        public static final String RECORD_ACK_COUNT = 
"record-acknowledgement-count";
+        public static final String ACK_TYPE = "ack-type";
+
+        public static final String PARTITION_LOAD_TIME_SENSOR = 
"partition-load-time-sensor";
+        public static final String PARTITION_LOAD_TIME_AVG = 
"partition-load-time-avg";
+        public static final String PARTITION_LOAD_TIME_MAX = 
"partition-load-time-max";
+
+        public static final Map<Byte, String> RECORD_ACKS_MAP = new 
HashMap<>();
+        
+        private final Time time;
+        private final Sensor shareAcknowledgementSensor;
+        private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
+        private final Sensor partitionLoadTimeSensor;
+
+        static {
+            RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
+            RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
+            RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
+        }
+
+        public ShareGroupMetrics(Metrics metrics, Time time) {
+            this.time = time;
+
+            shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
+            shareAcknowledgementSensor.add(new Meter(
+                metrics.metricName(
+                    SHARE_ACK_RATE,
+                    METRICS_GROUP_NAME,
+                    "The rate of number of acknowledge requests."),
+                metrics.metricName(
+                    SHARE_ACK_COUNT,
+                    METRICS_GROUP_NAME,
+                    "The number of acknowledge requests.")));
+
+            for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
+                recordAcksSensorMap.put(entry.getKey(), 
metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, 
entry.getValue())));
+                recordAcksSensorMap.get(entry.getKey())
+                    .add(new Meter(
+                        metrics.metricName(
+                            RECORD_ACK_RATE,
+                            METRICS_GROUP_NAME,
+                            "The rate of number of records acknowledged per 
acknowledgement type.",
+                            ACK_TYPE, entry.getValue()),
+                        metrics.metricName(
+                            RECORD_ACK_COUNT,
+                            METRICS_GROUP_NAME,
+                            "The number of records acknowledged per 
acknowledgement type.",
+                            ACK_TYPE, entry.getValue())));
+            }
+
+            partitionLoadTimeSensor = 
metrics.sensor(PARTITION_LOAD_TIME_SENSOR);
+            partitionLoadTimeSensor.add(metrics.metricName(
+                    PARTITION_LOAD_TIME_AVG,
+                    METRICS_GROUP_NAME,
+                    "The average time in milliseconds to load the share 
partitions."),
+                new Avg());
+            partitionLoadTimeSensor.add(metrics.metricName(
+                    PARTITION_LOAD_TIME_MAX,
+                    METRICS_GROUP_NAME,
+                    "The maximum time in milliseconds to load the share 
partitions."),
+                new Max());
+        }
+
+        void shareAcknowledgement() {
+            shareAcknowledgementSensor.record();
+        }
+
+        void recordAcknowledgement(byte ackType) {
+            if (recordAcksSensorMap.containsKey(ackType)) {
+                recordAcksSensorMap.get(ackType).record();
+            } else {

Review Comment:
   The ackType can be zero when a gap is being acknowledged. For example, when 
a batch of transactional records is received by the client, it responds with 0 
for the offsets which correspond to the transactional control records. This 
will cause an error to be logged by the broker for each of the records.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams, 
String groupId, String m
             this.partitionMaxBytes = partitionMaxBytes;
         }
     }
+
+    static class ShareGroupMetrics {
+        /**
+         * share-acknowledgement (share-acknowledgement-rate and 
share-acknowledgement-count) - The total number of offsets acknowledged for 
share groups (requests to be ack).
+         * record-acknowledgement (record-acknowledgement-rate and 
record-acknowledgement-count) - The number of records acknowledged per 
acknowledgement type.
+         * partition-load-time (partition-load-time-avg and 
partition-load-time-max) - The time taken to load the share partitions.
+         */
+
+        public static final String METRICS_GROUP_NAME = "share-group-metrics";
+
+        public static final String SHARE_ACK_SENSOR = 
"share-acknowledgement-sensor";
+        public static final String SHARE_ACK_RATE = 
"share-acknowledgement-rate";
+        public static final String SHARE_ACK_COUNT = 
"share-acknowledgement-count";
+
+        public static final String RECORD_ACK_SENSOR_PREFIX = 
"record-acknowledgement";
+        public static final String RECORD_ACK_RATE = 
"record-acknowledgement-rate";
+        public static final String RECORD_ACK_COUNT = 
"record-acknowledgement-count";
+        public static final String ACK_TYPE = "ack-type";
+
+        public static final String PARTITION_LOAD_TIME_SENSOR = 
"partition-load-time-sensor";
+        public static final String PARTITION_LOAD_TIME_AVG = 
"partition-load-time-avg";
+        public static final String PARTITION_LOAD_TIME_MAX = 
"partition-load-time-max";
+
+        public static final Map<Byte, String> RECORD_ACKS_MAP = new 
HashMap<>();
+        
+        private final Time time;
+        private final Sensor shareAcknowledgementSensor;
+        private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
+        private final Sensor partitionLoadTimeSensor;
+
+        static {
+            RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
+            RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
+            RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
+        }
+
+        public ShareGroupMetrics(Metrics metrics, Time time) {
+            this.time = time;
+
+            shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
+            shareAcknowledgementSensor.add(new Meter(
+                metrics.metricName(
+                    SHARE_ACK_RATE,
+                    METRICS_GROUP_NAME,
+                    "The rate of number of acknowledge requests."),

Review Comment:
   "Rate of number" sounds odd. "Rate of acknowledge requests" or "acknowledge 
request rate" would be better.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams, 
String groupId, String m
             this.partitionMaxBytes = partitionMaxBytes;
         }
     }
+
+    static class ShareGroupMetrics {
+        /**
+         * share-acknowledgement (share-acknowledgement-rate and 
share-acknowledgement-count) - The total number of offsets acknowledged for 
share groups (requests to be ack).
+         * record-acknowledgement (record-acknowledgement-rate and 
record-acknowledgement-count) - The number of records acknowledged per 
acknowledgement type.
+         * partition-load-time (partition-load-time-avg and 
partition-load-time-max) - The time taken to load the share partitions.
+         */
+
+        public static final String METRICS_GROUP_NAME = "share-group-metrics";
+
+        public static final String SHARE_ACK_SENSOR = 
"share-acknowledgement-sensor";
+        public static final String SHARE_ACK_RATE = 
"share-acknowledgement-rate";
+        public static final String SHARE_ACK_COUNT = 
"share-acknowledgement-count";
+
+        public static final String RECORD_ACK_SENSOR_PREFIX = 
"record-acknowledgement";
+        public static final String RECORD_ACK_RATE = 
"record-acknowledgement-rate";
+        public static final String RECORD_ACK_COUNT = 
"record-acknowledgement-count";
+        public static final String ACK_TYPE = "ack-type";
+
+        public static final String PARTITION_LOAD_TIME_SENSOR = 
"partition-load-time-sensor";
+        public static final String PARTITION_LOAD_TIME_AVG = 
"partition-load-time-avg";
+        public static final String PARTITION_LOAD_TIME_MAX = 
"partition-load-time-max";
+
+        public static final Map<Byte, String> RECORD_ACKS_MAP = new 
HashMap<>();
+        
+        private final Time time;
+        private final Sensor shareAcknowledgementSensor;
+        private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
+        private final Sensor partitionLoadTimeSensor;
+
+        static {
+            RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
+            RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
+            RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
+        }
+
+        public ShareGroupMetrics(Metrics metrics, Time time) {
+            this.time = time;
+
+            shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
+            shareAcknowledgementSensor.add(new Meter(
+                metrics.metricName(
+                    SHARE_ACK_RATE,
+                    METRICS_GROUP_NAME,
+                    "The rate of number of acknowledge requests."),
+                metrics.metricName(
+                    SHARE_ACK_COUNT,
+                    METRICS_GROUP_NAME,
+                    "The number of acknowledge requests.")));
+
+            for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) {
+                recordAcksSensorMap.put(entry.getKey(), 
metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, 
entry.getValue())));
+                recordAcksSensorMap.get(entry.getKey())
+                    .add(new Meter(
+                        metrics.metricName(
+                            RECORD_ACK_RATE,
+                            METRICS_GROUP_NAME,
+                            "The rate of number of records acknowledged per 
acknowledgement type.",

Review Comment:
   "Rate of number" sounds odd. "Rate of records acknowledged per 
acknowledgement type" sounds better I think.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams, 
String groupId, String m
             this.partitionMaxBytes = partitionMaxBytes;
         }
     }
+
+    static class ShareGroupMetrics {
+        /**
+         * share-acknowledgement (share-acknowledgement-rate and 
share-acknowledgement-count) - The total number of offsets acknowledged for 
share groups (requests to be ack).
+         * record-acknowledgement (record-acknowledgement-rate and 
record-acknowledgement-count) - The number of records acknowledged per 
acknowledgement type.
+         * partition-load-time (partition-load-time-avg and 
partition-load-time-max) - The time taken to load the share partitions.
+         */
+
+        public static final String METRICS_GROUP_NAME = "share-group-metrics";
+
+        public static final String SHARE_ACK_SENSOR = 
"share-acknowledgement-sensor";
+        public static final String SHARE_ACK_RATE = 
"share-acknowledgement-rate";
+        public static final String SHARE_ACK_COUNT = 
"share-acknowledgement-count";
+
+        public static final String RECORD_ACK_SENSOR_PREFIX = 
"record-acknowledgement";
+        public static final String RECORD_ACK_RATE = 
"record-acknowledgement-rate";
+        public static final String RECORD_ACK_COUNT = 
"record-acknowledgement-count";
+        public static final String ACK_TYPE = "ack-type";
+
+        public static final String PARTITION_LOAD_TIME_SENSOR = 
"partition-load-time-sensor";
+        public static final String PARTITION_LOAD_TIME_AVG = 
"partition-load-time-avg";
+        public static final String PARTITION_LOAD_TIME_MAX = 
"partition-load-time-max";
+
+        public static final Map<Byte, String> RECORD_ACKS_MAP = new 
HashMap<>();
+        
+        private final Time time;
+        private final Sensor shareAcknowledgementSensor;
+        private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>();
+        private final Sensor partitionLoadTimeSensor;
+
+        static {
+            RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString());
+            RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString());
+            RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString());
+        }
+
+        public ShareGroupMetrics(Metrics metrics, Time time) {
+            this.time = time;
+
+            shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR);
+            shareAcknowledgementSensor.add(new Meter(
+                metrics.metricName(
+                    SHARE_ACK_RATE,
+                    METRICS_GROUP_NAME,
+                    "The rate of number of acknowledge requests."),

Review Comment:
   Probably "Rate of acknowledge requests" is best given my next comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to