apoorvmittal10 commented on code in PR #14621: URL: https://github.com/apache/kafka/pull/14621#discussion_r1378825692
########## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidRequestException; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Client metric configuration related parameters and the supporting methods like validation, etc. are + * defined in this class. + * <p> + * { + * <ul> + * <li> name: Name supplied by CLI during the creation of the client metric subscription. + * <li> metrics: List of metric prefixes + * <li> intervalMs: A positive integer value >=0 tells the client that how often a client can push the metrics + * <li> match: List of client matching patterns, that are used by broker to match the client instance + * with the subscription. + * </ul> + * } + * <p> + * At present, CLI can pass the following parameters in request to add/delete/update the client metrics + * subscription: + * <ul> + * <li> "name" is a unique name for the subscription. This is used to identify the subscription in + * the broker. Ex: "METRICS-SUB" + * <li> "metrics" value should be comma separated metrics list. A prefix match on the requested metrics + * is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed. + * A list containing just an empty string means all metrics subscribed. + * Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency" + * + * <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the interval at which the client + * should push the metrics to the broker. + * + * <li> "match" is a comma separated list of client match patterns, in case if there is no matching + * pattern specified then broker considers that as all match which means the associated metrics + * applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*" + * which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc. + * </ul> + * For more information please look at kip-714: + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration + */ +public class ClientMetricsConfigs { + + public static final String SUBSCRIPTION_METRICS = "metrics"; + public static final String PUSH_INTERVAL_MS = "interval.ms"; + public static final String CLIENT_MATCH_PATTERN = "match"; + + public static final String CLIENT_INSTANCE_ID = "client_instance_id"; + public static final String CLIENT_ID = "client_id"; + public static final String CLIENT_SOFTWARE_NAME = "client_software_name"; + public static final String CLIENT_SOFTWARE_VERSION = "client_software_version"; + public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; + public static final String CLIENT_SOURCE_PORT = "client_source_port"; + + public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + private static final int MIN_INTERVAL_MS = 100; // 100ms + private static final int MAX_INTERVAL_MS = 3600000; // 1 hour + + private static final Set<String> ALLOWED_MATCH_PARAMS = new HashSet<>(Arrays.asList( + CLIENT_INSTANCE_ID, + CLIENT_ID, + CLIENT_SOFTWARE_NAME, + CLIENT_SOFTWARE_VERSION, + CLIENT_SOURCE_ADDRESS, + CLIENT_SOURCE_PORT + )); + + private static final ConfigDef CONFIG = new ConfigDef() + .define(SUBSCRIPTION_METRICS, Type.LIST, Importance.MEDIUM, "Subscription metrics list") + .define(PUSH_INTERVAL_MS, Type.INT, Importance.MEDIUM, "Push interval in milliseconds") + .define(CLIENT_MATCH_PATTERN, Type.LIST, Importance.MEDIUM, "Client match pattern list"); + + public static ConfigDef configDef() { + return CONFIG; + } + + public static Set<String> names() { + return CONFIG.names(); + } + + public static void validate(String subscriptionName, Properties properties) { + if (subscriptionName == null || subscriptionName.isEmpty()) { + throw new InvalidRequestException("Subscription name can't be empty"); + } + + validateProperties(properties); + } + + private static void validateProperties(Properties properties) { + // Make sure that all the properties are valid + properties.forEach((key, value) -> { + if (!names().contains(key)) { + throw new InvalidRequestException("Unknown client metrics configuration: " + key); + } + }); + + // Make sure that push interval is between 100ms and 1 hour. + if (properties.containsKey(PUSH_INTERVAL_MS)) { + int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS)); + if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { + throw new InvalidRequestException("Invalid value for interval.ms, interval must be between 100 and 3600000 (1 hour)"); Review Comment: Added in the message. ########## core/src/main/java/kafka/metrics/ClientMetricsConfigs.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.metrics; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.InvalidRequestException; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +/** + * Client metric configuration related parameters and the supporting methods like validation, etc. are + * defined in this class. + * <p> + * { + * <ul> + * <li> name: Name supplied by CLI during the creation of the client metric subscription. + * <li> metrics: List of metric prefixes + * <li> intervalMs: A positive integer value >=0 tells the client that how often a client can push the metrics + * <li> match: List of client matching patterns, that are used by broker to match the client instance + * with the subscription. + * </ul> + * } + * <p> + * At present, CLI can pass the following parameters in request to add/delete/update the client metrics + * subscription: + * <ul> + * <li> "name" is a unique name for the subscription. This is used to identify the subscription in + * the broker. Ex: "METRICS-SUB" + * <li> "metrics" value should be comma separated metrics list. A prefix match on the requested metrics + * is performed in clients to determine subscribed metrics. An empty list means no metrics subscribed. + * A list containing just an empty string means all metrics subscribed. + * Ex: "org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency" + * + * <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the interval at which the client + * should push the metrics to the broker. + * + * <li> "match" is a comma separated list of client match patterns, in case if there is no matching + * pattern specified then broker considers that as all match which means the associated metrics + * applies to all the clients. Ex: "client_software_name = Java, client_software_version = 11.1.*" + * which means all Java clients with any sub versions of 11.1 will be matched i.e. 11.1.1, 11.1.2 etc. + * </ul> + * For more information please look at kip-714: + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration + */ +public class ClientMetricsConfigs { + + public static final String SUBSCRIPTION_METRICS = "metrics"; + public static final String PUSH_INTERVAL_MS = "interval.ms"; + public static final String CLIENT_MATCH_PATTERN = "match"; + + public static final String CLIENT_INSTANCE_ID = "client_instance_id"; + public static final String CLIENT_ID = "client_id"; + public static final String CLIENT_SOFTWARE_NAME = "client_software_name"; + public static final String CLIENT_SOFTWARE_VERSION = "client_software_version"; + public static final String CLIENT_SOURCE_ADDRESS = "client_source_address"; + public static final String CLIENT_SOURCE_PORT = "client_source_port"; + + public static final int DEFAULT_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + private static final int MIN_INTERVAL_MS = 100; // 100ms + private static final int MAX_INTERVAL_MS = 3600000; // 1 hour + + private static final Set<String> ALLOWED_MATCH_PARAMS = new HashSet<>(Arrays.asList( + CLIENT_INSTANCE_ID, + CLIENT_ID, + CLIENT_SOFTWARE_NAME, + CLIENT_SOFTWARE_VERSION, + CLIENT_SOURCE_ADDRESS, + CLIENT_SOURCE_PORT + )); + + private static final ConfigDef CONFIG = new ConfigDef() + .define(SUBSCRIPTION_METRICS, Type.LIST, Importance.MEDIUM, "Subscription metrics list") + .define(PUSH_INTERVAL_MS, Type.INT, Importance.MEDIUM, "Push interval in milliseconds") + .define(CLIENT_MATCH_PATTERN, Type.LIST, Importance.MEDIUM, "Client match pattern list"); + + public static ConfigDef configDef() { + return CONFIG; + } + + public static Set<String> names() { + return CONFIG.names(); + } + + public static void validate(String subscriptionName, Properties properties) { + if (subscriptionName == null || subscriptionName.isEmpty()) { + throw new InvalidRequestException("Subscription name can't be empty"); + } + + validateProperties(properties); + } + + private static void validateProperties(Properties properties) { + // Make sure that all the properties are valid + properties.forEach((key, value) -> { + if (!names().contains(key)) { + throw new InvalidRequestException("Unknown client metrics configuration: " + key); + } + }); + + // Make sure that push interval is between 100ms and 1 hour. + if (properties.containsKey(PUSH_INTERVAL_MS)) { + int pushIntervalMs = Integer.parseInt(properties.getProperty(PUSH_INTERVAL_MS)); + if (pushIntervalMs < MIN_INTERVAL_MS || pushIntervalMs > MAX_INTERVAL_MS) { + throw new InvalidRequestException("Invalid value for interval.ms, interval must be between 100 and 3600000 (1 hour)"); + } + } + + // Make sure that client match patterns are valid by parsing them. + if (properties.containsKey(CLIENT_MATCH_PATTERN)) { + List<String> patterns = Arrays.asList(properties.getProperty(CLIENT_MATCH_PATTERN).split(",")); + // Parse the client matching patterns to validate if the patterns are valid. + parseMatchingPatterns(patterns); + } + } + + /** + * Parses the client matching patterns and builds a map with entries that has + * (PatternName, PatternValue) as the entries. + * Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3) + * <p> + * NOTES: + * Client match pattern splits the input into two parts separated by first occurrence of the character '=' + * + * @param patterns List of client matching pattern strings + * @return map of client matching pattern entries + */ + public static Map<String, String> parseMatchingPatterns(List<String> patterns) { + Map<String, String> patternsMap = new HashMap<>(); + if (patterns != null) { + patterns.forEach(pattern -> { + String[] nameValuePair = pattern.split("="); + if (nameValuePair.length == 2 && isValidParam(nameValuePair[0].trim()) && Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org