chirag-wadhwa5 commented on code in PR #16792: URL: https://github.com/apache/kafka/pull/16792#discussion_r1708899048
########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6548,14 +6522,1796 @@ class KafkaApisTest extends Logging { } private def compareResponsePartitionsFetchError( - expPartitionIndex: Int, - expErrorCode: Short, - partitionData: PartitionData - ): Unit = { + expPartitionIndex: Int, + expErrorCode: Short, + partitionData: PartitionData + ): Unit = { assertEquals(expPartitionIndex, partitionData.partitionIndex) assertEquals(expErrorCode, partitionData.errorCode) } + @Test + def testHandleShareFetchRequestSuccessWithAcknowledgements(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId: Uuid = Uuid.randomUuid() + + val records1 = memoryRecords(10, 0) + val records2 = memoryRecords(10, 10) + + val groupId = "group" + + when(sharePartitionManager.fetchMessages(any(), any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records1) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(0) + .setLastOffset(9) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareFetchResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchResponseData.PartitionData() + .setErrorCode(Errors.NONE.code) + .setAcknowledgeErrorCode(Errors.NONE.code) + .setRecords(records2) + .setAcquiredRecords(new util.ArrayList(List( + new ShareFetchResponseData.AcquiredRecords() + .setFirstOffset(10) + .setLastOffset(19) + .setDeliveryCount(1) + ).asJava)) + ).asJava) + ) + + val cachedSharePartitions = new ImplicitLinkedHashCollection[CachedSharePartition] + cachedSharePartitions.mustAdd(new CachedSharePartition( + new TopicIdPartition(topicId, new TopicPartition(topicName, 0)), new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes), false + )) + + when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn( + new ShareSessionContext(new ShareFetchMetadata(memberId, 0), Map( + new TopicIdPartition(topicId, new TopicPartition(topicName, partitionIndex)) -> + new ShareFetchRequest.SharePartitionData(topicId, partitionMaxBytes) + ).asJava) + ).thenReturn(new ShareSessionContext(new ShareFetchMetadata(memberId, 1), new ShareSession( + new ShareSessionKey(groupId, memberId), cachedSharePartitions, 0L, 0L, 2)) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, 0)) -> + new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + ).asJava) + ) + + var shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(0). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(40000)).asJava)).asJava) + + var shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + var request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + var response = verifyNoThrottling[ShareFetchResponse](request) + var responseData = response.data() + var topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(records1, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(0, 9, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + + shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(40000) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + request = buildRequest(shareFetchRequest) + kafkaApis.handleShareFetchRequest(request) + response = verifyNoThrottling[ShareFetchResponse](request) + responseData = response.data() + topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).acknowledgeErrorCode) + assertEquals(records2, topicResponses.get(0).partitions.get(0).records) + assertArrayEquals(expectedAcquiredRecords(10, 19, 1).toArray(), topicResponses.get(0).partitions.get(0).acquiredRecords.toArray()) + } + + @Test + def testHandleShareFetchNewGroupCoordinatorDisabled(): Unit = { + val topicId = Uuid.randomUuid() + val memberId: Uuid = Uuid.randomUuid() + val groupId = "group" + + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(40000) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.UNSUPPORTED_VERSION.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchShareGroupDisabled(): Unit = { + val topicId = Uuid.randomUuid() + val memberId: Uuid = Uuid.randomUuid() + val groupId = "group" + + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setPartitionMaxBytes(40000) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "false"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.UNSUPPORTED_VERSION.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestGroupAuthorizationError(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId: Uuid = Uuid.ZERO_UUID + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId("group"). + setMemberId(memberId.toString). + setShareSessionEpoch(1). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(partitionIndex) + .setPartitionMaxBytes(40000) + .setAcknowledgementBatches(List( + new ShareFetchRequestData.AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava) + ).asJava) + + val authorizer: Authorizer = mock(classOf[Authorizer]) + when(authorizer.authorize(any(), any())).thenReturn(List[AuthorizationResult]( + AuthorizationResult.DENIED + ).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData) + .build(ApiKeys.SHARE_ACKNOWLEDGE.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + authorizer = Option(authorizer), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + + assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, responseData.errorCode) + } + + @Test + def testHandleShareFetchRequestReleaseAcquiredRecordsThrowError(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId: Uuid = Uuid.ZERO_UUID + + val groupId = "group" + + when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, 0)) -> + new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + ).asJava) + ) + + when(sharePartitionManager.newContext(any(), any(), any(), any(), any())).thenReturn( + new FinalContext() + ) + + when(sharePartitionManager.releaseAcquiredRecords(any(), any())).thenReturn( + FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()) + ) + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + val shareFetchRequestData = new ShareFetchRequestData(). + setGroupId(groupId). + setMemberId(memberId.toString). + setShareSessionEpoch(ShareFetchMetadata.FINAL_EPOCH). + setTopics(List(new ShareFetchRequestData.FetchTopic(). + setTopicId(topicId). + setPartitions(List( + new ShareFetchRequestData.FetchPartition() + .setPartitionIndex(0) + .setAcknowledgementBatches(List( + new AcknowledgementBatch() + .setFirstOffset(0) + .setLastOffset(9) + .setAcknowledgeTypes(Collections.singletonList(1.toByte)) + ).asJava) + ).asJava)).asJava) + + val shareFetchRequest = new ShareFetchRequest.Builder(shareFetchRequestData).build(ApiKeys.SHARE_FETCH.latestVersion) + val request = buildRequest(shareFetchRequest) + kafkaApis = createKafkaApis( + overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "true", + ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG -> "true", + ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true"), + raftSupport = true) + kafkaApis.handleShareFetchRequest(request) + val response = verifyNoThrottling[ShareFetchResponse](request) + val responseData = response.data() + val topicResponses = responseData.responses() + + assertEquals(Errors.NONE.code, responseData.errorCode) + assertEquals(1, topicResponses.size()) + assertEquals(topicId, topicResponses.get(0).topicId) + assertEquals(1, topicResponses.get(0).partitions.size()) + assertEquals(partitionIndex, topicResponses.get(0).partitions.get(0).partitionIndex) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).errorCode) + assertEquals(Errors.NONE.code, topicResponses.get(0).partitions.get(0).acknowledgeErrorCode) + assertNull(topicResponses.get(0).partitions.get(0).records) + assertEquals(0, topicResponses.get(0).partitions.get(0).acquiredRecords.toArray().size) + } + + @Test + def testHandleShareAcknowledgeRequestSuccess(): Unit = { + val topicName = "foo" + val topicId = Uuid.randomUuid() + val partitionIndex = 0 + metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) + addTopicToMetadataCache(topicName, 1, topicId = topicId) + val memberId: Uuid = Uuid.randomUuid() + + val groupId = "group" + + when(clientQuotaManager.maybeRecordAndGetThrottleTimeMs( + any[RequestChannel.Request](), anyDouble, anyLong)).thenReturn(0) + + when(sharePartitionManager.acknowledge(any(), any(), any())).thenReturn( + CompletableFuture.completedFuture(Map[TopicIdPartition, ShareAcknowledgeResponseData.PartitionData]( + new TopicIdPartition(topicId, new TopicPartition(topicName, 0)) -> + new ShareAcknowledgeResponseData.PartitionData() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code) + ).asJava) + ) + + doNothing().when(sharePartitionManager).acknowledgeSessionUpdate(any(), any()) + + when(sharePartitionManager.releaseAcquiredRecords(any(), any())).thenReturn( Review Comment: Thanks for the review. We don't need this here. I have removed this piece of code now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org