kamalcph commented on code in PR #15825: URL: https://github.com/apache/kafka/pull/15825#discussion_r1598841362
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1424,11 +1424,18 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Given a message offset, find its corresponding offset metadata in the log. - * If the message offset is out of range, throw an OffsetOutOfRangeException + * 1. If the message offset is lesser than the log-start-offset, then throw an OffsetOutOfRangeException + * 2. If the message offset is lesser than the local-log-start-offset, then it returns the message-only metadata + * 3. If the message offset is greater than the log-end-offset, then it returns the message-only metadata */ - private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { + private[log] def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { Review Comment: To solve the infinite loop, instead of returning the message-only `LogOffsetMetadata`. Can we retain the same behavior without the loop? Something like below: ```java def read(startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { trace(s"Reading maximum $maxLength bytes at offset $startOffset from log with " + s"total length ${segments.sizeInBytes} bytes") val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset val segmentOpt = segments.floorSegment(startOffset) // return error on attempt to read beyond the log end offset if (startOffset > endOffset || !segmentOpt.isPresent) throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments upto $endOffset.") if (startOffset == maxOffsetMetadata.messageOffset) { emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) } else if (startOffset > maxOffsetMetadata.messageOffset) { // Updated code to avoid the loop: val tmpFetchDataInfo = readFetchDataInfo(segmentOpt.get, startOffset, maxLength = 1, minOneMessage = false, nextOffsetMetadata, includeAbortedTxns = false) emptyFetchDataInfo(tmpFetchDataInfo.fetchOffsetMetadata, includeAbortedTxns) } else { readFetchDataInfo(segmentOpt.get, startOffset, maxLength, minOneMessage, maxOffsetMetadata, includeAbortedTxns) } } } private def readFetchDataInfo(segment: LogSegment, startOffset: Long, maxLength: Int, minOneMessage: Boolean, maxOffsetMetadata: LogOffsetMetadata, includeAbortedTxns: Boolean): FetchDataInfo = { // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log var fetchDataInfo: FetchDataInfo = null var segmentOpt: Optional[LogSegment] = Optional.of(segment) while (fetchDataInfo == null && segmentOpt.isPresent) { val segment = segmentOpt.get val baseOffset = segment.baseOffset val maxPosition = // Use the max offset position if it is on this segment; otherwise, the segment size is the limit. if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) maxOffsetMetadata.relativePositionInSegment else segment.size fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) if (fetchDataInfo != null) { if (includeAbortedTxns) fetchDataInfo = addAbortedTransactions(startOffset, segment, fetchDataInfo) } else segmentOpt = segments.higherSegment(baseOffset) } if (fetchDataInfo != null) fetchDataInfo else { // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // this can happen when all messages with offset larger than start offsets have been deleted. // In this case, we will return the empty set with log end offset metadata new FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org