Repository: kafka Updated Branches: refs/heads/trunk afaaea809 -> c8c6ab632
KAFKA-5735; KIP-190: Handle client-ids consistently Developed with edoardocomar Author: Mickael Maison <mickael.mai...@gmail.com> Reviewers: Edoardo Comar <eco...@uk.ibm.com>, Rajini Sivaram <rajinisiva...@googlemail.com> Closes #3906 from mimaison/KAFKA-5735 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c8c6ab63 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c8c6ab63 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c8c6ab63 Branch: refs/heads/trunk Commit: c8c6ab63248639b167350642efdfc4341fa3ce37 Parents: afaaea8 Author: Mickael Maison <mickael.mai...@gmail.com> Authored: Thu Sep 21 21:16:57 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Thu Sep 21 21:16:57 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/admin/KafkaAdminClient.java | 16 ++-- .../kafka/clients/consumer/KafkaConsumer.java | 8 +- .../kafka/clients/producer/KafkaProducer.java | 15 ++-- .../apache/kafka/common/metrics/Sanitizer.java | 61 +++++++++++++ .../org/apache/kafka/common/utils/Utils.java | 4 +- .../kafka/common/metrics/SanitizerTest.java | 35 ++++++++ .../src/main/scala/kafka/admin/AdminUtils.scala | 6 +- .../main/scala/kafka/admin/ConfigCommand.scala | 8 +- .../scala/kafka/network/RequestChannel.scala | 3 +- .../scala/kafka/server/ClientQuotaManager.scala | 94 ++++++++------------ .../server/ClientRequestQuotaManager.scala | 4 +- .../main/scala/kafka/server/ConfigHandler.scala | 24 ++--- .../integration/kafka/api/BaseQuotaTest.scala | 12 +-- .../kafka/api/ClientIdQuotaTest.scala | 9 +- .../kafka/api/UserClientIdQuotaTest.scala | 13 +-- .../integration/kafka/api/UserQuotaTest.scala | 3 +- .../unit/kafka/admin/ConfigCommandTest.scala | 37 ++++---- .../kafka/server/ClientQuotaManagerTest.scala | 23 +++-- docs/upgrade.html | 1 + 19 files changed, 235 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 27c4b18..2447909 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -55,6 +55,7 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sanitizer; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; @@ -289,6 +290,7 @@ public class KafkaAdminClient extends AdminClient { NetworkClient networkClient = null; Time time = Time.SYSTEM; String clientId = generateClientId(config); + String sanitizedClientId = Sanitizer.sanitize(clientId); ChannelBuilder channelBuilder = null; Selector selector = null; ApiVersions apiVersions = new ApiVersions(); @@ -301,7 +303,7 @@ public class KafkaAdminClient extends AdminClient { config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true); List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); - Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); + Map<String, String> metricTags = Collections.singletonMap("client-id", sanitizedClientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -326,7 +328,7 @@ public class KafkaAdminClient extends AdminClient { true, apiVersions, logContext); - return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient, + return new KafkaAdminClient(config, clientId, sanitizedClientId, time, metadata, metrics, networkClient, timeoutProcessorFactory, logContext); } catch (Throwable exc) { closeQuietly(metrics, "Metrics"); @@ -343,7 +345,7 @@ public class KafkaAdminClient extends AdminClient { try { metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time); - return new KafkaAdminClient(config, clientId, time, metadata, metrics, client, null, + return new KafkaAdminClient(config, clientId, Sanitizer.sanitize(clientId), time, metadata, metrics, client, null, createLogContext(clientId)); } catch (Throwable exc) { closeQuietly(metrics, "Metrics"); @@ -355,7 +357,7 @@ public class KafkaAdminClient extends AdminClient { return new LogContext("[AdminClient clientId=" + clientId + "] "); } - private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, Metadata metadata, + private KafkaAdminClient(AdminClientConfig config, String clientId, String sanitizedClientId, Time time, Metadata metadata, Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory, LogContext logContext) { this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG); @@ -369,13 +371,13 @@ public class KafkaAdminClient extends AdminClient { this.metrics = metrics; this.client = client; this.runnable = new AdminClientRunnable(); - String threadName = "kafka-admin-client-thread" + (clientId.length() > 0 ? " | " + clientId : ""); + String threadName = "kafka-admin-client-thread | " + clientId; this.thread = new KafkaThread(threadName, runnable, true); this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ? new TimeoutProcessorFactory() : timeoutProcessorFactory; this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); + AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId); log.debug("Kafka admin client initialized"); thread.start(); } @@ -416,7 +418,7 @@ public class KafkaAdminClient extends AdminClient { // Wait for the thread to be joined. thread.join(); - AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId); + AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId)); log.debug("Kafka admin client closed."); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3ea0394..f4af39c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sanitizer; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; @@ -645,6 +646,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { if (clientId.isEmpty()) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; + String sanitizedClientId = Sanitizer.sanitize(this.clientId); String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); @@ -658,7 +660,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); this.time = Time.SYSTEM; - Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId); + Map<String, String> metricsTags = Collections.singletonMap("client-id", sanitizedClientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -769,7 +771,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { isolationLevel); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); + AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId); log.debug("Kafka consumer initialized"); } catch (Throwable t) { @@ -1724,7 +1726,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { ClientUtils.closeQuietly(client, "consumer network client", firstException); ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException); ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException); - AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId); + AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId)); log.debug("Kafka consumer has been closed"); Throwable exception = firstException.get(); if (exception != null && !swallowException) { http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 66760e2..8a1c2b7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -51,6 +51,7 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; +import org.apache.kafka.common.metrics.Sanitizer; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; @@ -233,7 +234,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; - private String clientId; + private final String clientId; // Visible for testing final Metrics metrics; private final Partitioner partitioner; @@ -312,9 +313,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> { Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = Time.SYSTEM; - clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); + String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); + this.clientId = clientId; + String sanitizedClientId = Sanitizer.sanitize(clientId); String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ? (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null; @@ -326,7 +329,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { log = logContext.logger(KafkaProducer.class); log.trace("Starting the Kafka producer"); - Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); + Map<String, String> metricTags = Collections.singletonMap("client-id", sanitizedClientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -420,12 +423,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> { config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions); - String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 0 ? " | " + clientId : ""); + String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); this.errors = this.metrics.sensor("errors"); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, clientId); + AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 @@ -1073,7 +1076,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException); ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException); - AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId); + AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId)); log.debug("Kafka producer has been closed"); if (firstException.get() != null && !swallowException) throw new KafkaException("Failed to close kafka producer", firstException.get()); http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java new file mode 100644 index 0000000..b98a426 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.KafkaException; + +/** + * Utility class for sanitizing/desanitizing user principal and client-ids + * to a safe value for use in MetricName and as Zookeeper node name + */ +public class Sanitizer { + + public static String sanitize(String name) { + String encoded = ""; + try { + encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name()); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < encoded.length(); i++) { + char c = encoded.charAt(i); + if (c == '*') { // Metric ObjectName treats * as pattern + builder.append("%2A"); + } else if (c == '+') { // Space URL-encoded as +, replace with percent encoding + builder.append("%20"); + } else { + builder.append(c); + } + } + return builder.toString(); + } catch (UnsupportedEncodingException e) { + throw new KafkaException(e); + } + } + + public static String desanitize(String name) { + try { + return URLDecoder.decode(name, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + throw new KafkaException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 1137045..82b12c3 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -322,12 +322,12 @@ public class Utils { Class<?>[] argTypes = new Class<?>[params.length / 2]; Object[] args = new Object[params.length / 2]; try { - Class c = Class.forName(className, true, Utils.getContextOrKafkaClassLoader()); + Class<?> c = Class.forName(className, true, Utils.getContextOrKafkaClassLoader()); for (int i = 0; i < params.length / 2; i++) { argTypes[i] = (Class<?>) params[2 * i]; args[i] = params[(2 * i) + 1]; } - Constructor<T> constructor = c.getConstructor(argTypes); + Constructor<T> constructor = (Constructor<T>) c.getConstructor(argTypes); return constructor.newInstance(args); } catch (NoSuchMethodException e) { throw new ClassNotFoundException(String.format("Failed to find " + http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java new file mode 100644 index 0000000..d66bda1 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.UnsupportedEncodingException; + +import org.junit.Test; + +public class SanitizerTest { + + @Test + public void testSanitize() throws UnsupportedEncodingException { + String principal = "CN=Some characters !@#$%&*()_-+=';:,/~"; + String sanitizedPrincipal = Sanitizer.sanitize(principal); + assertTrue(sanitizedPrincipal.replace('%', '_').matches("[a-zA-Z0-9\\._\\-]+")); + assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index b6c9afe..8c873f7 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -542,14 +542,14 @@ object AdminUtils extends Logging with AdminUtilities { * and <user> configs are not specified. * * @param zkUtils Zookeeper utilities used to write the config to ZK - * @param clientId: The clientId for which configs are being changed + * @param sanitizedClientId: The sanitized clientId for which configs are being changed * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or * existing configs need to be deleted, it should be done prior to invoking this API * */ - def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) { + def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, configs: Properties) { DynamicConfig.Client.validate(configs) - changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs) + changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/admin/ConfigCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 366667b..bd193c7 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -28,6 +28,7 @@ import kafka.utils.Implicits._ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram._ import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.metrics.Sanitizer import scala.collection._ import scala.collection.JavaConverters._ @@ -184,7 +185,7 @@ object ConfigCommand extends Config { sanitizedName match { case Some(ConfigEntityName.Default) => "default " + typeName case Some(n) => - val desanitized = if (entityType == ConfigType.User) QuotaId.desanitize(n) else n + val desanitized = if (entityType == ConfigType.User || entityType == ConfigType.Client) Sanitizer.desanitize(n) else n s"$typeName '$desanitized'" case None => entityType } @@ -265,10 +266,7 @@ object ConfigCommand extends Config { ConfigEntityName.Default else { entityType match { - case ConfigType.User => QuotaId.sanitize(name) - case ConfigType.Client => - validateChars("Client-id", name) - name + case ConfigType.User | ConfigType.Client => Sanitizer.sanitize(name) case _ => throw new IllegalArgumentException("Invalid entity type " + entityType) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 1128fd3..e71a06b 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -27,6 +27,7 @@ import kafka.network.RequestChannel.{ShutdownRequest, BaseRequest} import kafka.server.QuotaId import kafka.utils.{Logging, NotNothing} import org.apache.kafka.common.memory.MemoryPool +import org.apache.kafka.common.metrics.Sanitizer import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests._ @@ -45,7 +46,7 @@ object RequestChannel extends Logging { case object ShutdownRequest extends BaseRequest case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) { - val sanitizedUser = QuotaId.sanitize(principal.getName) + val sanitizedUser = Sanitizer.sanitize(principal.getName) } class Request(val processor: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/server/ClientQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index e1d5249..c84fbcb 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -75,36 +75,9 @@ object QuotaTypes { val UserClientIdQuotaEnabled = 4 } -object QuotaId { +case class QuotaId(sanitizedUser: Option[String], sanitizedClientId: Option[String]) - /** - * Sanitizes user principal to a safe value for use in MetricName - * and as Zookeeper node name - */ - def sanitize(user: String): String = { - val encoded = URLEncoder.encode(user, StandardCharsets.UTF_8.name) - val builder = new StringBuilder - for (i <- 0 until encoded.length) { - encoded.charAt(i) match { - case '*' => builder.append("%2A") // Metric ObjectName treats * as pattern - case '+' => builder.append("%20") // Space URL-encoded as +, replace with percent encoding - case c => builder.append(c) - } - } - builder.toString - } - - /** - * Decodes sanitized user principal - */ - def desanitize(sanitizedUser: String): String = { - URLDecoder.decode(sanitizedUser, StandardCharsets.UTF_8.name) - } -} - -case class QuotaId(sanitizedUser: Option[String], clientId: Option[String]) - -case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String, quota: Quota) +case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, sanitizedClientId: String, quota: Quota) /** * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics @@ -216,7 +189,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case _: QuotaViolationException => // Compute the delay val clientQuotaEntity = clientSensors.quotaEntity - val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId)) + val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.sanitizedClientId)) throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue @@ -242,17 +215,17 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * and the associated quota override or default quota. * */ - private def quotaEntity(sanitizedUser: String, clientId: String) : QuotaEntity = { + private def quotaEntity(sanitizedUser: String, sanitizedClientId: String) : QuotaEntity = { quotaTypesEnabled match { case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled => - val quotaId = QuotaId(None, Some(clientId)) + val quotaId = QuotaId(None, Some(sanitizedClientId)) var quota = overriddenQuota.get(quotaId) if (quota == null) { quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId) if (quota == null) quota = staticConfigClientIdQuota } - QuotaEntity(quotaId, "", clientId, quota) + QuotaEntity(quotaId, "", sanitizedClientId, quota) case QuotaTypes.UserQuotaEnabled => val quotaId = QuotaId(Some(sanitizedUser), None) var quota = overriddenQuota.get(quotaId) @@ -263,12 +236,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } QuotaEntity(quotaId, sanitizedUser, "", quota) case QuotaTypes.UserClientIdQuotaEnabled => - val quotaId = QuotaId(Some(sanitizedUser), Some(clientId)) + val quotaId = QuotaId(Some(sanitizedUser), Some(sanitizedClientId)) var quota = overriddenQuota.get(quotaId) if (quota == null) { quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default))) if (quota == null) { - quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId))) + quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(sanitizedClientId))) if (quota == null) { quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId) if (quota == null) @@ -276,17 +249,17 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } } - QuotaEntity(quotaId, sanitizedUser, clientId, quota) + QuotaEntity(quotaId, sanitizedUser, sanitizedClientId, quota) case _ => - quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId) + quotaEntityWithMultipleQuotaLevels(sanitizedUser, sanitizedClientId) } } - private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, clientId: String) : QuotaEntity = { - val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId)) + private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, sanitizerClientId: String) : QuotaEntity = { + val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(sanitizerClientId)) val userQuotaId = QuotaId(Some(sanitizedUser), None) - val clientQuotaId = QuotaId(None, Some(clientId)) + val clientQuotaId = QuotaId(None, Some(sanitizerClientId)) var quotaId = userClientQuotaId var quotaConfigId = userClientQuotaId // 1) /config/users/<user>/clients/<client-id> @@ -306,7 +279,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, if (quota == null) { // 4) /config/users/<default>/clients/<client-id> quotaId = userClientQuotaId - quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(clientId)) + quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(sanitizerClientId)) quota = overriddenQuota.get(quotaConfigId) if (quota == null) { @@ -324,7 +297,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, if (quota == null) { // 7) /config/clients/<client-id> quotaId = clientQuotaId - quotaConfigId = QuotaId(None, Some(clientId)) + quotaConfigId = QuotaId(None, Some(sanitizerClientId)) quota = overriddenQuota.get(quotaConfigId) if (quota == null) { @@ -346,15 +319,17 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser - val quotaClientId = if (quotaId == userQuotaId) "" else clientId + val quotaClientId = if (quotaId == userQuotaId) "" else sanitizerClientId QuotaEntity(quotaId, quotaUser, quotaClientId, quota) } /** * Returns the quota for the client with the specified (non-encoded) user principal and client-id. + * + * Note: this method is expensive, it is meant to be used by tests only */ def quota(user: String, clientId: String) = { - quotaEntity(QuotaId.sanitize(user), clientId).quota + quotaEntity(Sanitizer.sanitize(user), Sanitizer.sanitize(clientId)).quota } /* @@ -387,14 +362,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor */ def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = { - val clientQuotaEntity = quotaEntity(sanitizedUser, clientId) + val sanitizedClientId = Sanitizer.sanitize(clientId) + val clientQuotaEntity = quotaEntity(sanitizedUser, sanitizedClientId) // Names of the sensors to access ClientSensors( clientQuotaEntity, sensorAccessor.getOrCreate( getQuotaSensorName(clientQuotaEntity.quotaId), ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, - clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId), + clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.sanitizedClientId), Some(getQuotaMetricConfig(clientQuotaEntity.quota)), new Rate ), @@ -407,9 +383,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, ) } - private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("") + private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.sanitizedClientId.getOrElse("") - private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("") + private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.sanitizedClientId.getOrElse("") protected def getQuotaMetricConfig(quota: Quota): MetricConfig = { new MetricConfig() @@ -432,10 +408,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults * for any of these levels. * @param sanitizedUser user to override if quota applies to <user> or <user, client-id> - * @param clientId client to override if quota applies to <client-id> or <user, client-id> + * @param sanitizedClientId client to override if quota applies to <client-id> or <user, client-id> * @param quota custom quota to apply or None if quota override is being removed */ - def updateQuota(sanitizedUser: Option[String], clientId: Option[String], quota: Option[Quota]) { + def updateQuota(sanitizedUser: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]) { /* * Acquire the write lock to apply changes in the quota objects. * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists). @@ -445,13 +421,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, */ lock.writeLock().lock() try { - val quotaId = QuotaId(sanitizedUser, clientId) + val quotaId = QuotaId(sanitizedUser, sanitizedClientId) val userInfo = sanitizedUser match { case Some(ConfigEntityName.Default) => "default user " case Some(user) => "user " + user + " " case None => "" } - val clientIdInfo = clientId match { + val clientIdInfo = sanitizedClientId match { case Some(ConfigEntityName.Default) => "default client-id" case Some(id) => "client-id " + id case None => "" @@ -460,7 +436,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case Some(newQuota) => logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}") overriddenQuota.put(quotaId, newQuota) - (sanitizedUser, clientId) match { + (sanitizedUser, sanitizedClientId) match { case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled @@ -471,21 +447,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, overriddenQuota.remove(quotaId) } - val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), clientId.getOrElse("")) + val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), sanitizedClientId.getOrElse("")) val allMetrics = metrics.metrics() // If multiple-levels of quotas are defined or if this is a default quota update, traverse metrics // to find all affected values. Otherwise, update just the single matching one. val singleUpdate = quotaTypesEnabled match { case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled => - !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !clientId.filter(_ == ConfigEntityName.Default).isDefined + !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !sanitizedClientId.filter(_ == ConfigEntityName.Default).isDefined case _ => false } if (singleUpdate) { // Change the underlying metric config if the sensor has been created val metric = allMetrics.get(quotaMetricName) if (metric != null) { - val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse("")) + val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), sanitizedClientId.getOrElse("")) val newQuota = metricConfigEntity.quota logger.info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig") metric.config(getQuotaMetricConfig(newQuota)) @@ -509,11 +485,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } - protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = { + protected def clientRateMetricName(sanitizedUser: String, sanitizedClientId: String): MetricName = { metrics.metricName("byte-rate", quotaType.toString, "Tracking byte-rate per user/client-id", "user", sanitizedUser, - "client-id", clientId) + "client-id", sanitizedClientId) } private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = { @@ -521,7 +497,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, quotaType.toString, "Tracking average throttle-time per user/client-id", "user", quotaEntity.sanitizedUser, - "client-id", quotaEntity.clientId) + "client-id", quotaEntity.sanitizedClientId) } def shutdown() = { http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala index f454483..d2114dc 100644 --- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala @@ -64,11 +64,11 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs) } - override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = { + override protected def clientRateMetricName(sanitizedUser: String, sanitizedClientId: String): MetricName = { metrics.metricName("request-time", QuotaType.Request.toString, "Tracking request-time per user/client-id", "user", sanitizedUser, - "client-id", clientId) + "client-id", sanitizedClientId) } private def exemptMetricName: MetricName = { http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/server/ConfigHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 79ffde8..6f85801 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -29,7 +29,7 @@ import kafka.utils.Implicits._ import kafka.utils.Logging import org.apache.kafka.common.config.ConfigDef.Validator import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.metrics.Quota +import org.apache.kafka.common.metrics.{Quota, Sanitizer} import org.apache.kafka.common.metrics.Quota._ import scala.collection.JavaConverters._ @@ -117,25 +117,25 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC */ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) { - def updateQuotaConfig(sanitizedUser: Option[String], clientId: Option[String], config: Properties) { + def updateQuotaConfig(sanitizedUser: Option[String], sanitizedClientId: Option[String], config: Properties) { val producerQuota = if (config.containsKey(DynamicConfig.Client.ProducerByteRateOverrideProp)) Some(new Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong, true)) else None - quotaManagers.produce.updateQuota(sanitizedUser, clientId, producerQuota) + quotaManagers.produce.updateQuota(sanitizedUser, sanitizedClientId, producerQuota) val consumerQuota = if (config.containsKey(DynamicConfig.Client.ConsumerByteRateOverrideProp)) Some(new Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong, true)) else None - quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota) + quotaManagers.fetch.updateQuota(sanitizedUser, sanitizedClientId, consumerQuota) val requestQuota = if (config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp)) Some(new Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble, true)) else None - quotaManagers.request.updateQuota(sanitizedUser, clientId, requestQuota) + quotaManagers.request.updateQuota(sanitizedUser, sanitizedClientId, requestQuota) } } @@ -145,14 +145,14 @@ class QuotaConfigHandler(private val quotaManagers: QuotaManagers) { */ class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends QuotaConfigHandler(quotaManagers) with ConfigHandler { - def processConfigChanges(clientId: String, clientConfig: Properties) { - updateQuotaConfig(None, Some(clientId), clientConfig) + def processConfigChanges(sanitizedClientId: String, clientConfig: Properties) { + updateQuotaConfig(None, Some(sanitizedClientId), clientConfig) } } /** * The UserConfigHandler will process <user> and <user, client-id> quota changes in ZK. - * The callback provides the node name containing sanitized user principal, client-id if this is + * The callback provides the node name containing sanitized user principal, sanitized client-id if this is * a <user, client-id> update and the full properties set read from ZK. */ class UserConfigHandler(private val quotaManagers: QuotaManagers, val credentialProvider: CredentialProvider) extends QuotaConfigHandler(quotaManagers) with ConfigHandler { @@ -163,10 +163,10 @@ class UserConfigHandler(private val quotaManagers: QuotaManagers, val credential if (entities.length != 1 && entities.length != 3) throw new IllegalArgumentException("Invalid quota entity path: " + quotaEntityPath) val sanitizedUser = entities(0) - val clientId = if (entities.length == 3) Some(entities(2)) else None - updateQuotaConfig(Some(sanitizedUser), clientId, config) - if (!clientId.isDefined && sanitizedUser != ConfigEntityName.Default) - credentialProvider.updateCredentials(QuotaId.desanitize(sanitizedUser), config) + val sanitizedClientId = if (entities.length == 3) Some(entities(2)) else None + updateQuotaConfig(Some(sanitizedUser), sanitizedClientId, config) + if (!sanitizedClientId.isDefined && sanitizedUser != ConfigEntityName.Default) + credentialProvider.updateCredentials(Sanitizer.desanitize(sanitizedUser), config) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index c69c9a4..e8967d1 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.{MetricName, TopicPartition} -import org.apache.kafka.common.metrics.{KafkaMetric, Quota} +import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sanitizer} import org.junit.Assert._ import org.junit.{Before, Test} @@ -39,8 +39,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { val consumerCount = 1 private val producerBufferSize = 300000 - protected val producerClientId = "QuotasTestProducer-1" - protected val consumerClientId = "QuotasTestConsumer-1" + protected def producerClientId = "QuotasTestProducer-1" + protected def consumerClientId = "QuotasTestConsumer-1" this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2") @@ -210,7 +210,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) { val tags = new HashMap[String, String] - tags.put("client-id", producerClientId) + tags.put("client-id", Sanitizer.sanitize(producerClientId)) val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags)) val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics", "", tags)) @@ -220,7 +220,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime: Option[Double] = None) { val tags = new HashMap[String, String] - tags.put("client-id", consumerClientId) + tags.put("client-id", Sanitizer.sanitize(consumerClientId)) val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags)) val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", tags)) @@ -234,7 +234,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { quotaType.toString, "Tracking throttle-time per user/client-id", "user", quotaId.sanitizedUser.getOrElse(""), - "client-id", quotaId.clientId.getOrElse("")) + "client-id", quotaId.sanitizedClientId.getOrElse("")) } def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = { http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala index f8594e1..f5a2cf5 100644 --- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala @@ -18,14 +18,17 @@ import java.util.Properties import kafka.admin.AdminUtils import kafka.server.{DynamicConfig, KafkaConfig, QuotaId} +import org.apache.kafka.common.metrics.Sanitizer import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Before class ClientIdQuotaTest extends BaseQuotaTest { override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName - override val producerQuotaId = QuotaId(None, Some(producerClientId)) - override val consumerQuotaId = QuotaId(None, Some(consumerClientId)) + override def producerClientId = "QuotasTestProducer-!@#$%^&*()" + override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()" + override val producerQuotaId = QuotaId(None, Some(Sanitizer.sanitize(producerClientId))) + override val consumerQuotaId = QuotaId(None, Some(Sanitizer.sanitize(consumerClientId))) @Before override def setUp() { @@ -51,6 +54,6 @@ class ClientIdQuotaTest extends BaseQuotaTest { } private def updateQuotaOverride(clientId: String, properties: Properties) { - AdminUtils.changeClientIdConfig(zkUtils, clientId, properties) + AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(clientId), properties) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala index 333c851..cd9437c 100644 --- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala @@ -23,6 +23,7 @@ import kafka.server._ import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Before +import org.apache.kafka.common.metrics.Sanitizer class UserClientIdQuotaTest extends BaseQuotaTest { @@ -30,8 +31,10 @@ class UserClientIdQuotaTest extends BaseQuotaTest { override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) override val userPrincipal = "O=A client,CN=localhost" - override def producerQuotaId = QuotaId(Some(QuotaId.sanitize(userPrincipal)), Some(producerClientId)) - override def consumerQuotaId = QuotaId(Some(QuotaId.sanitize(userPrincipal)), Some(consumerClientId)) + override def producerClientId = "QuotasTestProducer-!@#$%^&*()" + override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()" + override def producerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(Sanitizer.sanitize(producerClientId))) + override def consumerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(Sanitizer.sanitize(consumerClientId))) @Before override def setUp() { @@ -58,11 +61,11 @@ class UserClientIdQuotaTest extends BaseQuotaTest { override def removeQuotaOverrides() { val emptyProps = new Properties - AdminUtils.changeUserOrUserClientIdConfig(zkUtils, QuotaId.sanitize(userPrincipal) + "/clients/" + producerClientId, emptyProps) - AdminUtils.changeUserOrUserClientIdConfig(zkUtils, QuotaId.sanitize(userPrincipal) + "/clients/" + consumerClientId, emptyProps) + AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(producerClientId), emptyProps) + AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(consumerClientId), emptyProps) } private def updateQuotaOverride(userPrincipal: String, clientId: String, properties: Properties) { - AdminUtils.changeUserOrUserClientIdConfig(zkUtils, QuotaId.sanitize(userPrincipal) + "/clients/" + clientId, properties) + AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), properties) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala index 4ad6265..330c1e0 100644 --- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala @@ -22,6 +22,7 @@ import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId} import kafka.utils.JaasTestUtils import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.{After, Before} +import org.apache.kafka.common.metrics.Sanitizer class UserQuotaTest extends BaseQuotaTest with SaslSetup { @@ -66,6 +67,6 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup { } private def updateQuotaOverride(properties: Properties) { - AdminUtils.changeUserOrUserClientIdConfig(zkUtils, QuotaId.sanitize(userPrincipal), properties) + AdminUtils.changeUserOrUserClientIdConfig(zkUtils, Sanitizer.sanitize(userPrincipal), properties) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index af77c67..bb17d74 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -30,6 +30,7 @@ import org.junit.Assert._ import org.junit.Test import scala.collection.mutable import scala.collection.JavaConverters._ +import org.apache.kafka.common.metrics.Sanitizer class ConfigCommandTest extends ZooKeeperTestHarness with Logging { @Test @@ -326,9 +327,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { // <user> quota val principal = "CN=ConfigCommandTest,O=Apache,L=<default>" - val sanitizedPrincipal = QuotaId.sanitize(principal) + val sanitizedPrincipal = Sanitizer.sanitize(principal) assertEquals(-1, sanitizedPrincipal.indexOf('=')) - assertEquals(principal, QuotaId.desanitize(sanitizedPrincipal)) + assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal)) for (opts <- Seq(describeOpts, alterOpts)) { checkEntity("users", Some(principal), sanitizedPrincipal, opts) checkEntity("users", Some(""), ConfigEntityName.Default, opts) @@ -362,38 +363,36 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { assertEquals(expectedEntityName, entity.fullSanitizedName) } - // <default> is a valid user principal (can be handled with URL-encoding), - // but an invalid client-id (cannot be handled since client-ids are not encoded) - checkEntity("users", QuotaId.sanitize("<default>"), + // <default> is a valid user principal and client-id (can be handled with URL-encoding), + checkEntity("users", Sanitizer.sanitize("<default>"), "--entity-type", "users", "--entity-name", "<default>", "--alter", "--add-config", "a=b,c=d") - try { - checkEntity("clients", QuotaId.sanitize("<default>"), - "--entity-type", "clients", "--entity-name", "<default>", - "--alter", "--add-config", "a=b,c=d") - fail("Did not fail with invalid client-id") - } catch { - case _: InvalidConfigException => // expected - } + checkEntity("clients", Sanitizer.sanitize("<default>"), + "--entity-type", "clients", "--entity-name", "<default>", + "--alter", "--add-config", "a=b,c=d") - checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1", + + checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1", "--entity-type", "users", "--entity-name", "CN=user1", "--entity-type", "clients", "--entity-name", "client1", "--alter", "--add-config", "a=b,c=d") - checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1", + checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1", "--entity-name", "CN=user1", "--entity-type", "users", "--entity-name", "client1", "--entity-type", "clients", "--alter", "--add-config", "a=b,c=d") - checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1", + checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1", "--entity-type", "clients", "--entity-name", "client1", "--entity-type", "users", "--entity-name", "CN=user1", "--alter", "--add-config", "a=b,c=d") - checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1", + checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1", "--entity-name", "client1", "--entity-type", "clients", "--entity-name", "CN=user1", "--entity-type", "users", "--alter", "--add-config", "a=b,c=d") - checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients", + checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients", "--entity-type", "clients", "--entity-name", "CN=user1", "--entity-type", "users", "--describe") checkEntity("users", "/clients", "--entity-type", "clients", "--entity-type", "users", "--describe") + checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/" + Sanitizer.sanitize("client1?@%"), + "--entity-name", "client1?@%", "--entity-type", "clients", "--entity-name", "CN=user1", "--entity-type", "users", + "--alter", "--add-config", "a=b,c=d") } @Test @@ -413,7 +412,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { val clientId = "a-client" val principal = "CN=ConfigCommandTest.testQuotaDescribeEntities , O=Apache, L=<default>" - val sanitizedPrincipal = QuotaId.sanitize(principal) + val sanitizedPrincipal = Sanitizer.sanitize(principal) val userClient = sanitizedPrincipal + "/clients/" + clientId var opts = Array("--entity-type", "clients", "--entity-name", clientId) http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index f4a55ab..7c8e5bd 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -18,7 +18,7 @@ package kafka.server import java.util.Collections -import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota} +import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota, Sanitizer} import org.apache.kafka.common.utils.MockTime import org.junit.Assert.{assertEquals, assertTrue} import org.junit.{Before, Test} @@ -374,12 +374,21 @@ class ClientQuotaManagerTest { } @Test - def testQuotaUserSanitize() { - val principal = "CN=Some characters !@#$%&*()_-+=';:,/~" - val sanitizedPrincipal = QuotaId.sanitize(principal) - // Apart from % used in percent-encoding all characters of sanitized principal must be characters allowed in client-id - ConfigCommand.validateChars("sanitized-principal", sanitizedPrincipal.replace('%', '_')) - assertEquals(principal, QuotaId.desanitize(sanitizedPrincipal)) + def testSanitizeClientId() { + val metrics = newMetrics + val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time) + val clientId = "client@#$%" + try { + clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, callback) + + val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + Sanitizer.sanitize(clientId)) + assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) + + val byteRateSensor = metrics.getSensor("Produce-:" + Sanitizer.sanitize(clientId)) + assertTrue("Byte rate sensor should exist", byteRateSensor != null) + } finally { + clientMetrics.shutdown() + } } def newMetrics: Metrics = { http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index ed0f9cf..a98bdea 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -65,6 +65,7 @@ <code>OffsetFetchRequest</code>, <code>OffsetRequest</code>, <code>ProducerRequest</code>, and <code>TopicMetadataRequest</code>. This was only intended for use on the broker, but it is no longer in use and the implementations have not been maintained. A stub implementation has been retained for binary compatibility.</li> + <li>The Java clients and tools now accept any string as a client-id.</li> </ul> <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New Protocol Versions</a></h5>