apoorvmittal10 commented on code in PR #16456: URL: https://github.com/apache/kafka/pull/16456#discussion_r1695145736
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4020,11 +4012,381 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } + val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case Some(manager) => manager + case None => + // The API is not supported when the SharePartitionManager is not defined on the broker + info("SharePartitionManager not defined on the broker") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } + + val groupId = shareFetchRequest.data.groupId + + // Share Fetch needs permission to perform the READ action on the named group resource (groupId) + if(!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) + return + } + + val memberId = shareFetchRequest.data.memberId + val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch + + def isAcknowledgeDataPresentInFetchRequest: Boolean = { + shareFetchRequest.data.topics.asScala + .flatMap(t => t.partitions().asScala) + .exists(partition => partition.acknowledgementBatches != null && !partition.acknowledgementBatches.isEmpty) + } + + val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest + val topicIdNames = metadataCache.topicIdsToNames() + + def isTopicPresent(topicName: String) : Boolean = { + metadataCache.contains(topicName) + } + + def isPartitionPresent(partition : Int, topicName : String) : Boolean = { + metadataCache.getTopicPartitions(topicName).foreach(tp => { + if(tp.partition() == partition) { Review Comment: Is `()` required or we can just write `if(tp.partition == partition) {` ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3955,11 +3948,484 @@ class KafkaApis(val requestChannel: RequestChannel, } } + /** + * Handle a shareFetch request + */ def handleShareFetchRequest(request: RequestChannel.Request): Unit = { val shareFetchRequest = request.body[ShareFetchRequest] - // TODO: Implement the ShareFetchRequest handling - requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + info("new Group Coordinator is not enabled on the broker") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } else if (!config.isShareGroupEnabled) { + // The API is not supported when the "share" rebalance protocol has not been set explicitly. + info("Share Group is not enabled on the broker") + requestHelper.sendMaybeThrottle(request, shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + return + } + val sharePartitionManager : SharePartitionManager = this.sharePartitionManager match { + case Some(manager) => manager + case None => + // The API is not supported when the SharePartitionManager is not defined on the broker + info("SharePartitionManager not defined on the broker") Review Comment: Yeah as per suggestion here: https://github.com/apache/kafka/pull/16456#discussion_r1665668978, if we have it consistent across other logs then it would be good: ``` info("Received share fetch request for zookeeper based cluster") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org