hachikuji commented on a change in pull request #9850: URL: https://github.com/apache/kafka/pull/9850#discussion_r555979059
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3242,6 +3209,125 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleEnvelope(request: RequestChannel.Request): Unit = { + val envelope = request.body[EnvelopeRequest] + + if (!config.metadataQuorumEnabled || !request.context.fromPrivilegedListener) { + // If forwarding is not yet enabled or this request has been received on an invalid endpoint, + // then we treat the request as unparsable and close the connection. + closeConnection(request, Collections.emptyMap()) + return + } else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope")) + return + } else if (!controller.isActive) { + sendErrorResponseMaybeThrottle(request, new NotControllerException( + s"Broker $brokerId is not the active controller")) + return + } + + try { + val forwardedPrincipal = parseForwardedPrincipal(request) + val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress) + + val forwardedRequestBuffer = envelope.requestData.duplicate() + val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer) + + val forwardedApi = forwardedRequestHeader.apiKey + if (!forwardedApi.forwardable || !forwardedApi.isEnabled) { + throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding") + } + + val forwardedContext = new RequestContext( + forwardedRequestHeader, + request.context.connectionId, + forwardedClientAddress, + forwardedPrincipal, + request.context.listenerName, + request.context.securityProtocol, + ClientInformation.EMPTY, + request.context.fromPrivilegedListener + ) + + val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer) + handle(forwardedRequest) + } catch { + case e: KafkaException => Review comment: I think I added the `try/catch` initially because I thought the logging in `handleError` was too verbose for some of these cases. However, parsing errors should be rare, and if they are happening, it probably implies misconfiguration. So perhaps the logging should be more verbose. I will go ahead and remove this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org