junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1599161815


##########
core/src/main/scala/kafka/log/LocalLog.scala:
##########
@@ -370,11 +370,12 @@ class LocalLog(@volatile private var _dir: File,
         throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
           s"but we only have log segments upto $endOffset.")
 
-      if (startOffset == maxOffsetMetadata.messageOffset)
+      if (startOffset == maxOffsetMetadata.messageOffset) {
         emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
-      else if (startOffset > maxOffsetMetadata.messageOffset)
-        emptyFetchDataInfo(convertToOffsetMetadataOrThrow(startOffset), 
includeAbortedTxns)
-      else {
+      } else if (startOffset > maxOffsetMetadata.messageOffset) {
+        // Instead of converting the `startOffset` to metadata, returning 
message-only metadata to avoid potential loop
+        emptyFetchDataInfo(new LogOffsetMetadata(startOffset), 
includeAbortedTxns)

Review Comment:
   Hmm, I thought the design of this PR is to allow maxOffsetMetadata to be 
message-only in some of the rare cases, right?



##########
core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala:
##########
@@ -164,18 +169,71 @@ class DelayedFetchTest {
     assertTrue(delayedFetch.tryComplete())
     assertTrue(delayedFetch.isCompleted)
     assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+  }
+
+  @ParameterizedTest(name = "testDelayedFetchWithInvalidHighWatermark 
minBytes={0}")
+  @ValueSource(ints = Array(1, 2))
+  def testDelayedFetchWithInvalidHighWatermark(minBytes: Int): Unit = {
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+    val fetchOffset = 450L
+    val logStartOffset = 5L
+    val currentLeaderEpoch = Optional.of[Integer](10)
+    val replicaId = 1
+
+    val fetchStatus = FetchPartitionStatus(
+      startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+      fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, 
fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+    val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500, 
minBytes = minBytes)
+
+    var fetchResultOpt: Option[FetchPartitionData] = None
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit 
= {
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val delayedFetch = new DelayedFetch(
+      params = fetchParams,
+      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      replicaManager = replicaManager,
+      quota = replicaQuota,
+      responseCallback = callback
+    )
+
+    val partition: Partition = mock(classOf[Partition])
+    
when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)).thenReturn(partition)
+    // high-watermark is lesser than the log-start-offset
+    val endOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0)
+    when(partition.fetchOffsetSnapshot(
+      currentLeaderEpoch,
+      fetchOnlyFromLeader = true))
+      .thenReturn(new LogOffsetSnapshot(0L, endOffsetMetadata, 
endOffsetMetadata, endOffsetMetadata))
+    when(replicaManager.isAddingReplica(any(), anyInt())).thenReturn(false)
+    expectReadFromReplica(fetchParams, topicIdPartition, 
fetchStatus.fetchInfo, Errors.NONE)
+
+    val expected = minBytes == 1
+    assertEquals(expected, delayedFetch.tryComplete())
+    assertEquals(expected, delayedFetch.isCompleted)

Review Comment:
   Yes, I understand that the test passes as it is. I am just saying that the 
logic in DelayedFetch is not consistent.
   
   If the offset metadata is available, we accumulate bytes only `if 
(fetchOffset.messageOffset < endOffset.messageOffset)`. To be consistent, we 
need to do the same `if` test if the offset metadata is not available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to