chirag-wadhwa5 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1697422741
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4020,11 +4012,381 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } + val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case Some(manager) => manager + case None => + // The API is not supported when the SharePartitionManager is not defined on the broker + info("SharePartitionManager not defined on the broker") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } + + val groupId = shareFetchRequest.data.groupId + + // Share Fetch needs permission to perform the READ action on the named group resource (groupId) + if(!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) + return + } + + val memberId = shareFetchRequest.data.memberId + val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + + def isAcknowledgeDataPresentInFetchRequest: Boolean = { + shareFetchRequest.data.topics.asScala + .flatMap(t => t.partitions().asScala) + .exists(partition => partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) + } + + val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest + val topicIdNames = metadataCache.topicIdsToNames() + + def isTopicPresent(topicName: String) : Boolean = { + metadataCache.contains(topicName) + } + + def isPartitionPresent(partition : Int, topicName : String) : Boolean = { + metadataCache.getTopicPartitions(topicName).foreach(tp => { + if(tp.partition() == partition) { + return true + } + }) + false + } + + def isInvalidShareFetchRequest : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data. + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { + return true + } + // validating the existence of all topics and partitions in the request. + shareFetchRequest.data().topics().forEach(topic => { + if(!topicIdNames.keySet().contains(topic.topicId)) { + return true + } + if(!isTopicPresent(topicIdNames.get(topic.topicId))) { + return true + } + topic.partitions().forEach(partition => { + if (!isPartitionPresent(partition.partitionIndex, topicIdNames.get(topic.topicId))) { + return true + } + }) + }) + false + } + + val shareFetchData = shareFetchRequest.shareFetchData(topicIdNames) + val forgottenTopics = shareFetchRequest.forgottenTopics(topicIdNames) + + val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) + var shareFetchContext : ShareFetchContext = null + + // check if the Request is Invalid. If it is, the request is failed directly here. + if(isInvalidShareFetchRequest) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception)) + return + } + + try { + // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here. + shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata) + } catch { + case e: Exception => requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) + return + } + + val erroneousAndValidPartitionData : ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions + val topicIdPartitionSeq : mutable.Set[TopicIdPartition] = mutable.Set() + erroneousAndValidPartitionData.erroneous.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + erroneousAndValidPartitionData.validTopicIdPartitions.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + shareFetchData.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + topicIdPartitionSeq + )(_.topicPartition.topic) + + // Variable to store the topic partition wise result of fetching. + var fetchResult : CompletableFuture[mutable.Map[TopicIdPartition, ShareFetchResponseData.PartitionData]] = + CompletableFuture.completedFuture(mutable.Map.empty) + // Variable to store the topic partition wise result of piggybacked acknowledgements. + var acknowledgeResult : CompletableFuture[mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]] = + CompletableFuture.completedFuture(mutable.Map.empty) + + // Handling the Acknowledgements from the ShareFetchRequest If this check is true, we are sure that this is not an + // Initial ShareFetch Request, otherwise the request would have been invalid. + if(isAcknowledgeDataPresent) { + val erroneous = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + val acknowledgementDataFromRequest = getAcknowledgeBatchesFromShareFetchRequest(request.body[ShareFetchRequest], topicIdNames, erroneous) + acknowledgeResult = handleAcknowledgements( + acknowledgementDataFromRequest, + erroneous, Review Comment: Thanks for the review. Actually, the upcoming PR for shareAcknowledgeRequest would make it clear why it has been one this way. The acknowledgement data sent to `handleAcknowledgements` is retrieved using different methods in case of a fetch request and an acknowledge request. These methods can themselves identify some erroneous topic partitions, so that is why the map is being passed on to the method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org