[
https://issues.apache.org/jira/browse/KAFKA-7076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525657#comment-16525657
]
ASF GitHub Bot commented on KAFKA-7076:
---------------------------------------
hachikuji closed pull request #5254: KAFKA-7076: Prevent reading through log
data for constructing producer state when using old message format
URL: https://github.com/apache/kafka/pull/5254
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 3036018dbda..e4be8fcc43d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -246,6 +246,10 @@ class Log(@volatile var dir: File,
// The earliest leader epoch may not be flushed during a hard failure.
Recover it here.
_leaderEpochCache.clearAndFlushEarliest(logStartOffset)
+ // Any segment loading or recovery code must not use producerStateManager,
so that we can build the full state here
+ // from scratch.
+ if (!producerStateManager.isEmpty)
+ throw new IllegalStateException("Producer state must be empty during log
initialization")
loadProducerState(logEndOffset, reloadFromCleanShutdown =
hasCleanShutdownFile)
info(s"Completed load of log with ${segments.size} segments, log start
offset $logStartOffset and " +
@@ -417,25 +421,14 @@ class Log(@volatile var dir: File,
* @return The number of bytes truncated from the segment
* @throws LogSegmentOffsetOverflowException if the segment contains
messages that cause index offset overflow
*/
- private def recoverSegment(segment: LogSegment, leaderEpochCache:
Option[LeaderEpochCache] = None): Int = lock synchronized {
- val stateManager = new ProducerStateManager(topicPartition, dir,
maxProducerIdExpirationMs)
- stateManager.truncateAndReload(logStartOffset, segment.baseOffset,
time.milliseconds)
- logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach {
segment =>
- val startOffset = math.max(segment.baseOffset, stateManager.mapEndOffset)
- val fetchDataInfo = segment.read(startOffset, None, Int.MaxValue)
- if (fetchDataInfo != null)
- loadProducersFromLog(stateManager, fetchDataInfo.records)
- }
- stateManager.updateMapEndOffset(segment.baseOffset)
-
- // take a snapshot for the first recovered segment to avoid reloading all
the segments if we shutdown before we
- // checkpoint the recovery point
- stateManager.takeSnapshot()
- val bytesTruncated = segment.recover(stateManager, leaderEpochCache)
-
+ private def recoverSegment(segment: LogSegment,
+ leaderEpochCache: Option[LeaderEpochCache] =
None): Int = lock synchronized {
+ val producerStateManager = new ProducerStateManager(topicPartition, dir,
maxProducerIdExpirationMs)
+ rebuildProducerState(segment.baseOffset, reloadFromCleanShutdown = false,
producerStateManager)
+ val bytesTruncated = segment.recover(producerStateManager,
leaderEpochCache)
// once we have recovered the segment's data, take a snapshot to ensure
that we won't
// need to reload the same segment again while recovering another segment.
- stateManager.takeSnapshot()
+ producerStateManager.takeSnapshot()
bytesTruncated
}
@@ -565,10 +558,22 @@ class Log(@volatile var dir: File,
recoveryPoint
}
- private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown:
Boolean): Unit = lock synchronized {
+ // Rebuild producer state until lastOffset. This method may be called from
the recovery code path, and thus must be
+ // free of all side-effects, i.e. it must not update any log-specific state.
+ private def rebuildProducerState(lastOffset: Long,
+ reloadFromCleanShutdown: Boolean,
+ producerStateManager:
ProducerStateManager): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
val messageFormatVersion = config.messageFormatVersion.recordVersion.value
- info(s"Loading producer state from offset $lastOffset with message format
version $messageFormatVersion")
+ val segments = logSegments
+ val offsetsToSnapshot =
+ if (segments.nonEmpty) {
+ val nextLatestSegmentBaseOffset =
lowerSegment(segments.last.baseOffset).map(_.baseOffset)
+ Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset),
Some(lastOffset))
+ } else {
+ Seq(Some(lastOffset))
+ }
+ info(s"Loading producer state till offset $lastOffset with message format
version $messageFormatVersion")
// We want to avoid unnecessary scanning of the log to build the producer
state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot
files to detect the upgrade case,
@@ -582,13 +587,11 @@ class Log(@volatile var dir: File,
// offset (see below). The next time the log is reloaded, we will load
producer state using this snapshot
// (or later snapshots). Otherwise, if there is no snapshot file, then we
have to rebuild producer state
// from the first segment.
-
- if (producerStateManager.latestSnapshotOffset.isEmpty &&
(messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 || reloadFromCleanShutdown))
{
+ if (messageFormatVersion < RecordBatch.MAGIC_VALUE_V2 ||
+ (producerStateManager.latestSnapshotOffset.isEmpty &&
reloadFromCleanShutdown)) {
// To avoid an expensive scan through all of the segments, we take empty
snapshots from the start of the
// last two segments and the last offset. This should avoid the full
scan in the case that the log needs
// truncation.
- val nextLatestSegmentBaseOffset =
lowerSegment(activeSegment.baseOffset).map(_.baseOffset)
- val offsetsToSnapshot = Seq(nextLatestSegmentBaseOffset,
Some(activeSegment.baseOffset), Some(lastOffset))
offsetsToSnapshot.flatten.foreach { offset =>
producerStateManager.updateMapEndOffset(offset)
producerStateManager.takeSnapshot()
@@ -607,19 +610,25 @@ class Log(@volatile var dir: File,
logSegments(producerStateManager.mapEndOffset, lastOffset).foreach {
segment =>
val startOffset = Utils.max(segment.baseOffset,
producerStateManager.mapEndOffset, logStartOffset)
producerStateManager.updateMapEndOffset(startOffset)
- producerStateManager.takeSnapshot()
+
+ if (offsetsToSnapshot.contains(Some(segment.baseOffset)))
+ producerStateManager.takeSnapshot()
val fetchDataInfo = segment.read(startOffset, Some(lastOffset),
Int.MaxValue)
if (fetchDataInfo != null)
loadProducersFromLog(producerStateManager, fetchDataInfo.records)
}
}
-
producerStateManager.updateMapEndOffset(lastOffset)
- updateFirstUnstableOffset()
+ producerStateManager.takeSnapshot()
}
}
+ private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown:
Boolean): Unit = lock synchronized {
+ rebuildProducerState(lastOffset, reloadFromCleanShutdown,
producerStateManager)
+ updateFirstUnstableOffset()
+ }
+
private def loadProducersFromLog(producerStateManager: ProducerStateManager,
records: Records): Unit = {
val loadedProducers = mutable.Map.empty[Long, ProducerAppendInfo]
val completedTxns = ListBuffer.empty[CompletedTxn]
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala
b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 2babd007a5a..6f246eedf1f 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -188,9 +188,9 @@ class OffsetIndex(_file: File, baseOffset: Long,
maxIndexSize: Int = -1, writabl
}
override def sanityCheck() {
- if (_entries != 0 && _lastOffset <= baseOffset)
+ if (_entries != 0 && _lastOffset < baseOffset)
throw new CorruptIndexException(s"Corrupt index found, index file
(${file.getAbsolutePath}) has non-zero size " +
- s"but the last offset is ${_lastOffset} which is no greater than the
base offset $baseOffset.")
+ s"but the last offset is ${_lastOffset} which is less than the base
offset $baseOffset.")
if (length % entrySize != 0)
throw new CorruptIndexException(s"Index file ${file.getAbsolutePath} is
corrupt, found $length bytes which is " +
s"neither positive nor a multiple of $entrySize.")
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 3b5b2fa7875..9a9bc613585 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
import java.nio.file.{Files, Paths}
import java.util.Properties
+import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
import kafka.common.{OffsetsOutOfOrderException,
UnexpectedAppendOffsetException}
import kafka.log.Log.DeleteDirSuffix
import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache}
@@ -206,8 +207,6 @@ class LogTest {
// Reload after unclean shutdown with recoveryPoint set to log end offset
log = createLog(logDir, logConfig, recoveryPoint = logEndOffset)
- // Note that we don't maintain the guarantee of having a snapshot for the
2 most recent segments in this case
- expectedSnapshotOffsets = Vector(log.logSegments.last.baseOffset,
log.logEndOffset)
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
log.close()
@@ -215,15 +214,24 @@ class LogTest {
// Reload after unclean shutdown with recoveryPoint set to 0
log = createLog(logDir, logConfig, recoveryPoint = 0L)
- // Is this working as intended?
+ // We progressively create a snapshot for each segment after the recovery
point
expectedSnapshotOffsets = log.logSegments.map(_.baseOffset).tail.toVector
:+ log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
log.close()
}
@Test
- def testProducerSnapshotsRecoveryAfterUncleanShutdown(): Unit = {
- val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10)
+ def testProducerSnapshotsRecoveryAfterUncleanShutdownV1(): Unit = {
+
testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.minSupportedFor(RecordVersion.V1).version)
+ }
+
+ @Test
+ def testProducerSnapshotsRecoveryAfterUncleanShutdownCurrentMessageFormat():
Unit = {
+
testProducerSnapshotsRecoveryAfterUncleanShutdown(ApiVersion.latestVersion.version)
+ }
+
+ private def
testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion:
String): Unit = {
+ val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10,
messageFormatVersion = messageFormatVersion)
var log = createLog(logDir, logConfig)
assertEquals(None, log.oldestProducerSnapshotOffset)
@@ -247,6 +255,16 @@ class LogTest {
val segmentsWithReads = ArrayBuffer[LogSegment]()
val recoveredSegments = ArrayBuffer[LogSegment]()
+ val expectedSegmentsWithReads = ArrayBuffer[Long]()
+ val expectedSnapshotOffsets = ArrayBuffer[Long]()
+
+ if (logConfig.messageFormatVersion < KAFKA_0_11_0_IV0) {
+ expectedSegmentsWithReads += activeSegmentOffset
+ expectedSnapshotOffsets ++=
log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
+ } else {
+ expectedSegmentsWithReads ++= segOffsetsBeforeRecovery ++
Seq(activeSegmentOffset)
+ expectedSnapshotOffsets ++=
log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
+ }
def createLogWithInterceptedReads(recoveryPoint: Long) = {
val maxProducerIdExpirationMs = 60 * 60 * 1000
@@ -283,9 +301,8 @@ class LogTest {
ProducerStateManager.deleteSnapshotsBefore(logDir,
segmentOffsets(segmentOffsets.size - 2))
log = createLogWithInterceptedReads(offsetForRecoveryPointSegment)
// We will reload all segments because the recovery point is behind the
producer snapshot files (pre KAFKA-5829 behaviour)
- assertEquals(segOffsetsBeforeRecovery, segmentsWithReads.map(_.baseOffset)
-- Seq(activeSegmentOffset))
+ assertEquals(expectedSegmentsWithReads,
segmentsWithReads.map(_.baseOffset))
assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
- var expectedSnapshotOffsets = segmentOffsets.takeRight(4) :+
log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
log.close()
segmentsWithReads.clear()
@@ -297,13 +314,12 @@ class LogTest {
log = createLogWithInterceptedReads(recoveryPoint = recoveryPoint)
assertEquals(Seq(activeSegmentOffset), segmentsWithReads.map(_.baseOffset))
assertEquals(segOffsetsAfterRecovery, recoveredSegments.map(_.baseOffset))
- expectedSnapshotOffsets =
log.logSegments.map(_.baseOffset).toVector.takeRight(4) :+ log.logEndOffset
assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
// Verify that we keep 2 snapshot files if we checkpoint the log end offset
log.deleteSnapshotsAfterRecoveryPointCheckpoint()
- expectedSnapshotOffsets =
log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
- assertEquals(expectedSnapshotOffsets, listProducerSnapshotOffsets)
+ val expectedSnapshotsAfterDelete =
log.logSegments.map(_.baseOffset).toVector.takeRight(2) :+ log.logEndOffset
+ assertEquals(expectedSnapshotsAfterDelete, listProducerSnapshotOffsets)
log.close()
}
@@ -398,6 +414,9 @@ class LogTest {
// We skip directly to updating the map end offset
stateManager.updateMapEndOffset(1L)
EasyMock.expectLastCall()
+ // Finally, we take a snapshot
+ stateManager.takeSnapshot()
+ EasyMock.expectLastCall().once()
EasyMock.replay(stateManager)
@@ -410,14 +429,18 @@ class LogTest {
def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
val stateManager = EasyMock.mock(classOf[ProducerStateManager])
- EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
-
stateManager.updateMapEndOffset(0L)
EasyMock.expectLastCall().anyTimes()
stateManager.takeSnapshot()
EasyMock.expectLastCall().anyTimes()
+ EasyMock.expect(stateManager.isEmpty).andReturn(true)
+ EasyMock.expectLastCall().once()
+
+ EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
+ EasyMock.expectLastCall().once()
+
EasyMock.replay(stateManager)
val logProps = new Properties()
@@ -443,14 +466,18 @@ class LogTest {
def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
val stateManager = EasyMock.mock(classOf[ProducerStateManager])
- EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
-
stateManager.updateMapEndOffset(0L)
EasyMock.expectLastCall().anyTimes()
stateManager.takeSnapshot()
EasyMock.expectLastCall().anyTimes()
+ EasyMock.expect(stateManager.isEmpty).andReturn(true)
+ EasyMock.expectLastCall().once()
+
+ EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
+ EasyMock.expectLastCall().once()
+
EasyMock.replay(stateManager)
val cleanShutdownFile = createCleanShutdownFile()
@@ -487,6 +514,12 @@ class LogTest {
stateManager.takeSnapshot()
EasyMock.expectLastCall().anyTimes()
+ EasyMock.expect(stateManager.isEmpty).andReturn(true)
+ EasyMock.expectLastCall().once()
+
+ EasyMock.expect(stateManager.firstUnstableOffset).andReturn(None)
+ EasyMock.expectLastCall().once()
+
EasyMock.replay(stateManager)
val cleanShutdownFile = createCleanShutdownFile()
@@ -644,8 +677,12 @@ class LogTest {
assertEquals(2, log.latestProducerStateEndOffset)
log.truncateTo(1)
- assertEquals(None, log.latestProducerSnapshotOffset)
+ assertEquals(Some(1), log.latestProducerSnapshotOffset)
assertEquals(1, log.latestProducerStateEndOffset)
+
+ log.truncateTo(0)
+ assertEquals(None, log.latestProducerSnapshotOffset)
+ assertEquals(0, log.latestProducerStateEndOffset)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index 1529597cf41..f47da995dd5 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -178,6 +178,15 @@ class OffsetIndexTest extends JUnitSuite {
// mmap should be null after unmap causing lookup to throw a NPE
intercept[NullPointerException](idx.lookup(1))
}
+
+ @Test
+ def testSanityLastOffsetEqualToBaseOffset(): Unit = {
+ // Test index sanity for the case where the last offset appended to the
index is equal to the base offset
+ val baseOffset = 20L
+ val idx = new OffsetIndex(nonExistentTempFile(), baseOffset = baseOffset,
maxIndexSize = 10 * 8)
+ idx.append(baseOffset, 0)
+ idx.sanityCheck()
+ }
def assertWriteFails[T](message: String, idx: OffsetIndex, offset: Int,
klass: Class[T]) {
try {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Broker startup could be inefficient when using old message format
> -----------------------------------------------------------------
>
> Key: KAFKA-7076
> URL: https://issues.apache.org/jira/browse/KAFKA-7076
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
> Reporter: Dhruvil Shah
> Assignee: Dhruvil Shah
> Priority: Major
>
> During broker startup, we call `Log#recoverSegment` when we find corrupted
> indexes, for segments beyond the last check-pointed recovery point, and for
> any ".swap" segments created by log cleaner. One of the things
> `Log#recoverSegments` does is to build up the producer state, starting from
> any previous snapshot file that is available. For logs using message formats
> older than V2, we could skip building up this producer state which would
> essentially speed up recovery.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)