adixitconfluent commented on code in PR #20765:
URL: https://github.com/apache/kafka/pull/20765#discussion_r2464831012


##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -212,32 +210,30 @@ public void 
testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // We are testing the case when the share partition is getting fetched 
for the first time, so for the first time
-        // the fetchOffsetMetadata will return empty. Post the readFromLog 
call, the fetchOffsetMetadata will be
+        // the fetchOffsetMetadata will return empty. Post the 
isMinBytesSatisfied call, the fetchOffsetMetadata will be

Review Comment:
   Why does this comment need a change. I think the comment that makes more 
sense is `Post the first readFromLog call...`



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -2149,14 +2176,6 @@ static void 
mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager repli
         
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
     }
 
-    private void mockTopicIdPartitionFetchBytes(ReplicaManager replicaManager, 
TopicIdPartition topicIdPartition, LogOffsetMetadata hwmOffsetMetadata) {
-        LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, 
mock(LogOffsetMetadata.class),
-            hwmOffsetMetadata, mock(LogOffsetMetadata.class));
-        Partition partition = mock(Partition.class);
-        when(partition.fetchOffsetSnapshot(any(), 
anyBoolean())).thenReturn(endOffsetSnapshot);
-        
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);
-    }
-

Review Comment:
   Removal of this function doesn't make sense to me. I agree that the line 
`when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn(partition);`
 is problematic. We can pass the mock Partition object into this function. 
Example - 
   
   caller function - 
   ```
   Partition p0 = mock(Partition.class);
   mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata, p0);
   when(p0.isLeader()).thenReturn(true);
   
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
   ```
   
   function `mockTopicIdPartitionFetchBytes` - 
   
   ```
   private void mockTopicIdPartitionFetchBytes(LogOffsetMetadata 
hwmOffsetMetadata, Partition partition) {
           LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, 
mock(LogOffsetMetadata.class),
               hwmOffsetMetadata, mock(LogOffsetMetadata.class));
           when(partition.fetchOffsetSnapshot(any(), 
anyBoolean())).thenReturn(endOffsetSnapshot);
   }
   ```
   



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -256,21 +252,26 @@ public void 
testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
         when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
         when(sp1.maybeAcquireFetchLock(fetchId)).thenReturn(true);
 
-        assertFalse(delayedShareFetch.isCompleted());
+        try (MockedStatic<ShareFetchUtils> mockedShareFetchUtils = 
Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) {
+            mockedShareFetchUtils.when(() -> 
ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any()))

Review Comment:
   I don't think we need mocking of `processFetchResponse` since it is not 
being called in this test



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -212,32 +210,30 @@ public void 
testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
 
         when(sp0.canAcquireRecords()).thenReturn(true);
         when(sp1.canAcquireRecords()).thenReturn(false);
-        when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), 
any())).thenReturn(
-            createShareAcquiredRecords(new 
ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
 1)));
 
         // We are testing the case when the share partition is getting fetched 
for the first time, so for the first time
-        // the fetchOffsetMetadata will return empty. Post the readFromLog 
call, the fetchOffsetMetadata will be
+        // the fetchOffsetMetadata will return empty. Post the 
isMinBytesSatisfied call, the fetchOffsetMetadata will be
         // populated for the share partition, which has 1 as the positional 
difference, so it doesn't satisfy the minBytes(2).
         when(sp0.fetchOffsetMetadata(anyLong()))
             .thenReturn(Optional.empty())
             .thenReturn(Optional.of(new LogOffsetMetadata(0, 1, 0)));
         LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, 1);
-        mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
-
-        doAnswer(invocation -> 
buildLogReadResult(List.of(tp0))).when(replicaManager).readFromLog(any(), 
any(), any(ReplicaQuota.class), anyBoolean());
-        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
-
-        PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mockPartitionMaxBytes(Set.of(tp0));

Review Comment:
   Any reason why this piece of code has been moved below - basically a cut and 
paste?



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1663,21 +1691,20 @@ public void 
testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
         
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
         
when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1);
 
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();

Review Comment:
   For safety, should we be mocking `ShareFetchUtils.processFetchResponse` for 
the test `testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to