adixitconfluent commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1753171734
########## core/src/test/java/kafka/server/share/SharePartitionManagerTest.java: ########## @@ -1750,159 +1647,439 @@ public void testAcknowledgeEmptyPartitionCacheMap() { } @Test - public void testProcessFetchResponseWithLsoMovementForTopicPartition() { + public void testFetchQueueProcessingWhenFrontItemIsEmpty() { String groupId = "grp"; - Uuid fooId = Uuid.randomUuid(); - TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0)); - TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1)); - + String memberId = Uuid.randomUuid().toString(); + FetchParams fetchParams = new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>(); partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES); + + final Time time = new MockTime(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1 = new SharePartitionManager.ShareFetchPartitionData( + fetchParams, groupId, memberId, new CompletableFuture<>(), Collections.emptyMap()); + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 = new SharePartitionManager.ShareFetchPartitionData( + fetchParams, groupId, memberId, new CompletableFuture<>(), partitionMaxBytes); + + ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> fetchQueue = new ConcurrentLinkedQueue<>(); + // First request added to fetch queue is empty i.e. no topic partitions to fetch. + fetchQueue.add(shareFetchPartitionData1); + // Second request added to fetch queue has a topic partition to fetch. + fetchQueue.add(shareFetchPartitionData2); + + DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() + .withReplicaManager(replicaManager) + .withTime(time) + .withTimer(mockTimer) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withFetchQueue(fetchQueue).build(); + + doAnswer(invocation -> { + sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); + return buildLogReadResult(partitionMaxBytes.keySet()); + }).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + sharePartitionManager.maybeProcessFetchQueue(); + + // Verifying that the second item in the fetchQueue is processed, even though the first item is empty. + verify(replicaManager, times(1)).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + } + + @Test + public void testAcknowledgeCompletesDelayedShareFetchRequest() { + String groupId = "grp"; + String memberId = Uuid.randomUuid().toString(); + + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0)); + + Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>(); partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES); + + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); - ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class); - SharePartition sp0 = Mockito.mock(SharePartition.class); - SharePartition sp1 = Mockito.mock(SharePartition.class); + // mocked share partitions sp1 and sp2 can be acquired once there is an acknowledgement for it. + doAnswer(invocation -> { + when(sp1.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp1).acknowledge(ArgumentMatchers.eq(memberId), any()); + doAnswer(invocation -> { + when(sp2.canAcquireRecords()).thenReturn(true); + return CompletableFuture.completedFuture(Optional.empty()); + }).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any()); - Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new ConcurrentHashMap<>(); - partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp0), sp0); + Map<SharePartitionManager.SharePartitionKey, SharePartition> partitionCacheMap = new HashMap<>(); partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionManager.SharePartitionKey(groupId, tp2), sp2); + + SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData = new SharePartitionManager.ShareFetchPartitionData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), + groupId, + Uuid.randomUuid().toString(), + new CompletableFuture<>(), + partitionMaxBytes); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + // Initially you cannot acquire records for both sp1 and sp2. + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(false); + + Set<Object> delayedShareFetchWatchKeys = new HashSet<>(); + partitionMaxBytes.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + + DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchPartitionData(shareFetchPartitionData) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + delayedShareFetchPurgatory.tryCompleteElseWatch( + delayedShareFetch, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); + + // Since acquisition lock for sp1 and sp2 cannot be acquired, we should have 2 watched keys. + assertEquals(2, delayedShareFetchPurgatory.watched()); SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder() - .withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build(); + .withPartitionCacheMap(partitionCacheMap) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .withReplicaManager(replicaManager) + .withTimer(mockTimer) + .build(); + + doAnswer(invocation -> { + sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, partitionMaxBytes.keySet()); Review Comment: yes, thanks for pointing it our. Earlier, we had not put mocks in some test cases, due to which it was required. But now we are using mocks in all the required tests. Removed all instances of `sharePartitionManager.releaseFetchQueueAndPartitionsLock` in the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org