apoorvmittal10 commented on code in PR #16792:
URL: https://github.com/apache/kafka/pull/16792#discussion_r1707341091


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4367,7 +4443,46 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
-  private def getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest: 
ShareFetchRequest,
+  // Visible for Testing
+  def getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest 
: ShareAcknowledgeRequest,
+                                                       topicIdNames : 
util.Map[Uuid, String],
+                                                       erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]
+                                                      ) : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+    val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, 
util.List[ShareAcknowledgementBatch]]()
+    shareAcknowledgeRequest.data().topics().forEach{ topic =>
+      if(!topicIdNames.containsKey(topic.topicId)) {
+        topic.partitions.forEach{ case partition: 
ShareAcknowledgeRequestData.AcknowledgePartition =>
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId,
+            new TopicPartition(null, partition.partitionIndex))
+          erroneous +=
+            topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID)
+        }
+      } else {
+        topic.partitions().forEach{partition =>
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId(),
+            new TopicPartition(topicIdNames.get(topic.topicId()), 
partition.partitionIndex())
+          )
+          val acknowledgeBatches = new 
util.ArrayList[ShareAcknowledgementBatch]()
+          partition.acknowledgementBatches().forEach{ batch =>
+            acknowledgeBatches.add(new ShareAcknowledgementBatch(
+              batch.firstOffset(),
+              batch.lastOffset(),
+              batch.acknowledgeTypes()
+            ))
+          }
+          if (acknowledgeBatches.size() > 0) {

Review Comment:
   if `acknowledgeBatches.size()` could be 0 then shouldn't that check exist 
first prior creating objects for `TopicIdPartition`, `TopicPartition`, `new 
util.ArrayList[ShareAcknowledgementBatch]()`? Also if acknowledgeBatches.size() 
is 0 for topic partition then where we are filling response for same?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4327,9 +4327,85 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleShareAcknowledgeRequest(request: RequestChannel.Request): Unit = {
     val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest]
-    // TODO: Implement the ShareAcknowledgeRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareAcknowledgeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!isShareGroupProtocolEnabled) {
+      requestHelper.sendMaybeThrottle(request,
+        
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.UNSUPPORTED_VERSION.exception))
+      return
+    }
+
+    val sharePartitionManagerInstance : SharePartitionManager = 
sharePartitionManager match {
+      case Some(manager) => manager
+      case None =>
+        // The API is not supported when the SharePartitionManager is not 
defined on the broker
+        info("Received share acknowledge request for zookeeper based cluster")
+        requestHelper.sendMaybeThrottle(request,
+          
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.UNSUPPORTED_VERSION.exception))
+        return
+    }
+    val groupId = shareAcknowledgeRequest.data.groupId
+
+    // Share Acknowledge needs permission to perform READ action on the named 
group resource (groupId)
+    if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+      requestHelper.sendMaybeThrottle(request,
+        
shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME,
 Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      return
+    }
+
+    val memberId = shareAcknowledgeRequest.data.memberId
+    val shareSessionEpoch = shareAcknowledgeRequest.data.shareSessionEpoch
+    val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)

Review Comment:
   `ShareFetchMetadata` names seems to be a bit inappropriate as we are 
handling ShareAcknowledge request. I understand that we need session and epoch 
related information from the class. Do you think `ShareRequestMetadata` would 
be a better name for `ShareFetchMetadata`?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4396,14 +4511,70 @@ class KafkaApis(val requestChannel: RequestChannel,
               batch.acknowledgeTypes()
             ))
           }
-          acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches
+          if (acknowledgeBatches.size() > 0) {

Review Comment:
   Same as above.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4430,16 +4430,16 @@ class KafkaApisTest extends Logging {
     when(sharePartitionManager.fetchMessages(any(), any(), any(), 
any())).thenReturn(
       CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
         new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
-                  new ShareFetchResponseData.PartitionData()
-                    .setErrorCode(Errors.NONE.code)
-                    .setAcknowledgeErrorCode(Errors.NONE.code)
-                    .setRecords(records)
-                    .setAcquiredRecords(new util.ArrayList(List(
-                      new ShareFetchResponseData.AcquiredRecords()
-                        .setFirstOffset(0)
-                        .setLastOffset(9)
-                        .setDeliveryCount(1)
-                      ).asJava))
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))

Review Comment:
   Is it an inteded change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to