kowshik commented on a change in pull request #10280: URL: https://github.com/apache/kafka/pull/10280#discussion_r602071574
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -836,178 +581,15 @@ class Log(@volatile private var _dir: File, updateHighWatermark(offset) } - if (this.recoveryPoint < offset) { - this.recoveryPoint = offset - } - } - - /** - * Recover the log segments and return the next offset after recovery. - * This method does not need to convert IOException to KafkaStorageException because it is only called before all - * logs are loaded. - * @throws LogSegmentOffsetOverflowException if we encountered a legacy segment with offset overflow - */ - private[log] def recoverLog(): Long = { - /** return the log end offset if valid */ - def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = { - if (logSegments.nonEmpty) { - val logEndOffset = activeSegment.readNextOffset - if (logEndOffset >= logStartOffset) - Some(logEndOffset) - else { - warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " + - "This could happen if segment files were deleted from the file system.") - removeAndDeleteSegments(logSegments, asyncDelete = true, LogRecovery) - leaderEpochCache.foreach(_.clearAndFlush()) - producerStateManager.truncateFullyAndStartAt(logStartOffset) - None - } - } else None - } - - // if we have the clean shutdown marker, skip recovery - if (!hadCleanShutdown) { - val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator - var truncated = false - - while (unflushed.hasNext && !truncated) { - val segment = unflushed.next() - info(s"Recovering unflushed segment ${segment.baseOffset}") - val truncatedBytes = - try { - recoverSegment(segment, leaderEpochCache) - } catch { - case _: InvalidOffsetException => - val startOffset = segment.baseOffset - warn("Found invalid offset during recovery. Deleting the corrupt segment and " + - s"creating an empty one with starting offset $startOffset") - segment.truncateTo(startOffset) - } - if (truncatedBytes > 0) { - // we had an invalid message, delete all remaining log - warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}") - removeAndDeleteSegments(unflushed.toList, - asyncDelete = true, - reason = LogRecovery) - truncated = true - } - } - } - - val logEndOffsetOption = deleteSegmentsIfLogStartGreaterThanLogEnd() - - if (logSegments.isEmpty) { - // no existing segments, create a new mutable segment beginning at logStartOffset - addSegment(LogSegment.open(dir = dir, - baseOffset = logStartOffset, - config, - time = time, - initFileSize = this.initFileSize, - preallocate = config.preallocate)) - } - - // Update the recovery point if there was a clean shutdown and did not perform any changes to - // the segment. Otherwise, we just ensure that the recovery point is not ahead of the log end - // offset. To ensure correctness and to make it easier to reason about, it's best to only advance - // the recovery point in flush(Long). If we advanced the recovery point here, we could skip recovery for - // unflushed segments if the broker crashed after we checkpoint the recovery point and before we flush the - // segment. - (hadCleanShutdown, logEndOffsetOption) match { - case (true, Some(logEndOffset)) => - recoveryPoint = logEndOffset - logEndOffset - case _ => - val logEndOffset = logEndOffsetOption.getOrElse(activeSegment.readNextOffset) - recoveryPoint = Math.min(recoveryPoint, logEndOffset) - logEndOffset - } - } - - // Rebuild producer state until lastOffset. This method may be called from the recovery code path, and thus must be - // free of all side-effects, i.e. it must not update any log-specific state. - private def rebuildProducerState(lastOffset: Long, - reloadFromCleanShutdown: Boolean, - producerStateManager: ProducerStateManager): Unit = lock synchronized { - checkIfMemoryMappedBufferClosed() - val segments = logSegments - val offsetsToSnapshot = - if (segments.nonEmpty) { - val nextLatestSegmentBaseOffset = lowerSegment(segments.last.baseOffset).map(_.baseOffset) - Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), Some(lastOffset)) - } else { - Seq(Some(lastOffset)) - } - info(s"Loading producer state till offset $lastOffset with message format version ${recordVersion.value}") - - // We want to avoid unnecessary scanning of the log to build the producer state when the broker is being - // upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case, - // but we have to be careful not to assume too much in the presence of broker failures. The two most common - // upgrade cases in which we expect to find no snapshots are the following: - // - // 1. The broker has been upgraded, but the topic is still on the old message format. - // 2. The broker has been upgraded, the topic is on the new message format, and we had a clean shutdown. - // - // If we hit either of these cases, we skip producer state loading and write a new snapshot at the log end - // offset (see below). The next time the log is reloaded, we will load producer state using this snapshot - // (or later snapshots). Otherwise, if there is no snapshot file, then we have to rebuild producer state - // from the first segment. - if (recordVersion.value < RecordBatch.MAGIC_VALUE_V2 || - (producerStateManager.latestSnapshotOffset.isEmpty && reloadFromCleanShutdown)) { - // To avoid an expensive scan through all of the segments, we take empty snapshots from the start of the - // last two segments and the last offset. This should avoid the full scan in the case that the log needs - // truncation. - offsetsToSnapshot.flatten.foreach { offset => - producerStateManager.updateMapEndOffset(offset) - producerStateManager.takeSnapshot() - } - } else { - info(s"Reloading from producer snapshot and rebuilding producer state from offset $lastOffset") - val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset - val producerStateLoadStart = time.milliseconds() - producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds()) - val segmentRecoveryStart = time.milliseconds() - - // Only do the potentially expensive reloading if the last snapshot offset is lower than the log end - // offset (which would be the case on first startup) and there were active producers prior to truncation - // (which could be the case if truncating after initial loading). If there weren't, then truncating - // shouldn't change that fact (although it could cause a producerId to expire earlier than expected), - // and we can skip the loading. This is an optimization for users which are not yet using - // idempotent/transactional features yet. - if (lastOffset > producerStateManager.mapEndOffset && !isEmptyBeforeTruncation) { - val segmentOfLastOffset = floorLogSegment(lastOffset) - - logSegments(producerStateManager.mapEndOffset, lastOffset).foreach { segment => - val startOffset = Utils.max(segment.baseOffset, producerStateManager.mapEndOffset, logStartOffset) - producerStateManager.updateMapEndOffset(startOffset) - - if (offsetsToSnapshot.contains(Some(segment.baseOffset))) - producerStateManager.takeSnapshot() - - val maxPosition = if (segmentOfLastOffset.contains(segment)) { - Option(segment.translateOffset(lastOffset)) - .map(_.position) - .getOrElse(segment.size) - } else { - segment.size - } - - val fetchDataInfo = segment.read(startOffset, - maxSize = Int.MaxValue, - maxPosition = maxPosition, - minOneMessage = false) - if (fetchDataInfo != null) - loadProducersFromRecords(producerStateManager, fetchDataInfo.records) - } - } - producerStateManager.updateMapEndOffset(lastOffset) - producerStateManager.takeSnapshot() - info(s"Producer state recovery took ${producerStateLoadStart - segmentRecoveryStart}ms for snapshot load " + - s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset") + if (localLog.recoveryPoint < offset) { + localLog.updateRecoveryPoint(offset) } } private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized { - rebuildProducerState(lastOffset, reloadFromCleanShutdown, producerStateManager) + lock synchronized { Review comment: Done. Good catch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org