chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1697988104


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4406,6 +4414,2341 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  @Test
+  def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val shareSessionEpoch = 0
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+                  new ShareFetchResponseData.PartitionData()
+                    .setErrorCode(Errors.NONE.code)
+                    .setAcknowledgeErrorCode(Errors.NONE.code)
+                    .setRecords(records)
+                    .setAcquiredRecords(new util.ArrayList(List(
+                      new ShareFetchResponseData.AcquiredRecords()
+                        .setFirstOffset(0)
+                        .setLastOffset(9)
+                        .setDeliveryCount(1)
+                      ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 
shareSessionEpoch), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(shareSessionEpoch).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestOnInitialEpoch() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+    val partitionIndex = 0
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)), new ShareFetchRequest.SharePartitionData(topicId, 
partitionMaxBytes), false))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 0), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 1
+    )))
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            setAcknowledgementBatches(List(
+            new AcknowledgementBatch()
+              .setFirstOffset(0)
+              .setLastOffset(9)
+              .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+          ).asJava)
+        ).asJava)
+      ).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+
+    assertEquals(Errors.INVALID_REQUEST.code, responseData.errorCode)
+
+    // Testing whether the subsequent request with the incremented share 
session epoch works or not.
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+        ).asJava)
+      ).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchRequestInvalidRequestOnFinalEpoch() : Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    val records = memoryRecords(10, 0)
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenThrow(Errors.INVALID_REQUEST.exception)
+
+    when(sharePartitionManager.releaseAcquiredRecords(any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+      ).asJava)
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)).asJava)).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(-1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(partitionMaxBytes)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)

Review Comment:
   Thanks for the review. The logic dictates, that a final share fetch request 
should only be sent for acknowledging previously fetched records, and not for 
fetching new records (the new records wouldn't be acknowledged since this is 
the final request). So, if the the partition Max Bytes field is non zero for 
any share partition in the request, that mens the client expects records as a 
result, which should not be the case, since it is a final fetch request. Hence 
this is an invalid request



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to