adixitconfluent commented on code in PR #16969:
URL: https://github.com/apache/kafka/pull/16969#discussion_r1753171734


##########
core/src/test/java/kafka/server/share/SharePartitionManagerTest.java:
##########
@@ -1750,159 +1647,439 @@ public void testAcknowledgeEmptyPartitionCacheMap() {
     }
 
     @Test
-    public void testProcessFetchResponseWithLsoMovementForTopicPartition() {
+    public void testFetchQueueProcessingWhenFrontItemIsEmpty() {
         String groupId = "grp";
-        Uuid fooId = Uuid.randomUuid();
-        TopicIdPartition tp0 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 0));
-        TopicIdPartition tp1 = new TopicIdPartition(fooId, new 
TopicPartition("foo", 1));
-
+        String memberId = Uuid.randomUuid().toString();
+        FetchParams fetchParams = new 
FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
+            1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty());
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
         Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
         partitionMaxBytes.put(tp0, PARTITION_MAX_BYTES);
+
+        final Time time = new MockTime();
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData1 
= new SharePartitionManager.ShareFetchPartitionData(
+                fetchParams, groupId, memberId, new CompletableFuture<>(), 
Collections.emptyMap());
+        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData2 
= new SharePartitionManager.ShareFetchPartitionData(
+            fetchParams, groupId, memberId, new CompletableFuture<>(), 
partitionMaxBytes);
+
+        ConcurrentLinkedQueue<SharePartitionManager.ShareFetchPartitionData> 
fetchQueue = new ConcurrentLinkedQueue<>();
+        // First request added to fetch queue is empty i.e. no topic 
partitions to fetch.
+        fetchQueue.add(shareFetchPartitionData1);
+        // Second request added to fetch queue has a topic partition to fetch.
+        fetchQueue.add(shareFetchPartitionData2);
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+                "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+
+        SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
+            .withReplicaManager(replicaManager)
+            .withTime(time)
+            .withTimer(mockTimer)
+            .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
+            .withFetchQueue(fetchQueue).build();
+
+        doAnswer(invocation -> {
+            sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, 
partitionMaxBytes.keySet());
+            return buildLogReadResult(partitionMaxBytes.keySet());
+        }).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        sharePartitionManager.maybeProcessFetchQueue();
+
+        // Verifying that the second item in the fetchQueue is processed, even 
though the first item is empty.
+        verify(replicaManager, times(1)).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+    }
+
+    @Test
+    public void testAcknowledgeCompletesDelayedShareFetchRequest() {
+        String groupId = "grp";
+        String memberId = Uuid.randomUuid().toString();
+
+        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo1", 0));
+        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo2", 0));
+
+        Map<TopicIdPartition, Integer> partitionMaxBytes = new HashMap<>();
         partitionMaxBytes.put(tp1, PARTITION_MAX_BYTES);
+        partitionMaxBytes.put(tp2, PARTITION_MAX_BYTES);
+
+        SharePartition sp1 = mock(SharePartition.class);
+        SharePartition sp2 = mock(SharePartition.class);
 
-        ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
-        SharePartition sp0 = Mockito.mock(SharePartition.class);
-        SharePartition sp1 = Mockito.mock(SharePartition.class);
+        // mocked share partitions sp1 and sp2 can be acquired once there is 
an acknowledgement for it.
+        doAnswer(invocation -> {
+            when(sp1.canAcquireRecords()).thenReturn(true);
+            return CompletableFuture.completedFuture(Optional.empty());
+        }).when(sp1).acknowledge(ArgumentMatchers.eq(memberId), any());
+        doAnswer(invocation -> {
+            when(sp2.canAcquireRecords()).thenReturn(true);
+            return CompletableFuture.completedFuture(Optional.empty());
+        }).when(sp2).acknowledge(ArgumentMatchers.eq(memberId), any());
 
-        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new ConcurrentHashMap<>();
-        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp0), sp0);
+        Map<SharePartitionManager.SharePartitionKey, SharePartition> 
partitionCacheMap = new HashMap<>();
         partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp1), sp1);
+        partitionCacheMap.put(new 
SharePartitionManager.SharePartitionKey(groupId, tp2), sp2);
+
+        SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData 
= new SharePartitionManager.ShareFetchPartitionData(
+                new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), 
FetchRequest.ORDINARY_CONSUMER_ID, -1, DELAYED_SHARE_FETCH_MAX_WAIT_MS,
+                        1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, 
Optional.empty()),
+                groupId,
+                Uuid.randomUuid().toString(),
+                new CompletableFuture<>(),
+                partitionMaxBytes);
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+        DelayedOperationPurgatory<DelayedShareFetch> 
delayedShareFetchPurgatory = new DelayedOperationPurgatory<>(
+                "TestShareFetch", mockTimer, replicaManager.localBrokerId(),
+                DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true);
+
+        // Initially you cannot acquire records for both sp1 and sp2.
+        when(sp1.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp1.canAcquireRecords()).thenReturn(false);
+        when(sp2.maybeAcquireFetchLock()).thenReturn(true);
+        when(sp2.canAcquireRecords()).thenReturn(false);
+
+        Set<Object> delayedShareFetchWatchKeys = new HashSet<>();
+        partitionMaxBytes.keySet().forEach(topicIdPartition -> 
delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, 
topicIdPartition)));
+
+        DelayedShareFetch delayedShareFetch = 
DelayedShareFetchTest.DelayedShareFetchBuilder.builder()
+                .withShareFetchPartitionData(shareFetchPartitionData)
+                .withReplicaManager(replicaManager)
+                .withPartitionCacheMap(partitionCacheMap)
+                .build();
+
+        delayedShareFetchPurgatory.tryCompleteElseWatch(
+            delayedShareFetch, 
CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq());
+
+        // Since acquisition lock for sp1 and sp2 cannot be acquired, we 
should have 2 watched keys.
+        assertEquals(2, delayedShareFetchPurgatory.watched());
 
         SharePartitionManager sharePartitionManager = 
SharePartitionManagerBuilder.builder()
-                
.withPartitionCacheMap(partitionCacheMap).withReplicaManager(replicaManager).build();
+            .withPartitionCacheMap(partitionCacheMap)
+            .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory)
+            .withReplicaManager(replicaManager)
+            .withTimer(mockTimer)
+            .build();
+
+        doAnswer(invocation -> {
+            sharePartitionManager.releaseFetchQueueAndPartitionsLock(groupId, 
partitionMaxBytes.keySet());

Review Comment:
   yes, thanks for pointing it our. Earlier, we had not put mocks in some test 
cases, due to which it was required. But now we are using mocks in all the 
required tests. Removed all instances of 
`sharePartitionManager.releaseFetchQueueAndPartitionsLock` in the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to