hachikuji commented on a change in pull request #10005: URL: https://github.com/apache/kafka/pull/10005#discussion_r567202218
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -119,7 +122,21 @@ class KafkaApis(val requestChannel: RequestChannel, type FetchResponseStats = Map[TopicPartition, RecordConversionStats] this.logIdent = "[KafkaApi-%d] ".format(brokerId) - val adminZkClient = new AdminZkClient(zkClient) + + val usingRaftMetadataQuorum = if (adminManager != null && controller != null && zkClient != null) { Review comment: Can we use `config.requiresZookeeper` here? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3316,8 +3440,8 @@ class KafkaApis(val requestChannel: RequestChannel, } private def parseForwardedClientAddress( - address: Array[Byte] - ): InetAddress = { + address: Array[Byte] Review comment: nit: fix indentation ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -119,7 +122,21 @@ class KafkaApis(val requestChannel: RequestChannel, type FetchResponseStats = Map[TopicPartition, RecordConversionStats] this.logIdent = "[KafkaApi-%d] ".format(brokerId) - val adminZkClient = new AdminZkClient(zkClient) + + val usingRaftMetadataQuorum = if (adminManager != null && controller != null && zkClient != null) { + false + } else { // at least one is null, so make sure they are all null, plus we must have a forwardingManager + if (adminManager != null || controller != null || zkClient != null) { Review comment: This kind of logic might be better suited for a factory. Something like this: ```scala object KafkaApis { def buildZk(zkClient: KafkaZkClient, ...): KafkaApis = { // verify adminManager, controller, etc } def buildSelfManaged(...): KafkaApis = { // verify forwarding manager, etc. } } ``` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -150,7 +167,11 @@ class KafkaApis(val requestChannel: RequestChannel, } forwardingManager match { - case Some(mgr) if !request.isForwarded && !controller.isActive => + case Some(mgr) if request.isForwarded && usingRaftMetadataQuorum => Review comment: The extra checks in here seem unnecessary. If the forwarding manager is provided, we can use it. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1654,6 +1702,7 @@ class KafkaApis(val requestChannel: RequestChannel, .format(response, request.header.correlationId, request.header.clientId)) response } + Review comment: nit: do we need all these additional newlines? they are a tad distracting ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2609,14 +2672,27 @@ class KafkaApis(val requestChannel: RequestChannel, case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") } } - val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly) - val unauthorizedResult = unauthorizedResources.keys.map { resource => - resource -> configsAuthorizationApiError(resource) + // Should never get here when using a Raft metadata quorum since it would be forwarded, Review comment: If you think the protection is worthwhile, then can we do something like `rejectWithUnsupportedVersionIfUsingRaftMetadataQuorum`? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3008,7 +3107,8 @@ class KafkaApis(val requestChannel: RequestChannel, }) } - if (!authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) { + // not yet supported when using a Raft-based metadata quorum + if (usingRaftMetadataQuorum || !authHelper.authorize(request.context, ALTER, CLUSTER, CLUSTER_NAME)) { Review comment: `CLUSTER_AUTHORIZATION_FAILED` seems like a surprising error for an unsupported operation. Any of the following errors might be better: - `UNKNOWN_SERVER_ERROR` - `OPERATION_NOT_ATTEMPTED` - `UNSUPPORTE_VERSION` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2965,10 +3064,10 @@ class KafkaApis(val requestChannel: RequestChannel, val electionRequest = request.body[ElectLeadersRequest] def sendResponseCallback( - error: ApiError - )( - results: Map[TopicPartition, ApiError] - ): Unit = { + error: ApiError Review comment: nit: not sure what happened with the indentation here ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3450,7 +3574,187 @@ class KafkaApis(val requestChannel: RequestChannel, brokerEpochInRequest < controller.brokerEpoch } } +} + +class ConfigHelper(metadataCache: MetadataCache, config: KafkaConfig, configRepository: ConfigRepository) extends Logging { Review comment: Can we move this to a separate file? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -249,6 +270,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit = { + rejectWithUnsupportedVersionIfUsingRaftMetadataQuorum(request) Review comment: Do we excludes these APIs from the ApiVersions response when raft is enabled? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -97,12 +100,12 @@ import scala.annotation.nowarn */ class KafkaApis(val requestChannel: RequestChannel, val replicaManager: ReplicaManager, - val adminManager: ZkAdminManager, + adminManager: ZkAdminManager, val groupCoordinator: GroupCoordinator, val txnCoordinator: TransactionCoordinator, - val controller: KafkaController, - val forwardingManager: Option[ForwardingManager], - val zkClient: KafkaZkClient, + controller: KafkaController, + forwardingManager: Option[ForwardingManager], + zkClient: KafkaZkClient, Review comment: Can we use `Option` for all of these types that might be null? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org