junrao commented on code in PR #16792: URL: https://github.com/apache/kafka/pull/16792#discussion_r1710001661
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4367,10 +4443,49 @@ class KafkaApis(val requestChannel: RequestChannel, CompletableFuture.completedFuture[Unit](()) } - private def getAcknowledgeBatchesFromShareFetchRequest(shareFetchRequest: ShareFetchRequest, - topicIdNames: util.Map[Uuid, String], - erroneous: mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData] - ): mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + // Visible for Testing + def getAcknowledgeBatchesFromShareAcknowledgeRequest(shareAcknowledgeRequest: ShareAcknowledgeRequest, + topicIdNames: util.Map[Uuid, String], + erroneous: mutable.Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData] + ): mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]] = { + val acknowledgeBatchesMap = mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]]() + shareAcknowledgeRequest.data().topics().forEach{ topic => + if (!topicIdNames.containsKey(topic.topicId)) { + topic.partitions.forEach{ case partition: ShareAcknowledgeRequestData.AcknowledgePartition => + val topicIdPartition = new TopicIdPartition( + topic.topicId, + new TopicPartition(null, partition.partitionIndex)) + erroneous += + topicIdPartition -> ShareAcknowledgeResponse.partitionResponse(topicIdPartition, Errors.UNKNOWN_TOPIC_ID) + } + } else { + topic.partitions().forEach{ partition => + if (partition.acknowledgementBatches().size() > 0) { Review Comment: As Apoorv mentioned earlier, by excluding partitions with empty ack batches, we won't include that partition in the response, which is incorrect, right? ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -4384,26 +4499,58 @@ class KafkaApis(val requestChannel: RequestChannel, } } else { topic.partitions().forEach { partition => - val topicIdPartition = new TopicIdPartition( - topic.topicId(), - new TopicPartition(topicIdNames.get(topic.topicId()), partition.partitionIndex()) - ) - val acknowledgeBatches = new util.ArrayList[ShareAcknowledgementBatch]() - partition.acknowledgementBatches().forEach{ batch => - acknowledgeBatches.add(new ShareAcknowledgementBatch( - batch.firstOffset(), - batch.lastOffset(), - batch.acknowledgeTypes() - )) + if (partition.acknowledgementBatches().size() > 0) { + val topicIdPartition = new TopicIdPartition( + topic.topicId(), + new TopicPartition(topicIdNames.get(topic.topicId()), partition.partitionIndex()) + ) + val acknowledgeBatches = new util.ArrayList[ShareAcknowledgementBatch]() + partition.acknowledgementBatches().forEach{ batch => + acknowledgeBatches.add(new ShareAcknowledgementBatch( + batch.firstOffset(), + batch.lastOffset(), + batch.acknowledgeTypes() + )) + } + acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches } - acknowledgeBatchesMap += topicIdPartition -> acknowledgeBatches } } } acknowledgeBatchesMap } - private def validateAcknowledgementBatches(acknowledgementDataFromRequest: mutable.Map[TopicIdPartition, util.List[ShareAcknowledgementBatch]], + // the callback for processing a share acknowledge response, invoked before throttling + def processShareAcknowledgeResponse(responseAcknowledgeData: Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData], + request: RequestChannel.Request, + topicNames: util.Map[Uuid, String]): ShareAcknowledgeResponse = { Review Comment: topicNames is unused. ########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -430,6 +430,41 @@ public ShareFetchContext newContext(String groupId, Map<TopicIdPartition, ShareF return context; } + /** + * The acknowledgeSessionUpdate method is used to update the request epoch and lastUsed time of the share session. + * @param groupId The group id in the share fetch request. + * @param reqMetadata The metadata in the share acknowledge request. + */ + public void acknowledgeSessionUpdate(String groupId, ShareFetchMetadata reqMetadata) { + if (reqMetadata.epoch() == ShareFetchMetadata.INITIAL_EPOCH) { + // ShareAcknowledge Request cannot have epoch as INITIAL_EPOCH (0) + throw Errors.INVALID_SHARE_SESSION_EPOCH.exception(); + } else if (reqMetadata.epoch() == ShareFetchMetadata.FINAL_EPOCH) { + ShareSessionKey key = shareSessionKey(groupId, reqMetadata.memberId()); + if (cache.remove(key) != null) { + log.debug("Removed share session with key " + key); + } Review Comment: Could we simplify the above code as the following? ``` if (cache.remove(key) == null) { log.error("Share session error for {}: no such share session found", key); throw Errors.SHARE_SESSION_NOT_FOUND.exception(); } else { log.debug("Removed share session with key " + key); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org