apoorvmittal10 commented on code in PR #20280: URL: https://github.com/apache/kafka/pull/20280#discussion_r2261175802
########## core/src/main/java/kafka/server/share/DelayedShareFetch.java: ########## @@ -765,7 +774,10 @@ private boolean maybeCompletePendingRemoteFetch() { for (TopicIdPartition topicIdPartition : pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) { try { - replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + if (!partition.isLeader()) { + throw new NotLeaderException("Broker is no longer the leader of topicPartition: " + topicIdPartition); Review Comment: Don't you need to handle the exception below? ########## core/src/test/java/kafka/server/share/DelayedShareFetchTest.java: ########## @@ -1288,6 +1357,62 @@ public void testRemoteStorageFetchTryCompleteReturnsFalse() { delayedShareFetch.lock().unlock(); } + @Test + public void testRemoteStorageFetchPartitionLeaderChanged() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + + SharePartition sp0 = mock(SharePartition.class); + + when(sp0.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + + // Fetch offset does not match with the cached entry for sp0, hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(false); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + + Uuid fetchId = Uuid.randomUuid(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0))) + .withFetchId(fetchId) + .build()); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); + + assertFalse(delayedShareFetch.isCompleted()); Review Comment: Shouldn't the delayed share fetch complete? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org