ijuma commented on a change in pull request #9912:
URL: https://github.com/apache/kafka/pull/9912#discussion_r561002429



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
##########
@@ -175,4 +175,19 @@ public String clientId() {
     public int correlationId() {
         return header.correlationId();
     }
+
+    @Override
+    public String toString() {
+        return "RequestContext(" +
+            "header=" + header +
+            ", connectionId='" + connectionId + '\'' +
+            ", clientAddress=" + clientAddress +
+            ", principal=" + principal +
+            ", listenerName=" + listenerName +
+            ", securityProtocol=" + securityProtocol +
+            ", clientInformation=" + clientInformation +
+            ", fromPrivilegedListener=" + fromPrivilegedListener +
+            ", principalSerde=" + principalSerde +

Review comment:
       Have we checked that no privileged information (i.e. credentials) are 
logged by any of these fields? Seems OK to me, but worth double checking.

##########
File path: 
core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
##########
@@ -44,28 +33,13 @@ class ThrottledChannelExpirationTest {
   private val metrics = new org.apache.kafka.common.metrics.Metrics(new 
MetricConfig(),
                                                                     
Collections.emptyList(),
                                                                     time)
-  private val request = buildRequest(FetchRequest.Builder.forConsumer(0, 1000, 
new util.HashMap[TopicPartition, PartitionData]))._2
-
-  private def buildRequest[T <: AbstractRequest](builder: 
AbstractRequest.Builder[T],
-                                                 listenerName: ListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, 
RequestChannel.Request) = {
-
-    val request = builder.build()
-    val buffer = RequestTestUtils.serializeRequestWithHeader(
-      new RequestHeader(builder.apiKey, request.version, "", 0), request)
-    val requestChannelMetrics: RequestChannel.Metrics = 
EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
-
-    // read the header from the buffer first so that the body can be read next 
from the Request constructor
-    val header = RequestHeader.parse(buffer)
-    val context = new RequestContext(header, "1", InetAddress.getLocalHost, 
KafkaPrincipal.ANONYMOUS,
-      listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
-    (request, new RequestChannel.Request(processor = 1, context = context, 
startTimeNanos =  0, MemoryPool.NONE, buffer,
-      requestChannelMetrics))
-  }

Review comment:
       Nice that we don't need this anymore.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int,
     requestQueue.put(request)
   }
 
-  /** Send a response back to the socket server to be sent over the network */
-  def sendResponse(response: RequestChannel.Response): Unit = {
+  def closeConnection(
+    request: RequestChannel.Request,
+    errorCounts: java.util.Map[Errors, Integer]
+  ): Unit = {
+    // This case is used when the request handler has encountered an error, 
but the client
+    // does not expect a response (e.g. when produce request has acks set to 0)
+    updateErrorMetrics(request.header.apiKey, errorCounts.asScala)
+    sendResponse(new RequestChannel.CloseConnectionResponse(request))
+  }
+
+  def sendResponse(
+    request: RequestChannel.Request,
+    response: AbstractResponse,
+    onComplete: Option[Send => Unit]

Review comment:
       I wonder if this should be `Send => Unit` instead. It doesn't seem like 
there is much value in the `Option` here. A function that does nothing is 
equivalent to `None`. I know this is an existing issue, so ok if you prefer not 
to address now.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -227,37 +225,33 @@ class KafkaApisTest {
 
     authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, 
AuthorizationResult.ALLOWED)
 
-    val capturedResponse = expectNoThrottling()
-
     val configResource = new ConfigResource(ConfigResource.Type.TOPIC, 
resourceName)
     EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false)))
       .andAnswer(() => {
         Map(configResource -> alterConfigHandler.apply())
       })
 
-    EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
authorizer,
-      adminManager, controller)
-
     val configs = Map(
       configResource -> new AlterConfigsRequest.Config(
         Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
     val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, 
false).build(requestHeader.apiVersion)
 
     val request = buildRequestWithEnvelope(alterConfigsRequest, 
fromPrivilegedListener = true)
+    val capturedResponse = EasyMock.newCapture[AbstractResponse]()
+    val capturedRequest = EasyMock.newCapture[RequestChannel.Request]()
 
-    createKafkaApis(authorizer = Some(authorizer), enableForwarding = 
true).handle(request)
-
-    val envelopeRequest = request.body[EnvelopeRequest]
-    val response = readResponse(envelopeRequest, capturedResponse)
-      .asInstanceOf[EnvelopeResponse]
-
-    assertEquals(Errors.NONE, response.error)

Review comment:
       We don't need this assert anymore? Can you explain a bit more what we're 
trying to do with the changes in this method? That will probably help with some 
other envelope related test changes in this class too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to