junrao commented on code in PR #21005:
URL: https://github.com/apache/kafka/pull/21005#discussion_r2790599836
##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -715,50 +715,41 @@ class ControllerApis(
val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
val context = new ControllerRequestContext(request.context.header.data,
request.context.principal,
OptionalLong.empty())
- val duplicateResources = new util.HashSet[ConfigResource]
val configChanges = new util.HashMap[ConfigResource,
util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
+ val duplicateResources = new util.HashSet[ConfigResource]
val brokerLoggerResponses = new
util.ArrayList[AlterConfigsResourceResponse](1)
alterConfigsRequest.data.resources.forEach { resource =>
- val configResource = new ConfigResource(
- ConfigResource.Type.forId(resource.resourceType),
resource.resourceName())
- if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
- val apiError = try {
+ ConfigAdminManager.processConfigResource(
+ resource,
+ () => {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER,
CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
- ApiError.NONE
- } catch {
- case t: Throwable => ApiError.fromThrowable(t)
- }
- brokerLoggerResponses.add(new AlterConfigsResourceResponse().
- setResourceName(resource.resourceName()).
- setResourceType(resource.resourceType()).
- setErrorCode(apiError.error().code()).
- setErrorMessage(if (apiError.isFailure)
apiError.messageWithFallback() else null))
- } else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
- response.responses().add(new AlterConfigsResourceResponse().
- setErrorCode(UNSUPPORTED_VERSION.code()).
- setErrorMessage("Unknown resource type " + resource.resourceType() +
".").
- setResourceName(resource.resourceName()).
- setResourceType(resource.resourceType()))
- } else if (!duplicateResources.contains(configResource)) {
- val altersByName = new util.HashMap[String,
Entry[AlterConfigOp.OpType, String]]()
- resource.configs.forEach { config =>
- altersByName.put(config.name, new
util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
- AlterConfigOp.OpType.forId(config.configOperation), config.value))
+ brokerLoggerResponses.add(new AlterConfigsResourceResponse()
+ .setResourceName(resource.resourceName())
+ .setResourceType(resource.resourceType())
+ .setErrorCode(ApiError.NONE.error().code())
+ .setErrorMessage(null))
+ },
+ configResource => {
+ ConfigAdminManager.validateDynamicControllerConfigChange(resource,
configResource)
Review Comment:
Could we put the broker level validation in ControllerConfigurationValidator?
##########
core/src/main/scala/kafka/server/ConfigAdminManager.scala:
##########
@@ -112,48 +112,33 @@ class ConfigAdminManager(nodeId: Int,
})
request.resources().forEach(resource => {
if (!results.containsKey(resource)) {
- val resourceType = ConfigResource.Type.forId(resource.resourceType())
- val configResource = new ConfigResource(resourceType,
resource.resourceName())
- try {
- if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
- throw new InvalidRequestException("Error due to duplicate config
keys")
- }
- val nullUpdates = new util.ArrayList[String]()
- resource.configs().forEach { config =>
- if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
- config.value() == null) {
- nullUpdates.add(config.name())
+ processConfigResource(
Review Comment:
Since all incrementalAlterConfigs requests are forwarded to the controller,
it's probably simpler to validate the request just in the controller. The logic
in preprocess() can just handle the logger config.
##########
core/src/main/scala/kafka/server/ConfigAdminManager.scala:
##########
@@ -451,4 +436,80 @@ object ConfigAdminManager {
}
}
}
+
+ def processConfigResource(
+ resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
+ onBrokerLogger: () => Unit,
+ onBroker: ConfigResource => Unit,
+ onOtherTypes: ConfigResource => Unit,
+ onError: (ConfigResource, Throwable) => Unit
+ ): Unit = {
+ val resourceType = ConfigResource.Type.forId(resource.resourceType())
+ val configResource = new ConfigResource(resourceType,
resource.resourceName())
+ try {
+ if (containsDuplicates(resource.configs().asScala.map(_.name()))) {
+ throw new InvalidRequestException("Error due to duplicate config keys")
+ }
+ validateNullValue(resource)
+ validateUnknownConfigTypeError(configResource, resource)
+ resourceType match {
+ case BROKER_LOGGER => onBrokerLogger()
+ case BROKER => onBroker(configResource)
+ case _ => onOtherTypes(configResource)
+ }
+ } catch {
+ case t: Throwable => onError(configResource, t)
+ }
+ }
+
+ private def validateNullValue(
+ resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
+ ): Unit = {
+ val nullUpdates = new util.ArrayList[String]()
+ resource.configs.forEach { config =>
+ if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
+ config.value() == null) {
+ nullUpdates.add(config.name())
+ }
+ }
+ if (!nullUpdates.isEmpty) {
+ val exception = new InvalidRequestException("Null value not supported
for : " + String.join(", ", nullUpdates))
+ log.error(s"Error on processing incrementalAlterConfigs request on
${resource.resourceName()}", exception)
+ throw exception
+ }
+ }
+
+ private def validateUnknownConfigTypeError(
+ configResource: ConfigResource,
+ resource: IncrementalAlterConfigsRequestData.AlterConfigsResource,
+ ): Unit = {
+ if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
+ val exception = new InvalidRequestException(s"Unknown resource type
${resource.resourceType()}.")
+ log.error(s"Error on processing request on ${configResource}", exception)
+ throw exception
+ }
+ }
+
+ def validateDynamicControllerConfigChange(
Review Comment:
It seems that we are duplicating the code from validateBrokerConfigChange().
Where is it needed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]