abbccdda commented on a change in pull request #9579: URL: https://github.com/apache/kafka/pull/9579#discussion_r563003389
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel, .setPartitions(partitionData) } - private def createInternalTopic(topic: String): MetadataResponseTopic = { - if (topic == null) - throw new IllegalArgumentException("topic must not be null") - - val aliveBrokers = metadataCache.getAliveBrokers - - topic match { - case GROUP_METADATA_TOPIC_NAME => - if (aliveBrokers.size < config.offsetsTopicReplicationFactor) { - error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + - s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " + - s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + - s"and not all brokers are up yet.") - metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList()) - } else { - createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt, - groupCoordinator.offsetsTopicConfigs) - } - case TRANSACTION_STATE_TOPIC_NAME => - if (aliveBrokers.size < config.transactionTopicReplicationFactor) { - error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + - s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " + - s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " + - s"and not all brokers are up yet.") - metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, util.Collections.emptyList()) - } else { - createTopic(topic, config.transactionTopicPartitions, config.transactionTopicReplicationFactor.toInt, - txnCoordinator.transactionTopicConfigs) - } - case _ => throw new IllegalArgumentException(s"Unexpected internal topic name: $topic") - } - } - - private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponseData.MetadataResponseTopic = { - val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName) - topicMetadata.headOption.getOrElse(createInternalTopic(topic)) - } - - private def getTopicMetadata(allowAutoTopicCreation: Boolean, isFetchAllMetadata: Boolean, - topics: Set[String], listenerName: ListenerName, + private def getTopicMetadata(allowAutoTopicCreation: Boolean, + isFetchAllMetadata: Boolean, + topics: Set[String], + listenerName: ListenerName, errorUnavailableEndpoints: Boolean, - errorUnavailableListeners: Boolean): Seq[MetadataResponseTopic] = { + errorUnavailableListeners: Boolean): (Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = { val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, errorUnavailableEndpoints, errorUnavailableListeners) if (topics.isEmpty || topicResponses.size == topics.size) { - topicResponses + (topicResponses, Seq.empty[MetadataResponseTopic]) } else { val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet) val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic => - if (isInternal(topic)) { - val topicMetadata = createInternalTopic(topic) - Some( - if (topicMetadata.errorCode == Errors.COORDINATOR_NOT_AVAILABLE.code) - metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, true, util.Collections.emptyList()) - else - topicMetadata - ) - } else if (isFetchAllMetadata) { + if (isFetchAllMetadata) { // A metadata request for all topics should never result in topic auto creation, but a topic may be deleted // in between the creation of the topics parameter and topicResponses, so make sure to return None for this case. None - } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) { - Some(createTopic(topic, config.numPartitions, config.defaultReplicationFactor)) - } else { - Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, util.Collections.emptyList())) + } else { + Some(metadataResponseTopic( + if (!hasEnoughAliveBrokers(topic)) + Errors.INVALID_REPLICATION_FACTOR + else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) + Errors.LEADER_NOT_AVAILABLE Review comment: That makes sense ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org