TaiJuWu commented on code in PR #18432:
URL: https://github.com/apache/kafka/pull/18432#discussion_r1906509319


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -324,79 +324,6 @@ class KafkaApisTest extends Logging {
     assertEquals(propValue, describeConfigsResponseData.value)
   }
 
-  @Test
-  def testEnvelopeRequestHandlingAsController(): Unit = {

Review Comment:
   This block remove Envelop PRC and related 
https://github.com/apache/kafka/pull/18422



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2845,49 +2739,15 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    // Forwarding has not happened yet, so handle both ZK and KRaft cases here
     if (remaining.resources().isEmpty) {
       sendResponse(Some(new IncrementalAlterConfigsResponseData()))
-    } else if ((!request.isForwarded) && metadataSupport.canForward() && 
isKRaftController) {
+    } else if ((!request.isForwarded) && metadataSupport.canForward()) {
       metadataSupport.forwardingManager.get.forwardRequest(request,
         new IncrementalAlterConfigsRequest(remaining, 
request.header.apiVersion()),
         response => sendResponse(response.map(_.data())))
     } else {
-      sendResponse(Some(processIncrementalAlterConfigsRequest(request, 
remaining)))
-    }
-  }
-
-  def processIncrementalAlterConfigsRequest(
-    originalRequest: RequestChannel.Request,
-    data: IncrementalAlterConfigsRequestData
-  ): IncrementalAlterConfigsResponseData = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))

Review Comment:
   ZK related.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2686,42 +2631,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         new AlterConfigsRequest(remaining, request.header.apiVersion()),
         response => sendResponse(response.map(_.data())))
     } else {
-      sendResponse(Some(processLegacyAlterConfigsRequest(request, remaining)))
+      throw KafkaApis.shouldAlwaysForward(request)
     }
   }
 
-  def processLegacyAlterConfigsRequest(
-    originalRequest: RequestChannel.Request,
-    data: AlterConfigsRequestData
-  ): AlterConfigsResponseData = {
-    val zkSupport = 
metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(originalRequest))

Review Comment:
   ZK related.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -630,51 +514,31 @@ class KafkaApisTest extends Logging {
     val configs = Map(authorizedResource -> new 
AlterConfigsRequest.Config(configEntries))
 
     val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion, clientId, 0)
-    val request = buildRequest(
-      new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion))
+    val apiRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
+    val request = buildRequest(apiRequest)
 
-    when(controller.isActive).thenReturn(false)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
-      .thenReturn(Map(authorizedResource -> ApiError.NONE))
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
+    kafkaApis = createKafkaApis(raftSupport = true)
     kafkaApis.handleAlterConfigsRequest(request)
-    val response = verifyNoThrottling[AlterConfigsResponse](request)
-    verifyAlterConfigResult(response, Map(subscriptionName -> Errors.NONE))
-    verify(authorizer, times(1)).authorize(any(), any())
-    verify(adminManager).alterConfigs(any(), anyBoolean())
+    testNewBodyForwardableApi(request)
   }
 
   @Test
   def testIncrementalClientMetricAlterConfigs(): Unit = {

Review Comment:
   ditto



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -630,51 +514,31 @@ class KafkaApisTest extends Logging {
     val configs = Map(authorizedResource -> new 
AlterConfigsRequest.Config(configEntries))
 
     val requestHeader = new RequestHeader(ApiKeys.ALTER_CONFIGS, 
ApiKeys.ALTER_CONFIGS.latestVersion, clientId, 0)
-    val request = buildRequest(
-      new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion))
+    val apiRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
+    val request = buildRequest(apiRequest)
 
-    when(controller.isActive).thenReturn(false)
-    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
-      any[Long])).thenReturn(0)
-    when(adminManager.alterConfigs(any(), ArgumentMatchers.eq(false)))
-      .thenReturn(Map(authorizedResource -> ApiError.NONE))
-    kafkaApis = createKafkaApis(authorizer = Some(authorizer))
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.LATEST_PRODUCTION)
+    kafkaApis = createKafkaApis(raftSupport = true)
     kafkaApis.handleAlterConfigsRequest(request)
-    val response = verifyNoThrottling[AlterConfigsResponse](request)
-    verifyAlterConfigResult(response, Map(subscriptionName -> Errors.NONE))
-    verify(authorizer, times(1)).authorize(any(), any())
-    verify(adminManager).alterConfigs(any(), anyBoolean())
+    testNewBodyForwardableApi(request)

Review Comment:
   In Kraft mode, this request is redirected to controller.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -485,44 +412,6 @@ class KafkaApisTest extends Logging {
     }
   }
 
-  @Test
-  def testAlterConfigsWithAuthorizer(): Unit = {

Review Comment:
   ZK only test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to