apoorvmittal10 commented on code in PR #20280:
URL: https://github.com/apache/kafka/pull/20280#discussion_r2261175802


##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -765,7 +774,10 @@ private boolean maybeCompletePendingRemoteFetch() {
 
         for (TopicIdPartition topicIdPartition : 
pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) {
             try {
-                
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+                if (!partition.isLeader()) {
+                    throw new NotLeaderException("Broker is no longer the 
leader of topicPartition: " + topicIdPartition);

Review Comment:
   Don't you need to handle the exception below?



##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -1288,6 +1357,62 @@ public void 
testRemoteStorageFetchTryCompleteReturnsFalse() {
         delayedShareFetch.lock().unlock();
     }
 
+    @Test
+    public void testRemoteStorageFetchPartitionLeaderChanged() {
+        ReplicaManager replicaManager = mock(ReplicaManager.class);
+        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new 
TopicPartition("foo", 0));
+
+        SharePartition sp0 = mock(SharePartition.class);
+
+        when(sp0.canAcquireRecords()).thenReturn(true);
+
+        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new 
LinkedHashMap<>();
+        sharePartitions.put(tp0, sp0);
+
+        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", 
Uuid.randomUuid().toString(),
+            new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, 
MAX_FETCH_RECORDS,
+            BROKER_TOPIC_STATS);
+
+        when(sp0.nextFetchOffset()).thenReturn(10L);
+
+        // Fetch offset does not match with the cached entry for sp0, hence, a 
replica manager fetch will happen for sp0.
+        when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty());
+
+        // Mocking remote storage read result for tp0.
+        doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), 
Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), 
any(ReplicaQuota.class), anyBoolean());
+
+        // Remote fetch related mocks. Remote fetch object does not complete 
within tryComplete in this mock.
+        RemoteLogManager remoteLogManager = mock(RemoteLogManager.class);
+        when(remoteLogManager.asyncRead(any(), 
any())).thenReturn(mock(Future.class));
+        
when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager));
+
+        Partition p0 = mock(Partition.class);
+        when(p0.isLeader()).thenReturn(false);
+
+        
when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0);
+
+        Uuid fetchId = Uuid.randomUuid();
+        DelayedShareFetch delayedShareFetch = 
spy(DelayedShareFetchBuilder.builder()
+            .withShareFetchData(shareFetch)
+            .withSharePartitions(sharePartitions)
+            .withReplicaManager(replicaManager)
+            .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0)))
+            .withFetchId(fetchId)
+            .build());
+
+        // All the topic partitions are acquirable.
+        when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true);
+
+        assertFalse(delayedShareFetch.isCompleted());

Review Comment:
   Shouldn't the delayed share fetch complete? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to