hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r509540617



##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,60 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send =
+      if (envelopeContext.isDefined) {

Review comment:
       nit: use `match`

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
                 val context = new RequestContext(header, connectionId, 
channel.socketAddress,
                   channel.principal, listenerName, securityProtocol,
                   channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener)
-                val req = new RequestChannel.Request(processor = id, context = 
context,
-                  startTimeNanos = nowNanos, memoryPool, receive.payload, 
requestChannel.metrics)
+
+                val principalSerde = 
Option(channel.principalSerde.orElse(null))
+                val req =
+                if (header.apiKey == ApiKeys.ENVELOPE) {

Review comment:
       nit: this is misaligned. It might be better to pull the body here into a 
separate method (e.g. `parseEnvelopeRequest`)

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+      sendErrorResponseMaybeThrottle(request, 
Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined &&
+      (!request.context.fromPrivilegedListener ||
+      !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, 
CLUSTER, CLUSTER_NAME))
+    ) {
+      // If the designated forwarding request is not coming from a privileged 
listener, or
+      // it fails CLUSTER_ACTION permission, we would fail the authorization.
+      sendErrorResponseMaybeThrottle(request, 
Errors.BROKER_AUTHORIZATION_FAILURE.exception())

Review comment:
       As mentioned above, you can see the rest of the cases in this class 
where we check CLUSTER_ACTION and they all return 
`CLUSTER_AUTHORIZATION_FAILURE`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -153,7 +177,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
+        case ApiKeys.CREATE_TOPICS => maybeForward(request, 
handleCreateTopicsRequest)

Review comment:
       We should have a check at the beginning of `handle` to restrict the 
"forwardable" APIs. 

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,60 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send =

Review comment:
       nit: add braces to all of these methods. Even though they are not 
required, braces make it easier to see the scope

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,60 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send =
+      if (envelopeContext.isDefined) {
+        // Right now only the NOT_CONTROLLER error gets handled by the 
forwarding
+        // broker retry. Other errors should be fatal and propagated to the 
client.
+        val envelopeError = if (error.equals(Errors.NOT_CONTROLLER))
+          Errors.NOT_CONTROLLER
+        else
+          Errors.NONE
+
+        val envelopeResponse = new EnvelopeResponse(
+          abstractResponse.throttleTimeMs(),
+          abstractResponse.serializeBody(context.header.apiVersion),
+          envelopeError
+        )
+        envelopeContext.get.brokerContext.buildResponse(envelopeResponse)
+      } else
+        context.buildResponse(abstractResponse)
+
+    def responseString(response: AbstractResponse): Option[String] =
+      if (RequestChannel.isRequestLoggingEnabled) {
+        Some(if (envelopeContext.isDefined)
+          response.toString(envelopeContext.get.brokerContext.apiVersion)
+        else
+          response.toString(context.apiVersion))
+      } else
+        None
+
+    def headerForLogging(): RequestHeader =
+      if (envelopeContext.isDefined)
+        envelopeContext.get.brokerContext.header
+      else
+        context.header
+
+    // most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.

Review comment:
       nit: can we move this back to where the request parsing logic is. 
Otherwise it becomes a bit hidden.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -250,7 +301,9 @@ object RequestChannel extends Logging {
     }
 
     def releaseBuffer(): Unit = {
-      if (buffer != null) {
+      if (envelopeContext.isDefined)

Review comment:
       nit: use `match`

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##########
@@ -338,7 +339,9 @@
     INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or 
recipient of a " +
             "voter-only request is not one of the expected voters", 
InconsistentVoterSetException::new),
     INVALID_UPDATE_VERSION(95, "The given update version was invalid.", 
InvalidUpdateVersionException::new),
-    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an 
unexpected server error.", FeatureUpdateFailedException::new);
+    FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an 
unexpected server error.", FeatureUpdateFailedException::new),
+    BROKER_AUTHORIZATION_FAILURE(97, "Authorization failed for the request 
during forwarding. " +

Review comment:
       Not sure I follow. All current inter-broker APIs are gated by 
`ClusterAction` and will return `CLUSTER_AUTHORIZATION_FAILURE` if the 
principal does not have access. There is no distinction between clients and 
brokers. It's not clear to me why we need something different here.

##########
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala
##########
@@ -129,6 +137,27 @@ class BrokerToControllerChannelManagerImpl(metadataCache: 
kafka.server.MetadataC
     requestQueue.put(BrokerToControllerQueueItem(request, callback))
     requestThread.wakeup()
   }
+
+  override def forwardRequest(responseToOriginalClient: 
(RequestChannel.Request, Int =>
+                                AbstractResponse, Option[Send => Unit]) => 
Unit,
+                              request: RequestChannel.Request,
+                              callback: Option[Send => Unit] = Option.empty): 
Unit = {
+    val serializedRequestData = 
request.body[AbstractRequest].serialize(request.header)

Review comment:
       We want to avoid this serialization since it introduces the possibility 
for the request to be altered by the forwarding broker. The 
`RequestChannel.Request` object retains the reference to the original buffer, 
which we can use here, but we need to tell the channel to delay releasing the 
buffer using `ApiKeys.requiresDelayedAllocation` for all of the "forwardable" 
APIs.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -352,6 +352,8 @@ object KafkaConfig {
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMaxMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
+  private[server] val quorumBasedControllerProp = "quorum.based.controller"

Review comment:
       How about `enable.metadata.quorum`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int,
                 val context = new RequestContext(header, connectionId, 
channel.socketAddress,
                   channel.principal, listenerName, securityProtocol,
                   channel.channelMetadataRegistry.clientInformation, 
isPrivilegedListener)
-                val req = new RequestChannel.Request(processor = id, context = 
context,
-                  startTimeNanos = nowNanos, memoryPool, receive.payload, 
requestChannel.metrics)
+
+                val principalSerde = 
Option(channel.principalSerde.orElse(null))

Review comment:
       nit: you can just use `channel.principalSerde.asScala`

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1025,6 +1027,7 @@ object KafkaConfig {
       .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, 
RequestTimeoutMsDoc)
       .define(ConnectionSetupTimeoutMsProp, LONG, 
Defaults.ConnectionSetupTimeoutMs, MEDIUM, ConnectionSetupTimeoutMsDoc)
       .define(ConnectionSetupTimeoutMaxMsProp, LONG, 
Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc)
+      .define(quorumBasedControllerProp, BOOLEAN, false, LOW, "Private 
configuration for turning on/off KIP-500 mode")

Review comment:
       Use `defineInternal`

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
##########
@@ -167,4 +171,24 @@ public void close() {
             oldPrincipalBuilder.close();
     }
 
+    @Override
+    public ByteBuffer serialize(KafkaPrincipal principal) {
+        DefaultPrincipalData data = new DefaultPrincipalData()
+                                        .setType(principal.getPrincipalType())
+                                        .setName(principal.getName())
+                                        
.setTokenAuthenticated(principal.tokenAuthenticated());
+        Struct dataStruct = 
data.toStruct(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);
+        ByteBuffer buffer = ByteBuffer.allocate(dataStruct.sizeOf());
+        dataStruct.writeTo(buffer);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Override
+    public KafkaPrincipal deserialize(ByteBuffer bytes) {
+        DefaultPrincipalData data = new DefaultPrincipalData(
+            DefaultPrincipalData.SCHEMAS[DefaultPrincipalData.SCHEMAS.length - 
1].read(bytes),
+            DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);

Review comment:
       Rather than assuming highest supported version, we should include the 
version in the serialized data. The simple thing would be to write the version 
first, then write the payload.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) {
+      sendErrorResponseMaybeThrottle(request, 
Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined &&
+      (!request.context.fromPrivilegedListener ||
+      !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, 
CLUSTER, CLUSTER_NAME))
+    ) {
+      // If the designated forwarding request is not coming from a privileged 
listener, or
+      // it fails CLUSTER_ACTION permission, we would fail the authorization.
+      sendErrorResponseMaybeThrottle(request, 
Errors.BROKER_AUTHORIZATION_FAILURE.exception())
+    } else if (request.envelopeContext.isDefined && !controller.isActive) {
+      sendErrorResponseMaybeThrottle(request, 
Errors.NOT_CONTROLLER.exception())
+    } else if (!controller.isActive && couldDoRedirection(request)) {
+      redirectionManager.forwardRequest(sendResponseMaybeThrottle, request)
+    } else {
+      // When IBP is smaller than 2.8 or the principal serde is undefined, 
forwarding is not supported,
+      // therefore requests are handled directly.
+      handler(request)
+    }
+  }
+
+  private def couldDoRedirection(request: RequestChannel.Request): Boolean =

Review comment:
       We use 'forward' and 'redirect' interchangeably throughout the PR, but 
the names do suggest different behavior. In my mind 'redirection' suggests that 
we are telling the client to go somewhere else, while 'forward' suggests that 
the broker is passing the request through to its destination. So maybe we can 
stick with 'forward' consistently (e.g. `isForwardingEnabled`)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to