kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602071574



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -836,178 +581,15 @@ class Log(@volatile private var _dir: File,
       updateHighWatermark(offset)
     }
 
-    if (this.recoveryPoint < offset) {
-      this.recoveryPoint = offset
-    }
-  }
-
-  /**
-   * Recover the log segments and return the next offset after recovery.
-   * This method does not need to convert IOException to KafkaStorageException 
because it is only called before all
-   * logs are loaded.
-   * @throws LogSegmentOffsetOverflowException if we encountered a legacy 
segment with offset overflow
-   */
-  private[log] def recoverLog(): Long = {
-    /** return the log end offset if valid */
-    def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = {
-      if (logSegments.nonEmpty) {
-        val logEndOffset = activeSegment.readNextOffset
-        if (logEndOffset >= logStartOffset)
-          Some(logEndOffset)
-        else {
-          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
-            "This could happen if segment files were deleted from the file 
system.")
-          removeAndDeleteSegments(logSegments, asyncDelete = true, LogRecovery)
-          leaderEpochCache.foreach(_.clearAndFlush())
-          producerStateManager.truncateFullyAndStartAt(logStartOffset)
-          None
-        }
-      } else None
-    }
-
-    // if we have the clean shutdown marker, skip recovery
-    if (!hadCleanShutdown) {
-      val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
-      var truncated = false
-
-      while (unflushed.hasNext && !truncated) {
-        val segment = unflushed.next()
-        info(s"Recovering unflushed segment ${segment.baseOffset}")
-        val truncatedBytes =
-          try {
-            recoverSegment(segment, leaderEpochCache)
-          } catch {
-            case _: InvalidOffsetException =>
-              val startOffset = segment.baseOffset
-              warn("Found invalid offset during recovery. Deleting the corrupt 
segment and " +
-                s"creating an empty one with starting offset $startOffset")
-              segment.truncateTo(startOffset)
-          }
-        if (truncatedBytes > 0) {
-          // we had an invalid message, delete all remaining log
-          warn(s"Corruption found in segment ${segment.baseOffset}, truncating 
to offset ${segment.readNextOffset}")
-          removeAndDeleteSegments(unflushed.toList,
-            asyncDelete = true,
-            reason = LogRecovery)
-          truncated = true
-        }
-      }
-    }
-
-    val logEndOffsetOption = deleteSegmentsIfLogStartGreaterThanLogEnd()
-
-    if (logSegments.isEmpty) {
-      // no existing segments, create a new mutable segment beginning at 
logStartOffset
-      addSegment(LogSegment.open(dir = dir,
-        baseOffset = logStartOffset,
-        config,
-        time = time,
-        initFileSize = this.initFileSize,
-        preallocate = config.preallocate))
-    }
-
-    // Update the recovery point if there was a clean shutdown and did not 
perform any changes to
-    // the segment. Otherwise, we just ensure that the recovery point is not 
ahead of the log end
-    // offset. To ensure correctness and to make it easier to reason about, 
it's best to only advance
-    // the recovery point in flush(Long). If we advanced the recovery point 
here, we could skip recovery for
-    // unflushed segments if the broker crashed after we checkpoint the 
recovery point and before we flush the
-    // segment.
-    (hadCleanShutdown, logEndOffsetOption) match {
-      case (true, Some(logEndOffset)) =>
-        recoveryPoint = logEndOffset
-        logEndOffset
-      case _ =>
-        val logEndOffset = 
logEndOffsetOption.getOrElse(activeSegment.readNextOffset)
-        recoveryPoint = Math.min(recoveryPoint, logEndOffset)
-        logEndOffset
-    }
-  }
-
-  // Rebuild producer state until lastOffset. This method may be called from 
the recovery code path, and thus must be
-  // free of all side-effects, i.e. it must not update any log-specific state.
-  private def rebuildProducerState(lastOffset: Long,
-                                   reloadFromCleanShutdown: Boolean,
-                                   producerStateManager: 
ProducerStateManager): Unit = lock synchronized {
-    checkIfMemoryMappedBufferClosed()
-    val segments = logSegments
-    val offsetsToSnapshot =
-      if (segments.nonEmpty) {
-        val nextLatestSegmentBaseOffset = 
lowerSegment(segments.last.baseOffset).map(_.baseOffset)
-        Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), 
Some(lastOffset))
-      } else {
-        Seq(Some(lastOffset))
-      }
-    info(s"Loading producer state till offset $lastOffset with message format 
version ${recordVersion.value}")
-
-    // We want to avoid unnecessary scanning of the log to build the producer 
state when the broker is being
-    // upgraded. The basic idea is to use the absence of producer snapshot 
files to detect the upgrade case,
-    // but we have to be careful not to assume too much in the presence of 
broker failures. The two most common
-    // upgrade cases in which we expect to find no snapshots are the following:
-    //
-    // 1. The broker has been upgraded, but the topic is still on the old 
message format.
-    // 2. The broker has been upgraded, the topic is on the new message 
format, and we had a clean shutdown.
-    //
-    // If we hit either of these cases, we skip producer state loading and 
write a new snapshot at the log end
-    // offset (see below). The next time the log is reloaded, we will load 
producer state using this snapshot
-    // (or later snapshots). Otherwise, if there is no snapshot file, then we 
have to rebuild producer state
-    // from the first segment.
-    if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 ||
-        (producerStateManager.latestSnapshotOffset.isEmpty && 
reloadFromCleanShutdown)) {
-      // To avoid an expensive scan through all of the segments, we take empty 
snapshots from the start of the
-      // last two segments and the last offset. This should avoid the full 
scan in the case that the log needs
-      // truncation.
-      offsetsToSnapshot.flatten.foreach { offset =>
-        producerStateManager.updateMapEndOffset(offset)
-        producerStateManager.takeSnapshot()
-      }
-    } else {
-      info(s"Reloading from producer snapshot and rebuilding producer state 
from offset $lastOffset")
-      val isEmptyBeforeTruncation = producerStateManager.isEmpty && 
producerStateManager.mapEndOffset >= lastOffset
-      val producerStateLoadStart = time.milliseconds()
-      producerStateManager.truncateAndReload(logStartOffset, lastOffset, 
time.milliseconds())
-      val segmentRecoveryStart = time.milliseconds()
-
-      // Only do the potentially expensive reloading if the last snapshot 
offset is lower than the log end
-      // offset (which would be the case on first startup) and there were 
active producers prior to truncation
-      // (which could be the case if truncating after initial loading). If 
there weren't, then truncating
-      // shouldn't change that fact (although it could cause a producerId to 
expire earlier than expected),
-      // and we can skip the loading. This is an optimization for users which 
are not yet using
-      // idempotent/transactional features yet.
-      if (lastOffset > producerStateManager.mapEndOffset && 
!isEmptyBeforeTruncation) {
-        val segmentOfLastOffset = floorLogSegment(lastOffset)
-
-        logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { 
segment =>
-          val startOffset = Utils.max(segment.baseOffset, 
producerStateManager.mapEndOffset, logStartOffset)
-          producerStateManager.updateMapEndOffset(startOffset)
-
-          if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
-            producerStateManager.takeSnapshot()
-
-          val maxPosition = if (segmentOfLastOffset.contains(segment)) {
-            Option(segment.translateOffset(lastOffset))
-              .map(_.position)
-              .getOrElse(segment.size)
-          } else {
-            segment.size
-          }
-
-          val fetchDataInfo = segment.read(startOffset,
-            maxSize = Int.MaxValue,
-            maxPosition = maxPosition,
-            minOneMessage = false)
-          if (fetchDataInfo != null)
-            loadProducersFromRecords(producerStateManager, 
fetchDataInfo.records)
-        }
-      }
-      producerStateManager.updateMapEndOffset(lastOffset)
-      producerStateManager.takeSnapshot()
-      info(s"Producer state recovery took ${producerStateLoadStart - 
segmentRecoveryStart}ms for snapshot load " +
-        s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment 
recovery from offset $lastOffset")
+    if (localLog.recoveryPoint < offset) {
+      localLog.updateRecoveryPoint(offset)
     }
   }
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: 
Boolean): Unit = lock synchronized {
-    rebuildProducerState(lastOffset, reloadFromCleanShutdown, 
producerStateManager)
+    lock synchronized {

Review comment:
       Done. Good catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to