hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r493823776
########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -117,6 +117,89 @@ class KafkaApis(val requestChannel: RequestChannel, val adminZkClient = new AdminZkClient(zkClient) private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId) + /** + * The template to create a forward request handler. + * + * @tparam T request type + * @tparam R response type + * @tparam RK resource key + * @tparam RV resource value + */ + private[server] abstract class ForwardRequestHandler[T <: AbstractRequest, + R <: AbstractResponse, RK, RV](request: RequestChannel.Request) extends Logging { + + /** + * Split the given resource into authorized and unauthorized sets. + * + * @return authorized resources and unauthorized resources + */ + def resourceSplitByAuthorization(request: T): (Map[RK, RV], Map[RK, ApiError]) + + /** + * Controller handling logic of the request. + */ + def process(authorizedResources: Map[RK, RV], + unauthorizedResult: Map[RK, ApiError], + request: T): Unit + + /** + * Build a forward request to the controller. + * + * @param authorizedResources authorized resources by the forwarding broker + * @param request the original request + * @return forward request builder + */ + def createRequestBuilder(authorizedResources: Map[RK, RV], + request: T): AbstractRequest.Builder[T] + + /** + * Merge the forward response with the previously unauthorized results. + * + * @param forwardResponse the forward request's response + * @param unauthorizedResult original unauthorized results + * @return combined response to the original client + */ + def mergeResponse(forwardResponse: R, + unauthorizedResult: Map[RK, ApiError]): R + + def handle(): Unit = { + val requestBody = request.body[AbstractRequest].asInstanceOf[T] + val (authorizedResources, unauthorizedResources) = resourceSplitByAuthorization(requestBody) + if (isForwardingRequest(request)) { + if (!controller.isActive) { + sendErrorResponseMaybeThrottle(request, Errors.NOT_CONTROLLER.exception()) + } else { + // For forwarding requests, the authentication failure is not caused by + // the original client, but by the broker. + val unauthorizedResult = unauthorizedResources.keys.map { + resource => resource -> new ApiError(Errors.BROKER_AUTHORIZATION_FAILURE, null) + }.toMap + + process(authorizedResources, unauthorizedResult, requestBody) + } + } else if (!controller.isActive && config.redirectionEnabled && + authorizedResources.nonEmpty) { + redirectionManager.forwardRequest( + createRequestBuilder(authorizedResources, requestBody), Review comment: As discussed offline, we can pass the expected version down to the Builder. The abstract builder already supports an explicit range of versions. In any case, it doesn't seem like we have a choice. By the way, one potential edge case here is that the broker receiving the request has upgraded to a later version than the controller. This would be possible in the middle of a rolling upgrade. I don't think there's an easy way to handle this. We could return UNSUPPORTED_VERSION to the client, but that would be surprising since the client chose a supported API based on ApiVersions and is not aware of the controller redirection. One idea to address this problem is to gate version upgrades to redirectable APIs by the IBP. Basically all of these APIs have become inter-broker APIs through redirection so they need the safeguard of the IBP. Feels like we might have to do this. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org