apoorvmittal10 commented on code in PR #16792:
URL: https://github.com/apache/kafka/pull/16792#discussion_r1710301991


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4327,9 +4327,85 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleShareAcknowledgeRequest(request: RequestChannel.Request): Unit = {
     val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest]
-    // TODO: Implement the ShareAcknowledgeRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareAcknowledgeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request,
+        
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.UNSUPPORTED_VERSION.exception))
+      return
+    }
+
+    val sharePartitionManagerInstance : SharePartitionManager = 
sharePartitionManager match {
+      case Some(manager) => manager
+      case None =>
+        // The API is not supported when the SharePartitionManager is not 
defined on the broker
+        info("Received share acknowledge request for zookeeper based cluster")
+        requestHelper.sendMaybeThrottle(request,
+          
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.UNSUPPORTED_VERSION.exception))
+        return
+    }
+    val groupId = shareAcknowledgeRequest.data.groupId
+
+    // Share Acknowledge needs permission to perform READ action on the named 
group resource (groupId)
+    if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+      requestHelper.sendMaybeThrottle(request,
+        
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      return
+    }
+
+    val memberId = shareAcknowledgeRequest.data.memberId
+    val shareSessionEpoch = shareAcknowledgeRequest.data.shareSessionEpoch
+    val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)

Review Comment:
   I am fine either ways if we want to take up in follow up PR after discussing 
with @AndrewJSchofield or current PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to