abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r563003389



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic 
(configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error 
can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, 
config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the 
transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This 
error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, 
config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic 
name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: 
ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), 
listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, 
isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+                               isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): 
Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): 
(Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {
           // A metadata request for all topics should never result in topic 
auto creation, but a topic may be deleted
           // in between the creation of the topics parameter and 
topicResponses, so make sure to return None for this case.
           None
-        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-          Some(createTopic(topic, config.numPartitions, 
config.defaultReplicationFactor))
-        } else {
-          Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, 
false, util.Collections.emptyList()))
+       } else {
+        Some(metadataResponseTopic(
+          if (!hasEnoughAliveBrokers(topic))
+            Errors.INVALID_REPLICATION_FACTOR
+          else if (allowAutoTopicCreation && config.autoCreateTopicsEnable)
+            Errors.LEADER_NOT_AVAILABLE

Review comment:
       That makes sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to