junrao commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1610310799
########## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ########## @@ -143,6 +143,39 @@ class LogSegmentTest { checkEquals(ms2.records.iterator, read.records.records.iterator) } + @Test + def testReadWhenNoMaxPosition(): Unit = { + val maxPosition = -1 + val maxSize = 1 + val seg = createSegment(40) + val ms = records(50, "hello", "there") + seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) + for (minOneMessage <- Array(true, false)) { + // read before first offset + var read = seg.read(48, maxSize, maxPosition, minOneMessage) Review Comment: Hmm, maxSize=-1 is an invalid input, right? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, throw an OffsetOutOfRangeException + * 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException + * 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata + * 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: Thanks for the explanation. Yes, it makes sense to me now. ########## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ########## @@ -143,6 +143,39 @@ class LogSegmentTest { checkEquals(ms2.records.iterator, read.records.records.iterator) } + @Test + def testReadWhenNoMaxPosition(): Unit = { + val maxPosition = -1 + val maxSize = 1 + val seg = createSegment(40) + val ms = records(50, "hello", "there") + seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) + for (minOneMessage <- Array(true, false)) { + // read before first offset + var read = seg.read(48, maxSize, maxPosition, minOneMessage) + assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata) + assertTrue(read.records.records().iterator().asScala.isEmpty) + + // read at first offset + read = seg.read(50, maxSize, maxPosition, minOneMessage) + assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata) + assertTrue(read.records.records().iterator().asScala.isEmpty) + + // read beyond first offset + read = seg.read(51, maxSize, maxPosition, minOneMessage) + assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata) + assertTrue(read.records.records().iterator().asScala.isEmpty) + + // read at last offset Review Comment: Hmm, 51 is the last offset, right? ########## core/src/main/scala/kafka/server/DelayedFetch.scala: ########## @@ -91,19 +91,24 @@ class DelayedFetch( // Go directly to the check for Case G if the message offsets are the same. If the log segment // has just rolled, then the high watermark offset will remain the same but be on the old segment, // which would incorrectly be seen as an instance of Case F. - if (endOffset.messageOffset != fetchOffset.messageOffset) { - if (endOffset.onOlderSegment(fetchOffset)) { - // Case F, this can happen when the new fetch operation is on a truncated leader - debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") - return forceComplete() + if (fetchOffset.messageOffset > endOffset.messageOffset) { + // Case F, this can happen when the new fetch operation is on a truncated leader + debug(s"Satisfying fetch $this since it is fetching later segments of partition $topicIdPartition.") + return forceComplete() + } else if (fetchOffset.messageOffset < endOffset.messageOffset) { + if (endOffset.messageOffsetOnly() || fetchOffset.messageOffsetOnly()) { + // If we don't know the position of the offset on log segments, just pessimistically assume that we + // only gained 1 byte when fetchOffset < endOffset, otherwise do nothing. This can happen when the + // high-watermark is stale, but should be rare. + accumulatedSize += 1 Review Comment: If endOffset or fetchOffset is message only, we return empty records when reading from the log. So, to be consistent, we want to avoid accumulating new bytes in this case. ########## core/src/main/scala/kafka/log/LocalLog.scala: ########## @@ -370,11 +370,11 @@ class LocalLog(@volatile private var _dir: File, throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") - if (startOffset == maxOffsetMetadata.messageOffset) + if (startOffset == maxOffsetMetadata.messageOffset) { Review Comment: We don't use {} for single line statement elsewhere in this file. So, we want to make this consistent here. ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ########## @@ -445,7 +445,7 @@ public FetchDataInfo read(long startOffset, int maxSize, long maxPosition, boole adjustedMaxSize = Math.max(maxSize, startOffsetAndSize.size); // return a log segment but with zero size in the case below - if (adjustedMaxSize == 0) + if (adjustedMaxSize == 0 || maxPosition == -1) Review Comment: 1. It's better to use Option[Long] for maxPosition. This also makes it consistent with maxSize for which negative values are invalid. 2. Could we adjust the comment above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org