apoorvmittal10 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1695145736


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4020,11 +4012,381 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
     val shareFetchRequest = request.body[ShareFetchRequest]
-    // TODO: Implement the ShareFetchRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      return
+    }
+    val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+      case Some(manager) => manager
+      case None =>
+        // The API is not supported when the SharePartitionManager is not 
defined on the broker
+        info("SharePartitionManager not defined on the broker")
+        requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+        return
+    }
+
+    val groupId = shareFetchRequest.data.groupId
+
+    // Share Fetch needs permission to perform the READ action on the named 
group resource (groupId)
+    if(!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      return
+    }
+
+    val memberId = shareFetchRequest.data.memberId
+    val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
+
+    def isAcknowledgeDataPresentInFetchRequest: Boolean = {
+      shareFetchRequest.data.topics.asScala
+        .flatMap(t => t.partitions().asScala)
+        .exists(partition => partition.acknowledgementBatches != null && 
!partition.acknowledgementBatches.isEmpty)
+    }
+
+    val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest
+    val topicIdNames = metadataCache.topicIdsToNames()
+
+    def isTopicPresent(topicName: String) : Boolean = {
+      metadataCache.contains(topicName)
+    }
+
+    def isPartitionPresent(partition : Int, topicName : String) : Boolean = {
+      metadataCache.getTopicPartitions(topicName).foreach(tp => {
+        if(tp.partition() == partition) {

Review Comment:
   Is `()` required or we can just write `if(tp.partition == partition) {`



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3955,11 +3948,484 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
     val shareFetchRequest = request.body[ShareFetchRequest]
-    // TODO: Implement the ShareFetchRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      info("new Group Coordinator is not enabled on the broker")
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      return
+    } else if (!config.isShareGroupEnabled) {
+      // The API is not supported when the "share" rebalance protocol has not 
been set explicitly.
+      info("Share Group is not enabled on the broker")
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      return
+    }
+    val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+      case Some(manager) => manager
+      case None =>
+        // The API is not supported when the SharePartitionManager is not 
defined on the broker
+        info("SharePartitionManager not defined on the broker")

Review Comment:
   Yeah as per suggestion here: 
https://github.com/apache/kafka/pull/16456#discussion_r1665668978, if we have 
it consistent across other logs then it would be good:
   
   ```
   info("Received share fetch request for zookeeper based cluster")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to