This is an automated email from the ASF dual-hosted git repository. gwenshap pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new a99e011 KAFKA-7800; Dynamic log levels admin API (KIP-412) a99e011 is described below commit a99e0111114d1cb8c762494ac195cf84e6425bb3 Author: Stanislav Kozlovski <familyguyuser...@windowslive.com> AuthorDate: Fri Aug 2 11:51:35 2019 -0700 KAFKA-7800; Dynamic log levels admin API (KIP-412) <!-- Is there any breaking changes? If so this is a major release, make sure '#major' is in at least one commit message to get CI to bump the major. This will prevent automatic down stream dependency bumping / consuming. For more information about semantic versioning see: https://semver.org/ Suggested PR template: Fill/delete/add sections as needed. Optionally delete any commented block. --> What ---- <!-- Briefly describe **what** you have changed and **why**. Optionally include implementation strategy. --> References ---------- [**KIP-412**](https://cwiki.apache.org/confluence/display/KAFKA/KIP-412%3A+Extend+Admin+API+to+support+dynamic+application+log+levels) [**KAFKA-7800**](https://issues.apache.org/jira/browse/KAFKA-7800) [**Discussion Thread**](http://mail-archives.apache.org/mod_mbox/kafka-dev/201901.mbox/%3CCANZZNGyeVw8q%3Dx9uOQS-18wL3FEmnOwpBnpJ9x3iMLdXY3gEug%40mail.gmail.com%3E) [**Vote Thread**](http://mail-archives.apache.org/mod_mbox/kafka-dev/201902.mbox/%3CCANZZNGzpTJg5YX1Gpe5S%3DHSr%3DXGvmxvYLTdA3jWq_qwH-UvorQ%40mail.gmail.com%3E) <!-- Copy&paste links: to Jira ticket, other PRs, issues, Slack conversations... For code bumps: link to PR, tag or GitHub `/compare/master...master` --> Test&Review ------------ Test cases covered: * DescribeConfigs * Alter the log level with and without validateOnly, validate the results with DescribeConfigs Open questions / Follow ups -------------------------- If you're a reviewer, I'd appreciate your thoughts on these questions I have open: 1. Should we add synchronization to the Log4jController methods? - Seems like we don't get much value from it 2. Should we instantiate a new Log4jController instead of it having static methods? - All operations are stateless, so I thought static methods would do well 3. A logger which does not have a set value returns "null" (as seen in the unit tests). Should we just return the Root logger's level? Author: Stanislav Kozlovski <familyguyuser...@windowslive.com> Reviewers: Gwen Shapira Closes #6903 from stanislavkozlovski/KAFKA-7800-dynamic-log-levels-admin-ap --- .../apache/kafka/clients/admin/ConfigEntry.java | 1 + .../kafka/clients/admin/KafkaAdminClient.java | 17 +- .../apache/kafka/common/config/ConfigResource.java | 2 +- .../apache/kafka/common/config/LogLevelConfig.java | 71 +++++++ .../common/requests/DescribeConfigsResponse.java | 3 +- .../src/main/scala/kafka/admin/ConfigCommand.scala | 138 +++++++++---- .../src/main/scala/kafka/server/AdminManager.scala | 69 ++++++- core/src/main/scala/kafka/server/KafkaApis.scala | 8 +- .../main/scala/kafka/utils/Log4jController.scala | 92 ++++++--- .../kafka/api/AdminClientIntegrationTest.scala | 229 ++++++++++++++++++++- .../kafka/api/AuthorizerIntegrationTest.scala | 34 ++- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 164 +++++++++++++-- 12 files changed, 719 insertions(+), 109 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java index 7775b6a..42cc627 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java @@ -189,6 +189,7 @@ public class ConfigEntry { */ public enum ConfigSource { DYNAMIC_TOPIC_CONFIG, // dynamic topic config that is configured for a specific topic + DYNAMIC_BROKER_LOGGER_CONFIG, // dynamic broker logger config that is configured for a specific broker DYNAMIC_BROKER_CONFIG, // dynamic broker config that is configured for a specific broker DYNAMIC_DEFAULT_BROKER_CONFIG, // dynamic broker config that is configured as default for all brokers in the cluster STATIC_BROKER_CONFIG, // static broker config provided as broker properties at start up (e.g. server.properties file) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 227a03b..8092eec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1762,7 +1762,7 @@ public class KafkaAdminClient extends AdminClient { final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(configResources.size()); for (ConfigResource resource : configResources) { - if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) { + if (dependsOnSpecificNode(resource)) { brokerFutures.put(resource, new KafkaFutureImpl<>()); brokerResources.add(resource); } else { @@ -1887,6 +1887,9 @@ public class KafkaAdminClient extends AdminClient { case STATIC_BROKER_CONFIG: configSource = ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG; break; + case DYNAMIC_BROKER_LOGGER_CONFIG: + configSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG; + break; case DEFAULT_CONFIG: configSource = ConfigEntry.ConfigSource.DEFAULT_CONFIG; break; @@ -1906,7 +1909,7 @@ public class KafkaAdminClient extends AdminClient { final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(); for (ConfigResource resource : configs.keySet()) { - if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) { + if (dependsOnSpecificNode(resource)) { NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name())); allFutures.putAll(alterConfigs(configs, options, Collections.singleton(resource), nodeProvider)); } else @@ -1971,7 +1974,7 @@ public class KafkaAdminClient extends AdminClient { final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>(); for (ConfigResource resource : configs.keySet()) { - if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) { + if (dependsOnSpecificNode(resource)) { NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name())); allFutures.putAll(incrementalAlterConfigs(configs, options, Collections.singleton(resource), nodeProvider)); } else @@ -3070,4 +3073,12 @@ public class KafkaAdminClient extends AdminClient { return new ElectLeadersResult(electionFuture); } + + /** + * Returns a boolean indicating whether the resource needs to go to a specific node + */ + private boolean dependsOnSpecificNode(ConfigResource resource) { + return (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) + || resource.type() == ConfigResource.Type.BROKER_LOGGER; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java index 5343a6b..8870238 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java @@ -33,7 +33,7 @@ public final class ConfigResource { * Type of resource. */ public enum Type { - BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0); + BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0); private static final Map<Byte, Type> TYPES = Collections.unmodifiableMap( Arrays.stream(values()).collect(Collectors.toMap(Type::id, Function.identity())) diff --git a/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java b/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java new file mode 100644 index 0000000..fe7e2eb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/LogLevelConfig.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.config; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * This class holds definitions for log level configurations related to Kafka's application logging. See KIP-412 for additional information + */ +public class LogLevelConfig { + /* + * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE. + */ + + /** + * The <code>FATAL</code> level designates a very severe error + * that will lead the Kafka broker to abort. + */ + public static final String FATAL_LOG_LEVEL = "FATAL"; + + /** + * The <code>ERROR</code> level designates error events that + * might still allow the broker to continue running. + */ + public static final String ERROR_LOG_LEVEL = "ERROR"; + + /** + * The <code>WARN</code> level designates potentially harmful situations. + */ + public static final String WARN_LOG_LEVEL = "WARN"; + + /** + * The <code>INFO</code> level designates informational messages + * that highlight normal Kafka events at a coarse-grained level + */ + public static final String INFO_LOG_LEVEL = "INFO"; + + /** + * The <code>DEBUG</code> level designates fine-grained + * informational events that are most useful to debug Kafka + */ + public static final String DEBUG_LOG_LEVEL = "DEBUG"; + + /** + * The <code>TRACE</code> level designates finer-grained + * informational events than the <code>DEBUG</code> level. + */ + public static final String TRACE_LOG_LEVEL = "TRACE"; + + public static final Set<String> VALID_LOG_LEVELS = new HashSet<>(Arrays.asList( + FATAL_LOG_LEVEL, ERROR_LOG_LEVEL, WARN_LOG_LEVEL, + INFO_LOG_LEVEL, DEBUG_LOG_LEVEL, TRACE_LOG_LEVEL + )); +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java index 51c35d5..6d424f2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -179,7 +179,8 @@ public class DescribeConfigsResponse extends AbstractResponse { DYNAMIC_BROKER_CONFIG((byte) 2), DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3), STATIC_BROKER_CONFIG((byte) 4), - DEFAULT_CONFIG((byte) 5); + DEFAULT_CONFIG((byte) 5), + DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6); final byte id; private static final ConfigSource[] VALUES = values(); diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 7edc4a4..781cc1a 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -28,8 +28,8 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncod import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{Admin, AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig} -import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig} +import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig} import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.security.JaasUtils @@ -41,7 +41,6 @@ import scala.collection._ /** - * This script can be used to change configs for topics/clients/brokers dynamically * This script can be used to change configs for topics/clients/users/brokers dynamically * An entity described or altered by the command may be one of: * <ul> @@ -50,12 +49,15 @@ import scala.collection._ * <li> user: --entity-type users --entity-name <user-principal> * <li> <user, client>: --entity-type users --entity-name <user-principal> --entity-type clients --entity-name <client-id> * <li> broker: --entity-type brokers --entity-name <broker> + * <li> broker-logger: --entity-type broker-loggers --entity-name <broker> * </ul> * --entity-default may be used instead of --entity-name when describing or altering default configuration for users and clients. * */ object ConfigCommand extends Config { + val BrokerLoggerConfigType = "broker-loggers" + val BrokerSupportedConfigTypes = Seq(ConfigType.Broker, BrokerLoggerConfigType) val DefaultScramIterations = 4096 // Dynamic broker configs can only be updated using the new AdminClient once brokers have started // so that configs may be fully validated. Prior to starting brokers, updates may be performed using @@ -274,49 +276,61 @@ object ConfigCommand extends Config { val adminClient = JAdminClient.create(props) val entityName = if (opts.options.has(opts.entityName)) opts.options.valueOf(opts.entityName) - else if (opts.options.has(opts.entityDefault)) + else // default entity "" - else - throw new IllegalArgumentException("At least one of --entity-name or --entity-default must be specified with --bootstrap-server") val entityTypes = opts.options.valuesOf(opts.entityType).asScala if (entityTypes.size != 1) - throw new IllegalArgumentException("Exactly one --entity-type must be specified with --bootstrap-server") - if (entityTypes.head != ConfigType.Broker) - throw new IllegalArgumentException(s"--zookeeper option must be specified for entity-type $entityTypes") + throw new IllegalArgumentException(s"Exactly one --entity-type (out of ${BrokerSupportedConfigTypes.mkString(",")}) must be specified with --bootstrap-server") try { if (opts.options.has(opts.alterOpt)) - alterBrokerConfig(adminClient, opts, entityName) + alterBrokerConfig(adminClient, opts, entityTypes.head, entityName) else if (opts.options.has(opts.describeOpt)) - describeBrokerConfig(adminClient, opts, entityName) + describeBrokerConfig(adminClient, opts, entityTypes.head, entityName) } finally { adminClient.close() } } - private[admin] def alterBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, entityName: String) { + private[admin] def alterBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, + entityType: String, entityName: String) { val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) } val configsToBeDeleted = parseConfigsToBeDeleted(opts) - // compile the final set of configs - val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName) - val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false) + if (entityType == ConfigType.Broker) { + val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName) + val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false) .map { entry => (entry.name, entry) }.toMap - // fail the command if any of the configs to be deleted does not exist - val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) - if (invalidConfigs.nonEmpty) - throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") - - val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted - val sensitiveEntries = newEntries.filter(_._2.value == null) - if (sensitiveEntries.nonEmpty) - throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") - val newConfig = new JConfig(newEntries.asJava.values) - - val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) - adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) + // fail the command if any of the configs to be deleted does not exist + val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) + if (invalidConfigs.nonEmpty) + throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") + + val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted + val sensitiveEntries = newEntries.filter(_._2.value == null) + if (sensitiveEntries.nonEmpty) + throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") + val newConfig = new JConfig(newEntries.asJava.values) + + val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) + adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) + } else if (entityType == BrokerLoggerConfigType) { + val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName) + val validLoggers = brokerLoggerConfigs(adminClient, entityName).map(_.name) + // fail the command if any of the configured broker loggers do not exist + val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains) + if (invalidBrokerLoggers.nonEmpty) + throw new InvalidConfigurationException(s"Invalid broker logger(s): ${invalidBrokerLoggers.mkString(",")}") + + val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) + val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET)) + ++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) } + ).asJavaCollection + + adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) + } if (entityName.nonEmpty) println(s"Completed updating config for broker: $entityName.") @@ -324,8 +338,13 @@ object ConfigCommand extends Config { println(s"Completed updating default config for brokers in the cluster,") } - private def describeBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, entityName: String) { - val configs = brokerConfig(adminClient, entityName, includeSynonyms = true) + private def describeBrokerConfig(adminClient: Admin, opts: ConfigCommandOptions, + entityType: String, entityName: String) { + val configs = if (entityType == ConfigType.Broker) + brokerConfig(adminClient, entityName, includeSynonyms = true) + else // broker logger + brokerLoggerConfigs(adminClient, entityName) + if (entityName.nonEmpty) println(s"Configs for broker $entityName are:") else @@ -349,6 +368,15 @@ object ConfigCommand extends Config { .toSeq } + /** + * Returns all the valid broker logger configurations + */ + private def brokerLoggerConfigs(adminClient: Admin, entityName: String): Seq[ConfigEntry] = { + val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityName) + val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS) + configs.get(configResource).entries.asScala.toSeq + } + case class Entity(entityType: String, sanitizedName: Option[String]) { val entityPath = sanitizedName match { case Some(n) => entityType + "/" + n @@ -445,7 +473,7 @@ object ConfigCommand extends Config { if (opts.options.has(opts.alterOpt) && names.size != types.size) throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter") - val reverse = types.size == 2 && types(0) == ConfigType.Client + val reverse = types.size == 2 && types.head == ConfigType.Client val entityTypes = if (reverse) types.reverse else types val sortedNames = (if (reverse && names.length == 2) names.reverse else names).iterator @@ -483,7 +511,7 @@ object ConfigCommand extends Config { .ofType(classOf[String]) val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") val describeOpt = parser.accepts("describe", "List configs for the given entity.") - val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers)") + val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers)") .withRequiredArg .ofType(classOf[String]) val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id)") @@ -514,36 +542,58 @@ object ConfigCommand extends Config { val actions = Seq(alterOpt, describeOpt).count(options.has _) if(actions != 1) CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter") - // check required args CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt)) CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig)) + val entityTypeVals = options.valuesOf(entityType).asScala + val (allowedEntityTypes, connectOptString) = if (options.has(bootstrapServerOpt)) + (BrokerSupportedConfigTypes, "--bootstrap-server") + else + (ConfigType.all, "--zookeeper") + + entityTypeVals.foreach(entityTypeVal => + if (!allowedEntityTypes.contains(entityTypeVal)) + throw new IllegalArgumentException(s"Invalid entity-type $entityTypeVal, --entity-type must be one of ${allowedEntityTypes.mkString(",")} with the $connectOptString argument") + ) + if (entityTypeVals.isEmpty) + throw new IllegalArgumentException("At least one --entity-type must be specified") + else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client))) + throw new IllegalArgumentException(s"Only '${ConfigType.User}' and '${ConfigType.Client}' entity types may be specified together") - if (options.has(bootstrapServerOpt) == options.has(zkConnectOpt)) + if (!options.has(bootstrapServerOpt) && !options.has(zkConnectOpt)) + throw new IllegalArgumentException("One of the required --bootstrap-server or --zookeeper arguments must be specified") + else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt)) throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified") + else if (options.has(bootstrapServerOpt) && !options.has(entityName) && !options.has(entityDefault)) + throw new IllegalArgumentException(s"At least one of --entity-name or --entity-default must be specified with --bootstrap-server") + + if (options.has(entityName) && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) { + val brokerId = options.valueOf(entityName) + try brokerId.toInt catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"The entity name for ${entityTypeVals.head} must be a valid integer broker id , but it is: $brokerId") + } + } + if (entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Topic) || entityTypeVals.contains(ConfigType.User)) CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType) - if(options.has(alterOpt)) { + + if (options.has(describeOpt) && entityTypeVals.contains(BrokerLoggerConfigType) && !options.has(entityName)) + throw new IllegalArgumentException(s"--entity-name must be specified with --describe of ${entityTypeVals.mkString(",")}") + + if (options.has(alterOpt)) { if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Broker)) { if (!options.has(entityName) && !options.has(entityDefault)) throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users, clients or brokers") } else if (!options.has(entityName)) - throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals}") + throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals.mkString(",")}") val isAddConfigPresent: Boolean = options.has(addConfig) val isDeleteConfigPresent: Boolean = options.has(deleteConfig) if(! isAddConfigPresent && ! isDeleteConfigPresent) throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter") } - entityTypeVals.foreach(entityTypeVal => - if (!ConfigType.all.contains(entityTypeVal)) - throw new IllegalArgumentException(s"Invalid entity-type ${entityTypeVal}, --entity-type must be one of ${ConfigType.all}") - ) - if (entityTypeVals.isEmpty) - throw new IllegalArgumentException("At least one --entity-type must be specified") - else if (entityTypeVals.size > 1 && !entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client))) - throw new IllegalArgumentException(s"Only '${ConfigType.User}' and '${ConfigType.Client}' entity types may be specified together") } } } diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 1ed55fb..1daaeec 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -21,13 +21,14 @@ import java.util.{Collections, Properties} import kafka.admin.{AdminOperationException, AdminUtils} import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.log.LogConfig +import kafka.utils.Log4jController import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.AlterConfigOp import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.common.config.ConfigDef.ConfigKey -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig} import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic @@ -347,8 +348,16 @@ class AdminManager(val config: KafkaConfig, createResponseConfig(allConfigs(config), createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms)) else - throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received $resource.name") + throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received ${resource.name}") + case ConfigResource.Type.BROKER_LOGGER => + if (resource.name == null || resource.name.isEmpty) + throw new InvalidRequestException("Broker id must not be empty") + else if (resourceNameToBrokerId(resource.name) != config.brokerId) + throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} but received ${resource.name}") + else + createResponseConfig(Log4jController.loggers, + (name, value) => new DescribeConfigsResponse.ConfigEntry(name, value.toString, ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, false, false, List.empty.asJava)) case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType") } resource -> resourceConfig @@ -428,13 +437,24 @@ class AdminManager(val config: KafkaConfig, resource -> ApiError.NONE } + private def alterLogLevelConfigs(alterConfigOps: List[AlterConfigOp]): Unit = { + alterConfigOps.foreach { alterConfigOp => + val loggerName = alterConfigOp.configEntry().name() + val logLevel = alterConfigOp.configEntry().value() + alterConfigOp.opType() match { + case OpType.SET => Log4jController.logLevel(loggerName, logLevel) + case OpType.DELETE => Log4jController.unsetLogLevel(loggerName) + } + } + } + private def getBrokerId(resource: ConfigResource) = { if (resource.name == null || resource.name.isEmpty) None else { val id = resourceNameToBrokerId(resource.name) if (id != this.config.brokerId) - throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name") + throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received ${resource.name}") Some(id) } } @@ -451,7 +471,7 @@ class AdminManager(val config: KafkaConfig, def incrementalAlterConfigs(configs: Map[ConfigResource, List[AlterConfigOp]], validateOnly: Boolean): Map[ConfigResource, ApiError] = { configs.map { case (resource, alterConfigOps) => try { - //throw InvalidRequestException if any duplicate keys + // throw InvalidRequestException if any duplicate keys val duplicateKeys = alterConfigOps.groupBy(config => config.configEntry().name()) .mapValues(_.size).filter(_._2 > 1).keys.toSet if (duplicateKeys.nonEmpty) @@ -475,6 +495,14 @@ class AdminManager(val config: KafkaConfig, val configProps = this.config.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig) prepareIncrementalConfigs(alterConfigOps, configProps, KafkaConfig.configKeys) alterBrokerConfigs(resource, validateOnly, configProps, configEntriesMap) + + case ConfigResource.Type.BROKER_LOGGER => + getBrokerId(resource) + validateLogLevelConfigs(alterConfigOps) + + if (!validateOnly) + alterLogLevelConfigs(alterConfigOps) + resource -> ApiError.NONE case resourceType => throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType") } @@ -495,6 +523,35 @@ class AdminManager(val config: KafkaConfig, }.toMap } + private def validateLogLevelConfigs(alterConfigOps: List[AlterConfigOp]): Unit = { + def validateLoggerNameExists(loggerName: String): Unit = { + if (!Log4jController.loggerExists(loggerName)) + throw new ConfigException(s"Logger $loggerName does not exist!") + } + + alterConfigOps.foreach { alterConfigOp => + val loggerName = alterConfigOp.configEntry().name() + alterConfigOp.opType() match { + case OpType.SET => + validateLoggerNameExists(loggerName) + val logLevel = alterConfigOp.configEntry().value() + if (!LogLevelConfig.VALID_LOG_LEVELS.contains(logLevel)) { + val validLevelsStr = LogLevelConfig.VALID_LOG_LEVELS.asScala.mkString(", ") + throw new ConfigException( + s"Cannot set the log level of $loggerName to $logLevel as it is not a supported log level. " + + s"Valid log levels are $validLevelsStr" + ) + } + case OpType.DELETE => + validateLoggerNameExists(loggerName) + if (loggerName == Log4jController.ROOT_LOGGER) + throw new InvalidRequestException(s"Removing the log level of the ${Log4jController.ROOT_LOGGER} logger is not allowed") + case OpType.APPEND => throw new InvalidRequestException(s"${OpType.APPEND} operation is not allowed for the ${ConfigResource.Type.BROKER_LOGGER} resource") + case OpType.SUBTRACT => throw new InvalidRequestException(s"${OpType.SUBTRACT} operation is not allowed for the ${ConfigResource.Type.BROKER_LOGGER} resource") + } + } + } + private def prepareIncrementalConfigs(alterConfigOps: List[AlterConfigOp], configProps: Properties, configKeys: Map[String, ConfigKey]): Unit = { def listType(configName: String, configKeys: Map[String, ConfigKey]): Boolean = { @@ -512,14 +569,14 @@ class AdminManager(val config: KafkaConfig, if (!listType(alterConfigOp.configEntry().name(), configKeys)) throw new InvalidRequestException(s"Config value append is not allowed for config key: ${alterConfigOp.configEntry().name()}") val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList - val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList + val newValueList = oldValueList ::: alterConfigOp.configEntry().value().split(",").toList configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(",")) } case OpType.SUBTRACT => { if (!listType(alterConfigOp.configEntry().name(), configKeys)) throw new InvalidRequestException(s"Config value subtract is not allowed for config key: ${alterConfigOp.configEntry().name()}") val oldValueList = configProps.getProperty(alterConfigOp.configEntry().name()).split(",").toList - val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList) + val newValueList = oldValueList.diff(alterConfigOp.configEntry().value().split(",").toList) configProps.setProperty(alterConfigOp.configEntry().name(), newValueList.mkString(",")) } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 3ec6b23..a88cd92 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2288,6 +2288,8 @@ class KafkaApis(val requestChannel: RequestChannel, val alterConfigsRequest = request.body[AlterConfigsRequest] val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.partition { case (resource, _) => resource.`type` match { + case ConfigResource.Type.BROKER_LOGGER => + throw new InvalidRequestException(s"AlterConfigs is deprecated and does not support the resource type ${ConfigResource.Type.BROKER_LOGGER}") case ConfigResource.Type.BROKER => authorize(request.session, AlterConfigs, Resource.ClusterResource) case ConfigResource.Type.TOPIC => @@ -2331,7 +2333,7 @@ class KafkaApis(val requestChannel: RequestChannel, private def configsAuthorizationApiError(session: RequestChannel.Session, resource: ConfigResource): ApiError = { val error = resource.`type` match { - case ConfigResource.Type.BROKER => Errors.CLUSTER_AUTHORIZATION_FAILED + case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => Errors.CLUSTER_AUTHORIZATION_FAILED case ConfigResource.Type.TOPIC => Errors.TOPIC_AUTHORIZATION_FAILED case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") } @@ -2349,7 +2351,7 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedResources, unauthorizedResources) = configs.partition { case (resource, _) => resource.`type` match { - case ConfigResource.Type.BROKER => + case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => authorize(request.session, AlterConfigs, Resource.ClusterResource) case ConfigResource.Type.TOPIC => authorize(request.session, AlterConfigs, Resource(Topic, resource.name, LITERAL)) @@ -2370,7 +2372,7 @@ class KafkaApis(val requestChannel: RequestChannel, val describeConfigsRequest = request.body[DescribeConfigsRequest] val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource => resource.`type` match { - case ConfigResource.Type.BROKER => authorize(request.session, DescribeConfigs, Resource.ClusterResource) + case ConfigResource.Type.BROKER | ConfigResource.Type.BROKER_LOGGER => authorize(request.session, DescribeConfigs, Resource.ClusterResource) case ConfigResource.Type.TOPIC => authorize(request.session, DescribeConfigs, Resource(Topic, resource.name, LITERAL)) case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") diff --git a/core/src/main/scala/kafka/utils/Log4jController.scala b/core/src/main/scala/kafka/utils/Log4jController.scala index 95d0733..ba0649c 100755 --- a/core/src/main/scala/kafka/utils/Log4jController.scala +++ b/core/src/main/scala/kafka/utils/Log4jController.scala @@ -22,69 +22,95 @@ import java.util.Locale import org.apache.log4j.{Level, LogManager, Logger} +import scala.collection.mutable +import scala.collection.JavaConverters._ -/** - * An MBean that allows the user to dynamically alter log4j levels at runtime. - * The companion object contains the singleton instance of this class and - * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization - * of the companion object. - */ -private class Log4jController extends Log4jControllerMBean { - def getLoggers = { - val lst = new util.ArrayList[String]() - lst.add("root=" + existingLogger("root").getLevel.toString) +object Log4jController { + val ROOT_LOGGER = "root" + + /** + * Returns a map of the log4j loggers and their assigned log level. + * If a logger does not have a log level assigned, we return the root logger's log level + */ + def loggers: mutable.Map[String, String] = { + val logs = new mutable.HashMap[String, String]() + val rootLoggerLvl = existingLogger(ROOT_LOGGER).getLevel.toString + logs.put(ROOT_LOGGER, rootLoggerLvl) + val loggers = LogManager.getCurrentLoggers while (loggers.hasMoreElements) { val logger = loggers.nextElement().asInstanceOf[Logger] if (logger != null) { - val level = if (logger != null) logger.getLevel else null - lst.add("%s=%s".format(logger.getName, if (level != null) level.toString else "null")) + val level = if (logger.getLevel != null) logger.getLevel.toString else rootLoggerLvl + logs.put(logger.getName, level) } } - lst + logs } + /** + * Sets the log level of a particular logger + */ + def logLevel(loggerName: String, logLevel: String): Boolean = { + val log = existingLogger(loggerName) + if (!loggerName.trim.isEmpty && !logLevel.trim.isEmpty && log != null) { + log.setLevel(Level.toLevel(logLevel.toUpperCase(Locale.ROOT))) + true + } + else false + } - private def newLogger(loggerName: String) = - if (loggerName == "root") - LogManager.getRootLogger - else LogManager.getLogger(loggerName) + def unsetLogLevel(loggerName: String): Boolean = { + val log = existingLogger(loggerName) + if (!loggerName.trim.isEmpty && log != null) { + log.setLevel(null) + true + } + else false + } + def loggerExists(loggerName: String): Boolean = existingLogger(loggerName) != null private def existingLogger(loggerName: String) = - if (loggerName == "root") + if (loggerName == ROOT_LOGGER) LogManager.getRootLogger else LogManager.exists(loggerName) +} +/** + * An MBean that allows the user to dynamically alter log4j levels at runtime. + * The companion object contains the singleton instance of this class and + * registers the MBean. The [[kafka.utils.Logging]] trait forces initialization + * of the companion object. + */ +class Log4jController extends Log4jControllerMBean { - def getLogLevel(loggerName: String) = { - val log = existingLogger(loggerName) + def getLoggers: util.List[String] = { + Log4jController.loggers.map { + case (logger, level) => s"$logger=$level" + }.toList.asJava + } + + + def getLogLevel(loggerName: String): String = { + val log = Log4jController.existingLogger(loggerName) if (log != null) { val level = log.getLevel if (level != null) log.getLevel.toString - else "Null log level." + else + Log4jController.existingLogger(Log4jController.ROOT_LOGGER).getLevel.toString } else "No such logger." } - - def setLogLevel(loggerName: String, level: String) = { - val log = newLogger(loggerName) - if (!loggerName.trim.isEmpty && !level.trim.isEmpty && log != null) { - log.setLevel(Level.toLevel(level.toUpperCase(Locale.ROOT))) - true - } - else false - } - + def setLogLevel(loggerName: String, level: String): Boolean = Log4jController.logLevel(loggerName, level) } -private trait Log4jControllerMBean { +trait Log4jControllerMBean { def getLoggers: java.util.List[String] def getLogLevel(logger: String): String def setLogLevel(logger: String, level: String): Boolean } - diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 7f04de1..ff8e379 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -24,12 +24,13 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} import java.util.{Collections, Properties} import java.{time, util} + import kafka.log.LogConfig import kafka.security.auth.{Cluster, Group, Topic} import kafka.server.{Defaults, KafkaConfig, KafkaServer} import kafka.utils.Implicits._ import kafka.utils.TestUtils._ -import kafka.utils.{Logging, TestUtils} +import kafka.utils.{Log4jController, Logging, TestUtils} import kafka.zk.KafkaZkClient import org.apache.kafka.clients.admin._ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} @@ -40,7 +41,7 @@ import org.apache.kafka.common.ElectionType import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.TopicPartitionReplica import org.apache.kafka.common.acl._ -import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig} import org.apache.kafka.common.errors._ import org.apache.kafka.common.requests.{DeleteRecordsRequest, MetadataResponse} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceType} @@ -49,6 +50,7 @@ import org.junit.Assert._ import org.junit.rules.Timeout import org.junit.{After, Before, Rule, Test} import org.scalatest.Assertions.intercept + import scala.collection.JavaConverters._ import scala.collection.Seq import scala.compat.java8.OptionConverters._ @@ -68,6 +70,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { def globalTimeout = Timeout.millis(120000) var client: Admin = null + var brokerLoggerConfigResource: ConfigResource = null + var changedBrokerLoggers = scala.collection.mutable.Set[String]() val topic = "topic" val partition = 0 @@ -77,10 +81,12 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { override def setUp(): Unit = { super.setUp TestUtils.waitUntilBrokerMetadataIsPropagated(servers) + brokerLoggerConfigResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, servers.head.config.brokerId.toString) } @After override def tearDown(): Unit = { + teardownBrokerLoggers() if (client != null) Utils.closeQuietly(client, "AdminClient") super.tearDown() @@ -1819,6 +1825,225 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { classOf[InvalidTopicException]) client.close() } + + @Test + def testDescribeConfigsForLog4jLogLevels(): Unit = { + client = AdminClient.create(createConfig()) + + val loggerConfig = describeBrokerLoggers() + val rootLogLevel = loggerConfig.get(Log4jController.ROOT_LOGGER).value() + val logCleanerLogLevelConfig = loggerConfig.get("kafka.cluster.Replica") + assertEquals(rootLogLevel, logCleanerLogLevelConfig.value()) // we expect an undefined log level to be the same as the root logger + assertEquals("kafka.cluster.Replica", logCleanerLogLevelConfig.name()) + assertEquals(ConfigEntry.ConfigSource.DYNAMIC_BROKER_LOGGER_CONFIG, logCleanerLogLevelConfig.source()) + assertEquals(false, logCleanerLogLevelConfig.isReadOnly) + assertEquals(false, logCleanerLogLevelConfig.isSensitive) + assertTrue(logCleanerLogLevelConfig.synonyms().isEmpty) + } + + @Test + def testIncrementalAlterConfigsForLog4jLogLevels(): Unit = { + client = AdminClient.create(createConfig()) + + val initialLoggerConfig = describeBrokerLoggers() + val initialRootLogLevel = initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value() + assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.log.LogCleaner").value()) + assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.server.ReplicaManager").value()) + + val newRootLogLevel = LogLevelConfig.DEBUG_LOG_LEVEL + val alterRootLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) + ).asJavaCollection + // Test validateOnly does not change anything + alterBrokerLoggers(alterRootLoggerEntry, validateOnly = true) + val validatedLoggerConfig = describeBrokerLoggers() + assertEquals(initialRootLogLevel, validatedLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.log.LogCleaner").value()) + assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.server.ReplicaManager").value()) + assertEquals(initialRootLogLevel, validatedLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + + // test that we can change them and unset loggers still use the root's log level + alterBrokerLoggers(alterRootLoggerEntry) + val changedRootLoggerConfig = describeBrokerLoggers() + assertEquals(newRootLogLevel, changedRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.log.LogCleaner").value()) + assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.server.ReplicaManager").value()) + assertEquals(newRootLogLevel, changedRootLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + + // alter the ZK client's logger so we can later test resetting it + val alterZKLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) + ).asJavaCollection + alterBrokerLoggers(alterZKLoggerEntry) + val changedZKLoggerConfig = describeBrokerLoggers() + assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, changedZKLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + + // properly test various set operations and one delete + val alterLogLevelsEntries = Seq( + new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", LogLevelConfig.TRACE_LOG_LEVEL), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("kafka.zookeeper.ZooKeeperClient", ""), AlterConfigOp.OpType.DELETE) // should reset to the root logger level + ).asJavaCollection + alterBrokerLoggers(alterLogLevelsEntries) + val alteredLoggerConfig = describeBrokerLoggers() + assertEquals(newRootLogLevel, alteredLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(LogLevelConfig.INFO_LOG_LEVEL, alteredLoggerConfig.get("kafka.controller.KafkaController").value()) + assertEquals(LogLevelConfig.ERROR_LOG_LEVEL, alteredLoggerConfig.get("kafka.log.LogCleaner").value()) + assertEquals(LogLevelConfig.TRACE_LOG_LEVEL, alteredLoggerConfig.get("kafka.server.ReplicaManager").value()) + assertEquals(newRootLogLevel, alteredLoggerConfig.get("kafka.zookeeper.ZooKeeperClient").value()) + } + + /** + * 1. Assume ROOT logger == TRACE + * 2. Change kafka.controller.KafkaController logger to INFO + * 3. Unset kafka.controller.KafkaController via AlterConfigOp.OpType.DELETE (resets it to the root logger - TRACE) + * 4. Change ROOT logger to ERROR + * 5. Ensure the kafka.controller.KafkaController logger's level is ERROR (the curent root logger level) + */ + @Test + def testIncrementalAlterConfigsForLog4jLogLevelsCanResetLoggerToCurrentRoot(): Unit = { + client = AdminClient.create(createConfig()) + // step 1 - configure root logger + val initialRootLogLevel = LogLevelConfig.TRACE_LOG_LEVEL + val alterRootLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, initialRootLogLevel), AlterConfigOp.OpType.SET) + ).asJavaCollection + alterBrokerLoggers(alterRootLoggerEntry) + val initialLoggerConfig = describeBrokerLoggers() + assertEquals(initialRootLogLevel, initialLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(initialRootLogLevel, initialLoggerConfig.get("kafka.controller.KafkaController").value()) + + // step 2 - change KafkaController logger to INFO + val alterControllerLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET) + ).asJavaCollection + alterBrokerLoggers(alterControllerLoggerEntry) + val changedControllerLoggerConfig = describeBrokerLoggers() + assertEquals(initialRootLogLevel, changedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(LogLevelConfig.INFO_LOG_LEVEL, changedControllerLoggerConfig.get("kafka.controller.KafkaController").value()) + + // step 3 - unset KafkaController logger + val deleteControllerLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry("kafka.controller.KafkaController", ""), AlterConfigOp.OpType.DELETE) + ).asJavaCollection + alterBrokerLoggers(deleteControllerLoggerEntry) + val deletedControllerLoggerConfig = describeBrokerLoggers() + assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(initialRootLogLevel, deletedControllerLoggerConfig.get("kafka.controller.KafkaController").value()) + + val newRootLogLevel = LogLevelConfig.ERROR_LOG_LEVEL + val newAlterRootLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, newRootLogLevel), AlterConfigOp.OpType.SET) + ).asJavaCollection + alterBrokerLoggers(newAlterRootLoggerEntry) + val newRootLoggerConfig = describeBrokerLoggers() + assertEquals(newRootLogLevel, newRootLoggerConfig.get(Log4jController.ROOT_LOGGER).value()) + assertEquals(newRootLogLevel, newRootLoggerConfig.get("kafka.controller.KafkaController").value()) + } + + @Test + def testIncrementalAlterConfigsForLog4jLogLevelsCannotResetRootLogger(): Unit = { + client = AdminClient.create(createConfig()) + val deleteRootLoggerEntry = Seq( + new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, ""), AlterConfigOp.OpType.DELETE) + ).asJavaCollection + + assertTrue(intercept[ExecutionException](alterBrokerLoggers(deleteRootLoggerEntry)).getCause.isInstanceOf[InvalidRequestException]) + } + + @Test + def testIncrementalAlterConfigsForLog4jLogLevelsDoesNotWorkWithInvalidConfigs(): Unit = { + client = AdminClient.create(createConfig()) + val validLoggerName = "kafka.server.KafkaRequestHandler" + val expectedValidLoggerLogLevel = describeBrokerLoggers().get(validLoggerName) + def assertLogLevelDidNotChange(): Unit = { + assertEquals( + expectedValidLoggerLogLevel, + describeBrokerLoggers().get(validLoggerName) + ) + } + + val appendLogLevelEntries = Seq( + new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid + new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.APPEND) // append is not supported + ).asJavaCollection + assertTrue(intercept[ExecutionException](alterBrokerLoggers(appendLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertLogLevelDidNotChange() + + val subtractLogLevelEntries = Seq( + new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid + new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SUBTRACT) // subtract is not supported + ).asJavaCollection + assertTrue(intercept[ExecutionException](alterBrokerLoggers(subtractLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertLogLevelDidNotChange() + + val invalidLogLevelLogLevelEntries = Seq( + new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid + new AlterConfigOp(new ConfigEntry("kafka.network.SocketServer", "OFF"), AlterConfigOp.OpType.SET) // OFF is not a valid log level + ).asJavaCollection + assertTrue(intercept[ExecutionException](alterBrokerLoggers(invalidLogLevelLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertLogLevelDidNotChange() + + val invalidLoggerNameLogLevelEntries = Seq( + new AlterConfigOp(new ConfigEntry("kafka.server.KafkaRequestHandler", LogLevelConfig.INFO_LOG_LEVEL), AlterConfigOp.OpType.SET), // valid + new AlterConfigOp(new ConfigEntry("Some Other LogCleaner", LogLevelConfig.ERROR_LOG_LEVEL), AlterConfigOp.OpType.SET) // invalid logger name is not supported + ).asJavaCollection + assertTrue(intercept[ExecutionException](alterBrokerLoggers(invalidLoggerNameLogLevelEntries)).getCause.isInstanceOf[InvalidRequestException]) + assertLogLevelDidNotChange() + } + + /** + * The AlterConfigs API is deprecated and should not support altering log levels + */ + @Test + def testAlterConfigsForLog4jLogLevelsDoesNotWork(): Unit = { + client = AdminClient.create(createConfig()) + + val alterLogLevelsEntries = Seq( + new ConfigEntry("kafka.controller.KafkaController", LogLevelConfig.INFO_LOG_LEVEL) + ).asJavaCollection + val alterResult = client.alterConfigs(Map(brokerLoggerConfigResource -> new Config(alterLogLevelsEntries)).asJava) + assertTrue(intercept[ExecutionException](alterResult.values.get(brokerLoggerConfigResource).get).getCause.isInstanceOf[InvalidRequestException]) + } + + def alterBrokerLoggers(entries: util.Collection[AlterConfigOp], validateOnly: Boolean = false): Unit = { + if (!validateOnly) { + for (entry <- entries.asScala) + changedBrokerLoggers.add(entry.configEntry().name()) + } + + client.incrementalAlterConfigs(Map(brokerLoggerConfigResource -> entries).asJava, new AlterConfigsOptions().validateOnly(validateOnly)) + .values.get(brokerLoggerConfigResource).get() + } + + def describeBrokerLoggers(): Config = + client.describeConfigs(Collections.singletonList(brokerLoggerConfigResource)).values.get(brokerLoggerConfigResource).get() + + /** + * Due to the fact that log4j is not re-initialized across tests, changing a logger's log level persists across test classes. + * We need to clean up the changes done while testing. + */ + def teardownBrokerLoggers(): Unit = { + if (changedBrokerLoggers.nonEmpty) { + val validLoggers = describeBrokerLoggers().entries().asScala.filterNot(_.name().equals(Log4jController.ROOT_LOGGER)).map(_.name).toSet + val unsetBrokerLoggersEntries = changedBrokerLoggers + .intersect(validLoggers) + .map { logger => new AlterConfigOp(new ConfigEntry(logger, ""), AlterConfigOp.OpType.DELETE) } + .asJavaCollection + + // ensure that we first reset the root logger to an arbitrary log level. Note that we cannot reset it to its original value + alterBrokerLoggers(List( + new AlterConfigOp(new ConfigEntry(Log4jController.ROOT_LOGGER, LogLevelConfig.FATAL_LOG_LEVEL), AlterConfigOp.OpType.SET) + ).asJavaCollection) + alterBrokerLoggers(unsetBrokerLoggersEntries) + + changedBrokerLoggers.clear() + } + } } object AdminClientIntegrationTest { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 8f3f24f..387a7f9 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -31,7 +31,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.producer._ import org.apache.kafka.common.ElectionType import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} -import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME import org.apache.kafka.common.message.ControlledShutdownRequestData @@ -101,6 +101,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create))) val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Alter))) val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Describe))) + val clusterAlterConfigsAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, AlterConfigs))) val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, IdempotentWrite))) val topicCreateAcl = Map(createTopicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Create))) val topicReadAcl = Map(topicResource -> Set(new Acl(userPrincipal, Allow, Acl.WildCardHost, Read))) @@ -215,8 +216,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest { if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED), ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error), ApiKeys.ELECT_LEADERS -> ((resp: ElectLeadersResponse) => Errors.forCode(resp.data().errorCode())), - ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => - IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)).error), + ApiKeys.INCREMENTAL_ALTER_CONFIGS -> ((resp: IncrementalAlterConfigsResponse) => { + val topicResourceError = IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.TOPIC, tp.topic)) + if (topicResourceError == null) + IncrementalAlterConfigsResponse.fromResponseData(resp.data()).get(new ConfigResource(ConfigResource.Type.BROKER_LOGGER, brokerId.toString)).error + else + topicResourceError.error() + }), ApiKeys.ALTER_PARTITION_REASSIGNMENTS -> ((resp: AlterPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode())), ApiKeys.LIST_PARTITION_REASSIGNMENTS -> ((resp: ListPartitionReassignmentsResponse) => Errors.forCode(resp.data().errorCode())) ) @@ -651,6 +657,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test + def testIncrementalAlterConfigsRequestRequiresClusterPermissionForBrokerLogger(): Unit = { + val data = new IncrementalAlterConfigsRequestData + val alterableConfig = new AlterableConfig().setName("kafka.controller.KafkaController"). + setValue(LogLevelConfig.DEBUG_LOG_LEVEL).setConfigOperation(AlterConfigOp.OpType.DELETE.id()) + val alterableConfigSet = new AlterableConfigCollection + alterableConfigSet.add(alterableConfig) + data.resources().add(new AlterConfigsResource(). + setResourceName(brokerId.toString).setResourceType(ConfigResource.Type.BROKER_LOGGER.id()). + setConfigs(alterableConfigSet)) + val key = ApiKeys.INCREMENTAL_ALTER_CONFIGS + val request = new IncrementalAlterConfigsRequest.Builder(data).build() + + removeAllAcls() + val resources = Set(topicResource.resourceType, Resource.ClusterResource.resourceType) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = false) + + val clusterAcls = clusterAlterConfigsAcl(Resource.ClusterResource) + addAndVerifyAcls(clusterAcls, Resource.ClusterResource) + sendRequestAndVerifyResponseError(key, request, resources, isAuthorized = true) + } + + @Test def testOffsetsForLeaderEpochClusterPermission(): Unit = { val key = ApiKeys.OFFSET_FOR_LEADER_EPOCH val request = offsetsForLeaderEpochRequest diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index bd26a61..e3396bb 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -22,7 +22,7 @@ import java.util.Properties import kafka.admin.ConfigCommand.ConfigCommandOptions import kafka.api.ApiVersion import kafka.cluster.{Broker, EndPoint} -import kafka.server.{ConfigEntityName, KafkaConfig} +import kafka.server.{ConfigEntityName, ConfigType, KafkaConfig} import kafka.utils.{Exit, Logging} import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness} import org.apache.kafka.clients.admin._ @@ -101,33 +101,44 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { testArgumentParse("brokers") } - def testArgumentParse(entityType: String) = { + @Test + def shouldParseArgumentsForBrokerLoggersEntityType() { + testArgumentParse("broker-loggers", + zkConfig = false) + } + + def testArgumentParse(entityType: String, zkConfig: Boolean=true): Unit = { + val connectOpts = if (zkConfig) + ("--zookeeper", zkConnect) + else + ("--bootstrap-server", "localhost:9092") + // Should parse correctly - var createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entity-name", "x", + var createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, + "--entity-name", "1", "--entity-type", entityType, "--describe")) createOpts.checkArgs() // For --alter and added config - createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entity-name", "x", + createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, + "--entity-name", "1", "--entity-type", entityType, "--alter", "--add-config", "a=b,c=d")) createOpts.checkArgs() // For alter and deleted config - createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entity-name", "x", + createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, + "--entity-name", "1", "--entity-type", entityType, "--alter", "--delete-config", "a,b,c")) createOpts.checkArgs() // For alter and both added, deleted config - createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entity-name", "x", + createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, + "--entity-name", "1", "--entity-type", entityType, "--alter", "--add-config", "a=b,c=d", @@ -143,8 +154,8 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { assertEquals(1, deletedProps.size) assertEquals("a", deletedProps.head) - createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, - "--entity-name", "x", + createOpts = new ConfigCommandOptions(Array(connectOpts._1, connectOpts._2, + "--entity-name", "1", "--entity-type", entityType, "--alter", "--add-config", "a=b,c=,d=e,f=")) @@ -165,6 +176,13 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient)) } + @Test(expected = classOf[IllegalArgumentException]) + def shouldFailIfBrokerEntityTypeIsNotAnInteger(): Unit = { + val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d")) + ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient)) + } + @Test def shouldAddClientConfig(): Unit = { val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, @@ -229,6 +247,71 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } @Test + def shouldAddBrokerLoggerConfig(): Unit = { + val node = new Node(1, "localhost", 9092) + verifyAlterBrokerLoggerConfig(node, "1", "1", List( + new ConfigEntry("kafka.log.LogCleaner", "INFO"), + new ConfigEntry("kafka.server.ReplicaManager", "INFO"), + new ConfigEntry("kafka.server.KafkaApi", "INFO") + )) + } + + @Test + def testNoSpecifiedEntityOptionWithDescribeBrokersInZKIsAllowed(): Unit = { + val optsList = List("--zookeeper", "localhost:9092", + "--entity-type", ConfigType.Broker, + "--describe" + ) + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testNoSpecifiedEntityOptionWithDescribeBrokersInBootstrapServerIsNotAllowed(): Unit = { + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigType.Broker, + "--describe" + ) + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testEntityDefaultOptionWithDescribeBrokerLoggerIsNotAllowed(): Unit = { + val node = new Node(1, "localhost", 9092) + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigCommand.BrokerLoggerConfigType, + "--entity-default", + "--describe" + ) + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testEntityDefaultOptionWithAlterBrokerLoggerIsNotAllowed(): Unit = { + val node = new Node(1, "localhost", 9092) + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigCommand.BrokerLoggerConfigType, + "--entity-default", + "--alter", + "--add-config", "kafka.log.LogCleaner=DEBUG" + ) + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + + @Test(expected = classOf[InvalidConfigurationException]) + def shouldRaiseInvalidConfigurationExceptionWhenAddingInvalidBrokerLoggerConfig(): Unit = { + val node = new Node(1, "localhost", 9092) + // verifyAlterBrokerLoggerConfig tries to alter kafka.log.LogCleaner, kafka.server.ReplicaManager and kafka.server.KafkaApi + // yet, we make it so DescribeConfigs returns only one logger, implying that kafka.server.ReplicaManager and kafka.log.LogCleaner are invalid + verifyAlterBrokerLoggerConfig(node, "1", "1", List( + new ConfigEntry("kafka.server.KafkaApi", "INFO") + )) + } + + @Test def shouldAddDefaultBrokerDynamicConfig(): Unit = { val node = new Node(1, "localhost", 9092) verifyAlterBrokerConfig(node, "", List("--entity-default")) @@ -274,11 +357,66 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } } EasyMock.replay(alterResult, describeResult) - ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, resourceName) + ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, ConfigType.Broker, resourceName) assertEquals(Map("message.max.bytes" -> "10", "num.io.threads" -> "5"), brokerConfigs.toMap) EasyMock.reset(alterResult, describeResult) } + def verifyAlterBrokerLoggerConfig(node: Node, resourceName: String, entityName: String, + describeConfigEntries: List[ConfigEntry]): Unit = { + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigCommand.BrokerLoggerConfigType, + "--alter", + "--entity-name", entityName, + "--add-config", "kafka.log.LogCleaner=DEBUG", + "--delete-config", "kafka.server.ReplicaManager,kafka.server.KafkaApi") + val alterOpts = new ConfigCommandOptions(optsList.toArray) + var alteredConfigs = false + + val resource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, resourceName) + val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] + future.complete(util.Collections.singletonMap(resource, new Config(describeConfigEntries.asJava))) + val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult]) + EasyMock.expect(describeResult.all()).andReturn(future).once() + + val alterFuture = new KafkaFutureImpl[Void] + alterFuture.complete(null) + val alterResult: AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult]) + EasyMock.expect(alterResult.all()).andReturn(alterFuture) + + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource]): DescribeConfigsResult = { + assertEquals(1, resources.size) + val resource = resources.iterator.next + assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`) + assertEquals(resourceName, resource.name) + describeResult + } + + override def incrementalAlterConfigs(configs: util.Map[ConfigResource, util.Collection[AlterConfigOp]], options: AlterConfigsOptions): AlterConfigsResult = { + assertEquals(1, configs.size) + val entry = configs.entrySet.iterator.next + val resource = entry.getKey + val alterConfigOps = entry.getValue + assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`) + assertEquals(3, alterConfigOps.size) + + val expectedConfigOps = List( + new AlterConfigOp(new ConfigEntry("kafka.log.LogCleaner", "DEBUG"), AlterConfigOp.OpType.SET), + new AlterConfigOp(new ConfigEntry("kafka.server.ReplicaManager", ""), AlterConfigOp.OpType.DELETE), + new AlterConfigOp(new ConfigEntry("kafka.server.KafkaApi", ""), AlterConfigOp.OpType.DELETE) + ) + assertEquals(expectedConfigOps, alterConfigOps.asScala.toList) + alteredConfigs = true + alterResult + } + } + EasyMock.replay(alterResult, describeResult) + ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, ConfigCommand.BrokerLoggerConfigType, resourceName) + assertTrue(alteredConfigs) + EasyMock.reset(alterResult, describeResult) + } + @Test def shouldSupportCommaSeparatedValues(): Unit = { val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,