Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-07-22 Thread via GitHub


gaobosince1987 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2243384804

   @abhijeetk88 trying the quota feature in test cluster now from trunk, 
primarily to save CPU when multiple consumer pull at the same time. Is there a 
document related with how to set quota for remote store? Latest trunk build 
does not seem to throttle remote fetch for me. Let me know if there is extra 
setup needed for this feature to work.
   
   I have a 6 broker test cluster, 1 test topic with local.retention.ms=1 so 
all read goes to remote. However consumer are still able to fetch 300MB per 
broker from remote. With the configuration, I would expect close to zero remote 
fetch.
[root@kafka-test-02-zookeeper-0 kafka]#./bin/kafka-configs.sh 
--bootstrap-server kafka-test-02-kafka-bootstrap:9092 --entity-type brokers 
--entity-default --alter --add-config 
'remote.log.manager.fetch.max.bytes.per.second=1,remote.log.manager.copy.max.bytes.per.second=1428800'
   Completed updating default config for brokers in the cluster.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


abhijeetk88 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2141192632

   Thanks @chia7712 @jolshan . Apologies for the miss.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


junrao commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140919323

   @jolshan : Thanks for pointing this out. Sorry that I didn't look at the 
test results carefully before merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


chia7712 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140866041

   @jolshan I file https://github.com/apache/kafka/pull/16146 to fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


jolshan commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2140804438

   Can we look at testCopyQuotaManagerConfig() – 
kafka.log.remote.RemoteLogManagerTest? It seems like it is failing pretty 
consistently. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-30 Thread via GitHub


junrao merged PR #15625:
URL: https://github.com/apache/kafka/pull/15625


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-29 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1619973284


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+"This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be copied per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = 
"remote.log.manager.copy.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote copy quota management. " +
+"The default value is 61, which means there are 60 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 
61;
+
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = 
"remote.log.manager.copy.quota.window.size.seconds";
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each 
sample for remote copy quota management. " +
+"The default value is 1 second.";
+public static final int 
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1;
+
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.fetch.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of 
bytes that can be fetched from remote storage to local storage per second. " +
+"This is a global limit for all the partitions that are being 
fetched from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be fetched per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP 
= "remote.log.manager.fetch.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote fetch quota management. " 
+
+"The default value is 11, which means there are 10 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM 
= 11;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-29 Thread via GitHub


junrao commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1619445831


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+"This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be copied per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = 
"remote.log.manager.copy.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote copy quota management. " +
+"The default value is 61, which means there are 60 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 
61;
+
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = 
"remote.log.manager.copy.quota.window.size.seconds";
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each 
sample for remote copy quota management. " +
+"The default value is 1 second.";
+public static final int 
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1;
+
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.fetch.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of 
bytes that can be fetched from remote storage to local storage per second. " +
+"This is a global limit for all the partitions that are being 
fetched from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be fetched per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP 
= "remote.log.manager.fetch.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote fetch quota management. " 
+
+"The default value is 11, which means there are 10 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM 
= 11;

Review Comment:
   If there is no good reach, perhaps it's better to use the same default 
window number for copy too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-29 Thread via GitHub


showuon commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2137244557

   @junrao , since this PR blocks other follow-up PRs and v3.8.0 release date 
is approaching, I'd like to merge it tomorrow if you don't have any other 
comments. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-28 Thread via GitHub


abhijeetk88 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2136524934

   > @abhijeetk88 , there is a merge conflict. Please help resolve it. Thanks.
   
   done
   
   
   > Do we plan to change the default copy quota window num samples from 61 to 
11?
   
   Waiting for a confirmation from Jun.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-28 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1618186590


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {
+public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+
+private final long quotaBytesPerSecond;
+private final int numQuotaSamples;
+private final int quotaWindowSizeSeconds;
+
+public long quotaBytesPerSecond() {
+return quotaBytesPerSecond;
+}
+
+public int numQuotaSamples() {
+return numQuotaSamples;
+}
+
+public int quotaWindowSizeSeconds() {
+return quotaWindowSizeSeconds;
+}
+
+/**
+ * Configuration settings for quota management
+ *
+ * @param quotaBytesPerSecondThe quota in bytes per second
+ * @param numQuotaSamplesThe number of samples to retain in memory
+ * @param quotaWindowSizeSeconds The time span of each sample
+ */
+public RLMQuotaManagerConfig(long quotaBytesPerSecond, int 
numQuotaSamples, int quotaWindowSizeSeconds) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-28 Thread via GitHub


showuon commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2136406740

   @abhijeetk88 , there is a merge conflict. Please help resolve it. Thanks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-28 Thread via GitHub


kamalcph commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1617015938


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {
+public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+
+private final long quotaBytesPerSecond;
+private final int numQuotaSamples;
+private final int quotaWindowSizeSeconds;
+
+public long quotaBytesPerSecond() {
+return quotaBytesPerSecond;
+}
+
+public int numQuotaSamples() {
+return numQuotaSamples;
+}
+
+public int quotaWindowSizeSeconds() {
+return quotaWindowSizeSeconds;
+}
+
+/**
+ * Configuration settings for quota management
+ *
+ * @param quotaBytesPerSecondThe quota in bytes per second
+ * @param numQuotaSamplesThe number of samples to retain in memory
+ * @param quotaWindowSizeSeconds The time span of each sample
+ */
+public RLMQuotaManagerConfig(long quotaBytesPerSecond, int 
numQuotaSamples, int quotaWindowSizeSeconds) {

Review Comment:
   nit: 
   
   place constructor before the getter methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-26 Thread via GitHub


abhijeetk88 commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2132600646

   @showuon @junrao I have addressed your comments. Please take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-24 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1613033249


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+"This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be copied per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP = 
"remote.log.manager.copy.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote copy quota management. " +
+"The default value is 61, which means there are 60 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM = 
61;
+
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP = 
"remote.log.manager.copy.quota.window.size.seconds";
+public static final String 
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each 
sample for remote copy quota management. " +
+"The default value is 1 second.";
+public static final int 
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1;
+
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.fetch.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of 
bytes that can be fetched from remote storage to local storage per second. " +
+"This is a global limit for all the partitions that are being 
fetched from remote storage to local storage. " +
+"The default value is Long.MAX_VALUE, which means there is no 
limit on the number of bytes that can be fetched per second.";
+public static final Long 
DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP 
= "remote.log.manager.fetch.quota.window.num";
+public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC = 
"The number of samples to retain in memory for remote fetch quota management. " 
+
+"The default value is 11, which means there are 10 whole windows + 
1 current window.";
+public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM 
= 11;

Review Comment:
   For fetches, the default window size was chosen to match the default window 
size used for other quotas, such as ClientQuota and ReplicationQuota.
   
   Using an 11-second (10 whole + 1 current) window size for copies, similar to 
other quotas, does seem to be a better option. Consider this:
   
   The broker-level quota for copying may be set to 250 MBps. The RLM task 
records the log segment size with the quota manager when uploading a log 
segment. The typical log segment size is 500 MB, meaning only one log segment 
can be uploaded every 2 seconds without breaching the quota. If uploads occur 
faster, the quota will be exceeded. Therefore, as long as the window size is 
greater than 2 seconds, either a 10-second or 60-second (whole) window should 
work.
   
   However, a shorter window (10 seconds) has advantages. It tracks data 
uploads more precisely and prevents large spikes in data upload more 
effectively. For example:
   
   With a 10-second window:
   
   Buckets: b1, b2, ..., b10
   In the 10th second, 5 segments can be uploaded without breaching the average 
quota (5 * 500 MB / 10 seconds = 250 MBps), though the spike will be 2.5 GB in 
that second.
   With a 60-second window:
   
   Buckets: b1, b2, ..., b60
   In the 60th second, 30 segments can be uploaded without breaching the 
average quota (30 * 500 MB / 60 seconds = 250 MBps), but the spike will be 15 
GB in that second.
   Given the need to avoid quota breaches, a 10-second window is preferable to 
a 60-second window.
   
   Let me know if it makes sense. I can change the default copy window to be 
the same as the default fetch window.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use

Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-23 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1612931258


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+KafkaMetric metric = allMetrics.get(quotaMetricName);
+if (metric != null) {
+LOGGER.warn("Sensor for quota-id {} already exists. Setting 
quota to {} in MetricConfig", quotaMetricName, newQuota);
+metric.config(getQuotaMetricConfig(newQuota));
+}
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+public boolean isQuotaExceeded() {
+Sensor sensorInstance = sensor();
+try {
+sensorInstance.checkQuotas();
+} catch (QuotaViolationException qve) {
+LOGGER.debug("Quota violated for sensor ({}), metric: ({}), 
metric-value: ({}), bound: ({})",
+sensorInstance.name(), qve.metric().metricName(), qve.value(), 
qve.bound());
+return true;
+}
+return false;
+}
+
+public void record(double value) {
+sensor().record(value, time.milliseconds(), false);

Review Comment:
   In KIP-956, we do not utilize the throttle time provided by the quota 
manager to regulate fetches and copies. For fetch operations, we initially 
verify quota availability before initiating the retrieval of remote data. If 
the quota is unavailable, our priority is to serve partitions requiring local 
data, rather than throttling the client. Therefore, we focus on fulfilling data 
requests for other partitions in the queue, eliminating the need for throttle 
time in fetch operations.
   
   Similarly, when a RLM Task attempts to copy a segment, it first checks if 
the write quota is available. If the quota is not available, the thread waits 
until the quota becomes available. As a result, we do not require throttle time 
for copies either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service,

Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-23 Thread via GitHub


showuon commented on PR #15625:
URL: https://github.com/apache/kafka/pull/15625#issuecomment-2126707276

   @abhijeetk88 , do we have any update on this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-12 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1597682003


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -181,7 +189,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
 Time time,
 Function> 
fetchLog,
 BiConsumer 
updateRemoteLogStartOffset,
-BrokerTopicStats brokerTopicStats) throws 
IOException {
+BrokerTopicStats brokerTopicStats,
+Metrics metrics) throws IOException {

Review Comment:
   Sure, will add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-12 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1597667627


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {
+public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+
+private final long quotaBytesPerSecond;
+private final int numQuotaSamples;
+private final int quotaWindowSizeSeconds;
+
+public long getQuotaBytesPerSecond() {

Review Comment:
   Sure, will change these.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-12 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1597667613


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {

Review Comment:
   Created https://issues.apache.org/jira/browse/KAFKA-16706



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-12 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1597666914


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+KafkaMetric metric = allMetrics.get(quotaMetricName);
+if (metric != null) {
+LOGGER.warn("Sensor for quota-id {} already exists. Setting 
quota to {} in MetricConfig", quotaMetricName, newQuota);

Review Comment:
   On second thought, INFO should be the right level, because quota update is a 
significant change in the application state and will decide how fast 
copies/fetches from remote storage can happen. Also, quota updates are 
infrequent, hence it will not cause excessive logging.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-12 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1597664809


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+"This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +

Review Comment:
   My bad. Thanks for catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-03 Thread via GitHub


junrao commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1589688863


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -181,7 +189,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
 Time time,
 Function> 
fetchLog,
 BiConsumer 
updateRemoteLogStartOffset,
-BrokerTopicStats brokerTopicStats) throws 
IOException {
+BrokerTopicStats brokerTopicStats,
+Metrics metrics) throws IOException {

Review Comment:
   Could we add the javadoc for the new param?



##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {
+public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+
+private final long quotaBytesPerSecond;
+private final int numQuotaSamples;
+private final int quotaWindowSizeSeconds;
+
+public long getQuotaBytesPerSecond() {

Review Comment:
   For consistency, we don't typically use getters. So this can just be 
quotaBytesPerSecond. Ditto below.



##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+K

Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-05-03 Thread via GitHub


showuon commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1588939433


##
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
 "less than or equal to `log.retention.bytes` value.";
 public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
 
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP = 
"remote.log.manager.copy.max.bytes.per.second";
+public static final String 
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes 
that can be copied from local storage to remote storage per second. " +
+"This is a global limit for all the partitions that are being 
copied from remote storage to local storage. " +

Review Comment:
   `This is a global limit for all the partitions that are being copied from 
remote storage to local storage.` <-- is it right? Copied from local storage to 
remote storage?



##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+KafkaMetric metric = allMetrics.get(quotaMetricName);
+if (metric != null) {
+LOGGER.warn("Sensor for quota-id {} already exists. Setting 
quota to {} in MetricConfig", quotaMetricName, newQuota);

Review Comment:
   I'd like to know why we set WARN logs here. It looks to me if we want to 
update quota dynamically, it is expected the metric is already existed, right? 
If so, I don't think this is unexpected. So maybe INFO or DEBUG level, WDYT?



##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *ht

Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-25 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1580557688


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+allMetrics.forEach((metricName, metric) -> {
+if (metricName.name().equals(quotaMetricName.name()) && 
metricName.group().equals(quotaMetricName.group())) {
+Map metricTags = metricName.tags();
+LOGGER.info("Sensor for quota-id {} already exists. 
Setting quota to {} in MetricConfig", metricTags, newQuota);
+metric.config(getQuotaMetricConfig(newQuota));
+}
+});
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+public boolean isQuotaExceeded() {
+Sensor sensorInstance = sensor();
+try {
+sensorInstance.checkQuotas();
+} catch (QuotaViolationException qve) {
+LOGGER.debug("Quota violated for sensor ({}), metric: ({}), 
metric-value: ({}), bound: ({})",

Review Comment:
   quota type is already being captured in the sensor instance name. So we can 
skip logging it separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-25 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1580549399


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-25 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1580545776


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+allMetrics.forEach((metricName, metric) -> {
+if (metricName.name().equals(quotaMetricName.name()) && 
metricName.group().equals(quotaMetricName.group())) {
+Map metricTags = metricName.tags();
+LOGGER.info("Sensor for quota-id {} already exists. 
Setting quota to {} in MetricConfig", metricTags, newQuota);
+metric.config(getQuotaMetricConfig(newQuota));

Review Comment:
   Ack will change this. Also, metricTags is empty. Will fix the log statement 
as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-25 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1580515079


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {

Review Comment:
   Will add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-25 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1580499863


##
core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RLMQuotaManagerTest {
+private final MockTime time = new MockTime();
+private final Metrics metrics = new Metrics(new MetricConfig(), 
Collections.emptyList(), time);
+private static final QuotaType QUOTA_TYPE = QuotaType.RLMFetch$.MODULE$;
+private static final String DESCRIPTION = "Tracking byte rate";
+
+@Test
+public void testQuotaExceeded() {
+RLMQuotaManager quotaManager = new RLMQuotaManager(
+new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, 
DESCRIPTION, time);
+
+assertFalse(quotaManager.isQuotaExceeded());
+quotaManager.record(500);
+// Move clock by 1 sec, quota is violated
+moveClock(1);
+assertTrue(quotaManager.isQuotaExceeded());
+
+// Move clock by another 8 secs, quota is still violated for the window
+moveClock(8);
+assertTrue(quotaManager.isQuotaExceeded());
+
+// Move clock by 1 sec, quota is no more violated
+moveClock(1);
+assertFalse(quotaManager.isQuotaExceeded());
+}
+
+@Test
+public void testQuotaUpdate() {
+RLMQuotaManager quotaManager = new RLMQuotaManager(
+new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, 
DESCRIPTION, time);
+
+assertFalse(quotaManager.isQuotaExceeded());
+quotaManager.record(51);
+assertTrue(quotaManager.isQuotaExceeded());
+
+Map fetchQuotaMetrics = 
metrics.metrics().entrySet().stream()
+.filter(entry -> entry.getKey().name().equals("byte-rate") && 
entry.getKey().group().equals(QUOTA_TYPE.toString()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+Map nonQuotaMetrics = 
metrics.metrics().entrySet().stream()
+.filter(entry -> !entry.getKey().name().equals("byte-rate") || 
!entry.getKey().group().equals(QUOTA_TYPE.toString()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+assertEquals(1, fetchQuotaMetrics.size());
+assertFalse(nonQuotaMetrics.isEmpty());
+
+Map configForQuotaMetricsBeforeUpdate = 
extractMetricConfig(fetchQuotaMetrics);
+Map configForNonQuotaMetricsBeforeUpdate = 
extractMetricConfig(nonQuotaMetrics);
+
+// Update quota to 60, quota is no more violated
+Quota quota60Bytes = new Quota(60, true);
+quotaManager.updateQuota(quota60Bytes);
+assertFalse(quotaManager.isQuotaExceeded());
+
+// Verify quota metrics were updated
+Map configForQuotaMetricsAfterFirstUpdate = 
extractMetricConfig(fetchQuotaMetrics);
+assertNotEquals(configForQuotaMetricsBeforeUpdate, 
configForQuotaMetricsAfterFirstUpdate);
+fetchQuotaMetrics.values().forEach(metric -> 
assertEquals(metric.config().quota(), quota60Bytes));

Review Comment:
   Yes, thanks for pointing out.



##
core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.

Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-24 Thread via GitHub


kamalcph commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1577552070


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {

Review Comment:
   do we need to add `toString()`? 



##
core/src/test/java/kafka/log/remote/quota/RLMQuotaManagerTest.java:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RLMQuotaManagerTest {
+private final MockTime time = new MockTime();
+private final Metrics metrics = new Metrics(new MetricConfig(), 
Collections.emptyList(), time);
+private static final QuotaType QUOTA_TYPE = QuotaType.RLMFetch$.MODULE$;
+private static final String DESCRIPTION = "Tracking byte rate";
+
+@Test
+public void testQuotaExceeded() {
+RLMQuotaManager quotaManager = new RLMQuotaManager(
+new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, 
DESCRIPTION, time);
+
+assertFalse(quotaManager.isQuotaExceeded());
+quotaManager.record(500);
+// Move clock by 1 sec, quota is violated
+moveClock(1);
+assertTrue(quotaManager.isQuotaExceeded());
+
+// Move clock by another 8 secs, quota is still violated for the window
+moveClock(8);
+assertTrue(quotaManager.isQuotaExceeded());
+
+// Move clock by 1 sec, quota is no more violated
+moveClock(1);
+assertFalse(quotaManager.isQuotaExceeded());
+}
+
+@Test
+public void testQuotaUpdate() {
+RLMQuotaManager quotaManager = new RLMQuotaManager(
+new RLMQuotaManagerConfig(50, 11, 1), metrics, QUOTA_TYPE, 
DESCRIPTION, time);
+
+assertFalse(quotaManager.isQuotaExceeded());
+quotaManager.record(51);
+assertTrue(quotaManager.isQuotaExceeded());
+
+Map fetchQuotaMetrics = 
metrics.metrics().entrySet().stream()
+.filter(entry -> entry.getKey().name().equals("byte-rate") && 
entry.getKey().group().equals(QUOTA_TYPE.toString()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+Map nonQuotaMetrics = 
metrics.metrics().entrySet().stream()
+.filter(entry -> !entry.getKey().name().equals("byte-rate") || 
!entry.getKey().group().equals(QUOTA_TYPE.toString()))
+.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+assertEquals(1, fetchQuotaMetrics.size());
+assertFalse(nonQuotaMetrics.isEmpty());
+
+Map configForQuotaMetricsBeforeUpdate = 
extractMetricConfig(fetchQuo

Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-23 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1576007574


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##


Review Comment:
   Yes, the integration of the quota manager will come in the follow-up PRs. I 
have mentioned it in the description of the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-23 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1576005665


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+allMetrics.forEach((metricName, metric) -> {
+if (metricName.name().equals(quotaMetricName.name()) && 
metricName.group().equals(quotaMetricName.group())) {
+Map metricTags = metricName.tags();
+LOGGER.info("Sensor for quota-id {} already exists. 
Setting quota to {} in MetricConfig", metricTags, newQuota);

Review Comment:
   Makes sense, will change this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-19 Thread via GitHub


funky-eyes commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1572073595


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##


Review Comment:
   I have a question, in fact, what this PR does is to provide a standard 
configuration and generate corresponding rate limiters and related monitoring 
indicators, right? Then it needs to be used in the corresponding 
RemoteStorageManager, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-15 Thread via GitHub


HenryCaiHaiying commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1566566286


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+allMetrics.forEach((metricName, metric) -> {
+if (metricName.name().equals(quotaMetricName.name()) && 
metricName.group().equals(quotaMetricName.group())) {
+Map metricTags = metricName.tags();
+LOGGER.info("Sensor for quota-id {} already exists. 
Setting quota to {} in MetricConfig", metricTags, newQuota);

Review Comment:
   LOGGER.warn ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org