hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r509540617
########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -94,19 +104,60 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size - //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. - //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference - //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = + if (envelopeContext.isDefined) { Review comment: nit: use `match` ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int, val context = new RequestContext(header, connectionId, channel.socketAddress, channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation, isPrivilegedListener) - val req = new RequestChannel.Request(processor = id, context = context, - startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) + + val principalSerde = Option(channel.principalSerde.orElse(null)) + val req = + if (header.apiKey == ApiKeys.ENVELOPE) { Review comment: nit: this is misaligned. It might be better to pull the body here into a separate method (e.g. `parseEnvelopeRequest`) ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { + if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) + } else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) + ) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.BROKER_AUTHORIZATION_FAILURE.exception()) Review comment: As mentioned above, you can see the rest of the cases in this class where we check CLUSTER_ACTION and they all return `CLUSTER_AUTHORIZATION_FAILURE`. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -153,7 +177,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) - case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) + case ApiKeys.CREATE_TOPICS => maybeForward(request, handleCreateTopicsRequest) Review comment: We should have a check at the beginning of `handle` to restrict the "forwardable" APIs. ########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -94,19 +104,60 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size - //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. - //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference - //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = Review comment: nit: add braces to all of these methods. Even though they are not required, braces make it easier to see the scope ########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -94,19 +104,60 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size - //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. - //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference - //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = + if (envelopeContext.isDefined) { + // Right now only the NOT_CONTROLLER error gets handled by the forwarding + // broker retry. Other errors should be fatal and propagated to the client. + val envelopeError = if (error.equals(Errors.NOT_CONTROLLER)) + Errors.NOT_CONTROLLER + else + Errors.NONE + + val envelopeResponse = new EnvelopeResponse( + abstractResponse.throttleTimeMs(), + abstractResponse.serializeBody(context.header.apiVersion), + envelopeError + ) + envelopeContext.get.brokerContext.buildResponse(envelopeResponse) + } else + context.buildResponse(abstractResponse) + + def responseString(response: AbstractResponse): Option[String] = + if (RequestChannel.isRequestLoggingEnabled) { + Some(if (envelopeContext.isDefined) + response.toString(envelopeContext.get.brokerContext.apiVersion) + else + response.toString(context.apiVersion)) + } else + None + + def headerForLogging(): RequestHeader = + if (envelopeContext.isDefined) + envelopeContext.get.brokerContext.header + else + context.header + + // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. Review comment: nit: can we move this back to where the request parsing logic is. Otherwise it becomes a bit hidden. ########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -250,7 +301,9 @@ object RequestChannel extends Logging { } def releaseBuffer(): Unit = { - if (buffer != null) { + if (envelopeContext.isDefined) Review comment: nit: use `match` ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ########## @@ -338,7 +339,9 @@ INCONSISTENT_VOTER_SET(94, "Indicates that the either the sender or recipient of a " + "voter-only request is not one of the expected voters", InconsistentVoterSetException::new), INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new), - FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new); + FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new), + BROKER_AUTHORIZATION_FAILURE(97, "Authorization failed for the request during forwarding. " + Review comment: Not sure I follow. All current inter-broker APIs are gated by `ClusterAction` and will return `CLUSTER_AUTHORIZATION_FAILURE` if the principal does not have access. There is no distinction between clients and brokers. It's not clear to me why we need something different here. ########## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala ########## @@ -129,6 +137,27 @@ class BrokerToControllerChannelManagerImpl(metadataCache: kafka.server.MetadataC requestQueue.put(BrokerToControllerQueueItem(request, callback)) requestThread.wakeup() } + + override def forwardRequest(responseToOriginalClient: (RequestChannel.Request, Int => + AbstractResponse, Option[Send => Unit]) => Unit, + request: RequestChannel.Request, + callback: Option[Send => Unit] = Option.empty): Unit = { + val serializedRequestData = request.body[AbstractRequest].serialize(request.header) Review comment: We want to avoid this serialization since it introduces the possibility for the request to be altered by the forwarding broker. The `RequestChannel.Request` object retains the reference to the original buffer, which we can use here, but we need to tell the channel to delay releasing the buffer using `ApiKeys.requiresDelayedAllocation` for all of the "forwardable" APIs. ########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -352,6 +352,8 @@ object KafkaConfig { val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG + private[server] val quorumBasedControllerProp = "quorum.based.controller" Review comment: How about `enable.metadata.quorum`? ########## File path: core/src/main/scala/kafka/network/SocketServer.scala ########## @@ -974,8 +973,39 @@ private[kafka] class Processor(val id: Int, val context = new RequestContext(header, connectionId, channel.socketAddress, channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation, isPrivilegedListener) - val req = new RequestChannel.Request(processor = id, context = context, - startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) + + val principalSerde = Option(channel.principalSerde.orElse(null)) Review comment: nit: you can just use `channel.principalSerde.asScala` ########## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ########## @@ -1025,6 +1027,7 @@ object KafkaConfig { .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc) .define(ConnectionSetupTimeoutMsProp, LONG, Defaults.ConnectionSetupTimeoutMs, MEDIUM, ConnectionSetupTimeoutMsDoc) .define(ConnectionSetupTimeoutMaxMsProp, LONG, Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc) + .define(quorumBasedControllerProp, BOOLEAN, false, LOW, "Private configuration for turning on/off KIP-500 mode") Review comment: Use `defineInternal` ########## File path: clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java ########## @@ -167,4 +171,24 @@ public void close() { oldPrincipalBuilder.close(); } + @Override + public ByteBuffer serialize(KafkaPrincipal principal) { + DefaultPrincipalData data = new DefaultPrincipalData() + .setType(principal.getPrincipalType()) + .setName(principal.getName()) + .setTokenAuthenticated(principal.tokenAuthenticated()); + Struct dataStruct = data.toStruct(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION); + ByteBuffer buffer = ByteBuffer.allocate(dataStruct.sizeOf()); + dataStruct.writeTo(buffer); + buffer.flip(); + return buffer; + } + + @Override + public KafkaPrincipal deserialize(ByteBuffer bytes) { + DefaultPrincipalData data = new DefaultPrincipalData( + DefaultPrincipalData.SCHEMAS[DefaultPrincipalData.SCHEMAS.length - 1].read(bytes), + DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION); Review comment: Rather than assuming highest supported version, we should include the version in the serialized data. The simple thing would be to write the version first, then write the payload. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -126,6 +125,31 @@ class KafkaApis(val requestChannel: RequestChannel, info("Shutdown complete.") } + private def maybeForward(request: RequestChannel.Request, + handler: RequestChannel.Request => Unit): Unit = { + if (request.envelopeContext.isDefined && request.principalSerde.isEmpty) { + sendErrorResponseMaybeThrottle(request, Errors.PRINCIPAL_DESERIALIZATION_FAILURE.exception()) + } else if (request.envelopeContext.isDefined && + (!request.context.fromPrivilegedListener || + !authorize(request.envelopeContext.get.brokerContext, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) + ) { + // If the designated forwarding request is not coming from a privileged listener, or + // it fails CLUSTER_ACTION permission, we would fail the authorization. + sendErrorResponseMaybeThrottle(request, Errors.BROKER_AUTHORIZATION_FAILURE.exception()) + } else if (request.envelopeContext.isDefined && !controller.isActive) { + sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception()) + } else if (!controller.isActive && couldDoRedirection(request)) { + redirectionManager.forwardRequest(sendResponseMaybeThrottle, request) + } else { + // When IBP is smaller than 2.8 or the principal serde is undefined, forwarding is not supported, + // therefore requests are handled directly. + handler(request) + } + } + + private def couldDoRedirection(request: RequestChannel.Request): Boolean = Review comment: We use 'forward' and 'redirect' interchangeably throughout the PR, but the names do suggest different behavior. In my mind 'redirection' suggests that we are telling the client to go somewhere else, while 'forward' suggests that the broker is passing the request through to its destination. So maybe we can stick with 'forward' consistently (e.g. `isForwardingEnabled`)? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org