chirag-wadhwa5 commented on code in PR #19701: URL: https://github.com/apache/kafka/pull/19701#discussion_r2088357326
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging { assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, responseData.errorCode) } + @Test + def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + metadataCache = initializeMetadataCacheWithShareGroupsEnabled() + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId: Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())) + .thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception) + + when(sharePartitionManager.createIdleShareFetchTimerTask(any())) + .thenAnswer(_ => CompletableFuture.runAsync(() => {})) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(util.List.of(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(util.List.of( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0))))) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis() + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.SHARE_SESSION_LIMIT_REACHED.code, responseData.errorCode) Review Comment: Thanks for the review. I have made some edits to the test, which actually makes sure that the response is not returned immediately. Also, the waiting behaviour is tested in SharePartitionManagerTest.testCreateIdleShareFetchTask ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -4824,6 +4824,42 @@ class KafkaApisTest extends Logging { assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.code, responseData.errorCode) } + @Test + def testHandleShareFetchRequestWhenShareSessionCacheIsFull(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + metadataCache = initializeMetadataCacheWithShareGroupsEnabled() + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId: Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(sharePartitionManager.newContext(any(), any(), any(), any(), any(), any())) + .thenThrow(Errors.SHARE_SESSION_LIMIT_REACHED.exception) + + when(sharePartitionManager.createIdleShareFetchTimerTask(any())) + .thenAnswer(_ => CompletableFuture.runAsync(() => {})) Review Comment: Thanks for the review. I have made edits to the test, removing runAsync altogether -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org