junrao commented on a change in pull request #10534:
URL: https://github.com/apache/kafka/pull/10534#discussion_r620704283
##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -162,15 +162,43 @@ class LogSegment private[log] (val log: FileRecords,
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset
$largestOffset")
+
+
+ def appendIndex(): Unit = {
+ var validBytes = 0
+ var maxTimestampSoFarTmp = RecordBatch.NO_TIMESTAMP
+ var offsetOfMaxTimestampSoFarTmp = 0L
+ var lastIndexEntry = 0
+ val originalLastOffset = offsetIndex.lastOffset
+
+ for (batch <- log.batches.asScala) {
+ batch.ensureValid()
+ ensureOffsetInRange(batch.lastOffset)
+
+ if (batch.maxTimestamp > maxTimestampSoFarTmp) {
+ maxTimestampSoFarTmp = batch.maxTimestamp
+ offsetOfMaxTimestampSoFarTmp = batch.lastOffset
+ }
+
+ if (validBytes - lastIndexEntry > indexIntervalBytes) {
+ if (batch.lastOffset > originalLastOffset) {
+ offsetIndex.append(batch.lastOffset, validBytes)
+ timeIndex.maybeAppend(maxTimestampSoFarTmp,
offsetOfMaxTimestampSoFarTmp)
+ }
+ lastIndexEntry = validBytes
Review comment:
Could we just have a single val accumulatedBytes and reset it to 0 after
each index insertion?
##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -162,15 +162,43 @@ class LogSegment private[log] (val log: FileRecords,
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset
$largestOffset")
+
+
+ def appendIndex(): Unit = {
+ var validBytes = 0
+ var maxTimestampSoFarTmp = RecordBatch.NO_TIMESTAMP
+ var offsetOfMaxTimestampSoFarTmp = 0L
+ var lastIndexEntry = 0
+ val originalLastOffset = offsetIndex.lastOffset
+
+ for (batch <- log.batches.asScala) {
+ batch.ensureValid()
+ ensureOffsetInRange(batch.lastOffset)
+
+ if (batch.maxTimestamp > maxTimestampSoFarTmp) {
+ maxTimestampSoFarTmp = batch.maxTimestamp
+ offsetOfMaxTimestampSoFarTmp = batch.lastOffset
+ }
+
+ if (validBytes - lastIndexEntry > indexIntervalBytes) {
Review comment:
It seems that we need to take bytesSinceLastIndexEntry into
consideration and we also need to update bytesSinceLastIndexEntry properly here
instead of the caller.
##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -162,15 +162,43 @@ class LogSegment private[log] (val log: FileRecords,
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset
$largestOffset")
+
+
+ def appendIndex(): Unit = {
+ var validBytes = 0
+ var maxTimestampSoFarTmp = RecordBatch.NO_TIMESTAMP
+ var offsetOfMaxTimestampSoFarTmp = 0L
+ var lastIndexEntry = 0
+ val originalLastOffset = offsetIndex.lastOffset
+
+ for (batch <- log.batches.asScala) {
+ batch.ensureValid()
Review comment:
We already did validation in Log.analyzeAndValidateRecords(). So we
don't need to do this again here.
##########
File path: core/src/main/scala/kafka/log/LogSegment.scala
##########
@@ -162,15 +162,43 @@ class LogSegment private[log] (val log: FileRecords,
// append the messages
val appendedBytes = log.append(records)
trace(s"Appended $appendedBytes to ${log.file} at end offset
$largestOffset")
+
+
+ def appendIndex(): Unit = {
+ var validBytes = 0
+ var maxTimestampSoFarTmp = RecordBatch.NO_TIMESTAMP
+ var offsetOfMaxTimestampSoFarTmp = 0L
+ var lastIndexEntry = 0
+ val originalLastOffset = offsetIndex.lastOffset
+
+ for (batch <- log.batches.asScala) {
+ batch.ensureValid()
+ ensureOffsetInRange(batch.lastOffset)
+
+ if (batch.maxTimestamp > maxTimestampSoFarTmp) {
Review comment:
Since this is done in the caller for all batches, we don't need to do it
here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]