hachikuji commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561451602



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+      val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {

Review comment:
       In the case of forwarding, maybe we can let the controller decide if 
there are enough alive brokers.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
         !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
       requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
     else {
-      // get metadata (and create the topic if necessary)
-      val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+      val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
         case CoordinatorType.GROUP =>
-          val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-          (partition, metadata)
+          (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
         case CoordinatorType.TRANSACTION =>
-          val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-          val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-          (partition, metadata)
+          (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+      }
 
-        case _ =>
-          throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+      val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+      def createFindCoordinatorResponse(error: Errors,
+                                        node: Node,
+                                        requestThrottleMs: Int,
+                                        errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+        new FindCoordinatorResponse(
+          new FindCoordinatorResponseData()
+            .setErrorCode(error.code)
+            .setErrorMessage(errorMessage.getOrElse(error.message))
+            .setNodeId(node.id)
+            .setHost(node.host)
+            .setPort(node.port)
+            .setThrottleTimeMs(requestThrottleMs))
       }
 
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-          new FindCoordinatorResponse(
-              new FindCoordinatorResponseData()
-                .setErrorCode(error.code)
-                .setErrorMessage(error.message)
-                .setNodeId(node.id)
-                .setHost(node.host)
-                .setPort(node.port)
-                .setThrottleTimeMs(requestThrottleMs))
+      val topicCreationNeeded = topicMetadata.headOption.isEmpty
+      if (topicCreationNeeded) {
+        if (hasEnoughAliveBrokers(internalTopicName)) {
+          if (shouldForwardRequest(request)) {
+            forwardingManager.sendInterBrokerRequest(
+              getCreateTopicsRequest(Seq(internalTopicName)),
+              _ => ())
+          } else {
+            val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+            val topicConfigs = Map(internalTopicName -> 
getTopicConfigs(internalTopicName))
+            adminManager.createTopics(
+              config.requestTimeoutMs,
+              validateOnly = false,
+              topicConfigs,
+              Map.empty,
+              controllerMutationQuota,
+              _ => ())
+          }
         }
-        val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-          createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
-        } else {
-          val coordinatorEndpoint = topicMetadata.partitions.asScala
-            .find(_.partitionIndex == partition)
-            .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-            .flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
-            .flatMap(_.getNode(request.context.listenerName))
-            .filterNot(_.isEmpty)
-
-          coordinatorEndpoint match {
-            case Some(endpoint) =>
-              createFindCoordinatorResponse(Errors.NONE, endpoint)
-            case _ =>
-              createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
+
+        requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createFindCoordinatorResponse(
+          Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+      } else {
+        def createResponse(requestThrottleMs: Int): AbstractResponse = {
+          val responseBody = if (topicMetadata.head.errorCode != 
Errors.NONE.code) {
+            createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode, requestThrottleMs)
+          } else {
+            val coordinatorEndpoint = topicMetadata.head.partitions.asScala
+              .find(_.partitionIndex == partition)
+              .filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
+              .flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
+              .flatMap(_.getNode(request.context.listenerName))
+              .filterNot(_.isEmpty)
+
+            coordinatorEndpoint match {
+              case Some(endpoint) =>
+                createFindCoordinatorResponse(Errors.NONE, endpoint, 
requestThrottleMs)
+              case _ =>
+                
createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, 
requestThrottleMs)
+            }
           }
+          trace("Sending FindCoordinator response %s for correlation id %d to 
client %s."
+            .format(responseBody, request.header.correlationId, 
request.header.clientId))
+          responseBody
         }
-        trace("Sending FindCoordinator response %s for correlation id %d to 
client %s."
-          .format(responseBody, request.header.correlationId, 
request.header.clientId))
-        responseBody
+
+        requestHelper.sendResponseMaybeThrottle(request, createResponse)
+      }
+    }
+  }
+
+  private def getCreateTopicsRequest(topics: Seq[String]): 
CreateTopicsRequest.Builder = {
+    val topicCollection = new CreateTopicsRequestData.CreatableTopicCollection
+    topics.foreach(topic => {
+      topicCollection.add(getTopicConfigs(topic))
+    })
+
+    new CreateTopicsRequest.Builder(
+      new CreateTopicsRequestData()
+        .setTimeoutMs(config.requestTimeoutMs)
+        .setTopics(topicCollection)
+    )
+  }
+
+  private def getTopicConfigs(topic: String): CreatableTopic = {
+    topic match {
+      case GROUP_METADATA_TOPIC_NAME =>
+        new CreatableTopic()
+          .setName(topic)
+          .setNumPartitions(config.offsetsTopicPartitions)

Review comment:
       The controller should have these configurations as well. Perhaps it is 
better to use -1 for this and replication factor and let the controller fill 
them in?

##########
File path: 
core/src/test/scala/unit/kafka/server/MetadataRequestWithForwardingTest.scala
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import java.util.Properties
+
+import kafka.server.{KafkaConfig, MetadataRequestTest}
+import org.junit.jupiter.api.Test
+
+class MetadataRequestWithForwardingTest extends MetadataRequestTest {
+
+  override def brokerPropertyOverrides(properties: Properties): Unit = {
+    super.brokerPropertyOverrides(properties)
+    properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString)
+  }
+
+  @Test
+  override def testAutoTopicCreation(): Unit = {
+    super.testAutoTopicCreation()
+  }
+
+  @Test
+  override def testAutoCreateOfCollidingTopics(): Unit = {
+    super.testAutoCreateOfCollidingTopics()
+  }
+
+  @Test
+  override def testAutoCreateTopicWithInvalidReplicationFactor(): Unit = {
+    super.testAutoCreateTopicWithInvalidReplicationFactor()
+  }
+
+  /* the rest of tests are not enabled */

Review comment:
       An alternative that we have done elsewhere would be to introduce an 
`AbstractMetadataRequestTest` which we can pull the common cases up to. A more 
elegant option might be to figure out how to use `@ParameterizedTest` so that 
we can provide config overrides. This would be a little difficult at the moment 
because we initialize brokers in a `@Before` method. Probably means we need to 
move away from this approach long term. For now, the abstract class seems 
preferable. Similar for `CreateTopicsRequestWithForwardingTest`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic 
(configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error 
can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, 
config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the 
transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This 
error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, 
config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic 
name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: 
ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), 
listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, 
isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+                               isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): 
Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): 
(Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {
           // A metadata request for all topics should never result in topic 
auto creation, but a topic may be deleted
           // in between the creation of the topics parameter and 
topicResponses, so make sure to return None for this case.
           None
-        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-          Some(createTopic(topic, config.numPartitions, 
config.defaultReplicationFactor))
-        } else {
-          Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, 
false, util.Collections.emptyList()))
+       } else {
+        Some(metadataResponseTopic(

Review comment:
       nit: seems misaligned

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1110,82 +1098,39 @@ class KafkaApis(val requestChannel: RequestChannel,
       .setPartitions(partitionData)
   }
 
-  private def createInternalTopic(topic: String): MetadataResponseTopic = {
-    if (topic == null)
-      throw new IllegalArgumentException("topic must not be null")
-
-    val aliveBrokers = metadataCache.getAliveBrokers
-
-    topic match {
-      case GROUP_METADATA_TOPIC_NAME =>
-        if (aliveBrokers.size < config.offsetsTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-            s"'${config.offsetsTopicReplicationFactor}' for the offsets topic 
(configured via " +
-            s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error 
can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.offsetsTopicPartitions, 
config.offsetsTopicReplicationFactor.toInt,
-            groupCoordinator.offsetsTopicConfigs)
-        }
-      case TRANSACTION_STATE_TOPIC_NAME =>
-        if (aliveBrokers.size < config.transactionTopicReplicationFactor) {
-          error(s"Number of alive brokers '${aliveBrokers.size}' does not meet 
the required replication factor " +
-            s"'${config.transactionTopicReplicationFactor}' for the 
transactions state topic (configured via " +
-            s"'${KafkaConfig.TransactionsTopicReplicationFactorProp}'). This 
error can be ignored if the cluster is starting up " +
-            s"and not all brokers are up yet.")
-          metadataResponseTopic(Errors.COORDINATOR_NOT_AVAILABLE, topic, true, 
util.Collections.emptyList())
-        } else {
-          createTopic(topic, config.transactionTopicPartitions, 
config.transactionTopicReplicationFactor.toInt,
-            txnCoordinator.transactionTopicConfigs)
-        }
-      case _ => throw new IllegalArgumentException(s"Unexpected internal topic 
name: $topic")
-    }
-  }
-
-  private def getOrCreateInternalTopic(topic: String, listenerName: 
ListenerName): MetadataResponseData.MetadataResponseTopic = {
-    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), 
listenerName)
-    topicMetadata.headOption.getOrElse(createInternalTopic(topic))
-  }
-
-  private def getTopicMetadata(allowAutoTopicCreation: Boolean, 
isFetchAllMetadata: Boolean,
-                               topics: Set[String], listenerName: ListenerName,
+  private def getTopicMetadata(allowAutoTopicCreation: Boolean,
+                               isFetchAllMetadata: Boolean,
+                               topics: Set[String],
+                               listenerName: ListenerName,
                                errorUnavailableEndpoints: Boolean,
-                               errorUnavailableListeners: Boolean): 
Seq[MetadataResponseTopic] = {
+                               errorUnavailableListeners: Boolean): 
(Seq[MetadataResponseTopic], Seq[MetadataResponseTopic]) = {
     val topicResponses = metadataCache.getTopicMetadata(topics, listenerName,
         errorUnavailableEndpoints, errorUnavailableListeners)
 
     if (topics.isEmpty || topicResponses.size == topics.size) {
-      topicResponses
+      (topicResponses, Seq.empty[MetadataResponseTopic])
     } else {
       val nonExistentTopics = topics.diff(topicResponses.map(_.name).toSet)
       val responsesForNonExistentTopics = nonExistentTopics.flatMap { topic =>
-        if (isInternal(topic)) {
-          val topicMetadata = createInternalTopic(topic)
-          Some(
-            if (topicMetadata.errorCode == 
Errors.COORDINATOR_NOT_AVAILABLE.code)
-              metadataResponseTopic(Errors.INVALID_REPLICATION_FACTOR, topic, 
true, util.Collections.emptyList())
-            else
-              topicMetadata
-          )
-        } else if (isFetchAllMetadata) {
+       if (isFetchAllMetadata) {
           // A metadata request for all topics should never result in topic 
auto creation, but a topic may be deleted
           // in between the creation of the topics parameter and 
topicResponses, so make sure to return None for this case.
           None
-        } else if (allowAutoTopicCreation && config.autoCreateTopicsEnable) {
-          Some(createTopic(topic, config.numPartitions, 
config.defaultReplicationFactor))
-        } else {
-          Some(metadataResponseTopic(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, 
false, util.Collections.emptyList()))
+       } else {
+        Some(metadataResponseTopic(
+          if (!hasEnoughAliveBrokers(topic))
+            Errors.INVALID_REPLICATION_FACTOR
+          else if (allowAutoTopicCreation && config.autoCreateTopicsEnable)
+            Errors.LEADER_NOT_AVAILABLE

Review comment:
       Hmm.. In the old logic, we would attempt topic creation through 
zookeeper first. Then, if the topic was created successfully, we would return 
LEADER_NOT_AVAILABLE to give time for the controller to elect a leader. Now we 
return LEADER_NOT_AVAILABLE immediately and we send the CreateTopic request 
asynchronously. We don't know if the CreateTopic request will ultimately 
succeed or not. Perhaps it would be better to keep returning 
`UNKNOWN_TOPIC_OR_PARTITION` until we see that the topic exists.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1084,24 +1087,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     (responseTopics ++ unauthorizedResponseStatus).toList
   }
 
-  private def createTopic(topic: String,
-                          numPartitions: Int,
-                          replicationFactor: Int,
-                          properties: util.Properties = new 
util.Properties()): MetadataResponseTopic = {
-    try {
-      adminZkClient.createTopic(topic, numPartitions, replicationFactor, 
properties, RackAwareMode.Safe)
-      info("Auto creation of topic %s with %d partitions and replication 
factor %d is successful"
-        .format(topic, numPartitions, replicationFactor))
-      metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, 
isInternal(topic), util.Collections.emptyList())
-    } catch {
-      case _: TopicExistsException => // let it go, possibly another broker 
created this topic

Review comment:
       We seem to have lost this handling or am I missing something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to