chirag-wadhwa5 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1696558877
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3955,11 +3948,484 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + info("new Group Coordinator is not enabled on the broker") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly. + info("Share Group is not enabled on the broker") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } + val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case Some(manager) => manager + case None => + // The API is not supported when the SharePartitionManager is not defined on the broker + info("SharePartitionManager not defined on the broker") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } + + val groupId = shareFetchRequest.data.groupId + val memberId = shareFetchRequest.data.memberId + val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + + def isAcknowledgeDataPresentInFetchRequest() : Boolean = { + var isAcknowledgeDataPresent = false + shareFetchRequest.data.topics.forEach ( topic => { + breakable{ + topic.partitions.forEach ( partition => { + if (partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) { + isAcknowledgeDataPresent = true + break + } else { + isAcknowledgeDataPresent = false + } + }) + } + }) + isAcknowledgeDataPresent + } + + val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest() + + def isInvalidShareFetchRequest() : Boolean = { + // The Initial Share Fetch Request should not Acknowledge any data. + if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && isAcknowledgeDataPresent) { + return true + } + false + } + + val topicIdNames = metadataCache.topicIdsToNames() + val shareFetchData = shareFetchRequest.shareFetchData(topicIdNames) + val forgottenTopics = shareFetchRequest.forgottenTopics(topicIdNames) + + val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) + var shareFetchContext : ShareFetchContext = null + + var shareFetchResponse : ShareFetchResponse = null + + // check if the Request is Invalid. If it is, the request is failed directly here. + if(isInvalidShareFetchRequest()) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.INVALID_REQUEST.exception)) + return + } + + try { + // Creating the shareFetchContext for Share Session Handling. if context creation fails, the request is failed directly here. + shareFetchContext = sharePartitionManager.newContext(groupId, shareFetchData, forgottenTopics, newReqMetadata) + } catch { + case e: Exception => requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e)) + return + } + + // Variable to store any error thrown while the handling piggybacked acknowledgements. + var acknowledgeError : Errors = Errors.NONE + // Variable to store the topic partition wise result of piggybacked acknowledgements. + var acknowledgeResult = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + + val erroneousAndValidPartitionData : ErroneousAndValidPartitionData = shareFetchContext.getErroneousAndValidTopicIdPartitions + val topicIdPartitionSeq : mutable.Set[TopicIdPartition] = mutable.Set() + erroneousAndValidPartitionData.erroneous.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + erroneousAndValidPartitionData.validTopicIdPartitions.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + shareFetchData.forEach { + case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) topicIdPartitionSeq += tp + } + + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + topicIdPartitionSeq + )(_.topicPartition.topic) + + // Handling the Acknowledgements from the ShareFetchRequest If this check is true, we are sure that this is not an + // Initial ShareFetch Request, otherwise the request would have been invalid. + if(isAcknowledgeDataPresent) { + if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + acknowledgeError = Errors.GROUP_AUTHORIZATION_FAILED + } else { + val erroneous = mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]() + val acknowledgementDataFromRequest = getAcknowledgeBatchesFromShareFetchRequest(request.body[ShareFetchRequest], topicIdNames, erroneous) + try { + acknowledgeResult = handleAcknowledgements( + acknowledgementDataFromRequest, + erroneous, + topicIdNames, + sharePartitionManager, + authorizedTopics, + groupId, + memberId, + request.header.clientId + ) + } catch { + case throwable: Throwable => + debug(s"Acknowledgements for Share fetch request with correlation from client ${request.header.clientId} " + + s"failed with error ${throwable.getMessage}") + requestHelper.handleError(request, throwable) + return + } + } + } + + // Handling the Fetch from the ShareFetchRequest. + try { + shareFetchResponse = handleFetchFromShareFetchRequest( + request, + erroneousAndValidPartitionData, + topicIdNames, + sharePartitionManager, + shareFetchContext, + authorizedTopics + ) + } catch { + case throwable : Throwable => + debug(s"Share fetch request with correlation from client ${request.header.clientId} " + + s"failed with error ${throwable.getMessage}") + requestHelper.handleError(request, throwable) + return + } + + def combineShareFetchAndShareAcknowledgeResponses( + shareFetchResponse: ShareFetchResponse, + acknowledgeResult : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + acknowledgeError : Errors + ) : ShareFetchResponse = { + + // The outer map has topicId as the key and the inner map has partitionIndex as the key. + val topicPartitionAcknowledgements : mutable.Map[Uuid, mutable.Map[Int, Short]] = mutable.Map() + if(acknowledgeResult != null && acknowledgeResult.nonEmpty) { + acknowledgeResult.foreach { case (tp, partitionData) => + topicPartitionAcknowledgements.get(tp.topicId) match { + case Some(subMap) => + subMap += tp.partition -> partitionData.errorCode + case None => + val partitionAcknowledgementsMap : mutable.Map[Int, Short] = mutable.Map() + partitionAcknowledgementsMap += tp.partition -> partitionData.errorCode + topicPartitionAcknowledgements += tp.topicId -> partitionAcknowledgementsMap + } + } + } + + shareFetchResponse.data.responses.forEach(topic => { + val topicId = topic.topicId + topicPartitionAcknowledgements.get(topicId) match { + case Some(subMap) => + topic.partitions.forEach { partition => + subMap.get(partition.partitionIndex) match { + case Some(value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + partition.setAcknowledgeErrorCode(ackErrorCode) + // Delete the element. + subMap.remove(partition.partitionIndex) + case None => + } + } + // Add the remaining acknowledgements. + subMap.foreach { case (partitionIndex, value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + val fetchPartitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(ackErrorCode) + topic.partitions.add(fetchPartitionData) + } + topicPartitionAcknowledgements.remove(topicId) + case None => + } + }) + // Add the remaining acknowledgements. + topicPartitionAcknowledgements.foreach{ case(topicId, subMap) => + val topicData = new ShareFetchResponseData.ShareFetchableTopicResponse() + .setTopicId(topicId) + subMap.foreach { case (partitionIndex, value) => + val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) acknowledgeError.code else value + val fetchPartitionData = new ShareFetchResponseData.PartitionData() + .setPartitionIndex(partitionIndex) + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(ackErrorCode) + topicData.partitions.add(fetchPartitionData) + } + shareFetchResponse.data.responses.add(topicData) + } + + if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) { + sharePartitionManager.releaseAcquiredRecords(groupId, memberId). + whenComplete((releaseAcquiredRecordsData, throwable) => { + if (throwable != null) { + throw throwable + } else { + info(s"Release acquired records on share session close $releaseAcquiredRecordsData succeeded") + } + }).get() + } + shareFetchResponse + } + + // Send the response immediately. + try { + requestChannel.sendResponse(request, combineShareFetchAndShareAcknowledgeResponses(shareFetchResponse, acknowledgeResult, acknowledgeError), fetchOnComplete(request)) Review Comment: Hi, thanks a lot for the review. I guess this comment refers to the code before the last commit. I believe all the issues with asynchronous code have been resolved with that. Let me know if you find any other gaps. Thanks ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org