KAFKA-3492; Secure quotas for authenticated users Implementation and tests for secure quotas at <user> and <user, client-id> levels as described in KIP-55. Also adds dynamic default quotas for <client-id>, <user> and <user-client-id>. For each client connection, the most specific quota matching the connection is used, with user quota taking precedence over client-id quota.
Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Jun Rao <jun...@gmail.com> Closes #1753 from rajinisivaram/KAFKA-3492 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69356fbc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69356fbc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69356fbc Branch: refs/heads/trunk Commit: 69356fbc6e76ab4291ff4957f0d6ea04e7245909 Parents: ecc1fb1 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Sat Sep 17 10:06:05 2016 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Sat Sep 17 10:06:05 2016 -0700 ---------------------------------------------------------------------- .../kafka/common/metrics/KafkaMetric.java | 2 +- .../org/apache/kafka/common/metrics/Quota.java | 5 + .../src/main/scala/kafka/admin/AdminUtils.scala | 72 +++- .../main/scala/kafka/admin/ConfigCommand.scala | 192 +++++++++-- .../scala/kafka/network/RequestChannel.scala | 5 +- .../scala/kafka/server/ClientQuotaManager.scala | 332 ++++++++++++++++--- .../main/scala/kafka/server/ConfigHandler.scala | 66 ++-- .../kafka/server/DynamicConfigManager.scala | 94 ++++-- .../src/main/scala/kafka/server/KafkaApis.scala | 3 +- .../main/scala/kafka/server/KafkaConfig.scala | 6 +- .../main/scala/kafka/server/KafkaServer.scala | 7 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 3 + .../integration/kafka/api/BaseQuotaTest.scala | 195 +++++++++++ .../kafka/api/ClientIdQuotaTest.scala | 55 +++ .../kafka/api/ClientQuotasTest.scala | 206 ------------ .../kafka/api/UserClientIdQuotaTest.scala | 66 ++++ .../integration/kafka/api/UserQuotaTest.scala | 61 ++++ .../test/scala/unit/kafka/admin/AdminTest.scala | 4 +- .../unit/kafka/admin/ConfigCommandTest.scala | 186 ++++++++++- .../scala/unit/kafka/admin/TestAdminUtils.scala | 1 + .../kafka/server/ClientQuotaManagerTest.scala | 217 ++++++++++-- .../kafka/server/DynamicConfigChangeTest.scala | 77 +++-- 22 files changed, 1449 insertions(+), 406 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index e4d3ae8..86014e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -37,7 +37,7 @@ public final class KafkaMetric implements Metric { this.time = time; } - MetricConfig config() { + public MetricConfig config() { return this.config; } http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java index 8431e50..663b963 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java @@ -67,4 +67,9 @@ public final class Quota { Quota that = (Quota) obj; return (that.bound == this.bound) && (that.upper == this.upper); } + + @Override + public String toString() { + return (upper ? "upper=" : "lower=") + bound; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 400cc47..b3f8e5c 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -41,6 +41,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException trait AdminUtilities { def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) + def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties) def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties } @@ -449,7 +450,7 @@ object AdminUtils extends Logging with AdminUtilities { if (!update) { // write out the config if there is any, this isn't transactional with the partition assignments LogConfig.validate(config) - writeEntityConfig(zkUtils, ConfigType.Topic, topic, config) + writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config) } // create the partition assignment @@ -476,7 +477,9 @@ object AdminUtils extends Logging with AdminUtilities { } /** - * Update the config for a client and create a change notification so the change will propagate to other brokers + * Update the config for a client and create a change notification so the change will propagate to other brokers. + * If clientId is <default>, default clientId config is updated. ClientId configs are used only if <user, clientId> + * and <user> configs are not specified. * * @param zkUtils Zookeeper utilities used to write the config to ZK * @param clientId: The clientId for which configs are being changed @@ -489,6 +492,21 @@ object AdminUtils extends Logging with AdminUtilities { } /** + * Update the config for a <user> or <user, clientId> and create a change notification so the change will propagate to other brokers. + * User and/or clientId components of the path may be <default>, indicating that the configuration is the default + * value to be applied if a more specific override is not configured. + * + * @param zkUtils Zookeeper utilities used to write the config to ZK + * @param sanitizedEntityName: <sanitizedUserPrincipal> or <sanitizedUserPrincipal>/clients/<clientId> + * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or + * existing configs need to be deleted, it should be done prior to invoking this API + * + */ + def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) { + changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs) + } + + /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers * * @param zkUtils Zookeeper utilities used to write the config to ZK @@ -520,37 +538,41 @@ object AdminUtils extends Logging with AdminUtilities { } } - private def changeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties) { + private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) { + val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName + val entityConfigPath = getEntityConfigPath(rootEntityType, fullSanitizedEntityName) // write the new config--may not exist if there were previously no overrides - writeEntityConfig(zkUtils, entityType, entityName, configs) + writeEntityConfig(zkUtils, entityConfigPath, configs) // create the change notification val seqNode = ZkUtils.EntityConfigChangesPath + "/" + EntityConfigChangeZnodePrefix - val content = Json.encode(getConfigChangeZnodeData(entityType, entityName)) + val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath)) zkUtils.zkClient.createPersistentSequential(seqNode, content) } - def getConfigChangeZnodeData(entityType: String, entityName: String) : Map[String, Any] = { - Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> entityName) + def getConfigChangeZnodeData(sanitizedEntityPath: String) : Map[String, Any] = { + Map("version" -> 2, "entity_path" -> sanitizedEntityPath) } /** - * Write out the topic config to zk, if there is any + * Write out the entity config to zk, if there is any */ - private def writeEntityConfig(zkUtils: ZkUtils, entityType: String, entityName: String, config: Properties) { + private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: Properties) { val configMap: mutable.Map[String, String] = { import JavaConversions._ config } val map = Map("version" -> 1, "config" -> configMap) - zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), Json.encode(map)) + zkUtils.updatePersistentPath(entityPath, Json.encode(map)) } /** - * Read the entity (topic or client) config (if any) from zk + * Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk + * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>. */ - def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entity: String): Properties = { - val str: String = zkUtils.zkClient.readData(getEntityConfigPath(entityType, entity), true) + def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, sanitizedEntityName: String): Properties = { + val entityConfigPath = getEntityConfigPath(rootEntityType, sanitizedEntityName) + val str: String = zkUtils.zkClient.readData(entityConfigPath, true) val props = new Properties() if (str != null) { Json.parseFull(str) match { @@ -564,13 +586,12 @@ object AdminUtils extends Logging with AdminUtilities { configTup match { case (k: String, v: String) => props.setProperty(k, v) - case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str) + case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}") } - case _ => throw new IllegalArgumentException("Invalid " + entityType + " config: " + str) + case _ => throw new IllegalArgumentException(s"Invalid ${entityConfigPath} config: ${str}") } - case o => throw new IllegalArgumentException("Unexpected value in config:(%s), entity_type: (%s), entity: (%s)" - .format(str, entityType, entity)) + case o => throw new IllegalArgumentException(s"Unexpected value in config:(${str}), entity_config_path: ${entityConfigPath}") } } props @@ -582,6 +603,23 @@ object AdminUtils extends Logging with AdminUtilities { def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] = zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap + def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, childEntityType: String): Map[String, Properties] = { + def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = { + val root = rootPath match { + case Some(path) => rootEntityType + '/' + rootPath + case None => rootEntityType + } + val entityNames = zkUtils.getAllEntitiesWithConfig(root) + rootPath match { + case Some(path) => entityNames.map(entityName => path + '/' + entityName) + case None => entityNames + } + } + entityPaths(zkUtils, None) + .flatMap(entity => entityPaths(zkUtils, Some(entity + '/' + childEntityType))) + .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, rootEntityType, entityPath))).toMap + } + def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): MetadataResponse.TopicMetadata = fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker]) http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/admin/ConfigCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index ebf9e61..58bdb7a 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -21,8 +21,9 @@ import java.util.Properties import joptsimple._ import kafka.admin.TopicCommand._ +import kafka.common.Config import kafka.log.{Defaults, LogConfig} -import kafka.server.{KafkaConfig, ClientConfigOverride, ConfigType} +import kafka.server.{KafkaConfig, QuotaConfigOverride, ConfigType, ConfigEntityName, QuotaId} import kafka.utils.{CommandLineUtils, ZkUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils @@ -33,15 +34,26 @@ import scala.collection._ /** * This script can be used to change configs for topics/clients/brokers dynamically + * This script can be used to change configs for topics/clients/users/brokers dynamically + * An entity described or altered by the command may be one of: + * <ul> + * <li> topic: --entity-type topics --entity-name <topic> + * <li> client: --entity-type clients --entity-name <client-id> + * <li> user: --entity-type users --entity-name <user-principal> + * <li> <user, client>: --entity-type users --entity-name <user-principal> --entity-type clients --entity-name <client-id> + * <li> broker: --entity-type brokers --entity-name <broker> + * </ul> + * --entity-default may be used instead of --entity-name when describing or altering default configuration for users and clients. + * */ -object ConfigCommand { +object ConfigCommand extends Config { def main(args: Array[String]): Unit = { val opts = new ConfigCommandOptions(args) if(args.length == 0) - CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic, client or broker") + CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config for a topic, client, user or broker") opts.checkArgs() @@ -57,18 +69,19 @@ object ConfigCommand { describeConfig(zkUtils, opts) } catch { case e: Throwable => - println("Error while executing topic command " + e.getMessage) + println("Error while executing config command " + e.getMessage) println(Utils.stackTrace(e)) } finally { zkUtils.close() } } - def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) { + private[admin] def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: AdminUtilities = AdminUtils) { val configsToBeAdded = parseConfigsToBeAdded(opts) val configsToBeDeleted = parseConfigsToBeDeleted(opts) - val entityType = opts.options.valueOf(opts.entityType) - val entityName = opts.options.valueOf(opts.entityName) + val entity = parseEntity(opts) + val entityType = entity.root.entityType + val entityName = entity.fullSanitizedName warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt)) // compile the final set of configs @@ -77,12 +90,13 @@ object ConfigCommand { configsToBeDeleted.foreach(config => configs.remove(config)) entityType match { - case ConfigType.Topic => utils.changeTopicConfig(zkUtils, entityName, configs) - case ConfigType.Client => utils.changeClientIdConfig(zkUtils, entityName, configs) + case ConfigType.Topic => utils.changeTopicConfig(zkUtils, entityName, configs) + case ConfigType.Client => utils.changeClientIdConfig(zkUtils, entityName, configs) + case ConfigType.User => utils.changeUserOrUserClientIdConfig(zkUtils, entityName, configs) case ConfigType.Broker => utils.changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs) case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}") } - println(s"Updated config for EntityType:$entityType => EntityName:'$entityName'.") + println(s"Updated config for entity: $entity.") } def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = { @@ -107,17 +121,16 @@ object ConfigCommand { } private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) { - val entityType = opts.options.valueOf(opts.entityType) - val entityNames: Seq[String] = - if (opts.options.has(opts.entityName)) - Seq(opts.options.valueOf(opts.entityName)) - else - zkUtils.getAllEntitiesWithConfig(entityType) - - for (entityName <- entityNames) { - val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName) - println("Configs for %s:%s are %s" - .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + val configEntity = parseEntity(opts) + val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined + val entities = configEntity.getAllEntities(zkUtils) + for (entity <- entities) { + val configs = AdminUtils.fetchEntityConfig(zkUtils, entity.root.entityType, entity.fullSanitizedName) + // When describing all users, don't include empty user nodes with only <user, client> quota overrides. + if (!configs.isEmpty || !describeAllUsers) { + println("Configs for %s are %s" + .format(entity, configs.map(kv => kv._1 + "=" + kv._2).mkString(","))) + } } } @@ -150,6 +163,115 @@ object ConfigCommand { Seq.empty } + case class Entity(entityType: String, sanitizedName: Option[String]) { + val entityPath = sanitizedName match { + case Some(n) => entityType + "/" + n + case None => entityType + } + override def toString: String = { + val typeName = entityType match { + case ConfigType.User => "user-principal" + case ConfigType.Client => "client-id" + case ConfigType.Topic => "topic" + case t => t + } + sanitizedName match { + case Some(ConfigEntityName.Default) => "default " + typeName + case Some(n) => + val desanitized = if (entityType == ConfigType.User) QuotaId.desanitize(n) else n + s"$typeName '$desanitized'" + case None => entityType + } + } + } + + case class ConfigEntity(root: Entity, child: Option[Entity]) { + val fullSanitizedName = root.sanitizedName.getOrElse("") + child.map(s => "/" + s.entityPath).getOrElse("") + + def getAllEntities(zkUtils: ZkUtils) : Seq[ConfigEntity] = { + // Describe option examples: + // Describe entity with specified name: + // --entity-type topics --entity-name topic1 (topic1) + // Describe all entities of a type (topics/brokers/users/clients): + // --entity-type topics (all topics) + // Describe <user, client> quotas: + // --entity-type users --entity-name user1 --entity-type clients --entity-name client2 (<user1, client2>) + // --entity-type users --entity-name userA --entity-type clients (all clients of userA) + // --entity-type users --entity-type clients (all <user, client>s)) + // Describe default quotas: + // --entity-type users --entity-default (Default user) + // --entity-type users --entity-default --entity-type clients --entity-default (Default <user, client>) + (root.sanitizedName, child) match { + case (None, _) => + val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType) + .map(name => ConfigEntity(Entity(root.entityType, Some(name)), child)) + child match { + case Some (s) => + rootEntities.flatMap(rootEntity => + ConfigEntity(rootEntity.root, Some(Entity(s.entityType, None))).getAllEntities(zkUtils)) + case None => rootEntities + } + case (rootName, Some(childEntity)) => + childEntity.sanitizedName match { + case Some(subName) => Seq(this) + case None => + zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + childEntity.entityType) + .map(name => ConfigEntity(root, Some(Entity(childEntity.entityType, Some(name))))) + + } + case (rootName, None) => + Seq(this) + } + } + + override def toString: String = { + root.toString + child.map(s => ", " + s.toString).getOrElse("") + } + } + + private[admin] def parseEntity(opts: ConfigCommandOptions): ConfigEntity = { + val entityTypes = opts.options.valuesOf(opts.entityType) + if (entityTypes.head == ConfigType.User || entityTypes.head == ConfigType.Client) + parseQuotaEntity(opts) + else { + // Exactly one entity type and at-most one entity name expected for other entities + val name = if (opts.options.has(opts.entityName)) Some(opts.options.valueOf(opts.entityName)) else None + ConfigEntity(Entity(entityTypes.head, name), None) + } + } + + private def parseQuotaEntity(opts: ConfigCommandOptions): ConfigEntity = { + val types = opts.options.valuesOf(opts.entityType) + val namesIterator = opts.options.valuesOf(opts.entityName).iterator + val names = opts.options.specs + .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) + .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "") + + if (opts.options.has(opts.alterOpt) && names.size != types.size) + throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter") + + val reverse = types.size == 2 && types(0) == ConfigType.Client + val entityTypes = if (reverse) types.reverse else types.toBuffer + val sortedNames = (if (reverse && names.length == 2) names.reverse else names).iterator + + def sanitizeName(entityType: String, name: String) = { + if (name.isEmpty) + ConfigEntityName.Default + else { + entityType match { + case ConfigType.User => QuotaId.sanitize(name) + case ConfigType.Client => + validateChars("Client-id", name) + name + case _ => throw new IllegalArgumentException("Invalid entity type " + entityType) + } + } + } + + val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) Some(sanitizeName(t, sortedNames.next)) else None)) + ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else None) + } + class ConfigCommandOptions(args: Array[String]) { val parser = new OptionParser val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + @@ -159,19 +281,23 @@ object ConfigCommand { .ofType(classOf[String]) val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") val describeOpt = parser.accepts("describe", "List configs for the given entity.") - val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/brokers)") + val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers)") .withRequiredArg .ofType(classOf[String]) - val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/broker id)") + val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id)") .withRequiredArg .ofType(classOf[String]) + val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users (applies to corresponding entity type in command line)") val nl = System.getProperty("line.separator") val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " + "For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl + "For entity_type '" + ConfigType.Broker + "': " + nl + KafkaConfig.dynamicBrokerConfigs.map("\t" + _).mkString(nl) + nl + - "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + ClientConfigOverride.ProducerOverride - + nl + "\t" + ClientConfigOverride.ConsumerOverride) + "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + QuotaConfigOverride.ProducerOverride + + nl + "\t" + QuotaConfigOverride.ConsumerOverride + nl + + "For entity_type '" + ConfigType.User + "': " + nl + "\t" + QuotaConfigOverride.ProducerOverride + + nl + "\t" + QuotaConfigOverride.ConsumerOverride + nl + + s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be specified together to update config for clients of a specific user.") .withRequiredArg .ofType(classOf[String]) val deleteConfig = parser.accepts("delete-config", "config keys to remove 'k1,k2'") @@ -194,15 +320,27 @@ object ConfigCommand { CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType) CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt)) CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig)) + val entityTypeVals = options.valuesOf(entityType) if(options.has(alterOpt)) { - require(options.has(entityName), "--entity-name must be specified with --alter") + if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client)) { + if (!options.has(entityName) && !options.has(entityDefault)) + throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users/clients") + } else if (!options.has(entityName)) + throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals}") val isAddConfigPresent: Boolean = options.has(addConfig) val isDeleteConfigPresent: Boolean = options.has(deleteConfig) if(! isAddConfigPresent && ! isDeleteConfigPresent) throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter") } - require(ConfigType.all.contains(options.valueOf(entityType)), s"--entity-type must be one of ${ConfigType.all}") + entityTypeVals.foreach(entityTypeVal => + if (!ConfigType.all.contains(entityTypeVal)) + throw new IllegalArgumentException(s"Invalid entity-type ${entityTypeVal}, --entity-type must be one of ${ConfigType.all}") + ) + if (entityTypeVals.isEmpty) + throw new IllegalArgumentException("At least one --entity-type must be specified") + else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client))) + throw new IllegalArgumentException(s"Only '${ConfigType.User}' and '${ConfigType.Client}' entity types may be specified together") } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index cff7b1a..8aec2d2 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -25,6 +25,7 @@ import java.util.concurrent._ import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.metrics.KafkaMetricsGroup +import kafka.server.QuotaId import kafka.utils.{Logging, SystemTime} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidRequestException @@ -44,7 +45,9 @@ object RequestChannel extends Logging { RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct) } - case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) + case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) { + val sanitizedUser = QuotaId.sanitize(principal.getName) + } case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) { // These need to be volatile because the readers are in the network thread and the writers are in the request http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/ClientQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index e6cac5d..c4472c6 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -16,15 +16,19 @@ */ package kafka.server +import java.net.{URLEncoder, URLDecoder} +import java.nio.charset.StandardCharsets import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit} +import java.util.concurrent.locks.ReentrantReadWriteLock + import kafka.utils.{ShutdownableThread, Logging} import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg} -import java.util.concurrent.locks.ReentrantReadWriteLock - import org.apache.kafka.common.utils.Time +import scala.collection.JavaConversions._ + /** * Represents the sensors aggregated per client * @param quotaSensor @Sensor that tracks the quota @@ -34,7 +38,8 @@ private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor /** * Configuration settings for quota management - * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client + * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client-id if + * dynamic defaults or user quotas are not set * @param numQuotaSamples The number of samples to retain in memory * @param quotaWindowSizeSeconds The time span of each sample * @@ -53,11 +58,72 @@ object ClientQuotaManagerConfig { val DefaultQuotaWindowSizeSeconds = 1 // Purge sensors after 1 hour of inactivity val InactiveSensorExpirationTimeSeconds = 3600 + + val UnlimitedQuota = Quota.upperBound(Long.MaxValue) + val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default)) + val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None) + val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) +} + +object QuotaTypes { + val NoQuotas = 0 + val ClientIdQuotaEnabled = 1 + val UserQuotaEnabled = 2 + val UserClientIdQuotaEnabled = 4 } +object QuotaId { + + /** + * Sanitizes user principal to a safe value for use in MetricName + * and as Zookeeper node name + */ + def sanitize(user: String): String = { + val encoded = URLEncoder.encode(user, StandardCharsets.UTF_8.name) + val builder = new StringBuilder + for (i <- 0 until encoded.length) { + encoded.charAt(i) match { + case '*' => builder.append("%2A") // Metric ObjectName treats * as pattern + case '+' => builder.append("%20") // Space URL-encoded as +, replace with percent encoding + case c => builder.append(c) + } + } + builder.toString + } + + /** + * Decodes sanitized user principal + */ + def desanitize(sanitizedUser: String): String = { + URLDecoder.decode(sanitizedUser, StandardCharsets.UTF_8.name) + } +} + +case class QuotaId(sanitizedUser: Option[String], clientId: Option[String]) + +case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String, quota: Quota) + /** * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics * for all clients. + * <p/> + * Quotas can be set at <user, client-id>, user or client-id levels. For a given client connection, + * the most specific quota matching the connection will be applied. For example, if both a <user, client-id> + * and a user quota match a connection, the <user, client-id> quota will be used. Otherwise, user quota takes + * precedence over client-id quota. The order of precedence is: + * <ul> + * <li>/config/users/<user>/clients/<client-id> + * <li>/config/users/<user>/clients/<default> + * <li>/config/users/<user> + * <li>/config/users/<default>/clients/<client-id> + * <li>/config/users/<default>/clients/<default> + * <li>/config/users/<default> + * <li>/config/clients/<client-id> + * <li>/config/clients/<default> + * </ul> + * Quota limits including defaults may be updated dynamically. The implementation is optimized for the case + * where a single level of quotas is configured. + * * @param config @ClientQuotaManagerConfig quota configs * @param metrics @Metrics Metrics instance * @param apiKey API Key for the request @@ -67,8 +133,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private val metrics: Metrics, private val apiKey: QuotaType, private val time: Time) extends Logging { - private val overriddenQuota = new ConcurrentHashMap[String, Quota]() - private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault) + private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]() + private val staticConfigClientIdQuota = Quota.upperBound(config.quotaBytesPerSecondDefault) + private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == Long.MaxValue) QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled private val lock = new ReentrantReadWriteLock() private val delayQueue = new DelayQueue[ThrottledResponse]() private val sensorAccessor = new SensorAccess @@ -107,8 +174,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * @return Number of milliseconds to delay the response in case of Quota violation. * Zero otherwise */ - def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = { - val clientSensors = getOrCreateQuotaSensors(clientId) + def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: Int, callback: Int => Unit): Int = { + val clientQuotaEntity = quotaEntity(sanitizedUser, clientId) + val clientSensors = getOrCreateQuotaSensors(clientQuotaEntity) var throttleTimeMs = 0 try { clientSensors.quotaSensor.record(value) @@ -117,8 +185,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } catch { case qve: QuotaViolationException => // Compute the delay - val clientMetric = metrics.metrics().get(clientRateMetricName(clientId)) - throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId))) + val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId)) + throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)) clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) @@ -128,6 +196,127 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, throttleTimeMs } + /** + * Determines the quota-id for the client with the specified user principal + * and client-id and returns the quota entity that encapsulates the quota-id + * and the associated quota override or default quota. + * + */ + private def quotaEntity(sanitizedUser: String, clientId: String) : QuotaEntity = { + quotaTypesEnabled match { + case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled => + val quotaId = QuotaId(None, Some(clientId)) + var quota = overriddenQuota.get(quotaId) + if (quota == null) { + quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId) + if (quota == null) + quota = staticConfigClientIdQuota + } + QuotaEntity(quotaId, "", clientId, quota) + case QuotaTypes.UserQuotaEnabled => + val quotaId = QuotaId(Some(sanitizedUser), None) + var quota = overriddenQuota.get(quotaId) + if (quota == null) { + quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserQuotaId) + if (quota == null) + quota = ClientQuotaManagerConfig.UnlimitedQuota + } + QuotaEntity(quotaId, sanitizedUser, "", quota) + case QuotaTypes.UserClientIdQuotaEnabled => + val quotaId = QuotaId(Some(sanitizedUser), Some(clientId)) + var quota = overriddenQuota.get(quotaId) + if (quota == null) { + quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default))) + if (quota == null) { + quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId))) + if (quota == null) { + quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId) + if (quota == null) + quota = ClientQuotaManagerConfig.UnlimitedQuota + } + } + } + QuotaEntity(quotaId, sanitizedUser, clientId, quota) + case _ => + quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId) + } + } + + private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, clientId: String) : QuotaEntity = { + val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId)) + + val userQuotaId = QuotaId(Some(sanitizedUser), None) + val clientQuotaId = QuotaId(None, Some(clientId)) + var quotaId = userClientQuotaId + var quotaConfigId = userClientQuotaId + // 1) /config/users/<user>/clients/<client-id> + var quota = overriddenQuota.get(quotaConfigId) + if (quota == null) { + // 2) /config/users/<user>/clients/<default> + quotaId = userClientQuotaId + quotaConfigId = QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default)) + quota = overriddenQuota.get(quotaConfigId) + + if (quota == null) { + // 3) /config/users/<user> + quotaId = userQuotaId + quotaConfigId = quotaId + quota = overriddenQuota.get(quotaConfigId) + + if (quota == null) { + // 4) /config/users/<default>/clients/<client-id> + quotaId = userClientQuotaId + quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(clientId)) + quota = overriddenQuota.get(quotaConfigId) + + if (quota == null) { + // 5) /config/users/<default>/clients/<default> + quotaId = userClientQuotaId + quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) + quota = overriddenQuota.get(quotaConfigId) + + if (quota == null) { + // 6) /config/users/<default> + quotaId = userQuotaId + quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None) + quota = overriddenQuota.get(quotaConfigId) + + if (quota == null) { + // 7) /config/clients/<client-id> + quotaId = clientQuotaId + quotaConfigId = QuotaId(None, Some(clientId)) + quota = overriddenQuota.get(quotaConfigId) + + if (quota == null) { + // 8) /config/clients/<default> + quotaId = clientQuotaId + quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default)) + quota = overriddenQuota.get(quotaConfigId) + + if (quota == null) { + quotaId = clientQuotaId + quotaConfigId = null + quota = staticConfigClientIdQuota + } + } + } + } + } + } + } + } + val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser + val quotaClientId = if (quotaId == userQuotaId) "" else clientId + QuotaEntity(quotaId, quotaUser, quotaClientId, quota) + } + + /** + * Returns the quota for the client with the specified (non-encoded) user principal and client-id. + */ + def quota(user: String, clientId: String) = { + quotaEntity(QuotaId.sanitize(user), clientId).quota + } + /* * This calculates the amount of time needed to bring the metric within quota * assuming that no new metrics are recorded. @@ -153,40 +342,35 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } - /** - * Returns the quota for the specified clientId - */ - def quota(clientId: String): Quota = - if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota - /* * This function either returns the sensors for a given client id or creates them if they don't exist * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor */ - private def getOrCreateQuotaSensors(clientId: String): ClientSensors = { + private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors = { + // Names of the sensors to access ClientSensors( sensorAccessor.getOrCreate( - getQuotaSensorName(clientId), + getQuotaSensorName(quotaEntity.quotaId), ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, lock, metrics, - () => clientRateMetricName(clientId), - () => getQuotaMetricConfig(quota(clientId)), + () => clientRateMetricName(quotaEntity.sanitizedUser, quotaEntity.clientId), + () => getQuotaMetricConfig(quotaEntity.quota), () => new Rate() ), - sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientId), + sensorAccessor.getOrCreate(getThrottleTimeSensorName(quotaEntity.quotaId), ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, lock, metrics, - () => metrics.metricName("throttle-time", apiKey.toString, "Tracking average throttle-time per client", "client-id", clientId), + () => throttleMetricName(quotaEntity), () => null, () => new Avg() ) ) } - private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId + private def getThrottleTimeSensorName(quotaId: QuotaId): String = apiKey + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("") - private def getQuotaSensorName(clientId: String): String = apiKey + "-" + clientId + private def getQuotaSensorName(quotaId: QuotaId): String = apiKey + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("") private def getQuotaMetricConfig(quota: Quota): MetricConfig = { new MetricConfig() @@ -196,19 +380,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } /** - * Reset quotas to the default value for the given clientId - * @param clientId client to override - */ - def resetQuota(clientId: String) = { - updateQuota(clientId, defaultQuota) - } - - /** - * Overrides quotas per clientId - * @param clientId client to override - * @param quota custom quota to apply + * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults + * for any of these levels. + * @param sanitizedUser user to override if quota applies to <user> or <user, client-id> + * @param clientId client to override if quota applies to <client-id> or <user, client-id> + * @param quota custom quota to apply or None if quota override is being removed */ - def updateQuota(clientId: String, quota: Quota) = { + def updateQuota(sanitizedUser: Option[String], clientId: Option[String], quota: Option[Quota]) { /* * Acquire the write lock to apply changes in the quota objects. * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists). @@ -218,31 +396,85 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, */ lock.writeLock().lock() try { - logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}") - - if (quota.equals(defaultQuota)) - this.overriddenQuota.remove(clientId) - else - this.overriddenQuota.put(clientId, quota) - - // Change the underlying metric config if the sensor has been created. - // Note the metric could be expired by another thread, so use a local variable and null check. - val metric = metrics.metrics.get(clientRateMetricName(clientId)) - if (metric != null) { - logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig") - metric.config(getQuotaMetricConfig(quota)) + val quotaId = QuotaId(sanitizedUser, clientId) + val userInfo = sanitizedUser match { + case Some(ConfigEntityName.Default) => "default user " + case Some(user) => "user " + user + " " + case None => "" + } + val clientIdInfo = clientId match { + case Some(ConfigEntityName.Default) => "default client-id" + case Some(id) => "client-id " + id + case None => "" + } + quota match { + case Some(newQuota) => + logger.info(s"Changing ${apiKey} quota for ${userInfo}${clientIdInfo} to ${newQuota.bound}") + overriddenQuota.put(quotaId, newQuota) + (sanitizedUser, clientId) match { + case (Some(u), Some(c)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled + case (Some(u), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled + case (None, Some(c)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled + case (None, None) => + } + case None => + logger.info(s"Removing ${apiKey} quota for ${userInfo}${clientIdInfo}") + overriddenQuota.remove(quotaId) } + + val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), clientId.getOrElse("")) + val allMetrics = metrics.metrics() + + // If multiple-levels of quotas are defined or if this is a default quota update, traverse metrics + // to find all affected values. Otherwise, update just the single matching one. + val singleUpdate = quotaTypesEnabled match { + case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled => + !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !clientId.filter(_ == ConfigEntityName.Default).isDefined + case _ => false + } + if (singleUpdate) { + // Change the underlying metric config if the sensor has been created + val metric = allMetrics.get(quotaMetricName) + if (metric != null) { + val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse("")) + val newQuota = metricConfigEntity.quota + logger.info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig") + metric.config(getQuotaMetricConfig(newQuota)) + } + } else { + allMetrics.filterKeys(n => n.name == quotaMetricName.name && n.group == quotaMetricName.group).foreach { + case (metricName, metric) => + val userTag = if (metricName.tags.containsKey("user")) metricName.tags.get("user") else "" + val clientIdTag = if (metricName.tags.containsKey("client-id")) metricName.tags.get("client-id") else "" + val metricConfigEntity = quotaEntity(userTag, clientIdTag) + if (metricConfigEntity.quota != metric.config.quota) { + val newQuota = metricConfigEntity.quota + logger.info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig") + metric.config(getQuotaMetricConfig(newQuota)) + } + } + } + } finally { lock.writeLock().unlock() } } - private def clientRateMetricName(clientId: String): MetricName = { + private def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = { metrics.metricName("byte-rate", apiKey.toString, - "Tracking byte-rate per client", + "Tracking byte-rate per user/client-id", + "user", sanitizedUser, "client-id", clientId) } + private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = { + metrics.metricName("throttle-time", + apiKey.toString, + "Tracking average throttle-time per user/client-id", + "user", quotaEntity.sanitizedUser, + "client-id", quotaEntity.clientId) + } + def shutdown() = { throttledRequestReaper.shutdown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/ConfigHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 67b74a7..5be9c12 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -96,31 +96,59 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC } } -object ClientConfigOverride { +object QuotaConfigOverride { val ProducerOverride = "producer_byte_rate" val ConsumerOverride = "consumer_byte_rate" } /** - * The ClientIdConfigHandler will process clientId config changes in ZK. - * The callback provides the clientId and the full properties set read from ZK. - * This implementation reports the overrides to the respective ClientQuotaManager objects - */ -class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends ConfigHandler { + * Handles <client-id>, <user> or <user, client-id> quota config updates in ZK. + * This implementation reports the overrides to the respective ClientQuotaManager objects + */ +class QuotaConfigHandler(private val quotaManagers: QuotaManagers) { + + def updateQuotaConfig(sanitizedUser: Option[String], clientId: Option[String], config: Properties) { + val producerQuota = + if (config.containsKey(QuotaConfigOverride.ProducerOverride)) + Some(new Quota(config.getProperty(QuotaConfigOverride.ProducerOverride).toLong, true)) + else + None + quotaManagers.produce.updateQuota(sanitizedUser, clientId, producerQuota) + val consumerQuota = + if (config.containsKey(QuotaConfigOverride.ConsumerOverride)) + Some(new Quota(config.getProperty(QuotaConfigOverride.ConsumerOverride).toLong, true)) + else + None + quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota) + } +} + +/** + * The ClientIdConfigHandler will process clientId config changes in ZK. + * The callback provides the clientId and the full properties set read from ZK. + */ +class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends QuotaConfigHandler(quotaManagers) with ConfigHandler { + def processConfigChanges(clientId: String, clientConfig: Properties) { - if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) { - quotaManagers.produce.updateQuota(clientId, - new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true)) - } else { - quotaManagers.fetch.resetQuota(clientId) - } + updateQuotaConfig(None, Some(clientId), clientConfig) + } +} - if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) { - quotaManagers.fetch.updateQuota(clientId, - new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true)) - } else { - quotaManagers.produce.resetQuota(clientId) - } +/** + * The UserConfigHandler will process <user> and <user, client-id> quota changes in ZK. + * The callback provides the node name containing sanitized user principal, client-id if this is + * a <user, client-id> update and the full properties set read from ZK. + */ +class UserConfigHandler(private val quotaManagers: QuotaManagers) extends QuotaConfigHandler(quotaManagers) with ConfigHandler { + + def processConfigChanges(quotaEntityPath: String, config: Properties) { + // Entity path is <user> or <user>/clients/<client> + val entities = quotaEntityPath.split("/") + if (entities.length != 1 && entities.length != 3) + throw new IllegalArgumentException("Invalid quota entity path: " + quotaEntityPath); + val sanitizedUser = entities(0) + val clientId = if (entities.length == 3) Some(entities(2)) else None + updateQuotaConfig(Some(sanitizedUser), clientId, config) } } @@ -151,4 +179,4 @@ object ThrottledReplicaValidator extends Validator { private def isValid(proposed: String): Boolean = { proposed.trim.equals("*") || proposed.trim.matches("([0-9]+:[0-9]+)?(,[0-9]+:[0-9]+)*") } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/DynamicConfigManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index 556534a..b31d838 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -37,8 +37,13 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} object ConfigType { val Topic = "topics" val Client = "clients" + val User = "users" val Broker = "brokers" - val all = Seq(Topic, Client, Broker) + val all = Seq(Topic, Client, User, Broker) +} + +object ConfigEntityName { + val Default = "<default>" } /** @@ -48,7 +53,9 @@ object ConfigType { * * Config is stored under the path: /config/entityType/entityName * E.g. /config/topics/<topic_name> and /config/clients/<clientId> - * This znode stores the overrides for this entity (but no defaults) in properties format. + * This znode stores the overrides for this entity in properties format with defaults stored using entityName "<default>". + * Multiple entity names may be specified (eg. <user, client-id> quotas) using a hierarchical path: + * E.g. /config/users/<user>/clients/<clientId> * * To avoid watching all topics for changes instead we have a notification path * /config/changes @@ -57,8 +64,10 @@ object ConfigType { * To update a config we first update the config properties. Then we create a new sequential * znode under the change path which contains the name of the entityType and entityName that was updated, say * /config/changes/config_change_13321 - * The sequential znode contains data in this format: {"version" : 1, "entityType":"topic/client", "entityName" : "topic_name/client_id"} + * The sequential znode contains data in this format: {"version" : 1, "entity_type":"topic/client", "entity_name" : "topic_name/client_id"} * This is just a notification--the actual config change is stored only once under the /config/entityType/entityName path. + * Version 2 of notifications has the format: {"version" : 2, "entity_path":"entity_type/entity_name"} + * Multiple entities may be specified as a hierarchical path (eg. users/<user>/clients/<clientId>). * * This will fire a watcher on all brokers. This watcher works as follows. It reads all the config change notifications. * It keeps track of the highest config change suffix number it has applied previously. For any previously applied change it finds @@ -89,30 +98,60 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, case Some(mapAnon: Map[_, _]) => val map = mapAnon collect { case (k: String, v: Any) => k -> v } - require(map("version") == 1) - - val entityType = map.get("entity_type") match { - case Some(ConfigType.Topic) => ConfigType.Topic - case Some(ConfigType.Client) => ConfigType.Client - case Some(ConfigType.Broker) => ConfigType.Broker - case _ => throw new IllegalArgumentException(s"Config change notification must have 'entity_type' set to one of ${ConfigType.all}. Received: $json") - } - val entity = map.get("entity_name") match { - case Some(value: String) => value - case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json) + map("version") match { + case 1 => processEntityConfigChangeVersion1(json, map) + case 2 => processEntityConfigChangeVersion2(json, map) + case _ => throw new IllegalArgumentException("Config change notification has an unsupported version " + map("version") + + "Supported versions are 1 and 2.") } - val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity) - logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig") - configHandlers(entityType).processConfigChanges(entity, entityConfig) case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" + - "{\"version\" : 1," + - " \"entity_type\":\"topic/client\"," + - " \"entity_name\" : \"topic_name/client_id\"}." + + "{\"version\" : 1, \"entity_type\":\"topics/clients\", \"entity_name\" : \"topic_name/client_id\"}." + " or " + + "{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." + " Received: " + json) } } + + private def processEntityConfigChangeVersion1(json: String, map: Map[String, Any]) { + + val entityType = map.get("entity_type") match { + case Some(ConfigType.Topic) => ConfigType.Topic + case Some(ConfigType.Client) => ConfigType.Client + case _ => throw new IllegalArgumentException("Version 1 config change notification must have 'entity_type' set to 'clients' or 'topics'." + + " Received: " + json) + } + + val entity = map.get("entity_name") match { + case Some(value: String) => value + case _ => throw new IllegalArgumentException("Version 1 config change notification does not specify 'entity_name'. Received: " + json) + } + + val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity) + logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig") + configHandlers(entityType).processConfigChanges(entity, entityConfig) + + } + + private def processEntityConfigChangeVersion2(json: String, map: Map[String, Any]) { + + val entityPath = map.get("entity_path") match { + case Some(value: String) => value + case _ => throw new IllegalArgumentException("Version 2 config change notification does not specify 'entity_path'. Received: " + json) + } + + val index = entityPath.indexOf('/') + val rootEntityType = entityPath.substring(0, index) + if (index < 0 || !configHandlers.contains(rootEntityType)) + throw new IllegalArgumentException("Version 2 config change notification must have 'entity_path' starting with 'clients/', 'topics/' or 'users/'." + + " Received: " + json) + val fullSanitizedEntityName = entityPath.substring(index + 1) + + val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, rootEntityType, fullSanitizedEntityName) + logger.info(s"Processing override for entityPath: $entityPath with config: $entityConfig") + configHandlers(rootEntityType).processConfigChanges(fullSanitizedEntityName, entityConfig) + + } } private val configChangeListener = new ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler) @@ -122,5 +161,20 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, */ def startup(): Unit = { configChangeListener.init() + + // Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides + configHandlers.foreach { + case (ConfigType.User, handler) => + AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.User).foreach { + case (sanitizedUser, properties) => handler.processConfigChanges(sanitizedUser, properties) + } + AdminUtils.fetchAllChildEntityConfigs(zkUtils, ConfigType.User, ConfigType.Client).foreach { + case (sanitizedUserClientId, properties) => handler.processConfigChanges(sanitizedUserClientId, properties) + } + case (configType, handler) => + AdminUtils.fetchAllEntityConfigs(zkUtils, configType).foreach { + case (entityName, properties) => handler.processConfigChanges(entityName, properties) + } + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 3008426..d3ba5ef 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -396,6 +396,7 @@ class KafkaApis(val requestChannel: RequestChannel, request.apiRemoteCompleteTimeMs = SystemTime.milliseconds quotas.produce.recordAndMaybeThrottle( + request.session.sanitizedUser, request.header.clientId, numBytesAppended, produceResponseCallback) @@ -494,7 +495,7 @@ class KafkaApis(val requestChannel: RequestChannel, fetchResponseCallback(0) } else { val size = FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), fetchRequest.versionId) - quotas.fetch.recordAndMaybeThrottle(fetchRequest.clientId, size, fetchResponseCallback) + quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, fetchRequest.clientId, size, fetchResponseCallback) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3671297..b37be5b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -523,8 +523,10 @@ object KafkaConfig { "or this timeout is reached. This is similar to the producer request timeout." val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden" /** ********* Quota Configuration ***********/ - val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" - val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" + val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user>, <client-id> or <user, client-id> in Zookeeper. " + + "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" + val ConsumerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when dynamic default quotas are not configured for <user, <client-id> or <user, client-id> in Zookeeper. " + + "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" val NumQuotaSamplesDoc = "The number of samples to retain in memory for client quotas" val NumReplicationQuotaSamplesDoc = "The number of samples to retain in memory for replication quotas" val QuotaWindowSizeSecondsDoc = "The time span of each sample for client quotas" http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index db92cb8..5055c87 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -251,14 +251,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers), ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), + ConfigType.User -> new UserConfigHandler(quotaManagers), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) - // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides - // TODO: Move this logic to DynamicConfigManager - AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach { - case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties) - } - // Create the config manager. start listening to notifications dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) dynamicConfigManager.startup() http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 503ed54..96779ff 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -119,6 +119,9 @@ object ZkUtils { def getEntityConfigPath(entityType: String, entity: String): String = getEntityConfigRootPath(entityType) + "/" + entity + def getEntityConfigPath(entityPath: String): String = + ZkUtils.EntityConfigPath + "/" + entityPath + def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala new file mode 100644 index 0000000..c9b7787 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -0,0 +1,195 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package kafka.api + +import java.util.Properties + +import kafka.server.{QuotaConfigOverride, KafkaConfig, KafkaServer, QuotaId} +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer._ +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.{Quota, KafkaMetric} +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.Assert._ +import org.junit.{After, Before, Test} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.Map +import scala.collection.mutable + +abstract class BaseQuotaTest extends IntegrationTestHarness { + + def userPrincipal : String + def producerQuotaId : QuotaId + def consumerQuotaId : QuotaId + def overrideQuotas(producerQuota: Long, consumerQuota: Long) + def removeQuotaOverrides() + + override val serverCount = 2 + val producerCount = 1 + val consumerCount = 1 + + private val producerBufferSize = 300000 + protected val producerClientId = "QuotasTestProducer-1" + protected val consumerClientId = "QuotasTestConsumer-1" + + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2") + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") + this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000") + this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0") + this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString) + this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, producerClientId) + this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest") + this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) + this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId) + + // Low enough quota that a producer sending a small payload in a tight loop should get throttled + val defaultProducerQuota = 8000 + val defaultConsumerQuota = 2500 + + var leaderNode: KafkaServer = null + var followerNode: KafkaServer = null + private val topic1 = "topic-1" + + @Before + override def setUp() { + super.setUp() + + val numPartitions = 1 + val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, serverCount, servers) + leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1) + followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1) + assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) + } + + @After + override def tearDown() { + super.tearDown() + } + + @Test + def testThrottledProducerConsumer() { + val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala + + val numRecords = 1000 + produce(producers.head, numRecords) + + val producerMetricName = throttleMetricName(ApiKeys.PRODUCE, producerQuotaId) + assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0) + + // Consumer should read in a bursty manner and get throttled immediately + consume(consumers.head, numRecords) + val consumerMetricName = throttleMetricName(ApiKeys.FETCH, consumerQuotaId) + assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0) + } + + @Test + def testProducerConsumerOverrideUnthrottled() { + // Give effectively unlimited quota for producer and consumer + val props = new Properties() + props.put(QuotaConfigOverride.ProducerOverride, Long.MaxValue.toString) + props.put(QuotaConfigOverride.ConsumerOverride, Long.MaxValue.toString) + + overrideQuotas(Long.MaxValue, Long.MaxValue) + waitForQuotaUpdate(Long.MaxValue, Long.MaxValue) + + val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala + val numRecords = 1000 + produce(producers.head, numRecords) + val producerMetricName = throttleMetricName(ApiKeys.PRODUCE, producerQuotaId) + assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0) + + // The "client" consumer does not get throttled. + consume(consumers.head, numRecords) + val consumerMetricName = throttleMetricName(ApiKeys.FETCH, consumerQuotaId) + assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0) + } + + @Test + def testQuotaOverrideDelete() { + // Override producer and consumer quotas to unlimited + overrideQuotas(Long.MaxValue, Long.MaxValue) + + val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala + val numRecords = 1000 + produce(producers.head, numRecords) + assertTrue("Should not have been throttled", allMetrics(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)).value() == 0) + consume(consumers.head, numRecords) + assertTrue("Should not have been throttled", allMetrics(throttleMetricName(ApiKeys.FETCH, consumerQuotaId)).value() == 0) + + // Delete producer and consumer quota overrides. Consumer and producer should now be + // throttled since broker defaults are very small + removeQuotaOverrides() + produce(producers.head, numRecords) + + assertTrue("Should have been throttled", allMetrics(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)).value() > 0) + consume(consumers.head, numRecords) + assertTrue("Should have been throttled", allMetrics(throttleMetricName(ApiKeys.FETCH, consumerQuotaId)).value() > 0) + } + + def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = { + var numBytesProduced = 0 + for (i <- 0 to count) { + val payload = i.toString.getBytes + numBytesProduced += payload.length + p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload), + new ErrorLoggingCallback(topic1, null, null, true)).get() + Thread.sleep(1) + } + numBytesProduced + } + + def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) { + consumer.subscribe(List(topic1)) + var numConsumed = 0 + while (numConsumed < numRecords) { + for (cr <- consumer.poll(100)) { + numConsumed += 1 + } + } + } + + def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long) { + TestUtils.retry(10000) { + val quotaManagers = leaderNode.apis.quotas + val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, producerClientId) + val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, consumerClientId) + + assertEquals(s"ClientId $producerClientId of user $userPrincipal must have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota) + assertEquals(s"ClientId $consumerClientId of user $userPrincipal must have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota) + } + } + + private def throttleMetricName(apiKey: ApiKeys, quotaId: QuotaId): MetricName = { + leaderNode.metrics.metricName("throttle-time", + apiKey.name, + "Tracking throttle-time per user/client-id", + "user", quotaId.sanitizedUser.getOrElse(""), + "client-id", quotaId.clientId.getOrElse("")) + } + + def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = { + val props = new Properties() + props.put(QuotaConfigOverride.ProducerOverride, producerQuota.toString) + props.put(QuotaConfigOverride.ConsumerOverride, consumerQuota.toString) + props + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala new file mode 100644 index 0000000..7477f7f --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala @@ -0,0 +1,55 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package kafka.api + +import java.util.Properties + +import kafka.admin.AdminUtils +import kafka.server.{KafkaConfig, QuotaConfigOverride, QuotaId} +import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.junit.Before + +class ClientIdQuotaTest extends BaseQuotaTest { + + override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName + override val producerQuotaId = QuotaId(None, Some(producerClientId)) + override val consumerQuotaId = QuotaId(None, Some(consumerClientId)) + + @Before + override def setUp() { + this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, defaultProducerQuota.toString) + this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, defaultConsumerQuota.toString) + super.setUp() + } + + override def overrideQuotas(producerQuota: Long, consumerQuota: Long) { + val producerProps = new Properties() + producerProps.put(QuotaConfigOverride.ProducerOverride, producerQuota.toString) + updateQuotaOverride(producerClientId, producerProps) + + val consumerProps = new Properties() + consumerProps.put(QuotaConfigOverride.ConsumerOverride, consumerQuota.toString) + updateQuotaOverride(consumerClientId, consumerProps) + } + override def removeQuotaOverrides() { + val emptyProps = new Properties + updateQuotaOverride(producerClientId, emptyProps) + updateQuotaOverride(consumerClientId, emptyProps) + } + + private def updateQuotaOverride(clientId: String, properties: Properties) { + AdminUtils.changeClientIdConfig(zkUtils, clientId, properties) + } +}