junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1546692495
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ########## @@ -232,17 +233,17 @@ private boolean canConvertToRelativeOffset(long offset) throws IOException { * * @param largestOffset The last offset in the message set * @param largestTimestampMs The largest timestamp in the message set. - * @param offsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. + * @param shallowOffsetOfMaxTimestamp The offset of the message that has the largest timestamp in the messages to append. Review Comment: Could we make the comment clear that the last offset is in the batch? ########## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ########## @@ -240,25 +240,39 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. + * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch which having max timestamp. + * If there are many batches having same max timestamp, we pick up the earliest batch. + * <p> + * If the log append time is used, the offset will be the last offset unless no compression is used and + * the message format version is 0 or 1, in which case, it will be the first offset. + * <p> + * If create time is used, the offset will be the last offset unless no compression is used and the message + * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { - return new RecordsInfo(logAppendTime, baseOffset); + if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + // case 0: there is only one batch so use the last offset + return new RecordsInfo(logAppendTime, lastOffset); + else + // case 1: there are many single-record batches having same max timestamp, so the base offset is + // equal with the last offset of earliest batch + return new RecordsInfo(logAppendTime, baseOffset); + } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { + return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { - // For create time, we always use offsetOfMaxTimestamp for the correct time -> offset mapping - // If it's MAGIC_VALUE_V0, the value will be the default value: [-1, -1] - return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp); + if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + // ditto to case 0 + return new RecordsInfo(maxTimestamp, lastOffset); + else + // case 2: Each batch is composed of single record, and offsetOfMaxTimestamp points to the record having + // max timestamp. Hence, offsetOfMaxTimestamp is equal to the last offset of earliest batch (record) Review Comment: of earliest batch => of earliest batch with max timestamp? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ########## @@ -293,14 +292,13 @@ public ValidationResult assignOffsetsNonCompressed(LongRef offsetCounter, if (timestampType == TimestampType.LOG_APPEND_TIME) { maxTimestamp = now; - offsetOfMaxTimestamp = initialOffset; } return new ValidationResult( now, records, maxTimestamp, - offsetOfMaxTimestamp, + shallowOffsetOfMaxTimestamp, Review Comment: We need to add the following code back, right? ``` if (toMagic >= RecordBatch.MAGIC_VALUE_V2) { offsetOfMaxTimestamp = offsetCounter.value - 1; } ``` ########## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ########## @@ -240,25 +240,39 @@ public MemoryRecords build() { return builtRecords; } + /** - * Get the max timestamp and its offset. The details of the offset returned are a bit subtle. - * Note: The semantic for the offset of max timestamp is the first offset with the max timestamp if there are multi-records having same timestamp. - * - * If the log append time is used, the offset will be the first offset of the record. - * - * If create time is used, the offset will always be the offset of the record with the max timestamp. - * - * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 since no timestamp info in records. + * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader append, follower append, and index recovery) + * The definition of shallowOffsetOfMaxTimestamp is the last offset of the batch which having max timestamp. + * If there are many batches having same max timestamp, we pick up the earliest batch. + * <p> + * If the log append time is used, the offset will be the last offset unless no compression is used and + * the message format version is 0 or 1, in which case, it will be the first offset. + * <p> + * If create time is used, the offset will be the last offset unless no compression is used and the message + * format version is 0 or 1, in which case, it will be the offset of the record with the max timestamp. * * @return The max timestamp and its offset */ public RecordsInfo info() { if (timestampType == TimestampType.LOG_APPEND_TIME) { - return new RecordsInfo(logAppendTime, baseOffset); + if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) + // case 0: there is only one batch so use the last offset + return new RecordsInfo(logAppendTime, lastOffset); + else + // case 1: there are many single-record batches having same max timestamp, so the base offset is + // equal with the last offset of earliest batch + return new RecordsInfo(logAppendTime, baseOffset); + } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { + return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); Review Comment: > Currently, when in assignOffsetsNonCompressed or validateMessagesAndAssignOffsetsCompressed, we'll always return [-1, -1] for maxTimestamp and offset if it's MAGIC_VALUE_V0 here. But in the case of re-build the records, we make it as [-1, lastOffset], which is inconsistent. Fixing it here. The above is the comment from https://github.com/apache/kafka/pull/15474/files#r1522447288. So, it seems that we should use -1 for for the offset here? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs) - appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.offsetOfMaxTimestampMs) + appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs) Review Comment: For consistency, should we change `appendInfo.offsetOfMaxTimestamp` to `appendInfo.shallowOffsetOfMaxTimestamp`? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ########## @@ -68,17 +68,17 @@ public static class ValidationResult { public final long logAppendTimeMs; public final MemoryRecords validatedRecords; public final long maxTimestampMs; - public final long offsetOfMaxTimestampMs; + public final long shallowOffsetOfMaxTimestampMs; Review Comment: Could we add a comment on why we could only maintain batch level offset for max timestamp? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org