junrao commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1610310799


##########
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##########
@@ -143,6 +143,39 @@ class LogSegmentTest {
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+    val maxPosition = -1
+    val maxSize = 1
+    val seg = createSegment(40)
+    val ms = records(50, "hello", "there")
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    for (minOneMessage <- Array(true, false)) {
+      // read before first offset
+      var read = seg.read(48, maxSize, maxPosition, minOneMessage)

Review Comment:
   Hmm, maxSize=-1 is an invalid input, right?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
     * Given a message offset, find its corresponding offset metadata in the 
log.
-    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    * 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+    * 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+    * 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
     */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   Thanks for the explanation. Yes, it makes sense to me now.



##########
core/src/test/scala/unit/kafka/log/LogSegmentTest.scala:
##########
@@ -143,6 +143,39 @@ class LogSegmentTest {
     checkEquals(ms2.records.iterator, read.records.records.iterator)
   }
 
+  @Test
+  def testReadWhenNoMaxPosition(): Unit = {
+    val maxPosition = -1
+    val maxSize = 1
+    val seg = createSegment(40)
+    val ms = records(50, "hello", "there")
+    seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms)
+    for (minOneMessage <- Array(true, false)) {
+      // read before first offset
+      var read = seg.read(48, maxSize, maxPosition, minOneMessage)
+      assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata)
+      assertTrue(read.records.records().iterator().asScala.isEmpty)
+
+      // read at first offset
+      read = seg.read(50, maxSize, maxPosition, minOneMessage)
+      assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata)
+      assertTrue(read.records.records().iterator().asScala.isEmpty)
+
+      // read beyond first offset
+      read = seg.read(51, maxSize, maxPosition, minOneMessage)
+      assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata)
+      assertTrue(read.records.records().iterator().asScala.isEmpty)
+
+      // read at last offset

Review Comment:
   Hmm, 51 is the last offset, right?



##########
core/src/main/scala/kafka/server/DelayedFetch.scala:
##########
@@ -91,19 +91,24 @@ class DelayedFetch(
             // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
             // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
             // which would incorrectly be seen as an instance of Case F.
-            if (endOffset.messageOffset != fetchOffset.messageOffset) {
-              if (endOffset.onOlderSegment(fetchOffset)) {
-                // Case F, this can happen when the new fetch operation is on 
a truncated leader
-                debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-                return forceComplete()
+            if (fetchOffset.messageOffset > endOffset.messageOffset) {
+              // Case F, this can happen when the new fetch operation is on a 
truncated leader
+              debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
+              return forceComplete()
+            } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+              if (endOffset.messageOffsetOnly() || 
fetchOffset.messageOffsetOnly()) {
+                // If we don't know the position of the offset on log 
segments, just pessimistically assume that we
+                // only gained 1 byte when fetchOffset < endOffset, otherwise 
do nothing. This can happen when the
+                // high-watermark is stale, but should be rare.
+                accumulatedSize += 1

Review Comment:
   If endOffset or fetchOffset is message only, we return empty records when 
reading from the log. So, to be consistent, we want to avoid accumulating new 
bytes in this case.



##########
core/src/main/scala/kafka/log/LocalLog.scala:
##########
@@ -370,11 +370,11 @@ class LocalLog(@volatile private var _dir: File,
         throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
           s"but we only have log segments upto $endOffset.")
 
-      if (startOffset == maxOffsetMetadata.messageOffset)
+      if (startOffset == maxOffsetMetadata.messageOffset) {

Review Comment:
   We don't use {} for single line statement elsewhere in this file. So, we 
want to make this consistent here.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, 
long maxPosition, boole
             adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size);
 
         // return a log segment but with zero size in the case below
-        if (adjustedMaxSize == 0)
+        if (adjustedMaxSize == 0 || maxPosition == -1)

Review Comment:
   1. It's better to use Option[Long] for maxPosition. This also makes it 
consistent with maxSize for which negative values are invalid.
   2. Could we adjust the comment above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to