showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1522447288


##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##########
@@ -242,34 +242,23 @@ public MemoryRecords build() {
 
     /**
      * Get the max timestamp and its offset. The details of the offset 
returned are a bit subtle.
+     * Note: The semantic for the offset of max timestamp is the first offset 
with the max timestamp if there are multi-records having same timestamp.
      *
-     * If the log append time is used, the offset will be the last offset 
unless no compression is used and
-     * the message format version is 0 or 1, in which case, it will be the 
first offset.
+     * If the log append time is used, the offset will be the first offset of 
the record.
      *
-     * If create time is used, the offset will be the last offset unless no 
compression is used and the message
-     * format version is 0 or 1, in which case, it will be the offset of the 
record with the max timestamp.
+     * If create time is used, the offset will always be the offset of the 
record with the max timestamp.
+     *
+     * If it's NO_TIMESTAMP (i.e. MAGIC_VALUE_V0), we'll return offset -1 
since no timestamp info in records.
      *
      * @return The max timestamp and its offset
      */
     public RecordsInfo info() {
         if (timestampType == TimestampType.LOG_APPEND_TIME) {
-            long shallowOffsetOfMaxTimestamp;
-            // Use the last offset when dealing with record batches
-            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-                shallowOffsetOfMaxTimestamp = lastOffset;
-            else
-                shallowOffsetOfMaxTimestamp = baseOffset;
-            return new RecordsInfo(logAppendTime, shallowOffsetOfMaxTimestamp);
-        } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
-            return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
+            return new RecordsInfo(logAppendTime, baseOffset);
         } else {
-            long shallowOffsetOfMaxTimestamp;
-            // Use the last offset when dealing with record batches
-            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-                shallowOffsetOfMaxTimestamp = lastOffset;
-            else
-                shallowOffsetOfMaxTimestamp = offsetOfMaxTimestamp;
-            return new RecordsInfo(maxTimestamp, shallowOffsetOfMaxTimestamp);
+            // For create time, we always use offsetOfMaxTimestamp for the 
correct time -> offset mapping
+            // If it's MAGIC_VALUE_V0, the value will be the default value: 
[-1, -1]
+            return new RecordsInfo(maxTimestamp, offsetOfMaxTimestamp);

Review Comment:
   Currently, when in `assignOffsetsNonCompressed` or 
`validateMessagesAndAssignOffsetsCompressed`, we'll always return `[-1, -1]` 
for maxTimestamp and offset if it's `MAGIC_VALUE_V0` 
[here](https://github.com/apache/kafka/pull/15474/files#diff-74aba980d40ab3c5c6fd66d7a27ffb4515181130e475b589834538d05aa408b9L263-L264).
 But in the case of re-build the records, we make it as `[-1, lastOffset]`, 
which is inconsistent. Fixing it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to