junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1546692495


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -232,17 +233,17 @@ private boolean canConvertToRelativeOffset(long offset) 
throws IOException {
      *
      * @param largestOffset The last offset in the message set
      * @param largestTimestampMs The largest timestamp in the message set.
-     * @param offsetOfMaxTimestamp The offset of the message that has the 
largest timestamp in the messages to append.
+     * @param shallowOffsetOfMaxTimestamp The offset of the message that has 
the largest timestamp in the messages to append.

Review Comment:
   Could we make the comment clear that the last offset is in the batch?



##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##########
@@ -240,25 +240,39 @@ public MemoryRecords build() {
         return builtRecords;
     }
 
+
     /**
-     * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
-     * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
-     *
-     * If the log append time is used, the offset will be the first offset of 
the record.
-     *
-     * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
-     *
-     * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+     * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+     * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch which having max timestamp.
+     * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+     * <p>
+     * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+     * the message format version is 0 or 1, in which case, it will be the 
first offset.
+     * <p>
+     * If create time is used, the offset will be the last offset unless no 
compression is used and the message
+     * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
      *
      * @return The max timestamp and its offset
      */
     public RecordsInfo info() {
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
-            return new RecordsInfo(logAppendTime, baseOffset);
+            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
+                // case 0: there is only one batch so use the last offset
+                return new RecordsInfo(logAppendTime, lastOffset);
+            else
+                // case 1: there are many single-record batches having same 
max timestamp, so the base offset is
+                // equal with the last offset of earliest batch
+                return new RecordsInfo(logAppendTime, baseOffset);
+        } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
+            return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
         } else {
-            // For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
-            // If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
-            return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);
+            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
+                // ditto to case 0
+                return new RecordsInfo(maxTimestamp, lastOffset);
+            else
+                // case 2: Each batch is composed of single record, and 
offsetOfMaxTimestamp points to the record having
+                // max timestamp. Hence, offsetOfMaxTimestamp is equal to the 
last offset of earliest batch (record)

Review Comment:
   of earliest batch => of earliest batch with max timestamp?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -293,14 +292,13 @@ public ValidationResult 
assignOffsetsNonCompressed(LongRef offsetCounter,
 
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
             maxTimestamp = now;
-            offsetOfMaxTimestamp = initialOffset;
         }
 
         return new ValidationResult(
             now,
             records,
             maxTimestamp,
-            offsetOfMaxTimestamp,
+            shallowOffsetOfMaxTimestamp,

Review Comment:
   We need to add the following code back, right?
   
   ```
           if (toMagic >= RecordBatch.MAGIC_VALUE_V2) {
               offsetOfMaxTimestamp = offsetCounter.value - 1;
           }
   
   ```



##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##########
@@ -240,25 +240,39 @@ public MemoryRecords build() {
         return builtRecords;
     }
 
+
     /**
-     * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
-     * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
-     *
-     * If the log append time is used, the offset will be the first offset of 
the record.
-     *
-     * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
-     *
-     * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
+     * We want to align the shallowOffsetOfMaxTimestamp for all paths (leader 
append, follower append, and index recovery)
+     * The definition of shallowOffsetOfMaxTimestamp is the last offset of the 
batch which having max timestamp.
+     * If there are many batches having same max timestamp, we pick up the 
earliest batch.
+     * <p>
+     * If the log append time is used, the offset will be the last offset 
unless no compression is used and
+     * the message format version is 0 or 1, in which case, it will be the 
first offset.
+     * <p>
+     * If create time is used, the offset will be the last offset unless no 
compression is used and the message
+     * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
      *
      * @return The max timestamp and its offset
      */
     public RecordsInfo info() {
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
-            return new RecordsInfo(logAppendTime, baseOffset);
+            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
+                // case 0: there is only one batch so use the last offset
+                return new RecordsInfo(logAppendTime, lastOffset);
+            else
+                // case 1: there are many single-record batches having same 
max timestamp, so the base offset is
+                // equal with the last offset of earliest batch
+                return new RecordsInfo(logAppendTime, baseOffset);
+        } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
+            return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);

Review Comment:
   > Currently, when in assignOffsetsNonCompressed or 
validateMessagesAndAssignOffsetsCompressed, we'll always return [-1, -1] for 
maxTimestamp and offset if it's MAGIC_VALUE_V0 here. But in the case of 
re-build the records, we make it as [-1, lastOffset], which is inconsistent. 
Fixing it here.
   
   The above is the comment from 
https://github.com/apache/kafka/pull/15474/files#r1522447288. So, it seems that 
we should use -1 for for the offset here?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -819,7 +819,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
             validRecords = validateAndOffsetAssignResult.validatedRecords
             
appendInfo.setMaxTimestamp(validateAndOffsetAssignResult.maxTimestampMs)
-            
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.offsetOfMaxTimestampMs)
+            
appendInfo.setOffsetOfMaxTimestamp(validateAndOffsetAssignResult.shallowOffsetOfMaxTimestampMs)

Review Comment:
   For consistency, should we change `appendInfo.offsetOfMaxTimestamp` to 
`appendInfo.shallowOffsetOfMaxTimestamp`?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -68,17 +68,17 @@ public static class ValidationResult {
         public final long logAppendTimeMs;
         public final MemoryRecords validatedRecords;
         public final long maxTimestampMs;
-        public final long offsetOfMaxTimestampMs;
+        public final long shallowOffsetOfMaxTimestampMs;

Review Comment:
   Could we add a comment on why we could only maintain batch level offset for 
max timestamp?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to