abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r570396673
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1113,82 +1097,36 @@ class KafkaApis(val requestChannel: RequestChannel, .setPartitions(partitionData) } - private def createInternalTopic(topic: String): MetadataResponseTopic = { - if (topic == null) - throw new IllegalArgumentException("topic must not be null") - - val aliveBrokers = metadataCache.getAliveBrokers - - topic match { - case GROUP_METADATA_TOPIC_NAME => - if (aliveBrokers.size < config.offsetsTopicReplicationFactor) { - error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + - s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " + - s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + - s"and not all brokers are up yet.") - metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList()) - } else { - createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt, - groupCoordinator.offsetsTopicConfigs) - } - case TRANSACTION_STATE_TOPIC_NAME => - if (aliveBrokers.size < config.transactionTopicReplicationFactor) { - error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + - s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " + - s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + - s"and not all brokers are up yet.") - metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList()) - } else { - createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt, - txnCoordinator.transactionTopicConfigs) - } - case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic") - } - } - - private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = { - val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName) - topicMetadata.headOption.getOrElse(createInternalTopic(topic)) - } - - private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean, - topics: Set[String], listenerName: ListenerName, + private def getTopicMetadata(isFetchAllMetadata: Boolean, + topics: Set[String], + listenerName: ListenerName, errorUnavailableEndpoints: Boolean, - errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = { + errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = { val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints, errorUnavailableListeners) if (topics.isEmpty || topicResponses.size == topics.size) { - topicResponses + (topicResponses, Seq.empty[MetadataResponseTopic]) } else { val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet) val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic => - if (isInternal(topic)) { - val topicMetadata = createInternalTopic(topic) - Some( - if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code) - metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) - else - topicMetadata - ) - } else if (isFetchAllMetadata) { + if (isFetchAllMetadata) { Review comment: Yes, I think so. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel, // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader. // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors. val errorUnavailableListeners = requestVersion >= 6 - val topicMetadata = + val (topicMetadata, nonExistTopicMetadata) = if (authorizedTopics.isEmpty) - Seq.empty[MetadataResponseTopic] - else { - getTopicMetadata( - metadataRequest.allowAutoTopicCreation, - metadataRequest.isAllTopics, - authorizedTopics, - request.context.listenerName, - errorUnavailableEndpoints, - errorUnavailableListeners - ) + (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic]) + else + getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics, + request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) + + nonExistTopicMetadata.foreach(metadata => + try { + // Validate topic name and propagate error if failed + Topic.validate(metadata.name()) Review comment: Actually after looking into the zk admin manager logic, I don't think it's necessary to do the topic validation here. ########## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.ConcurrentHashMap + +import kafka.controller.KafkaController +import kafka.utils.Logging +import org.apache.kafka.clients.ClientResponse +import org.apache.kafka.common.message.CreateTopicsRequestData +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.requests.CreateTopicsRequest +import org.apache.kafka.common.utils.Time + +import scala.collection.Map + +trait AutoTopicCreationManager { + + def createTopics( + topicNames: Set[CreatableTopic], + controllerMutationQuota: ControllerMutationQuota + ): Unit + + def start(): Unit = {} + + def shutdown(): Unit = {} +} + +object AutoTopicCreationManager { + + def apply( + config: KafkaConfig, + metadataCache: MetadataCache, + time: Time, + metrics: Metrics, + threadNamePrefix: Option[String], + adminManager: ZkAdminManager, + controller: KafkaController, + enableForwarding: Boolean + ): AutoTopicCreationManager = { + + val channelManager = + if (enableForwarding) + Some(new BrokerToControllerChannelManager( + controllerNodeProvider = MetadataCacheControllerNodeProvider( + config, metadataCache), + time = time, + metrics = metrics, + config = config, + channelName = "autoTopicCreationChannel", + threadNamePrefix = threadNamePrefix, + retryTimeoutMs = config.requestTimeoutMs.longValue + )) + else + None + new AutoTopicCreationManagerImpl(channelManager, adminManager, controller, config.requestTimeoutMs) + } +} + +class AutoTopicCreationManagerImpl( + channelManager: Option[BrokerToControllerChannelManager], + adminManager: ZkAdminManager, + controller: KafkaController, + requestTimeout: Int +) extends AutoTopicCreationManager with Logging { + + private val inflightTopics = new ConcurrentHashMap[String, CreatableTopic] + + override def start(): Unit = { + channelManager.foreach(_.start()) + } + + override def shutdown(): Unit = { + channelManager.foreach(_.shutdown()) + } + + override def createTopics(topics: Set[CreatableTopic], + controllerMutationQuota: ControllerMutationQuota): Unit = { + val topicConfigs = topics + .filter(topic => !inflightTopics.contains(topic.name())) Review comment: You mean omit () for `topic.name()`? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1234,19 +1171,28 @@ class KafkaApis(val requestChannel: RequestChannel, // In versions 5 and below, we returned LEADER_NOT_AVAILABLE if a matching listener was not found on the leader. // From version 6 onwards, we return LISTENER_NOT_FOUND to enable diagnosis of configuration errors. val errorUnavailableListeners = requestVersion >= 6 - val topicMetadata = + val (topicMetadata, nonExistTopicMetadata) = if (authorizedTopics.isEmpty) - Seq.empty[MetadataResponseTopic] - else { - getTopicMetadata( - metadataRequest.allowAutoTopicCreation, - metadataRequest.isAllTopics, - authorizedTopics, - request.context.listenerName, - errorUnavailableEndpoints, - errorUnavailableListeners - ) + (Seq.empty[MetadataResponseTopic], Seq.empty[MetadataResponseTopic]) + else + getTopicMetadata(metadataRequest.isAllTopics, authorizedTopics, + request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) + + nonExistTopicMetadata.foreach(metadata => + try { + // Validate topic name and propagate error if failed + Topic.validate(metadata.name()) + } catch { + case e: Exception => + metadata.setErrorCode(Errors.forException(e).code) } + ) + + if (nonExistTopicMetadata.nonEmpty && metadataRequest.allowAutoTopicCreation && config.autoCreateTopicsEnable) { + val controllerMutationQuota = quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6) + autoTopicCreationManager.createTopics( + nonExistTopicMetadata.map(metadata => getTopicConfigs(metadata.name())).toSet, controllerMutationQuota) Review comment: I guess we could rely on admin manager to do the validation for us. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel, (responseTopics ++ unauthorizedResponseStatus).toList } - private def createTopic(topic: String, - numPartitions: Int, - replicationFactor: Int, - properties: util.Properties = new util.Properties()): MetadataResponseTopic = { - try { - adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe) - info("Auto creation of topic %s with %d partitions and replication factor %d is successful" - .format(topic, numPartitions, replicationFactor)) - metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList()) - } catch { - case _: TopicExistsException => // let it go, possibly another broker created this topic Review comment: The problem we have is that `ZkAdminManager.createTopics` only takes a callback instead of responding to you in realtime whether we hit TopicExists. Right now we are doing the topic creation async, so unless this is necessary to be fixed (which today we would just return UNKNOWN_PARTITION which seems to be semantically similar to LEADER_NOT_AVAILABLE), I think we could just returning unknown partition immediately without waiting for the async creation? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel, !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { - // get metadata (and create the topic if necessary) - val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { + val (partition, internalTopicName) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => - val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), GROUP_METADATA_TOPIC_NAME) case CoordinatorType.TRANSACTION => - val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) - val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) - (partition, metadata) + (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), TRANSACTION_STATE_TOPIC_NAME) + } - case _ => - throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") + val topicMetadata = metadataCache.getTopicMetadata(Set(internalTopicName), request.context.listenerName) + def createFindCoordinatorResponse(error: Errors, + node: Node, + requestThrottleMs: Int, + errorMessage: Option[String] = None): FindCoordinatorResponse = { + new FindCoordinatorResponse( + new FindCoordinatorResponseData() + .setErrorCode(error.code) + .setErrorMessage(errorMessage.getOrElse(error.message)) + .setNodeId(node.id) + .setHost(node.host) + .setPort(node.port) + .setThrottleTimeMs(requestThrottleMs)) } - def createResponse(requestThrottleMs: Int): AbstractResponse = { - def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { - new FindCoordinatorResponse( - new FindCoordinatorResponseData() - .setErrorCode(error.code) - .setErrorMessage(error.message) - .setNodeId(node.id) - .setHost(node.host) - .setPort(node.port) - .setThrottleTimeMs(requestThrottleMs)) + val topicCreationNeeded = topicMetadata.headOption.isEmpty + if (topicCreationNeeded) { + if (hasEnoughAliveBrokers(internalTopicName)) { Review comment: Sounds good ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org