ijuma commented on a change in pull request #9912:
URL: https://github.com/apache/kafka/pull/9912#discussion_r563455980



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int,
     requestQueue.put(request)
   }
 
-  /** Send a response back to the socket server to be sent over the network */
-  def sendResponse(response: RequestChannel.Response): Unit = {
+  def closeConnection(
+    request: RequestChannel.Request,
+    errorCounts: java.util.Map[Errors, Integer]
+  ): Unit = {
+    // This case is used when the request handler has encountered an error, 
but the client
+    // does not expect a response (e.g. when produce request has acks set to 0)
+    updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
+    sendResponse(new RequestChannel.CloseConnectionResponse(request))
+  }
+
+  def sendResponse(
+    request: RequestChannel.Request,
+    response: AbstractResponse,
+    onComplete: Option[Send => Unit]

Review comment:
       Sounds good.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -227,37 +225,33 @@ class KafkaApisTest {
 
     authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, 
AuthorizationResult.ALLOWED)
 
-    val capturedResponse = expectNoThrottling()
-
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
     EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
       .andAnswer(() => {
         Map(configResource -> alterConfigHandler.apply())
       })
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
-      adminManager, controller)
-
     val configs = Map(
       configResource -> new AlterConfigsRequest.Config(
         Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
     val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
 
     val request = buildRequestWithEnvelope(alterConfigsRequest, 
fromPrivilegedListener = true)
+    val capturedResponse = EasyMock.newCapture[AbstractResponse]()
+    val capturedRequest = EasyMock.newCapture[RequestChannel.Request]()
 
-    createKafkaApis(authorizer = Some(authorizer), enableForwarding = 
true).handle(request)
-
-    val envelopeRequest = request.body[EnvelopeRequest]
-    val response = readResponse(envelopeRequest, capturedResponse)
-      .asInstanceOf[EnvelopeResponse]
-
-    assertEquals(Errors.NONE, response.error)

Review comment:
       Thanks for the explanation.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -529,15 +518,14 @@ class KafkaApisTest {
     new IncrementalAlterConfigsRequest.Builder(resourceMap, false)
   }
 
+  // FIXME: Unused arg

Review comment:
       Does this still have to be addressed? Either way, the FIXME needs to be 
removed.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -2127,21 +2118,15 @@ class KafkaApisTest {
       error = Errors.NONE
     ))
 
-    val response = readResponse(joinGroupRequest, capturedResponse)
-      .asInstanceOf[JoinGroupResponse]
+    val response = capturedResponse.getValue.asInstanceOf[JoinGroupResponse]
 
     assertEquals(Errors.NONE, response.error)
     assertEquals(0, response.data.members.size)
     assertEquals(memberId, response.data.memberId)
     assertEquals(0, response.data.generationId)
     assertEquals(memberId, response.data.leader)
     assertEquals(protocolName, response.data.protocolName)
-
-    if (version >= 7) {

Review comment:
       Why is this not needed?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -579,11 +566,11 @@ class KafkaApisTest {
     testForwardableAPI(ApiKeys.ALTER_CLIENT_QUOTAS, requestBuilder)
   }
 
+  // FIXME: Unused arg

Review comment:
       Similar to the other comment.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -2200,17 +2183,11 @@ class KafkaApisTest {
       error = Errors.NONE
     ))
 
-    val response = readResponse(syncGroupRequest, capturedResponse)
-      .asInstanceOf[SyncGroupResponse]
+    val response = capturedResponse.getValue.asInstanceOf[SyncGroupResponse]
 
     assertEquals(Errors.NONE, response.error)
     assertArrayEquals(Array.empty[Byte], response.data.assignment)
-
-    if (version >= 5) {

Review comment:
       Similar question, why is this not needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to