hachikuji commented on a change in pull request #9850: URL: https://github.com/apache/kafka/pull/9850#discussion_r555236682
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -3242,6 +3209,133 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleEnvelope(request: RequestChannel.Request): Unit = { + val envelope = request.body[EnvelopeRequest] + + if (!config.metadataQuorumEnabled) { + // If forwarding is not yet enabled, we treat the request as unparsable and close the connection + closeConnection(request, Collections.emptyMap()) + return + } else if (!request.context.fromPrivilegedListener) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Invalid envelope request on unprivileged listener ${request.context.listenerName}")) + return + } else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) { + sendErrorResponseMaybeThrottle(request, new ClusterAuthorizationException( + s"Principal ${request.context.principal} does not have required CLUSTER_ACTION for envelope")) + return + } else if (!controller.isActive) { + sendErrorResponseMaybeThrottle(request, new NotControllerException( + s"Broker $brokerId is not the active controller")) + return + } + + try { + val forwardedPrincipal = parseForwardedPrincipal(request).getOrElse { + throw new PrincipalDeserializationException("Failed to parse client principal from envelope") + } + + val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress).getOrElse { + throw new InvalidRequestException("Failed to parse client address from envelope") + } + + // Note that any failure to parse the embedded request or its header is treated as + // an UNSUPPORTED_VERSION error rather than INVALID_REQUEST. The purpose + // is to disambiguate structural errors in the envelope request itself, such + // as an invalid client address. + + val forwardedRequestBuffer = envelope.requestData.duplicate() + val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer).getOrElse { + throw new UnsupportedVersionException("Failed to parse request header from envelope") + } + + val forwardedApi = forwardedRequestHeader.apiKey + if (!forwardedApi.forwardable || !forwardedApi.isEnabled) { + throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding") + } + + val forwardedContext = new RequestContext( + forwardedRequestHeader, + request.context.connectionId, + forwardedClientAddress, + forwardedPrincipal, + request.context.listenerName, + request.context.securityProtocol, + ClientInformation.EMPTY, + request.context.fromPrivilegedListener + ) + + val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer).getOrElse { + throw new UnsupportedVersionException(s"Failed to parse forwarded request with header $forwardedRequestHeader") + } + + handle(forwardedRequest) + } catch { + case e: KafkaException => + debug(s"Failed to handle envelope request $request", e) + sendErrorResponseMaybeThrottle(request, e) + } + } + + private def parseForwardedClientAddress( + address: Array[Byte] + ): Option[InetAddress] = { + try { + Some(InetAddress.getByAddress(address)) + } catch { + case e: UnknownHostException => + None + } + } + + private def parseForwardedRequest( + envelope: RequestChannel.Request, + forwardedContext: RequestContext, + buffer: ByteBuffer + ): Option[RequestChannel.Request] = { + try { + Some(new RequestChannel.Request( + processor = envelope.processor, + context = forwardedContext, + startTimeNanos = envelope.startTimeNanos, + envelope.memoryPool, + buffer, + requestChannel.metrics, + Some(envelope) + )) + } catch { + case e: InvalidRequestException => + None + } + } + + private def parseForwardedRequestHeader( + buffer: ByteBuffer + ): Option[RequestHeader] = { + try { + Some(RequestHeader.parse(buffer)) + } catch { + case e: InvalidRequestException => + None + } + } + + private def parseForwardedPrincipal( + envelopeRequest: RequestChannel.Request + ): Option[KafkaPrincipal] = { + val envelope = envelopeRequest.body[EnvelopeRequest] + try { + val principalSerde = envelopeRequest.context.principalSerde.asScala + principalSerde.map { serde => + serde.deserialize(envelope.requestPrincipal) + } + } catch { + case e: Exception => + warn(s"Failed to deserialize principal from envelope request $envelope", e) Review comment: Thanks, makes sense and simplifies the code a bit. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org