chirag-wadhwa5 commented on code in PR #19701:
URL: https://github.com/apache/kafka/pull/19701#discussion_r2088357326


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), 
any()))
+      .thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception)
+
+    when(sharePartitionManager.createIdleShareFetchTimerTask(any()))
+      .thenAnswer(_ => CompletableFuture.runAsync(() => {}))
+
+    val shareFetchRequestData = new ShareFetchRequestData().
+      setGroupId(groupId).
+      setMemberId(memberId.toString).
+      setShareSessionEpoch(0).
+      setTopics(util.List.of(new ShareFetchRequestData.FetchTopic().
+        setTopicId(topicId).
+        setPartitions(util.List.of(
+          new ShareFetchRequestData.FetchPartition()
+            .setPartitionIndex(0)))))
+
+    val shareFetchRequest = new 
ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion)
+    val request = buildRequest(shareFetchRequest)
+    kafkaApis = createKafkaApis()
+    kafkaApis.handleShareFetchRequest(request)
+    val response = verifyNoThrottling[ShareFetchResponse](request)
+    val responseData = response.data()
+
+    assertEquals(Errors.SHARE_SESSION_LIMIT_REACHED.code, 
responseData.errorCode)

Review Comment:
   Thanks for the review. I have made some edits to the test, which actually 
makes sure that the response is not returned immediately. Also, the waiting 
behaviour is tested in SharePartitionManagerTest.testCreateIdleShareFetchTask



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging {
     assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, 
responseData.errorCode)
   }
 
+  @Test
+  def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    metadataCache = initializeMetadataCacheWithShareGroupsEnabled()
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId: Uuid = Uuid.ZERO_UUID
+
+    val groupId = "group"
+
+    when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), 
any()))
+      .thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception)
+
+    when(sharePartitionManager.createIdleShareFetchTimerTask(any()))
+      .thenAnswer(_ => CompletableFuture.runAsync(() => {}))

Review Comment:
   Thanks for the review. I have made edits to the test, removing runAsync 
altogether



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to