Repository: kafka
Updated Branches:
  refs/heads/trunk afaaea809 -> c8c6ab632


KAFKA-5735; KIP-190: Handle client-ids consistently

Developed with edoardocomar

Author: Mickael Maison <mickael.mai...@gmail.com>

Reviewers: Edoardo Comar <eco...@uk.ibm.com>, Rajini Sivaram 
<rajinisiva...@googlemail.com>

Closes #3906 from mimaison/KAFKA-5735


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c8c6ab63
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c8c6ab63
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c8c6ab63

Branch: refs/heads/trunk
Commit: c8c6ab63248639b167350642efdfc4341fa3ce37
Parents: afaaea8
Author: Mickael Maison <mickael.mai...@gmail.com>
Authored: Thu Sep 21 21:16:57 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu Sep 21 21:16:57 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/admin/KafkaAdminClient.java   | 16 ++--
 .../kafka/clients/consumer/KafkaConsumer.java   |  8 +-
 .../kafka/clients/producer/KafkaProducer.java   | 15 ++--
 .../apache/kafka/common/metrics/Sanitizer.java  | 61 +++++++++++++
 .../org/apache/kafka/common/utils/Utils.java    |  4 +-
 .../kafka/common/metrics/SanitizerTest.java     | 35 ++++++++
 .../src/main/scala/kafka/admin/AdminUtils.scala |  6 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  |  8 +-
 .../scala/kafka/network/RequestChannel.scala    |  3 +-
 .../scala/kafka/server/ClientQuotaManager.scala | 94 ++++++++------------
 .../server/ClientRequestQuotaManager.scala      |  4 +-
 .../main/scala/kafka/server/ConfigHandler.scala | 24 ++---
 .../integration/kafka/api/BaseQuotaTest.scala   | 12 +--
 .../kafka/api/ClientIdQuotaTest.scala           |  9 +-
 .../kafka/api/UserClientIdQuotaTest.scala       | 13 +--
 .../integration/kafka/api/UserQuotaTest.scala   |  3 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    | 37 ++++----
 .../kafka/server/ClientQuotaManagerTest.scala   | 23 +++--
 docs/upgrade.html                               |  1 +
 19 files changed, 235 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 27c4b18..2447909 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -55,6 +55,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sanitizer;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
@@ -289,6 +290,7 @@ public class KafkaAdminClient extends AdminClient {
         NetworkClient networkClient = null;
         Time time = Time.SYSTEM;
         String clientId = generateClientId(config);
+        String sanitizedClientId = Sanitizer.sanitize(clientId);
         ChannelBuilder channelBuilder = null;
         Selector selector = null;
         ApiVersions apiVersions = new ApiVersions();
@@ -301,7 +303,7 @@ public class KafkaAdminClient extends AdminClient {
                     config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), 
true);
             List<MetricsReporter> reporters = 
config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
-            Map<String, String> metricTags = 
Collections.singletonMap("client-id", clientId);
+            Map<String, String> metricTags = 
Collections.singletonMap("client-id", sanitizedClientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
                 
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
                 
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -326,7 +328,7 @@ public class KafkaAdminClient extends AdminClient {
                 true,
                 apiVersions,
                 logContext);
-            return new KafkaAdminClient(config, clientId, time, metadata, 
metrics, networkClient,
+            return new KafkaAdminClient(config, clientId, sanitizedClientId, 
time, metadata, metrics, networkClient,
                 timeoutProcessorFactory, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
@@ -343,7 +345,7 @@ public class KafkaAdminClient extends AdminClient {
 
         try {
             metrics = new Metrics(new MetricConfig(), new 
LinkedList<MetricsReporter>(), time);
-            return new KafkaAdminClient(config, clientId, time, metadata, 
metrics, client, null,
+            return new KafkaAdminClient(config, clientId, 
Sanitizer.sanitize(clientId), time, metadata, metrics, client, null,
                     createLogContext(clientId));
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
@@ -355,7 +357,7 @@ public class KafkaAdminClient extends AdminClient {
         return new LogContext("[AdminClient clientId=" + clientId + "] ");
     }
 
-    private KafkaAdminClient(AdminClientConfig config, String clientId, Time 
time, Metadata metadata,
+    private KafkaAdminClient(AdminClientConfig config, String clientId, String 
sanitizedClientId, Time time, Metadata metadata,
                      Metrics metrics, KafkaClient client, 
TimeoutProcessorFactory timeoutProcessorFactory,
                      LogContext logContext) {
         this.defaultTimeoutMs = 
config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -369,13 +371,13 @@ public class KafkaAdminClient extends AdminClient {
         this.metrics = metrics;
         this.client = client;
         this.runnable = new AdminClientRunnable();
-        String threadName = "kafka-admin-client-thread" + (clientId.length() > 
0 ? " | " + clientId : "");
+        String threadName = "kafka-admin-client-thread | " + clientId;
         this.thread = new KafkaThread(threadName, runnable, true);
         this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
         config.logUnused();
-        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+        AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
         log.debug("Kafka admin client initialized");
         thread.start();
     }
@@ -416,7 +418,7 @@ public class KafkaAdminClient extends AdminClient {
             // Wait for the thread to be joined.
             thread.join();
 
-            AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+            AppInfoParser.unregisterAppInfo(JMX_PREFIX, 
Sanitizer.sanitize(clientId));
 
             log.debug("Kafka admin client closed.");
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3ea0394..f4af39c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sanitizer;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
@@ -645,6 +646,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (clientId.isEmpty())
                 clientId = "consumer-" + 
CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
             this.clientId = clientId;
+            String sanitizedClientId = Sanitizer.sanitize(this.clientId);
             String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
 
             LogContext logContext = new LogContext("[Consumer clientId=" + 
clientId + ", groupId=" + groupId + "] ");
@@ -658,7 +660,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 throw new 
ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater 
than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + 
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             this.time = Time.SYSTEM;
 
-            Map<String, String> metricsTags = 
Collections.singletonMap("client-id", clientId);
+            Map<String, String> metricsTags = 
Collections.singletonMap("client-id", sanitizedClientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
                     
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -769,7 +771,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     isolationLevel);
 
             config.logUnused();
-            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+            AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
 
             log.debug("Kafka consumer initialized");
         } catch (Throwable t) {
@@ -1724,7 +1726,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         ClientUtils.closeQuietly(client, "consumer network client", 
firstException);
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", 
firstException);
         ClientUtils.closeQuietly(valueDeserializer, "consumer value 
deserializer", firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, 
Sanitizer.sanitize(clientId));
         log.debug("Kafka consumer has been closed");
         Throwable exception = firstException.get();
         if (exception != null && !swallowException) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 66760e2..8a1c2b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sanitizer;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
@@ -233,7 +234,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private static final String JMX_PREFIX = "kafka.producer";
     public static final String NETWORK_THREAD_PREFIX = 
"kafka-producer-network-thread";
 
-    private String clientId;
+    private final String clientId;
     // Visible for testing
     final Metrics metrics;
     private final Partitioner partitioner;
@@ -312,9 +313,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
             this.time = Time.SYSTEM;
-            clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
+            String clientId = 
config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + 
PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
+            this.clientId = clientId;
+            String sanitizedClientId = Sanitizer.sanitize(clientId);
 
             String transactionalId = 
userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                     (String) 
userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
@@ -326,7 +329,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             log = logContext.logger(KafkaProducer.class);
             log.trace("Starting the Kafka producer");
 
-            Map<String, String> metricTags = 
Collections.singletonMap("client-id", clientId);
+            Map<String, String> metricTags = 
Collections.singletonMap("client-id", sanitizedClientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
                     
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -420,12 +423,12 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
                     config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                     this.transactionManager,
                     apiVersions);
-            String ioThreadName = NETWORK_THREAD_PREFIX + (clientId.length() > 
0 ? " | " + clientId : "");
+            String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
             this.errors = this.metrics.sensor("errors");
             config.logUnused();
-            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
+            AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId);
             log.debug("Kafka producer started");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed 
this is to prevent resource leak. see KAFKA-2121
@@ -1073,7 +1076,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         ClientUtils.closeQuietly(keySerializer, "producer keySerializer", 
firstException);
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", 
firstException);
         ClientUtils.closeQuietly(partitioner, "producer partitioner", 
firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, 
Sanitizer.sanitize(clientId));
         log.debug("Kafka producer has been closed");
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to close kafka producer", 
firstException.get());

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
new file mode 100644
index 0000000..b98a426
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Utility class for sanitizing/desanitizing user principal and client-ids
+ * to a safe value for use in MetricName and as Zookeeper node name
+ */
+public class Sanitizer {
+
+    public static String sanitize(String name) {
+        String encoded = "";
+        try {
+            encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name());
+            StringBuilder builder = new StringBuilder();
+            for (int i = 0; i < encoded.length(); i++) {
+                char c = encoded.charAt(i);
+                if (c == '*') {         // Metric ObjectName treats * as 
pattern
+                    builder.append("%2A");
+                } else if (c == '+') {  // Space URL-encoded as +, replace 
with percent encoding
+                    builder.append("%20");
+                } else {
+                    builder.append(c);
+                }
+            }
+            return builder.toString();
+        } catch (UnsupportedEncodingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public static String desanitize(String name) {
+        try {
+            return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
+        } catch (UnsupportedEncodingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 1137045..82b12c3 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -322,12 +322,12 @@ public class Utils {
         Class<?>[] argTypes = new Class<?>[params.length / 2];
         Object[] args = new Object[params.length / 2];
         try {
-            Class c = Class.forName(className, true, 
Utils.getContextOrKafkaClassLoader());
+            Class<?> c = Class.forName(className, true, 
Utils.getContextOrKafkaClassLoader());
             for (int i = 0; i < params.length / 2; i++) {
                 argTypes[i] = (Class<?>) params[2 * i];
                 args[i] = params[(2 * i) + 1];
             }
-            Constructor<T> constructor = c.getConstructor(argTypes);
+            Constructor<T> constructor = (Constructor<T>) 
c.getConstructor(argTypes);
             return constructor.newInstance(args);
         } catch (NoSuchMethodException e) {
             throw new ClassNotFoundException(String.format("Failed to find " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
new file mode 100644
index 0000000..d66bda1
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+
+import org.junit.Test;
+
+public class SanitizerTest {
+
+    @Test
+    public void testSanitize() throws UnsupportedEncodingException {
+        String principal = "CN=Some characters !@#$%&*()_-+=';:,/~";
+        String sanitizedPrincipal = Sanitizer.sanitize(principal);
+        assertTrue(sanitizedPrincipal.replace('%', 
'_').matches("[a-zA-Z0-9\\._\\-]+"));
+        assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index b6c9afe..8c873f7 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -542,14 +542,14 @@ object AdminUtils extends Logging with AdminUtilities {
    * and <user> configs are not specified.
    *
    * @param zkUtils Zookeeper utilities used to write the config to ZK
-   * @param clientId: The clientId for which configs are being changed
+   * @param sanitizedClientId: The sanitized clientId for which configs are 
being changed
    * @param configs: The final set of configs that will be applied to the 
topic. If any new configs need to be added or
    *                 existing configs need to be deleted, it should be done 
prior to invoking this API
    *
    */
-  def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: 
Properties) {
+  def changeClientIdConfig(zkUtils: ZkUtils, sanitizedClientId: String, 
configs: Properties) {
     DynamicConfig.Client.validate(configs)
-    changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs)
+    changeEntityConfig(zkUtils, ConfigType.Client, sanitizedClientId, configs)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 366667b..bd193c7 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,6 +28,7 @@ import kafka.utils.Implicits._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.metrics.Sanitizer
 import scala.collection._
 import scala.collection.JavaConverters._
 
@@ -184,7 +185,7 @@ object ConfigCommand extends Config {
       sanitizedName match {
         case Some(ConfigEntityName.Default) => "default " + typeName
         case Some(n) =>
-          val desanitized = if (entityType == ConfigType.User) 
QuotaId.desanitize(n) else n
+          val desanitized = if (entityType == ConfigType.User || entityType == 
ConfigType.Client) Sanitizer.desanitize(n) else n
           s"$typeName '$desanitized'"
         case None => entityType
       }
@@ -265,10 +266,7 @@ object ConfigCommand extends Config {
         ConfigEntityName.Default
       else {
         entityType match {
-          case ConfigType.User => QuotaId.sanitize(name)
-          case ConfigType.Client =>
-            validateChars("Client-id", name)
-            name
+          case ConfigType.User | ConfigType.Client => Sanitizer.sanitize(name)
           case _ => throw new IllegalArgumentException("Invalid entity type " 
+ entityType)
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 1128fd3..e71a06b 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -27,6 +27,7 @@ import kafka.network.RequestChannel.{ShutdownRequest, 
BaseRequest}
 import kafka.server.QuotaId
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.metrics.Sanitizer
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests._
@@ -45,7 +46,7 @@ object RequestChannel extends Logging {
   case object ShutdownRequest extends BaseRequest
 
   case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
-    val sanitizedUser = QuotaId.sanitize(principal.getName)
+    val sanitizedUser = Sanitizer.sanitize(principal.getName)
   }
 
   class Request(val processor: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index e1d5249..c84fbcb 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -75,36 +75,9 @@ object QuotaTypes {
   val UserClientIdQuotaEnabled = 4
 }
 
-object QuotaId {
+case class QuotaId(sanitizedUser: Option[String], sanitizedClientId: 
Option[String])
 
-  /**
-   * Sanitizes user principal to a safe value for use in MetricName
-   * and as Zookeeper node name
-   */
-  def sanitize(user: String): String = {
-    val encoded = URLEncoder.encode(user, StandardCharsets.UTF_8.name)
-    val builder = new StringBuilder
-    for (i <- 0 until encoded.length) {
-      encoded.charAt(i) match {
-        case '*' => builder.append("%2A") // Metric ObjectName treats * as 
pattern
-        case '+' => builder.append("%20") // Space URL-encoded as +, replace 
with percent encoding
-        case c => builder.append(c)
-      }
-    }
-    builder.toString
-  }
-
-  /**
-   * Decodes sanitized user principal
-   */
-  def desanitize(sanitizedUser: String): String = {
-    URLDecoder.decode(sanitizedUser, StandardCharsets.UTF_8.name)
-  }
-}
-
-case class QuotaId(sanitizedUser: Option[String], clientId: Option[String])
-
-case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: 
String, quota: Quota)
+case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, 
sanitizedClientId: String, quota: Quota)
 
 /**
  * Helper class that records per-client metrics. It is also responsible for 
maintaining Quota usage statistics
@@ -216,7 +189,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       case _: QuotaViolationException =>
         // Compute the delay
         val clientQuotaEntity = clientSensors.quotaEntity
-        val clientMetric = 
metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.clientId))
+        val clientMetric = 
metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.sanitizedClientId))
         throttleTimeMs = throttleTime(clientMetric, 
getQuotaMetricConfig(clientQuotaEntity.quota)).toInt
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
@@ -242,17 +215,17 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    * and the associated quota override or default quota.
    *
    */
-  private def quotaEntity(sanitizedUser: String, clientId: String) : 
QuotaEntity = {
+  private def quotaEntity(sanitizedUser: String, sanitizedClientId: String) : 
QuotaEntity = {
     quotaTypesEnabled match {
       case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
-        val quotaId = QuotaId(None, Some(clientId))
+        val quotaId = QuotaId(None, Some(sanitizedClientId))
         var quota = overriddenQuota.get(quotaId)
         if (quota == null) {
           quota = 
overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId)
           if (quota == null)
             quota = staticConfigClientIdQuota
         }
-        QuotaEntity(quotaId, "", clientId, quota)
+        QuotaEntity(quotaId, "", sanitizedClientId, quota)
       case QuotaTypes.UserQuotaEnabled =>
         val quotaId = QuotaId(Some(sanitizedUser), None)
         var quota = overriddenQuota.get(quotaId)
@@ -263,12 +236,12 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
         }
         QuotaEntity(quotaId, sanitizedUser, "", quota)
       case QuotaTypes.UserClientIdQuotaEnabled =>
-        val quotaId = QuotaId(Some(sanitizedUser), Some(clientId))
+        val quotaId = QuotaId(Some(sanitizedUser), Some(sanitizedClientId))
         var quota = overriddenQuota.get(quotaId)
         if (quota == null) {
           quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), 
Some(ConfigEntityName.Default)))
           if (quota == null) {
-            quota = 
overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId)))
+            quota = 
overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), 
Some(sanitizedClientId)))
             if (quota == null) {
               quota = 
overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId)
               if (quota == null)
@@ -276,17 +249,17 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
             }
           }
         }
-        QuotaEntity(quotaId, sanitizedUser, clientId, quota)
+        QuotaEntity(quotaId, sanitizedUser, sanitizedClientId, quota)
       case _ =>
-        quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId)
+        quotaEntityWithMultipleQuotaLevels(sanitizedUser, sanitizedClientId)
     }
   }
 
-  private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, 
clientId: String) : QuotaEntity = {
-    val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId))
+  private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, 
sanitizerClientId: String) : QuotaEntity = {
+    val userClientQuotaId = QuotaId(Some(sanitizedUser), 
Some(sanitizerClientId))
 
     val userQuotaId = QuotaId(Some(sanitizedUser), None)
-    val clientQuotaId = QuotaId(None, Some(clientId))
+    val clientQuotaId = QuotaId(None, Some(sanitizerClientId))
     var quotaId = userClientQuotaId
     var quotaConfigId = userClientQuotaId
     // 1) /config/users/<user>/clients/<client-id>
@@ -306,7 +279,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
         if (quota == null) {
           // 4) /config/users/<default>/clients/<client-id>
           quotaId = userClientQuotaId
-          quotaConfigId = QuotaId(Some(ConfigEntityName.Default), 
Some(clientId))
+          quotaConfigId = QuotaId(Some(ConfigEntityName.Default), 
Some(sanitizerClientId))
           quota = overriddenQuota.get(quotaConfigId)
 
           if (quota == null) {
@@ -324,7 +297,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
               if (quota == null) {
                 // 7) /config/clients/<client-id>
                 quotaId = clientQuotaId
-                quotaConfigId = QuotaId(None, Some(clientId))
+                quotaConfigId = QuotaId(None, Some(sanitizerClientId))
                 quota = overriddenQuota.get(quotaConfigId)
 
                 if (quota == null) {
@@ -346,15 +319,17 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       }
     }
     val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser
-    val quotaClientId = if (quotaId == userQuotaId) "" else clientId
+    val quotaClientId = if (quotaId == userQuotaId) "" else sanitizerClientId
     QuotaEntity(quotaId, quotaUser, quotaClientId, quota)
   }
 
   /**
    * Returns the quota for the client with the specified (non-encoded) user 
principal and client-id.
+   * 
+   * Note: this method is expensive, it is meant to be used by tests only
    */
   def quota(user: String, clientId: String) = {
-    quotaEntity(QuotaId.sanitize(user), clientId).quota
+    quotaEntity(Sanitizer.sanitize(user), Sanitizer.sanitize(clientId)).quota
   }
 
   /*
@@ -387,14 +362,15 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    * First sensor of the tuple is the quota enforcement sensor. Second one is 
the throttle time sensor
    */
   def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): 
ClientSensors = {
-    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
+    val sanitizedClientId = Sanitizer.sanitize(clientId)
+    val clientQuotaEntity = quotaEntity(sanitizedUser, sanitizedClientId)
     // Names of the sensors to access
     ClientSensors(
       clientQuotaEntity,
       sensorAccessor.getOrCreate(
         getQuotaSensorName(clientQuotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.clientId),
+        clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.sanitizedClientId),
         Some(getQuotaMetricConfig(clientQuotaEntity.quota)),
         new Rate
       ),
@@ -407,9 +383,9 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     )
   }
 
-  private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType 
+ "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + 
quotaId.clientId.getOrElse("")
+  private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType 
+ "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + 
quotaId.sanitizedClientId.getOrElse("")
 
-  private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + 
quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
+  private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + 
quotaId.sanitizedUser.getOrElse("") + ':' + 
quotaId.sanitizedClientId.getOrElse("")
 
   protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
     new MetricConfig()
@@ -432,10 +408,10 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    * Overrides quotas for <user>, <client-id> or <user, client-id> or the 
dynamic defaults
    * for any of these levels.
    * @param sanitizedUser user to override if quota applies to <user> or 
<user, client-id>
-   * @param clientId client to override if quota applies to <client-id> or 
<user, client-id>
+   * @param sanitizedClientId client to override if quota applies to 
<client-id> or <user, client-id>
    * @param quota custom quota to apply or None if quota override is being 
removed
    */
-  def updateQuota(sanitizedUser: Option[String], clientId: Option[String], 
quota: Option[Quota]) {
+  def updateQuota(sanitizedUser: Option[String], sanitizedClientId: 
Option[String], quota: Option[Quota]) {
     /*
      * Acquire the write lock to apply changes in the quota objects.
      * This method changes the quota in the overriddenQuota map and applies 
the update on the actual KafkaMetric object (if it exists).
@@ -445,13 +421,13 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
      */
     lock.writeLock().lock()
     try {
-      val quotaId = QuotaId(sanitizedUser, clientId)
+      val quotaId = QuotaId(sanitizedUser, sanitizedClientId)
       val userInfo = sanitizedUser match {
         case Some(ConfigEntityName.Default) => "default user "
         case Some(user) => "user " + user + " "
         case None => ""
       }
-      val clientIdInfo = clientId match {
+      val clientIdInfo = sanitizedClientId match {
         case Some(ConfigEntityName.Default) => "default client-id"
         case Some(id) => "client-id " + id
         case None => ""
@@ -460,7 +436,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
         case Some(newQuota) =>
           logger.info(s"Changing ${quotaType} quota for 
${userInfo}${clientIdInfo} to $newQuota.bound}")
           overriddenQuota.put(quotaId, newQuota)
-          (sanitizedUser, clientId) match {
+          (sanitizedUser, sanitizedClientId) match {
             case (Some(_), Some(_)) => quotaTypesEnabled |= 
QuotaTypes.UserClientIdQuotaEnabled
             case (Some(_), None) => quotaTypesEnabled |= 
QuotaTypes.UserQuotaEnabled
             case (None, Some(_)) => quotaTypesEnabled |= 
QuotaTypes.ClientIdQuotaEnabled
@@ -471,21 +447,21 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
           overriddenQuota.remove(quotaId)
       }
 
-      val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), 
clientId.getOrElse(""))
+      val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), 
sanitizedClientId.getOrElse(""))
       val allMetrics = metrics.metrics()
 
       // If multiple-levels of quotas are defined or if this is a default 
quota update, traverse metrics
       // to find all affected values. Otherwise, update just the single 
matching one.
       val singleUpdate = quotaTypesEnabled match {
         case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | 
QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
-          !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && 
!clientId.filter(_ == ConfigEntityName.Default).isDefined
+          !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && 
!sanitizedClientId.filter(_ == ConfigEntityName.Default).isDefined
         case _ => false
       }
       if (singleUpdate) {
           // Change the underlying metric config if the sensor has been created
           val metric = allMetrics.get(quotaMetricName)
           if (metric != null) {
-            val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), 
clientId.getOrElse(""))
+            val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), 
sanitizedClientId.getOrElse(""))
             val newQuota = metricConfigEntity.quota
             logger.info(s"Sensor for ${userInfo}${clientIdInfo} already 
exists. Changing quota to ${newQuota.bound()} in MetricConfig")
             metric.config(getQuotaMetricConfig(newQuota))
@@ -509,11 +485,11 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
-  protected def clientRateMetricName(sanitizedUser: String, clientId: String): 
MetricName = {
+  protected def clientRateMetricName(sanitizedUser: String, sanitizedClientId: 
String): MetricName = {
     metrics.metricName("byte-rate", quotaType.toString,
                    "Tracking byte-rate per user/client-id",
                    "user", sanitizedUser,
-                   "client-id", clientId)
+                   "client-id", sanitizedClientId)
   }
 
   private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
@@ -521,7 +497,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
                        quotaType.toString,
                        "Tracking average throttle-time per user/client-id",
                        "user", quotaEntity.sanitizedUser,
-                       "client-id", quotaEntity.clientId)
+                       "client-id", quotaEntity.sanitizedClientId)
   }
 
   def shutdown() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index f454483..d2114dc 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -64,11 +64,11 @@ class ClientRequestQuotaManager(private val config: 
ClientQuotaManagerConfig,
     math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
   }
 
-  override protected def clientRateMetricName(sanitizedUser: String, clientId: 
String): MetricName = {
+  override protected def clientRateMetricName(sanitizedUser: String, 
sanitizedClientId: String): MetricName = {
     metrics.metricName("request-time", QuotaType.Request.toString,
                    "Tracking request-time per user/client-id",
                    "user", sanitizedUser,
-                   "client-id", clientId)
+                   "client-id", sanitizedClientId)
   }
 
   private def exemptMetricName: MetricName = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 79ffde8..6f85801 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -29,7 +29,7 @@ import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.common.config.ConfigDef.Validator
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.metrics.Quota
+import org.apache.kafka.common.metrics.{Quota, Sanitizer}
 import org.apache.kafka.common.metrics.Quota._
 
 import scala.collection.JavaConverters._
@@ -117,25 +117,25 @@ class TopicConfigHandler(private val logManager: 
LogManager, kafkaConfig: KafkaC
  */
 class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
 
-  def updateQuotaConfig(sanitizedUser: Option[String], clientId: 
Option[String], config: Properties) {
+  def updateQuotaConfig(sanitizedUser: Option[String], sanitizedClientId: 
Option[String], config: Properties) {
     val producerQuota =
       if 
(config.containsKey(DynamicConfig.Client.ProducerByteRateOverrideProp))
         Some(new 
Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong,
 true))
       else
         None
-    quotaManagers.produce.updateQuota(sanitizedUser, clientId, producerQuota)
+    quotaManagers.produce.updateQuota(sanitizedUser, sanitizedClientId, 
producerQuota)
     val consumerQuota =
       if 
(config.containsKey(DynamicConfig.Client.ConsumerByteRateOverrideProp))
         Some(new 
Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong,
 true))
       else
         None
-    quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota)
+    quotaManagers.fetch.updateQuota(sanitizedUser, sanitizedClientId, 
consumerQuota)
     val requestQuota =
       if 
(config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp))
         Some(new 
Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble,
 true))
       else
         None
-    quotaManagers.request.updateQuota(sanitizedUser, clientId, requestQuota)
+    quotaManagers.request.updateQuota(sanitizedUser, sanitizedClientId, 
requestQuota)
   }
 }
 
@@ -145,14 +145,14 @@ class QuotaConfigHandler(private val quotaManagers: 
QuotaManagers) {
  */
 class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends 
QuotaConfigHandler(quotaManagers) with ConfigHandler {
 
-  def processConfigChanges(clientId: String, clientConfig: Properties) {
-    updateQuotaConfig(None, Some(clientId), clientConfig)
+  def processConfigChanges(sanitizedClientId: String, clientConfig: 
Properties) {
+    updateQuotaConfig(None, Some(sanitizedClientId), clientConfig)
   }
 }
 
 /**
  * The UserConfigHandler will process <user> and <user, client-id> quota 
changes in ZK.
- * The callback provides the node name containing sanitized user principal, 
client-id if this is
+ * The callback provides the node name containing sanitized user principal, 
sanitized client-id if this is
  * a <user, client-id> update and the full properties set read from ZK.
  */
 class UserConfigHandler(private val quotaManagers: QuotaManagers, val 
credentialProvider: CredentialProvider) extends 
QuotaConfigHandler(quotaManagers) with ConfigHandler {
@@ -163,10 +163,10 @@ class UserConfigHandler(private val quotaManagers: 
QuotaManagers, val credential
     if (entities.length != 1 && entities.length != 3)
       throw new IllegalArgumentException("Invalid quota entity path: " + 
quotaEntityPath)
     val sanitizedUser = entities(0)
-    val clientId = if (entities.length == 3) Some(entities(2)) else None
-    updateQuotaConfig(Some(sanitizedUser), clientId, config)
-    if (!clientId.isDefined && sanitizedUser != ConfigEntityName.Default)
-      credentialProvider.updateCredentials(QuotaId.desanitize(sanitizedUser), 
config)
+    val sanitizedClientId = if (entities.length == 3) Some(entities(2)) else 
None
+    updateQuotaConfig(Some(sanitizedUser), sanitizedClientId, config)
+    if (!sanitizedClientId.isDefined && sanitizedUser != 
ConfigEntityName.Default)
+      
credentialProvider.updateCredentials(Sanitizer.desanitize(sanitizedUser), 
config)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index c69c9a4..e8967d1 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, 
KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.metrics.{KafkaMetric, Quota}
+import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sanitizer}
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
@@ -39,8 +39,8 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
   val consumerCount = 1
 
   private val producerBufferSize = 300000
-  protected val producerClientId = "QuotasTestProducer-1"
-  protected val consumerClientId = "QuotasTestConsumer-1"
+  protected def producerClientId = "QuotasTestProducer-1"
+  protected def consumerClientId = "QuotasTestConsumer-1"
 
   this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, 
"false")
   this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, 
"2")
@@ -210,7 +210,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
 
   private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) {
     val tags = new HashMap[String, String]
-    tags.put("client-id", producerClientId)
+    tags.put("client-id", Sanitizer.sanitize(producerClientId))
     val avgMetric = producer.metrics.get(new 
MetricName("produce-throttle-time-avg", "producer-metrics", "", tags))
     val maxMetric = producer.metrics.get(new 
MetricName("produce-throttle-time-max", "producer-metrics", "", tags))
 
@@ -220,7 +220,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
 
   private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], 
maxThrottleTime: Option[Double] = None) {
     val tags = new HashMap[String, String]
-    tags.put("client-id", consumerClientId)
+    tags.put("client-id", Sanitizer.sanitize(consumerClientId))
     val avgMetric = consumer.metrics.get(new 
MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", 
tags))
     val maxMetric = consumer.metrics.get(new 
MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", 
tags))
 
@@ -234,7 +234,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
                                   quotaType.toString,
                                   "Tracking throttle-time per user/client-id",
                                   "user", quotaId.sanitizedUser.getOrElse(""),
-                                  "client-id", quotaId.clientId.getOrElse(""))
+                                  "client-id", 
quotaId.sanitizedClientId.getOrElse(""))
   }
 
   def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index f8594e1..f5a2cf5 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -18,14 +18,17 @@ import java.util.Properties
 
 import kafka.admin.AdminUtils
 import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
+import org.apache.kafka.common.metrics.Sanitizer
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.Before
 
 class ClientIdQuotaTest extends BaseQuotaTest {
 
   override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName
-  override val producerQuotaId = QuotaId(None, Some(producerClientId))
-  override val consumerQuotaId = QuotaId(None, Some(consumerClientId))
+  override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
+  override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
+  override val producerQuotaId = QuotaId(None, 
Some(Sanitizer.sanitize(producerClientId)))
+  override val consumerQuotaId = QuotaId(None, 
Some(Sanitizer.sanitize(consumerClientId)))
 
   @Before
   override def setUp() {
@@ -51,6 +54,6 @@ class ClientIdQuotaTest extends BaseQuotaTest {
   }
 
   private def updateQuotaOverride(clientId: String, properties: Properties) {
-    AdminUtils.changeClientIdConfig(zkUtils, clientId, properties)
+    AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(clientId), 
properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index 333c851..cd9437c 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -23,6 +23,7 @@ import kafka.server._
 
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Before
+import org.apache.kafka.common.metrics.Sanitizer
 
 class UserClientIdQuotaTest extends BaseQuotaTest {
 
@@ -30,8 +31,10 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
   override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
 
   override val userPrincipal = "O=A client,CN=localhost"
-  override def producerQuotaId = 
QuotaId(Some(QuotaId.sanitize(userPrincipal)), Some(producerClientId))
-  override def consumerQuotaId = 
QuotaId(Some(QuotaId.sanitize(userPrincipal)), Some(consumerClientId))
+  override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
+  override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
+  override def producerQuotaId = 
QuotaId(Some(Sanitizer.sanitize(userPrincipal)), 
Some(Sanitizer.sanitize(producerClientId)))
+  override def consumerQuotaId = 
QuotaId(Some(Sanitizer.sanitize(userPrincipal)), 
Some(Sanitizer.sanitize(consumerClientId)))
 
   @Before
   override def setUp() {
@@ -58,11 +61,11 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
 
   override def removeQuotaOverrides() {
     val emptyProps = new Properties
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
QuotaId.sanitize(userPrincipal) + "/clients/" + producerClientId, emptyProps)
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
QuotaId.sanitize(userPrincipal) + "/clients/" + consumerClientId, emptyProps)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
Sanitizer.sanitize(userPrincipal) + "/clients/" + 
Sanitizer.sanitize(producerClientId), emptyProps)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
Sanitizer.sanitize(userPrincipal) + "/clients/" + 
Sanitizer.sanitize(consumerClientId), emptyProps)
   }
 
   private def updateQuotaOverride(userPrincipal: String, clientId: String, 
properties: Properties) {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
QuotaId.sanitize(userPrincipal) + "/clients/" + clientId, properties)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
Sanitizer.sanitize(userPrincipal) + "/clients/" + Sanitizer.sanitize(clientId), 
properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index 4ad6265..330c1e0 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -22,6 +22,7 @@ import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.{After, Before}
+import org.apache.kafka.common.metrics.Sanitizer
 
 class UserQuotaTest extends BaseQuotaTest with SaslSetup {
 
@@ -66,6 +67,6 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
   }
 
   private def updateQuotaOverride(properties: Properties) {
-    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
QuotaId.sanitize(userPrincipal), properties)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
Sanitizer.sanitize(userPrincipal), properties)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index af77c67..bb17d74 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -30,6 +30,7 @@ import org.junit.Assert._
 import org.junit.Test
 import scala.collection.mutable
 import scala.collection.JavaConverters._
+import org.apache.kafka.common.metrics.Sanitizer
 
 class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   @Test
@@ -326,9 +327,9 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
 
     // <user> quota
     val principal = "CN=ConfigCommandTest,O=Apache,L=<default>"
-    val sanitizedPrincipal = QuotaId.sanitize(principal)
+    val sanitizedPrincipal = Sanitizer.sanitize(principal)
     assertEquals(-1, sanitizedPrincipal.indexOf('='))
-    assertEquals(principal, QuotaId.desanitize(sanitizedPrincipal))
+    assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal))
     for (opts <- Seq(describeOpts, alterOpts)) {
       checkEntity("users", Some(principal), sanitizedPrincipal, opts)
       checkEntity("users", Some(""), ConfigEntityName.Default, opts)
@@ -362,38 +363,36 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
       assertEquals(expectedEntityName, entity.fullSanitizedName)
     }
 
-    // <default> is a valid user principal (can be handled with URL-encoding),
-    // but an invalid client-id (cannot be handled since client-ids are not 
encoded)
-    checkEntity("users", QuotaId.sanitize("<default>"),
+    // <default> is a valid user principal and client-id (can be handled with 
URL-encoding),
+    checkEntity("users", Sanitizer.sanitize("<default>"),
         "--entity-type", "users", "--entity-name", "<default>",
         "--alter", "--add-config", "a=b,c=d")
-    try {
-      checkEntity("clients", QuotaId.sanitize("<default>"),
-          "--entity-type", "clients", "--entity-name", "<default>",
-          "--alter", "--add-config", "a=b,c=d")
-      fail("Did not fail with invalid client-id")
-    } catch {
-      case _: InvalidConfigException => // expected
-    }
+    checkEntity("clients", Sanitizer.sanitize("<default>"),
+        "--entity-type", "clients", "--entity-name", "<default>",
+        "--alter", "--add-config", "a=b,c=d")
 
-    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
+
+    checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1",
         "--entity-type", "users", "--entity-name", "CN=user1", 
"--entity-type", "clients", "--entity-name", "client1",
         "--alter", "--add-config", "a=b,c=d")
-    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
+    checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1",
         "--entity-name", "CN=user1", "--entity-type", "users", 
"--entity-name", "client1", "--entity-type", "clients",
         "--alter", "--add-config", "a=b,c=d")
-    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
+    checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1",
         "--entity-type", "clients", "--entity-name", "client1", 
"--entity-type", "users", "--entity-name", "CN=user1",
         "--alter", "--add-config", "a=b,c=d")
-    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
+    checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/client1",
         "--entity-name", "client1", "--entity-type", "clients", 
"--entity-name", "CN=user1", "--entity-type", "users",
         "--alter", "--add-config", "a=b,c=d")
-    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients",
+    checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients",
         "--entity-type", "clients", "--entity-name", "CN=user1", 
"--entity-type", "users",
         "--describe")
     checkEntity("users", "/clients",
         "--entity-type", "clients", "--entity-type", "users",
         "--describe")
+    checkEntity("users", Sanitizer.sanitize("CN=user1") + "/clients/" + 
Sanitizer.sanitize("client1?@%"),
+        "--entity-name", "client1?@%", "--entity-type", "clients", 
"--entity-name", "CN=user1", "--entity-type", "users",
+        "--alter", "--add-config", "a=b,c=d")
   }
 
   @Test
@@ -413,7 +412,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
 
     val clientId = "a-client"
     val principal = "CN=ConfigCommandTest.testQuotaDescribeEntities , 
O=Apache, L=<default>"
-    val sanitizedPrincipal = QuotaId.sanitize(principal)
+    val sanitizedPrincipal = Sanitizer.sanitize(principal)
     val userClient = sanitizedPrincipal + "/clients/" + clientId
 
     var opts = Array("--entity-type", "clients", "--entity-name", clientId)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index f4a55ab..7c8e5bd 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -18,7 +18,7 @@ package kafka.server
 
 import java.util.Collections
 
-import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
+import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota, 
Sanitizer}
 import org.apache.kafka.common.utils.MockTime
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{Before, Test}
@@ -374,12 +374,21 @@ class ClientQuotaManagerTest {
   }
 
   @Test
-  def testQuotaUserSanitize() {
-    val principal = "CN=Some characters !@#$%&*()_-+=';:,/~"
-    val sanitizedPrincipal = QuotaId.sanitize(principal)
-    // Apart from % used in percent-encoding all characters of sanitized 
principal must be characters allowed in client-id
-    ConfigCommand.validateChars("sanitized-principal", 
sanitizedPrincipal.replace('%', '_'))
-    assertEquals(principal, QuotaId.desanitize(sanitizedPrincipal))
+  def testSanitizeClientId() {
+    val metrics = newMetrics
+    val clientMetrics = new ClientQuotaManager(config, metrics, 
QuotaType.Produce, time)
+    val clientId = "client@#$%"
+    try {
+      clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, 
callback)
+      
+      val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + 
Sanitizer.sanitize(clientId))
+      assertTrue("Throttle time sensor should exist", throttleTimeSensor != 
null)
+
+      val byteRateSensor = metrics.getSensor("Produce-:"  + 
Sanitizer.sanitize(clientId))
+      assertTrue("Byte rate sensor should exist", byteRateSensor != null)
+    } finally {
+      clientMetrics.shutdown()
+    }
   }
 
   def newMetrics: Metrics = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c8c6ab63/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index ed0f9cf..a98bdea 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -65,6 +65,7 @@
         <code>OffsetFetchRequest</code>, <code>OffsetRequest</code>, 
<code>ProducerRequest</code>, and <code>TopicMetadataRequest</code>.
         This was only intended for use on the broker, but it is no longer in 
use and the implementations have not been maintained.
         A stub implementation has been retained for binary compatibility.</li>
+    <li>The Java clients and tools now accept any string as a client-id.</li>
 </ul>
 
 <h5><a id="upgrade_100_new_protocols" href="#upgrade_100_new_protocols">New 
Protocol Versions</a></h5>

Reply via email to