ijuma commented on a change in pull request #9912: URL: https://github.com/apache/kafka/pull/9912#discussion_r561002429
########## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java ########## @@ -175,4 +175,19 @@ public String clientId() { public int correlationId() { return header.correlationId(); } + + @Override + public String toString() { + return "RequestContext(" + + "header=" + header + + ", connectionId='" + connectionId + '\'' + + ", clientAddress=" + clientAddress + + ", principal=" + principal + + ", listenerName=" + listenerName + + ", securityProtocol=" + securityProtocol + + ", clientInformation=" + clientInformation + + ", fromPrivilegedListener=" + fromPrivilegedListener + + ", principalSerde=" + principalSerde + Review comment: Have we checked that no privileged information (i.e. credentials) are logged by any of these fields? Seems OK to me, but worth double checking. ########## File path: core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala ########## @@ -44,28 +33,13 @@ class ThrottledChannelExpirationTest { private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), Collections.emptyList(), time) - private val request = buildRequest(FetchRequest.Builder.forConsumer(0, 1000, new util.HashMap[TopicPartition, PartitionData]))._2 - - private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T], - listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = { - - val request = builder.build() - val buffer = RequestTestUtils.serializeRequestWithHeader( - new RequestHeader(builder.apiKey, request.version, "", 0), request) - val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics]) - - // read the header from the buffer first so that the body can be read next from the Request constructor - val header = RequestHeader.parse(buffer) - val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, - listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false) - (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer, - requestChannelMetrics)) - } Review comment: Nice that we don't need this anymore. ########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -371,9 +372,44 @@ class RequestChannel(val queueSize: Int, requestQueue.put(request) } - /** Send a response back to the socket server to be sent over the network */ - def sendResponse(response: RequestChannel.Response): Unit = { + def closeConnection( + request: RequestChannel.Request, + errorCounts: java.util.Map[Errors, Integer] + ): Unit = { + // This case is used when the request handler has encountered an error, but the client + // does not expect a response (e.g. when produce request has acks set to 0) + updateErrorMetrics(request.header.apiKey, errorCounts.asScala) + sendResponse(new RequestChannel.CloseConnectionResponse(request)) + } + + def sendResponse( + request: RequestChannel.Request, + response: AbstractResponse, + onComplete: Option[Send => Unit] Review comment: I wonder if this should be `Send => Unit` instead. It doesn't seem like there is much value in the `Option` here. A function that does nothing is equivalent to `None`. I know this is an existing issue, so ok if you prefer not to address now. ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -227,37 +225,33 @@ class KafkaApisTest { authorizeResource(authorizer, operation, ResourceType.TOPIC, resourceName, AuthorizationResult.ALLOWED) - val capturedResponse = expectNoThrottling() - val configResource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName) EasyMock.expect(adminManager.alterConfigs(anyObject(), EasyMock.eq(false))) .andAnswer(() => { Map(configResource -> alterConfigHandler.apply()) }) - EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, authorizer, - adminManager, controller) - val configs = Map( configResource -> new AlterConfigsRequest.Config( Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava)) val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion) val request = buildRequestWithEnvelope(alterConfigsRequest, fromPrivilegedListener = true) + val capturedResponse = EasyMock.newCapture[AbstractResponse]() + val capturedRequest = EasyMock.newCapture[RequestChannel.Request]() - createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request) - - val envelopeRequest = request.body[EnvelopeRequest] - val response = readResponse(envelopeRequest, capturedResponse) - .asInstanceOf[EnvelopeResponse] - - assertEquals(Errors.NONE, response.error) Review comment: We don't need this assert anymore? Can you explain a bit more what we're trying to do with the changes in this method? That will probably help with some other envelope related test changes in this class too. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org