justramesh2000 commented on code in PR #21090:
URL: https://github.com/apache/kafka/pull/21090#discussion_r2595802337


##########
storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaManager.java:
##########
@@ -78,13 +85,32 @@ public void updateQuota(Quota newQuota) {
     }
 
     public long getThrottleTimeMs() {
+        return getThrottleTimeMsWithTopic(null);
+    }
+
+    public long getThrottleTimeMsWithTopic(String topic) {
         Sensor sensorInstance = sensor();
         try {
             sensorInstance.checkQuotas();
         } catch (QuotaViolationException qve) {
+            long throttleTimeMs = QuotaUtils.throttleTime(qve, 
time.milliseconds());
+
             LOGGER.debug("Quota violated for sensor ({}), metric: ({}), 
metric-value: ({}), bound: ({})",
                 sensorInstance.name(), qve.metric().metricName(), qve.value(), 
qve.bound());
-            return QuotaUtils.throttleTime(qve, time.milliseconds());
+
+            // Record throttling metrics if topic provided
+            if (brokerTopicStats != null && topic != null) {

Review Comment:
   IMO more defense could be added here, in case this additional code leads to 
exception (today or in future) the throrrleTime will never be returned to 
caller. Worth adding another try catch to ensure this is safe and in worst case 
returns  ?  QuotaUtils.throttleTime(qve, time.milliseconds()); 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to