chirag-wadhwa5 commented on code in PR #16792:
URL: https://github.com/apache/kafka/pull/16792#discussion_r1708899048


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6548,14 +6522,1796 @@ class KafkaApisTest extends Logging {
   }
 
   private def compareResponsePartitionsFetchError(
-                                           expPartitionIndex: Int,
-                                           expErrorCode: Short,
-                                           partitionData: PartitionData
-                                         ): Unit = {
+                                                   expPartitionIndex: Int,
+                                                   expErrorCode: Short,
+                                                   partitionData: PartitionData
+                                                 ): Unit = {
     assertEquals(expPartitionIndex, partitionData.partitionIndex)
     assertEquals(expErrorCode, partitionData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestSuccessWithAcknowledgements(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.randomUuid()
+
+    val records1 = memoryRecords(10, 0)
+    val records2 = memoryRecords(10, 10)
+
+    val groupId = "group"
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records1)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    ).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchResponseData.PartitionData()
+            .setErrorCode(Errors.NONE.code)
+            .setAcknowledgeErrorCode(Errors.NONE.code)
+            .setRecords(records2)
+            .setAcquiredRecords(new util.ArrayList(List(
+              new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(10)
+                .setLastOffset(19)
+                .setDeliveryCount(1)
+            ).asJava))
+      ).asJava)
+    )
+
+    val cachedSharePartitions = new 
ImplicitLinkedHashCollection[CachedSharePartition]
+    cachedSharePartitions.mustAdd(new CachedSharePartition(
+      new TopicIdPartition(topicId, new TopicPartition(topicName, 0)), new 
ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false
+    ))
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), 
any())).thenReturn(
+      new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map(
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+          new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes)
+      ).asJava)
+    ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 1), 
new ShareSession(
+      new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 
2))
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 0)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(0)
+            .setErrorCode(Errors.NONE.code),
+      ).asJava)
+    )
+
+    var shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(40000)).asJava)).asJava)
+
+    var shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    var request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    var response = verifyNoThrottling[ShareFetchResponse](request)
+    var responseData = response.data()
+    var topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(records1, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+
+    shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(40000)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    request = buildRequest(shareFetchRequest)
+    kafkaApis.handleShareFetchRequest(request)
+    response = verifyNoThrottling[ShareFetchResponse](request)
+    responseData = response.data()
+    topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).acknowledgeErrorCode)
+    assertEquals(records2, topicResponses.get(0).partitions.get(0).records)
+    assertArrayEquals(expectedAcquiredRecords(10, 19, 1).toArray(), 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray())
+  }
+
+  @Test
+  def testHandleShareFetchNewGroupCoordinatorDisabled(): Unit = {
+    val topicId = Uuid.randomUuid()
+    val memberId: Uuid = Uuid.randomUuid()
+    val groupId = "group"
+
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(40000)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.UNSUPPORTED_VERSION.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchShareGroupDisabled(): Unit = {
+    val topicId = Uuid.randomUuid()
+    val memberId: Uuid = Uuid.randomUuid()
+    val groupId = "group"
+
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setPartitionMaxBytes(40000)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.UNSUPPORTED_VERSION.code, responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestGroupAuthorizationError(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId("group").
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(1).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(partitionIndex)
+            .setPartitionMaxBytes(40000)
+            .setAcknowledgementBatches(List(
+              new ShareFetchRequestData.AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)
+      ).asJava)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    when(authorizer.authorize(any(), 
any())).thenReturn(List[AuthorizationResult](
+      AuthorizationResult.DENIED
+    ).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData)
+      .build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      authorizer = Option(authorizer),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
responseData.errorCode)
+  }
+
+  @Test
+  def testHandleShareFetchRequestReleaseAcquiredRecordsThrowError(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 0)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(0)
+            .setErrorCode(Errors.NONE.code),
+      ).asJava)
+    )
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), 
any())).thenReturn(
+      new FinalContext()
+    )
+
+    when(sharePartitionManager.releaseAcquiredRecords(any(), 
any())).thenReturn(
+      FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())
+    )
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(ShareFetchMetadata.FINAL_EPOCH).
+      setTopics(List(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(List(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)
+            .setAcknowledgementBatches(List(
+              new AcknowledgementBatch()
+                .setFirstOffset(0)
+                .setLastOffset(9)
+                .setAcknowledgeTypes(Collections.singletonList(1.toByte))
+            ).asJava)
+        ).asJava)).asJava)
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis(
+      overrideProperties = Map(
+        GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true",
+        ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true",
+        ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"),
+      raftSupport = true)
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+    val topicResponses = responseData.responses()
+
+    assertEquals(Errors.NONE.code, responseData.errorCode)
+    assertEquals(1, topicResponses.size())
+    assertEquals(topicId, topicResponses.get(0).topicId)
+    assertEquals(1, topicResponses.get(0).partitions.size())
+    assertEquals(partitionIndex, 
topicResponses.get(0).partitions.get(0).partitionIndex)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).errorCode)
+    assertEquals(Errors.NONE.code, 
topicResponses.get(0).partitions.get(0).acknowledgeErrorCode)
+    assertNull(topicResponses.get(0).partitions.get(0).records)
+    assertEquals(0, 
topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().size)
+  }
+
+  @Test
+  def testHandleShareAcknowledgeRequestSuccess(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => 
KRaftVersion.KRAFT_VERSION_0)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.randomUuid()
+
+    val groupId = "group"
+
+    when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+      any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0)
+
+    when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareAcknowledgeResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 0)) ->
+          new ShareAcknowledgeResponseData.PartitionData()
+            .setPartitionIndex(0)
+            .setErrorCode(Errors.NONE.code)
+      ).asJava)
+    )
+
+    doNothing().when(sharePartitionManager).acknowledgeSessionUpdate(any(), 
any())
+
+    when(sharePartitionManager.releaseAcquiredRecords(any(), 
any())).thenReturn(

Review Comment:
   Thanks for the review. We don't need this here. I have removed this piece of 
code now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to