chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1697502447


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4069,6 +4431,211 @@ class KafkaApis(val requestChannel: RequestChannel,
     CompletableFuture.completedFuture[Unit](())
   }
 
+  private def getAcknowledgeBatchesFromShareFetchRequest(
+                                                  shareFetchRequest : 
ShareFetchRequest,
+                                                  topicIdNames : 
util.Map[Uuid, String],
+                                                  erroneous : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                                ) : 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = {
+
+    val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, 
util.List[ShareAcknowledgementBatch]]()
+    shareFetchRequest.data().topics().forEach ( topic => {
+
+      if(!topicIdNames.containsKey(topic.topicId)) {
+        topic.partitions.forEach((partition: 
ShareFetchRequestData.FetchPartition) => {
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId,
+            new TopicPartition(null, partition.partitionIndex))
+          erroneous +=
+            topicIdPartition -> 
ShareAcknowledgeResponse.partitionResponse(topicIdPartition, 
Errors.UNKNOWN_TOPIC_ID)
+        })
+      }
+      else {
+        topic.partitions().forEach ( partition => {
+          val topicIdPartition = new TopicIdPartition(
+            topic.topicId(),
+            new TopicPartition(topicIdNames.get(topic.topicId()), 
partition.partitionIndex())
+          )
+          val acknowledgeBatches = new 
util.ArrayList[ShareAcknowledgementBatch]()
+          partition.acknowledgementBatches().forEach( batch => {
+              acknowledgeBatches.add(new ShareAcknowledgementBatch(
+                batch.firstOffset(),
+                batch.lastOffset(),
+                batch.acknowledgeTypes()
+              ))
+          })
+          acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches
+        })
+      }
+    })
+    acknowledgeBatchesMap
+  }
+
+  def validateAcknowledgementBatches(
+                                      acknowledgementDataFromRequest: 
mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]],
+                                      erroneous: mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]
+                                    ): mutable.Set[TopicIdPartition] = {
+    val erroneousTopicIdPartitions: mutable.Set[TopicIdPartition] = 
mutable.Set.empty[TopicIdPartition]
+
+    acknowledgementDataFromRequest.foreach { case (tp: TopicIdPartition, 
acknowledgeBatches: util.List[ShareAcknowledgementBatch]) =>
+      var prevEndOffset = -1L
+      var isErroneous = false
+      acknowledgeBatches.forEach { batch =>
+        if (!isErroneous) {
+          if (batch.firstOffset > batch.lastOffset) {
+            erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, 
Errors.INVALID_REQUEST)
+            erroneousTopicIdPartitions.add(tp)
+            isErroneous = true
+          } else if (batch.firstOffset < prevEndOffset) {
+            erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, 
Errors.INVALID_REQUEST)
+            erroneousTopicIdPartitions.add(tp)
+            isErroneous = true
+          } else if (batch.acknowledgeTypes == null || 
batch.acknowledgeTypes.isEmpty) {
+            erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, 
Errors.INVALID_REQUEST)
+            erroneousTopicIdPartitions.add(tp)
+            isErroneous = true
+          } else if (batch.acknowledgeTypes.size() > 1 && batch.lastOffset - 
batch.firstOffset != batch.acknowledgeTypes.size() - 1) {
+            erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, 
Errors.INVALID_REQUEST)
+            erroneousTopicIdPartitions.add(tp)
+            isErroneous = true
+          } else if (batch.acknowledgeTypes.stream().anyMatch(ackType => 
ackType < 0 || ackType > 3)) {
+            erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp, 
Errors.INVALID_REQUEST)
+            erroneousTopicIdPartitions.add(tp)
+            isErroneous = true
+          } else {
+            prevEndOffset = batch.lastOffset
+          }
+        }
+      }
+    }
+
+    erroneousTopicIdPartitions
+  }
+
+  // the callback for processing a share fetch response, invoked before 
throttling.
+  def processShareFetchResponse(
+                                  responsePartitionData: 
mutable.Map[TopicIdPartition, ShareFetchResponseData.PartitionData],
+                                  request: RequestChannel.Request,
+                                  topicIdNames : util.Map[Uuid, String],
+                                  shareFetchContext : ShareFetchContext
+                                ): ShareFetchResponse = {
+
+    val clientId = request.header.clientId
+    val versionId = request.header.apiVersion
+    val shareFetchRequest = request.body[ShareFetchRequest]
+    val groupId = shareFetchRequest.data.groupId
+    val memberId = shareFetchRequest.data.memberId
+
+    val partitions = new util.LinkedHashMap[TopicIdPartition, 
ShareFetchResponseData.PartitionData]
+    val nodeEndpoints = new mutable.HashMap[Int, Node]
+    responsePartitionData.foreach { case(tp, partitionData) =>
+      partitionData.errorCode match {
+        case errCode if errCode == Errors.NOT_LEADER_OR_FOLLOWER.code | 
errCode == Errors.FENCED_LEADER_EPOCH.code =>
+          val leaderNode = getCurrentLeader(tp.topicPartition, 
request.context.listenerName)
+          leaderNode.node.foreach { node =>
+            nodeEndpoints.put(node.id, node)
+          }
+          partitionData.currentLeader
+            .setLeaderId(leaderNode.leaderId)
+            .setLeaderEpoch(leaderNode.leaderEpoch)
+        case _ =>
+      }
+
+      partitions.put(tp, partitionData)

Review Comment:
   Thanks for the review. Yes we could use the same, but the definition of some 
methods of shareFetchContext require a util.LinkedHashMap, so we would anyways 
require a new variable to store the converted map as it is required as an 
argument to multiple methods. Talking about why do we need a util.LinkedHashMap 
altogether, maybe we could change those method signatures to use a scala map as 
well, but I think that would out of scope for this PR as it would include 
making changes to others code as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to