kamalcph commented on code in PR #15825:
URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
     * Given a message offset, find its corresponding offset metadata in the 
log.
-    * If the message offset is out of range, throw an OffsetOutOfRangeException
+    * 1. If the message offset is lesser than the log-start-offset, then throw 
an OffsetOutOfRangeException
+    * 2. If the message offset is lesser than the local-log-start-offset, then 
it returns the message-only metadata
+    * 3. If the message offset is greater than the log-end-offset, then it 
returns the message-only metadata
     */
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
+  private[log] def convertToOffsetMetadataOrThrow(offset: Long): 
LogOffsetMetadata = {

Review Comment:
   To solve the infinite loop, instead of returning the message-only 
`LogOffsetMetadata`. Can we retain the same behavior without the loop? 
Something like below:
   
   ```java
   def read(startOffset: Long,
              maxLength: Int,
              minOneMessage: Boolean,
              maxOffsetMetadata: LogOffsetMetadata,
              includeAbortedTxns: Boolean): FetchDataInfo = {
       maybeHandleIOException(s"Exception while reading from $topicPartition in 
dir ${dir.getParent}") {
         trace(s"Reading maximum $maxLength bytes at offset $startOffset from 
log with " +
           s"total length ${segments.sizeInBytes} bytes")
   
         val endOffsetMetadata = nextOffsetMetadata
         val endOffset = endOffsetMetadata.messageOffset
         val segmentOpt = segments.floorSegment(startOffset)
   
         // return error on attempt to read beyond the log end offset
         if (startOffset > endOffset || !segmentOpt.isPresent)
           throw new OffsetOutOfRangeException(s"Received request for offset 
$startOffset for partition $topicPartition, " +
             s"but we only have log segments upto $endOffset.")
   
         if (startOffset == maxOffsetMetadata.messageOffset) {
           emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
         } else if (startOffset > maxOffsetMetadata.messageOffset) {
          // Updated code to avoid the loop:
           val tmpFetchDataInfo = readFetchDataInfo(segmentOpt.get, 
startOffset, maxLength = 1, minOneMessage = false, nextOffsetMetadata, 
includeAbortedTxns = false)
           emptyFetchDataInfo(tmpFetchDataInfo.fetchOffsetMetadata, 
includeAbortedTxns)
         } else {
           readFetchDataInfo(segmentOpt.get, startOffset, maxLength, 
minOneMessage, maxOffsetMetadata, includeAbortedTxns)
         }
       }
     }
   
     private def readFetchDataInfo(segment: LogSegment,
                                   startOffset: Long,
                                   maxLength: Int,
                                   minOneMessage: Boolean,
                                   maxOffsetMetadata: LogOffsetMetadata,
                                   includeAbortedTxns: Boolean): FetchDataInfo 
= {
       // Do the read on the segment with a base offset less than the target 
offset
       // but if that segment doesn't contain any messages with an offset 
greater than that
       // continue to read from successive segments until we get some messages 
or we reach the end of the log
       var fetchDataInfo: FetchDataInfo = null
       var segmentOpt: Optional[LogSegment] = Optional.of(segment)
       while (fetchDataInfo == null && segmentOpt.isPresent) {
         val segment = segmentOpt.get
         val baseOffset = segment.baseOffset
   
         val maxPosition =
           // Use the max offset position if it is on this segment; otherwise, 
the segment size is the limit.
           if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) 
maxOffsetMetadata.relativePositionInSegment
           else segment.size
   
         fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, 
minOneMessage)
         if (fetchDataInfo != null) {
           if (includeAbortedTxns)
             fetchDataInfo = addAbortedTransactions(startOffset, segment, 
fetchDataInfo)
         } else segmentOpt = segments.higherSegment(baseOffset)
       }
   
       if (fetchDataInfo != null) fetchDataInfo
       else {
         // okay we are beyond the end of the last segment with no data fetched 
although the start offset is in range,
         // this can happen when all messages with offset larger than start 
offsets have been deleted.
         // In this case, we will return the empty set with log end offset 
metadata
         new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
       }
     }
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to