apoorvmittal10 commented on code in PR #16792: URL: https://github.com/apache/kafka/pull/16792#discussion_r1707341091
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4367,7 +4443,46 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - private def getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest: ShareFetchRequest, + // Visible for Testing + def getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest : ShareAcknowledgeRequest, + topicIdNames : util.Map[Uuid, String], + erroneous : mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData] + ) : mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() + shareAcknowledgeRequest.data().topics().forEach{ topic => + if(!topicIdNames.containsKey(topic.topicId)) { + topic.partitions.forEach{ case partition: ShareAcknowledgeRequestData.AcknowledgePartition => + val topicIdPartition = new TopicIdPartition( + topic.topicId, + new TopicPartition(null, partition.partitionIndex)) + erroneous += + topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + } + } else { + topic.partitions().forEach{partition => + val topicIdPartition = new TopicIdPartition( + topic.topicId(), + new TopicPartition(topicIdNames.get(topic.topicId()), partition.partitionIndex()) + ) + val acknowledgeBatches = new util.ArrayList[ShareAcknowledgementBatch]() + partition.acknowledgementBatches().forEach{ batch => + acknowledgeBatches.add(new ShareAcknowledgementBatch( + batch.firstOffset(), + batch.lastOffset(), + batch.acknowledgeTypes() + )) + } + if (acknowledgeBatches.size() > 0) { Review Comment: if `acknowledgeBatches.size()` could be 0 then shouldn't that check exist first prior creating objects for `TopicIdPartition`, `TopicPartition`, `new util.ArrayList[ShareAcknowledgementBatch]()`? Also if acknowledgeBatches.size() is 0 for topic partition then where we are filling response for same? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4327,9 +4327,85 @@ class KafkaApis(val requestChannel: RequestChannel, def handleShareAcknowledgeRequest(request: RequestChannel.Request): Unit = { val shareAcknowledgeRequest = request.body[ShareAcknowledgeRequest] - // TODO: Implement the ShareAcknowledgeRequest handling - requestHelper.sendMaybeThrottle(request, shareAcknowledgeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + + if (!isShareGroupProtocolEnabled) { + requestHelper.sendMaybeThrottle(request, + shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) + return + } + + val sharePartitionManagerInstance : SharePartitionManager = sharePartitionManager match { + case Some(manager) => manager + case None => + // The API is not supported when the SharePartitionManager is not defined on the broker + info("Received share acknowledge request for zookeeper based cluster") + requestHelper.sendMaybeThrottle(request, + shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION.exception)) + return + } + val groupId = shareAcknowledgeRequest.data.groupId + + // Share Acknowledge needs permission to perform READ action on the named group resource (groupId) + if (!authHelper.authorize(request.context, READ, GROUP, groupId)) { + requestHelper.sendMaybeThrottle(request, + shareAcknowledgeRequest.getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, Errors.GROUP_AUTHORIZATION_FAILED.exception)) + return + } + + val memberId = shareAcknowledgeRequest.data.memberId + val shareSessionEpoch = shareAcknowledgeRequest.data.shareSessionEpoch + val newReqMetadata : ShareFetchMetadata = new ShareFetchMetadata(Uuid.fromString(memberId), shareSessionEpoch) Review Comment: `ShareFetchMetadata` names seems to be a bit inappropriate as we are handling ShareAcknowledge request. I understand that we need session and epoch related information from the class. Do you think `ShareRequestMetadata` would be a better name for `ShareFetchMetadata`? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4396,14 +4511,70 @@ class KafkaApis(val requestChannel: RequestChannel, batch.acknowledgeTypes() )) } - acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches + if (acknowledgeBatches.size() > 0) { Review Comment: Same as above. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4430,16 +4430,16 @@ class KafkaApisTest extends Logging { when(sharePartitionManager.fetchMessages(any(), any(), any(), any())).thenReturn( CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> - new ShareFetchResponseData.PartitionData() - .setErrorCode(Errors.NONE.code) - .setAcknowledgeErrorCode(Errors.NONE.code) - .setRecords(records) - .setAcquiredRecords(new util.ArrayList(List( - new ShareFetchResponseData.AcquiredRecords() - .setFirstOffset(0) - .setLastOffset(9) - .setDeliveryCount(1) - ).asJava)) + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) Review Comment: Is it an inteded change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org