chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1696558877


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3955,11 +3948,484 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  /**
+   * Handle a shareFetch request
+   */
   def handleShareFetchRequest(request: RequestChannel.Request): Unit = {
     val shareFetchRequest = request.body[ShareFetchRequest]
-    // TODO: Implement the ShareFetchRequest handling
-    requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      info("new Group Coordinator is not enabled on the broker")
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      return
+    } else if (!config.isShareGroupEnabled) {
+      // The API is not supported when the "share" rebalance protocol has not 
been set explicitly.
+      info("Share Group is not enabled on the broker")
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      return
+    }
+    val sharePartitionManager : SharePartitionManager = 
this.sharePartitionManager match {
+      case Some(manager) => manager
+      case None =>
+        // The API is not supported when the SharePartitionManager is not 
defined on the broker
+        info("SharePartitionManager not defined on the broker")
+        requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+        return
+    }
+
+    val groupId = shareFetchRequest.data.groupId
+    val memberId = shareFetchRequest.data.memberId
+    val shareSessionEpoch = shareFetchRequest.data.shareSessionEpoch
+
+    def isAcknowledgeDataPresentInFetchRequest() : Boolean = {
+      var isAcknowledgeDataPresent = false
+      shareFetchRequest.data.topics.forEach ( topic => {
+        breakable{
+          topic.partitions.forEach ( partition => {
+            if (partition.acknowledgementBatches != null && 
!partition.acknowledgementBatches.isEmpty) {
+              isAcknowledgeDataPresent = true
+              break
+            } else {
+              isAcknowledgeDataPresent = false
+            }
+          })
+        }
+      })
+      isAcknowledgeDataPresent
+    }
+
+    val isAcknowledgeDataPresent = isAcknowledgeDataPresentInFetchRequest()
+
+    def isInvalidShareFetchRequest() : Boolean = {
+      // The Initial Share Fetch Request should not Acknowledge any data.
+      if (shareSessionEpoch == ShareFetchMetadata.INITIAL_EPOCH && 
isAcknowledgeDataPresent) {
+        return true
+      }
+      false
+    }
+
+    val topicIdNames = metadataCache.topicIdsToNames()
+    val shareFetchData = shareFetchRequest.shareFetchData(topicIdNames)
+    val forgottenTopics = shareFetchRequest.forgottenTopics(topicIdNames)
+
+    val newReqMetadata : ShareFetchMetadata = new 
ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch)
+    var shareFetchContext : ShareFetchContext = null
+
+    var shareFetchResponse : ShareFetchResponse = null
+
+    // check if the Request is Invalid. If it is, the request is failed 
directly here.
+    if(isInvalidShareFetchRequest()) {
+      requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, 
Errors.INVALID_REQUEST.exception))
+      return
+    }
+
+    try {
+      // Creating the shareFetchContext for Share Session Handling. if context 
creation fails, the request is failed directly here.
+      shareFetchContext = sharePartitionManager.newContext(groupId, 
shareFetchData, forgottenTopics, newReqMetadata)
+    } catch {
+      case e: Exception => requestHelper.sendMaybeThrottle(request, 
shareFetchRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e))
+        return
+    }
+
+    // Variable to store any error thrown while the handling piggybacked 
acknowledgements.
+    var acknowledgeError : Errors = Errors.NONE
+    // Variable to store the topic partition wise result of piggybacked 
acknowledgements.
+    var acknowledgeResult = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+
+    val erroneousAndValidPartitionData : ErroneousAndValidPartitionData = 
shareFetchContext.getErroneousAndValidTopicIdPartitions
+    val topicIdPartitionSeq : mutable.Set[TopicIdPartition] = mutable.Set()
+    erroneousAndValidPartitionData.erroneous.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+    erroneousAndValidPartitionData.validTopicIdPartitions.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+    shareFetchData.forEach {
+      case(tp, _) => if (!topicIdPartitionSeq.contains(tp)) 
topicIdPartitionSeq += tp
+    }
+
+    val authorizedTopics = authHelper.filterByAuthorized(
+      request.context,
+      READ,
+      TOPIC,
+      topicIdPartitionSeq
+    )(_.topicPartition.topic)
+
+    // Handling the Acknowledgements from the ShareFetchRequest If this check 
is true, we are sure that this is not an
+    // Initial ShareFetch Request, otherwise the request would have been 
invalid.
+    if(isAcknowledgeDataPresent) {
+      if (!authHelper.authorize(request.context, READ, GROUP, groupId)) {
+        acknowledgeError = Errors.GROUP_AUTHORIZATION_FAILED
+      } else {
+        val erroneous = mutable.Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData]()
+        val acknowledgementDataFromRequest = 
getAcknowledgeBatchesFromShareFetchRequest(request.body[ShareFetchRequest], 
topicIdNames, erroneous)
+        try {
+          acknowledgeResult = handleAcknowledgements(
+            acknowledgementDataFromRequest,
+            erroneous,
+            topicIdNames,
+            sharePartitionManager,
+            authorizedTopics,
+            groupId,
+            memberId,
+            request.header.clientId
+          )
+        } catch {
+          case throwable: Throwable =>
+            debug(s"Acknowledgements for Share fetch request with correlation 
from client ${request.header.clientId}  " +
+              s"failed with error ${throwable.getMessage}")
+            requestHelper.handleError(request, throwable)
+            return
+        }
+      }
+    }
+
+    // Handling the Fetch from the ShareFetchRequest.
+    try {
+      shareFetchResponse = handleFetchFromShareFetchRequest(
+        request,
+        erroneousAndValidPartitionData,
+        topicIdNames,
+        sharePartitionManager,
+        shareFetchContext,
+        authorizedTopics
+      )
+    } catch {
+      case throwable : Throwable =>
+        debug(s"Share fetch request with correlation from client 
${request.header.clientId}  " +
+          s"failed with error ${throwable.getMessage}")
+        requestHelper.handleError(request, throwable)
+        return
+    }
+
+    def combineShareFetchAndShareAcknowledgeResponses(
+                                                       shareFetchResponse: 
ShareFetchResponse,
+                                                       acknowledgeResult : 
mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData],
+                                                       acknowledgeError : 
Errors
+                                                     ) : ShareFetchResponse = {
+
+      // The outer map has topicId as the key and the inner map has 
partitionIndex as the key.
+      val topicPartitionAcknowledgements : mutable.Map[Uuid, mutable.Map[Int, 
Short]] = mutable.Map()
+      if(acknowledgeResult != null && acknowledgeResult.nonEmpty) {
+        acknowledgeResult.foreach { case (tp, partitionData) =>
+          topicPartitionAcknowledgements.get(tp.topicId) match {
+            case Some(subMap) =>
+              subMap += tp.partition -> partitionData.errorCode
+            case None =>
+              val partitionAcknowledgementsMap : mutable.Map[Int, Short] = 
mutable.Map()
+              partitionAcknowledgementsMap += tp.partition -> 
partitionData.errorCode
+              topicPartitionAcknowledgements += tp.topicId -> 
partitionAcknowledgementsMap
+          }
+        }
+      }
+
+      shareFetchResponse.data.responses.forEach(topic => {
+        val topicId = topic.topicId
+        topicPartitionAcknowledgements.get(topicId) match {
+          case Some(subMap) =>
+            topic.partitions.forEach { partition =>
+              subMap.get(partition.partitionIndex) match {
+                case Some(value) =>
+                  val ackErrorCode = if(acknowledgeError.code != 
Errors.NONE.code) acknowledgeError.code else value
+                  partition.setAcknowledgeErrorCode(ackErrorCode)
+                  // Delete the element.
+                  subMap.remove(partition.partitionIndex)
+                case None =>
+              }
+            }
+            // Add the remaining acknowledgements.
+            subMap.foreach { case (partitionIndex, value) =>
+              val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) 
acknowledgeError.code else value
+              val fetchPartitionData = new 
ShareFetchResponseData.PartitionData()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(Errors.NONE.code)
+                .setAcknowledgeErrorCode(ackErrorCode)
+              topic.partitions.add(fetchPartitionData)
+            }
+            topicPartitionAcknowledgements.remove(topicId)
+          case None =>
+        }
+      })
+      // Add the remaining acknowledgements.
+      topicPartitionAcknowledgements.foreach{ case(topicId, subMap) =>
+        val topicData = new 
ShareFetchResponseData.ShareFetchableTopicResponse()
+          .setTopicId(topicId)
+        subMap.foreach { case (partitionIndex, value) =>
+          val ackErrorCode = if(acknowledgeError.code != Errors.NONE.code) 
acknowledgeError.code else value
+          val fetchPartitionData = new ShareFetchResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(ackErrorCode)
+          topicData.partitions.add(fetchPartitionData)
+        }
+        shareFetchResponse.data.responses.add(topicData)
+      }
+
+      if (shareSessionEpoch == ShareFetchMetadata.FINAL_EPOCH) {
+        sharePartitionManager.releaseAcquiredRecords(groupId, memberId).
+          whenComplete((releaseAcquiredRecordsData, throwable) => {
+            if (throwable != null) {
+              throw throwable
+            } else {
+              info(s"Release acquired records on share session close 
$releaseAcquiredRecordsData succeeded")
+            }
+          }).get()
+      }
+      shareFetchResponse
+    }
+
+    // Send the response immediately.
+    try {
+      requestChannel.sendResponse(request, 
combineShareFetchAndShareAcknowledgeResponses(shareFetchResponse, 
acknowledgeResult, acknowledgeError), fetchOnComplete(request))

Review Comment:
   Hi, thanks a lot for the review. I guess this comment refers to the code 
before the last commit. I believe all the issues with asynchronous code have 
been resolved with that. Let me know if you find any other gaps. Thanks !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to