hachikuji commented on a change in pull request #9850:
URL: https://github.com/apache/kafka/pull/9850#discussion_r555985207



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -3242,6 +3209,125 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleEnvelope(request: RequestChannel.Request): Unit = {
+    val envelope = request.body[EnvelopeRequest]
+
+    if (!config.metadataQuorumEnabled || 
!request.context.fromPrivilegedListener) {
+      // If forwarding is not yet enabled or this request has been received on 
an invalid endpoint,
+      // then we treat the request as unparsable and close the connection.
+      closeConnection(request, Collections.emptyMap())
+      return
+    } else if (!authorize(request.context, CLUSTER_ACTION, CLUSTER, 
CLUSTER_NAME)) {
+      sendErrorResponseMaybeThrottle(request, new 
ClusterAuthorizationException(
+        s"Principal ${request.context.principal} does not have required 
CLUSTER_ACTION for envelope"))
+      return
+    } else if (!controller.isActive) {
+      sendErrorResponseMaybeThrottle(request, new NotControllerException(
+        s"Broker $brokerId is not the active controller"))
+      return
+    }
+
+    try {
+      val forwardedPrincipal = parseForwardedPrincipal(request)
+      val forwardedClientAddress = 
parseForwardedClientAddress(envelope.clientAddress)
+
+      val forwardedRequestBuffer = envelope.requestData.duplicate()
+      val forwardedRequestHeader = 
parseForwardedRequestHeader(forwardedRequestBuffer)
+
+      val forwardedApi = forwardedRequestHeader.apiKey
+      if (!forwardedApi.forwardable || !forwardedApi.isEnabled) {
+        throw new InvalidRequestException(s"API $forwardedApi is not enabled 
or is not eligible for forwarding")
+      }
+
+      val forwardedContext = new RequestContext(
+        forwardedRequestHeader,
+        request.context.connectionId,
+        forwardedClientAddress,
+        forwardedPrincipal,
+        request.context.listenerName,
+        request.context.securityProtocol,
+        ClientInformation.EMPTY,
+        request.context.fromPrivilegedListener
+      )
+
+      val forwardedRequest = parseForwardedRequest(request, forwardedContext, 
forwardedRequestBuffer)
+      handle(forwardedRequest)
+    } catch {
+      case e: KafkaException =>
+        debug(s"Failed to handle envelope request $request", e)
+        sendErrorResponseMaybeThrottle(request, e)
+    }
+  }
+
+  private def parseForwardedClientAddress(
+    address: Array[Byte]
+  ): InetAddress = {
+    try {
+      InetAddress.getByAddress(address)
+    } catch {
+      case e: UnknownHostException =>
+        throw new InvalidRequestException("Failed to parse client address from 
envelope", e)
+    }
+  }
+
+  private def parseForwardedRequest(
+    envelope: RequestChannel.Request,
+    forwardedContext: RequestContext,
+    buffer: ByteBuffer
+  ): RequestChannel.Request = {
+    try {
+      new RequestChannel.Request(
+        processor = envelope.processor,
+        context = forwardedContext,
+        startTimeNanos = envelope.startTimeNanos,
+        envelope.memoryPool,
+        buffer,
+        requestChannel.metrics,
+        Some(envelope)
+      )
+    } catch {
+      case e: InvalidRequestException =>
+        // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
+        // The purpose is to disambiguate structural errors in the envelope 
request
+        // itself, such as an invalid client address.
+        throw new UnsupportedVersionException(s"Failed to parse forwarded 
request " +
+          s"with header ${forwardedContext.header}")
+    }
+  }
+
+  private def parseForwardedRequestHeader(
+    buffer: ByteBuffer
+  ): RequestHeader = {
+    try {
+      RequestHeader.parse(buffer)
+    } catch {
+      case e: InvalidRequestException =>
+        // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
+        // The purpose is to disambiguate structural errors in the envelope 
request
+        // itself, such as an invalid client address.
+        throw new UnsupportedVersionException("Failed to parse request header 
from envelope", e)
+    }
+  }
+
+  private def parseForwardedPrincipal(
+    envelopeRequest: RequestChannel.Request

Review comment:
       Thanks I modified the arguments so that we pass down the envelope 
request context and the principal bytes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to