chia7712 commented on code in PR #17979: URL: https://github.com/apache/kafka/pull/17979#discussion_r1868291841
########## core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java: ########## @@ -429,4 +432,41 @@ public void testProcessFetchResponseWithMaxFetchRecords() { assertEquals(100, resultData1.get(tp1).acquiredRecords().get(0).firstOffset()); assertEquals(103, resultData1.get(tp1).acquiredRecords().get(0).lastOffset()); } + + @Test + @SuppressWarnings("unchecked") + public void testProcessFetchResponseWithOffsetFetchException() { + SharePartition sp0 = Mockito.mock(SharePartition.class); + when(sp0.leaderEpoch()).thenReturn(1); + + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + ShareFetch shareFetch = mock(ShareFetch.class); + when(shareFetch.groupId()).thenReturn("grp"); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + // Mock the replicaManager.fetchOffsetForTimestamp method to throw exception. + Throwable exception = new FencedLeaderEpochException("Fenced exception"); + doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); + when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty()); + doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); Review Comment: Since `sp0` is a mock object, there's no need to stub `updateCacheAndOffsets` as it already does nothing. ########## core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java: ########## @@ -429,4 +432,41 @@ public void testProcessFetchResponseWithMaxFetchRecords() { assertEquals(100, resultData1.get(tp1).acquiredRecords().get(0).firstOffset()); assertEquals(103, resultData1.get(tp1).acquiredRecords().get(0).lastOffset()); } + + @Test + @SuppressWarnings("unchecked") + public void testProcessFetchResponseWithOffsetFetchException() { + SharePartition sp0 = Mockito.mock(SharePartition.class); + when(sp0.leaderEpoch()).thenReturn(1); + + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + ShareFetch shareFetch = mock(ShareFetch.class); + when(shareFetch.groupId()).thenReturn("grp"); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + // Mock the replicaManager.fetchOffsetForTimestamp method to throw exception. + Throwable exception = new FencedLeaderEpochException("Fenced exception"); + doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); + when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty()); + doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); + + // When no records are acquired from share partition. + Map<TopicIdPartition, FetchPartitionData> responseData = Collections.singletonMap( + tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L, + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mock(BiConsumer.class); + Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData = + ShareFetchUtils.processFetchResponse(shareFetch, responseData, sharePartitions, + replicaManager, exceptionHandler); + + assertTrue(resultData.isEmpty()); + Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception); + Mockito.verify(exceptionHandler, times(1)).accept(any(SharePartitionKey.class), any(Throwable.class)); + Mockito.verify(sp0, times(0)).updateCacheAndOffsets(1L); Review Comment: Using `any(Long.class)` instead of `1L` is more appropriate since we expect that `updateCacheAndOffsets` should not be called, regardless of the input. ########## core/src/main/java/kafka/server/share/ShareFetchUtils.java: ########## @@ -84,7 +86,16 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR // response and let the client retry the fetch. This way we do not lose out on the data that // would be returned for other share partitions in the fetch request. if (fetchPartitionData.error.code() == Errors.OFFSET_OUT_OF_RANGE.code()) { - sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, replicaManager)); + try { + sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition, + replicaManager, sharePartition.leaderEpoch())); + } catch (Exception e) { + log.error("Error while fetching offset for earliest timestamp for topicIdPartition: {}", topicIdPartition, e); + shareFetch.addErroneous(topicIdPartition, e); + exceptionHandler.accept(new SharePartitionKey(shareFetch.groupId(), topicIdPartition), e); + // Do not fill the response for this partition and continue. Review Comment: Excuse me, could you explain the difference between returning empty records and skipping partition data in the response? ########## core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java: ########## @@ -429,4 +432,41 @@ public void testProcessFetchResponseWithMaxFetchRecords() { assertEquals(100, resultData1.get(tp1).acquiredRecords().get(0).firstOffset()); assertEquals(103, resultData1.get(tp1).acquiredRecords().get(0).lastOffset()); } + + @Test + @SuppressWarnings("unchecked") + public void testProcessFetchResponseWithOffsetFetchException() { + SharePartition sp0 = Mockito.mock(SharePartition.class); + when(sp0.leaderEpoch()).thenReturn(1); + + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + ShareFetch shareFetch = mock(ShareFetch.class); + when(shareFetch.groupId()).thenReturn("grp"); + ReplicaManager replicaManager = mock(ReplicaManager.class); + + // Mock the replicaManager.fetchOffsetForTimestamp method to throw exception. + Throwable exception = new FencedLeaderEpochException("Fenced exception"); + doThrow(exception).when(replicaManager).fetchOffsetForTimestamp(any(TopicPartition.class), anyLong(), any(), any(), anyBoolean()); + when(sp0.acquire(anyString(), anyInt(), any(FetchPartitionData.class))).thenReturn(ShareAcquiredRecords.empty()); + doNothing().when(sp0).updateCacheAndOffsets(any(Long.class)); + + // When no records are acquired from share partition. + Map<TopicIdPartition, FetchPartitionData> responseData = Collections.singletonMap( + tp0, new FetchPartitionData(Errors.OFFSET_OUT_OF_RANGE, 0L, 0L, + MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), + OptionalInt.empty(), false)); + + BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mock(BiConsumer.class); + Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData = + ShareFetchUtils.processFetchResponse(shareFetch, responseData, sharePartitions, + replicaManager, exceptionHandler); + + assertTrue(resultData.isEmpty()); + Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception); + Mockito.verify(exceptionHandler, times(1)).accept(any(SharePartitionKey.class), any(Throwable.class)); Review Comment: Could you please replace `any(SharePartitionKey.class), any(Throwable.class)` by `new SharePartitionKey("grp", tp0), exception` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org