chia7712 commented on code in PR #17979:
URL: https://github.com/apache/kafka/pull/17979#discussion_r1868291841


##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -429,4 +432,41 @@ public void testProcessFetchResponseWithMaxFetchRecords() {
         assertEquals(100, 
resultData1.get(tp1).acquiredRecords().get(0).firstOffset());
         assertEquals(103, 
resultData1.get(tp1).acquiredRecords().get(0).lastOffset());
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testProcessFetchResponseWithOffsetFetchException() {
+        SharePartition sp0 = Mockito.mock(SharePartition.class);
+        when(sp0.leaderEpoch()).thenReturn(1);
+
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+
+        ShareFetch shareFetch = mock(ShareFetch.class);
+        when(shareFetch.groupId()).thenReturn("grp");
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+        // Mock the replicaManager.fetchOffsetForTimestamp method to throw 
exception.
+        Throwable exception = new FencedLeaderEpochException("Fenced 
exception");
+        
doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class),
 anyLong(), any(), any(), anyBoolean());
+        when(sp0.acquire(anyString(), anyInt(), 
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+        doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));

Review Comment:
   Since `sp0` is a mock object, there's no need to stub 
`updateCacheAndOffsets` as it already does nothing.



##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -429,4 +432,41 @@ public void testProcessFetchResponseWithMaxFetchRecords() {
         assertEquals(100, 
resultData1.get(tp1).acquiredRecords().get(0).firstOffset());
         assertEquals(103, 
resultData1.get(tp1).acquiredRecords().get(0).lastOffset());
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testProcessFetchResponseWithOffsetFetchException() {
+        SharePartition sp0 = Mockito.mock(SharePartition.class);
+        when(sp0.leaderEpoch()).thenReturn(1);
+
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+
+        ShareFetch shareFetch = mock(ShareFetch.class);
+        when(shareFetch.groupId()).thenReturn("grp");
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+        // Mock the replicaManager.fetchOffsetForTimestamp method to throw 
exception.
+        Throwable exception = new FencedLeaderEpochException("Fenced 
exception");
+        
doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class),
 anyLong(), any(), any(), anyBoolean());
+        when(sp0.acquire(anyString(), anyInt(), 
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+        doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
+
+        // When no records are acquired from share partition.
+        Map<TopicIdPartition, FetchPartitionData> responseData = 
Collections.singletonMap(
+            tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L,
+                MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
+                OptionalInt.empty(), false));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mock(BiConsumer.class);
+        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions,
+                replicaManager, exceptionHandler);
+
+        assertTrue(resultData.isEmpty());
+        Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception);
+        Mockito.verify(exceptionHandler, 
times(1)).accept(any(SharePartitionKey.class), any(Throwable.class));
+        Mockito.verify(sp0, times(0)).updateCacheAndOffsets(1L);

Review Comment:
   Using `any(Long.class)` instead of `1L` is more appropriate since we expect 
that `updateCacheAndOffsets` should not be called, regardless of the input.



##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -84,7 +86,16 @@ static Map<TopicIdPartition, 
ShareFetchResponseData.PartitionData> processFetchR
                 // response and let the client retry the fetch. This way we do 
not lose out on the data that
                 // would be returned for other share partitions in the fetch 
request.
                 if (fetchPartitionData.error.code() == 
Errors.OFFSET_OUT_OF_RANGE.code()) {
-                    
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
 replicaManager));
+                    try {
+                        
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
+                            replicaManager, sharePartition.leaderEpoch()));
+                    } catch (Exception e) {
+                        log.error("Error while fetching offset for earliest 
timestamp for topicIdPartition: {}", topicIdPartition, e);
+                        shareFetch.addErroneous(topicIdPartition, e);
+                        exceptionHandler.accept(new 
SharePartitionKey(shareFetch.groupId(), topicIdPartition), e);
+                        // Do not fill the response for this partition and 
continue.

Review Comment:
   Excuse me, could you explain the difference between returning empty records 
and skipping partition data in the response?



##########
core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java:
##########
@@ -429,4 +432,41 @@ public void testProcessFetchResponseWithMaxFetchRecords() {
         assertEquals(100, 
resultData1.get(tp1).acquiredRecords().get(0).firstOffset());
         assertEquals(103, 
resultData1.get(tp1).acquiredRecords().get(0).lastOffset());
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testProcessFetchResponseWithOffsetFetchException() {
+        SharePartition sp0 = Mockito.mock(SharePartition.class);
+        when(sp0.leaderEpoch()).thenReturn(1);
+
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+
+        ShareFetch shareFetch = mock(ShareFetch.class);
+        when(shareFetch.groupId()).thenReturn("grp");
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+
+        // Mock the replicaManager.fetchOffsetForTimestamp method to throw 
exception.
+        Throwable exception = new FencedLeaderEpochException("Fenced 
exception");
+        
doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class),
 anyLong(), any(), any(), anyBoolean());
+        when(sp0.acquire(anyString(), anyInt(), 
any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty());
+        doNothing().when(sp0).updateCacheAndOffsets(any(Long.class));
+
+        // When no records are acquired from share partition.
+        Map<TopicIdPartition, FetchPartitionData> responseData = 
Collections.singletonMap(
+            tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L,
+                MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
+                OptionalInt.empty(), false));
+
+        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mock(BiConsumer.class);
+        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions,
+                replicaManager, exceptionHandler);
+
+        assertTrue(resultData.isEmpty());
+        Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception);
+        Mockito.verify(exceptionHandler, 
times(1)).accept(any(SharePartitionKey.class), any(Throwable.class));

Review Comment:
   Could you please replace `any(SharePartitionKey.class), 
any(Throwable.class)` by `new SharePartitionKey("grp", tp0), exception`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to