hachikuji commented on a change in pull request #10005:
URL: https://github.com/apache/kafka/pull/10005#discussion_r567202218



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -119,7 +122,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
-  val adminZkClient = new AdminZkClient(zkClient)
+
+  val usingRaftMetadataQuorum = if (adminManager != null && controller != null 
&& zkClient != null) {

Review comment:
       Can we use `config.requiresZookeeper` here?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3316,8 +3440,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def parseForwardedClientAddress(
-    address: Array[Byte]
-  ): InetAddress = {
+                                           address: Array[Byte]

Review comment:
       nit: fix indentation

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -119,7 +122,21 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
   this.logIdent = "[KafkaApi-%d] ".format(brokerId)
-  val adminZkClient = new AdminZkClient(zkClient)
+
+  val usingRaftMetadataQuorum = if (adminManager != null && controller != null 
&& zkClient != null) {
+    false
+  } else { // at least one is null, so make sure they are all null, plus we 
must have a forwardingManager
+    if (adminManager != null || controller != null || zkClient != null) {

Review comment:
       This kind of logic might be better suited for a factory. Something like 
this:
   ```scala
   object KafkaApis {
     def buildZk(zkClient: KafkaZkClient, ...): KafkaApis = {
        // verify adminManager, controller, etc
     }
     def buildSelfManaged(...): KafkaApis = {
       // verify forwarding manager, etc.
     }
   }
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -150,7 +167,11 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     forwardingManager match {
-      case Some(mgr) if !request.isForwarded && !controller.isActive =>
+      case Some(mgr) if request.isForwarded && usingRaftMetadataQuorum =>

Review comment:
       The extra checks in here seem unnecessary. If the forwarding manager is 
provided, we can use it.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1654,6 +1702,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           .format(response, request.header.correlationId, 
request.header.clientId))
         response
       }
+

Review comment:
       nit: do we need all these additional newlines? they are a tad distracting

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2609,14 +2672,27 @@ class KafkaApis(val requestChannel: RequestChannel,
         case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
       }
     }
-    val authorizedResult = adminManager.alterConfigs(authorizedResources, 
alterConfigsRequest.validateOnly)
-    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
-      resource -> configsAuthorizationApiError(resource)
+    // Should never get here when using a Raft metadata quorum since it would 
be forwarded,

Review comment:
       If you think the protection is worthwhile, then can we do something like 
`rejectWithUnsupportedVersionIfUsingRaftMetadataQuorum`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3008,7 +3107,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       })
     }
 
-    if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) {
+    // not yet supported when using a Raft-based metadata quorum
+    if (usingRaftMetadataQuorum || !authHelper.authorize(request.context, 
ALTER, CLUSTER, CLUSTER_NAME)) {

Review comment:
       `CLUSTER_AUTHORIZATION_FAILED` seems like a surprising error for an 
unsupported operation. Any of the following errors might be better:
   -  `UNKNOWN_SERVER_ERROR` 
   - `OPERATION_NOT_ATTEMPTED`
   - `UNSUPPORTE_VERSION`

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2965,10 +3064,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val electionRequest = request.body[ElectLeadersRequest]
 
     def sendResponseCallback(
-      error: ApiError
-    )(
-      results: Map[TopicPartition, ApiError]
-    ): Unit = {
+                              error: ApiError

Review comment:
       nit: not sure what happened with the indentation here

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3450,7 +3574,187 @@ class KafkaApis(val requestChannel: RequestChannel,
       brokerEpochInRequest < controller.brokerEpoch
     }
   }
+}
+
+class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, 
configRepository: ConfigRepository) extends Logging {

Review comment:
       Can we move this to a separate file? 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -249,6 +270,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = {
+    rejectWithUnsupportedVersionIfUsingRaftMetadataQuorum(request)

Review comment:
       Do we excludes these APIs from the ApiVersions response when raft is 
enabled?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -97,12 +100,12 @@ import scala.annotation.nowarn
  */
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
-                val adminManager: ZkAdminManager,
+                adminManager: ZkAdminManager,
                 val groupCoordinator: GroupCoordinator,
                 val txnCoordinator: TransactionCoordinator,
-                val controller: KafkaController,
-                val forwardingManager: Option[ForwardingManager],
-                val zkClient: KafkaZkClient,
+                controller: KafkaController,
+                forwardingManager: Option[ForwardingManager],
+                zkClient: KafkaZkClient,

Review comment:
       Can we use `Option` for all of these types that might be null?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to