junrao commented on code in PR #16792:
URL: https://github.com/apache/kafka/pull/16792#discussion_r1710001661


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4367,10 +4443,49 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
-  private def getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest: 
ShareFetchRequest,
-                                                         topicIdNames: 
util.Map[Uuid, String],
-                                                         erroneous: 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]
-                                                        ): 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+  // Visible for Testing
+  def 
getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest: 
ShareAcknowledgeRequest,
+                                                       topicIdNames: 
util.Map[Uuid, String],
+                                                       erroneous: 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]
+                                                      ): 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+    val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, 
util.List[ShareAcknowledgementBatch]]()
+    shareAcknowledgeRequest.data().topics().forEach{ topic =>
+      if (!topicIdNames.containsKey(topic.topicId)) {
+        topic.partitions.forEach{ case partition: 
ShareAcknowledgeRequestData.AcknowledgePartition =>
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId,
+            new TopicPartition(null, partition.partitionIndex))
+          erroneous +=
+            topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID)
+        }
+      } else {
+        topic.partitions().forEach{ partition =>
+          if (partition.acknowledgementBatches().size() > 0) {

Review Comment:
   As Apoorv mentioned earlier, by excluding partitions with empty ack batches, 
we won't include that partition in the response, which is incorrect, right?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4384,26 +4499,58 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       } else {
         topic.partitions().forEach { partition =>
-          val topicIdPartition = new TopicIdPartition(
-            topic.topicId(),
-            new TopicPartition(topicIdNames.get(topic.topicId()), 
partition.partitionIndex())
-          )
-          val acknowledgeBatches = new 
util.ArrayList[ShareAcknowledgementBatch]()
-          partition.acknowledgementBatches().forEach{ batch =>
-            acknowledgeBatches.add(new ShareAcknowledgementBatch(
-              batch.firstOffset(),
-              batch.lastOffset(),
-              batch.acknowledgeTypes()
-            ))
+          if (partition.acknowledgementBatches().size() > 0) {  
+            val topicIdPartition = new TopicIdPartition(
+              topic.topicId(),
+              new TopicPartition(topicIdNames.get(topic.topicId()), 
partition.partitionIndex())
+            )
+            val acknowledgeBatches = new 
util.ArrayList[ShareAcknowledgementBatch]()
+            partition.acknowledgementBatches().forEach{ batch =>
+              acknowledgeBatches.add(new ShareAcknowledgementBatch(
+                batch.firstOffset(),
+                batch.lastOffset(),
+                batch.acknowledgeTypes()
+              ))
+            }
+            acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches
           }
-          acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches
         }
       }
     }
     acknowledgeBatchesMap
   }
 
-  private def validateAcknowledgementBatches(acknowledgementDataFromRequest: 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]],
+  // the callback for processing a share acknowledge response, invoked before 
throttling
+  def processShareAcknowledgeResponse(responseAcknowledgeData: 
Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                      request: RequestChannel.Request,
+                                      topicNames: util.Map[Uuid, String]): 
ShareAcknowledgeResponse = {

Review Comment:
   topicNames is unused.



##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -430,6 +430,41 @@ public ShareFetchContext newContext(String groupId, 
Map<TopicIdPartition, ShareF
         return context;
     }
 
+    /**
+     * The acknowledgeSessionUpdate method is used to update the request epoch 
and lastUsed time of the share session.
+     * @param groupId The group id in the share fetch request.
+     * @param reqMetadata The metadata in the share acknowledge request.
+     */
+    public void acknowledgeSessionUpdate(String groupId, ShareFetchMetadata 
reqMetadata) {
+        if (reqMetadata.epoch() == ShareFetchMetadata.INITIAL_EPOCH) {
+            // ShareAcknowledge Request cannot have epoch as INITIAL_EPOCH (0)
+            throw Errors.INVALID_SHARE_SESSION_EPOCH.exception();
+        } else if (reqMetadata.epoch() == ShareFetchMetadata.FINAL_EPOCH) {
+            ShareSessionKey key = shareSessionKey(groupId, 
reqMetadata.memberId());
+            if (cache.remove(key) != null) {
+                log.debug("Removed share session with key " + key);
+            }

Review Comment:
   Could we simplify the above code as the following?
   
   ```
               if (cache.remove(key) == null) {
                   log.error("Share session error for {}: no such share session 
found", key);
                   throw Errors.SHARE_SESSION_NOT_FOUND.exception();
               } else {
                   log.debug("Removed share session with key " + key);
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to