junrao commented on code in PR #18012:
URL: https://github.com/apache/kafka/pull/18012#discussion_r1907970044


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -257,13 +257,21 @@ public void append(long largestOffset,
             if (largestTimestampMs > maxTimestampSoFar()) {
                 maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
             }
-            // append an entry to the index (if needed)
+            // append an entry to the timestamp index at MemoryRecords level 
(if needed)
             if (bytesSinceLastIndexEntry > indexIntervalBytes) {
-                offsetIndex().append(largestOffset, physicalPosition);
                 timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar());
-                bytesSinceLastIndexEntry = 0;
             }
-            bytesSinceLastIndexEntry += records.sizeInBytes();
+
+            // append an entry to the offset index at batches level (if needed)
+            for (RecordBatch batch : records.batches()) {
+                if (bytesSinceLastIndexEntry > indexIntervalBytes &&
+                    batch.lastOffset() >= offsetIndex().lastOffset()) {
+                    offsetIndex().append(batch.lastOffset(), physicalPosition);

Review Comment:
   It's true that the time index is used less frequently. But I am not sure 
it's worth optimizing since its size is small.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -253,17 +253,33 @@ public void append(long largestOffset,
             // append the messages
             long appendedBytes = log.append(records);
             LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, 
log.file(), largestOffset);
-            // Update the in memory max timestamp and corresponding offset.
-            if (largestTimestampMs > maxTimestampSoFar()) {
-                maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
-            }
-            // append an entry to the index (if needed)
-            if (bytesSinceLastIndexEntry > indexIntervalBytes) {
-                offsetIndex().append(largestOffset, physicalPosition);
-                timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar());
-                bytesSinceLastIndexEntry = 0;
+
+            long batchMaxTimestamp = RecordBatch.NO_TIMESTAMP;
+            long batchShallowOffsetOfMaxTimestamp = -1L;
+            // append an entry to the index at batches level (if needed)
+            for (RecordBatch batch : records.batches()) {
+                if (batch.maxTimestamp() > batchMaxTimestamp) {
+                    batchMaxTimestamp = batch.maxTimestamp();
+                    batchShallowOffsetOfMaxTimestamp = batch.lastOffset();
+                }
+
+                // Update the in memory max timestamp and corresponding offset.
+                if (batchMaxTimestamp > maxTimestampSoFar()) {
+                    maxTimestampAndOffsetSoFar = new 
TimestampOffset(batchMaxTimestamp, batchShallowOffsetOfMaxTimestamp);
+                }
+
+                if (bytesSinceLastIndexEntry > indexIntervalBytes) {
+                    offsetIndex().append(batch.lastOffset(), physicalPosition);
+
+                    // max timestamp may not be monotonic, so we need to check 
it to avoid the time index append error
+                    if (batchMaxTimestamp >= timeIndex().lastEntry().timestamp)
+                        timeIndex().maybeAppend(batchMaxTimestamp, 
shallowOffsetOfMaxTimestampSoFar());

Review Comment:
   Hmm, we should use `maxTimestampSoFar()` instead `batchMaxTimestamp`, right?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -253,17 +253,33 @@ public void append(long largestOffset,
             // append the messages
             long appendedBytes = log.append(records);
             LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, 
log.file(), largestOffset);
-            // Update the in memory max timestamp and corresponding offset.
-            if (largestTimestampMs > maxTimestampSoFar()) {
-                maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
-            }
-            // append an entry to the index (if needed)
-            if (bytesSinceLastIndexEntry > indexIntervalBytes) {
-                offsetIndex().append(largestOffset, physicalPosition);
-                timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar());
-                bytesSinceLastIndexEntry = 0;
+
+            long batchMaxTimestamp = RecordBatch.NO_TIMESTAMP;
+            long batchShallowOffsetOfMaxTimestamp = -1L;
+            // append an entry to the index at batches level (if needed)
+            for (RecordBatch batch : records.batches()) {
+                if (batch.maxTimestamp() > batchMaxTimestamp) {

Review Comment:
   Could we get rid of this condition and make batchMaxTimestamp and 
batchShallowOffsetOfMaxTimestamp a local val in the loop? 



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -253,17 +253,33 @@ public void append(long largestOffset,
             // append the messages
             long appendedBytes = log.append(records);
             LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, 
log.file(), largestOffset);
-            // Update the in memory max timestamp and corresponding offset.
-            if (largestTimestampMs > maxTimestampSoFar()) {
-                maxTimestampAndOffsetSoFar = new 
TimestampOffset(largestTimestampMs, shallowOffsetOfMaxTimestamp);
-            }
-            // append an entry to the index (if needed)
-            if (bytesSinceLastIndexEntry > indexIntervalBytes) {
-                offsetIndex().append(largestOffset, physicalPosition);
-                timeIndex().maybeAppend(maxTimestampSoFar(), 
shallowOffsetOfMaxTimestampSoFar());
-                bytesSinceLastIndexEntry = 0;
+
+            long batchMaxTimestamp = RecordBatch.NO_TIMESTAMP;
+            long batchShallowOffsetOfMaxTimestamp = -1L;
+            // append an entry to the index at batches level (if needed)
+            for (RecordBatch batch : records.batches()) {
+                if (batch.maxTimestamp() > batchMaxTimestamp) {
+                    batchMaxTimestamp = batch.maxTimestamp();
+                    batchShallowOffsetOfMaxTimestamp = batch.lastOffset();
+                }
+
+                // Update the in memory max timestamp and corresponding offset.
+                if (batchMaxTimestamp > maxTimestampSoFar()) {
+                    maxTimestampAndOffsetSoFar = new 
TimestampOffset(batchMaxTimestamp, batchShallowOffsetOfMaxTimestamp);
+                }
+
+                if (bytesSinceLastIndexEntry > indexIntervalBytes) {
+                    offsetIndex().append(batch.lastOffset(), physicalPosition);
+
+                    // max timestamp may not be monotonic, so we need to check 
it to avoid the time index append error
+                    if (batchMaxTimestamp >= timeIndex().lastEntry().timestamp)

Review Comment:
   This is unnecessary since `TimeIndex` has the same check.
   
   ```
               // We only append to the time index when the timestamp is 
greater than the last inserted timestamp.
               // If all the messages are in message format v0, the timestamp 
will always be NoTimestamp. In that case, the time
               // index will be empty.
               if (timestamp > lastEntry.timestamp) {
   
   ```



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -253,17 +253,33 @@ public void append(long largestOffset,
             // append the messages
             long appendedBytes = log.append(records);
             LOGGER.trace("Appended {} to {} at end offset {}", appendedBytes, 
log.file(), largestOffset);
-            // Update the in memory max timestamp and corresponding offset.
-            if (largestTimestampMs > maxTimestampSoFar()) {

Review Comment:
   With this change, we can remove both `largestTimestampMs` and 
`shallowOffsetOfMaxTimestamp`. We probably can also remove 
`LogAppendInfo.shallowOffsetOfMaxTimestamp`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to