This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 1a5971ebdc77f976627da8af30a9dfab596dfcb1 Author: Bob Barrett <bob.barr...@confluent.io> AuthorDate: Tue Oct 8 14:06:36 2019 -0700 KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (#7388) As described in KIP-360, this patch changes producer state retention so that producer state remains cached even after it is removed from the log. Producer state will only be removed now when the transactional id expiration time has passed. This is intended to reduce the incidence of UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a topic has a short retention time. Tested with unit tests. Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang <wangg...@gmail.com> --- core/src/main/scala/kafka/log/Log.scala | 1 - .../scala/kafka/log/ProducerStateManager.scala | 58 +++----------------- core/src/test/scala/unit/kafka/log/LogTest.scala | 13 +++-- .../unit/kafka/log/ProducerStateManagerTest.scala | 63 ++++------------------ 4 files changed, 27 insertions(+), 108 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8068c9b..b0af105 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1241,7 +1241,6 @@ class Log(@volatile var dir: File, info(s"Incrementing log start offset to $newLogStartOffset") logStartOffset = newLogStartOffset leaderEpochCache.foreach(_.truncateFromStart(logStartOffset)) - producerStateManager.truncateHead(logStartOffset) maybeIncrementFirstUnstableOffset() } } diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index 995bf85..ae5b77a 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -113,7 +113,7 @@ private[log] class ProducerStateEntry(val producerId: Long, def lastDataOffset: Long = if (isEmpty) -1L else batchMetadata.last.lastOffset - def lastTimestamp = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp + def lastTimestamp: Long = if (isEmpty) RecordBatch.NO_TIMESTAMP else batchMetadata.last.timestamp def lastOffsetDelta : Int = if (isEmpty) 0 else batchMetadata.last.offsetDelta @@ -148,8 +148,6 @@ private[log] class ProducerStateEntry(val producerId: Long, this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset } - def removeBatchesOlderThan(offset: Long): Unit = batchMetadata.dropWhile(_.lastOffset < offset) - def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = { if (batch.producerEpoch != producerEpoch) None @@ -542,7 +540,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * Returns the last offset of this map */ - def mapEndOffset = lastMapOffset + def mapEndOffset: Long = lastMapOffset /** * Get a copy of the active producers @@ -557,9 +555,7 @@ class ProducerStateManager(val topicPartition: TopicPartition, case Some(file) => try { info(s"Loading producer state from snapshot file '$file'") - val loadedProducers = readSnapshot(file).filter { producerEntry => - isProducerRetained(producerEntry, logStartOffset) && !isProducerExpired(currentTime, producerEntry) - } + val loadedProducers = readSnapshot(file).filter { producerEntry => !isProducerExpired(currentTime, producerEntry) } loadedProducers.foreach(loadProducerEntry) lastSnapOffset = offsetFromFile(file) lastMapOffset = lastSnapOffset @@ -600,8 +596,10 @@ class ProducerStateManager(val topicPartition: TopicPartition, /** * Truncate the producer id mapping to the given offset range and reload the entries from the most recent - * snapshot in range (if there is one). Note that the log end offset is assumed to be less than - * or equal to the high watermark. + * snapshot in range (if there is one). We delete snapshot files prior to the logStartOffset but do not remove + * producer state from the map. This means that in-memory and on-disk state can diverge, and in the case of + * broker failover or unclean shutdown, any in-memory state not persisted in the snapshots will be lost. + * Note that the log end offset is assumed to be less than or equal to the high watermark. */ def truncateAndReload(logStartOffset: Long, logEndOffset: Long, currentTimeMs: Long): Unit = { // remove all out of range snapshots @@ -617,8 +615,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, // safe to clear the unreplicated transactions unreplicatedTxns.clear() loadFromSnapshot(logStartOffset, currentTimeMs) - } else { - truncateHead(logStartOffset) } } @@ -692,46 +688,6 @@ class ProducerStateManager(val topicPartition: TopicPartition, */ def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) - private def isProducerRetained(producerStateEntry: ProducerStateEntry, logStartOffset: Long): Boolean = { - producerStateEntry.removeBatchesOlderThan(logStartOffset) - producerStateEntry.lastDataOffset >= logStartOffset - } - - /** - * When we remove the head of the log due to retention, we need to clean up the id map. This method takes - * the new start offset and removes all producerIds which have a smaller last written offset. Additionally, - * we remove snapshots older than the new log start offset. - * - * Note that snapshots from offsets greater than the log start offset may have producers included which - * should no longer be retained: these producers will be removed if and when we need to load state from - * the snapshot. - */ - def truncateHead(logStartOffset: Long): Unit = { - val evictedProducerEntries = producers.filter { case (_, producerState) => - !isProducerRetained(producerState, logStartOffset) - } - val evictedProducerIds = evictedProducerEntries.keySet - - producers --= evictedProducerIds - removeEvictedOngoingTransactions(evictedProducerIds) - removeUnreplicatedTransactions(logStartOffset) - - if (lastMapOffset < logStartOffset) - lastMapOffset = logStartOffset - - deleteSnapshotsBefore(logStartOffset) - lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset) - } - - private def removeEvictedOngoingTransactions(expiredProducerIds: collection.Set[Long]): Unit = { - val iterator = ongoingTxns.entrySet.iterator - while (iterator.hasNext) { - val txnEntry = iterator.next() - if (expiredProducerIds.contains(txnEntry.getValue.producerId)) - iterator.remove() - } - } - private def removeUnreplicatedTransactions(offset: Long): Unit = { val iterator = unreplicatedTxns.entrySet.iterator while (iterator.hasNext) { diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index c446563..29b564e 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -1070,15 +1070,17 @@ class LogTest { log.updateHighWatermark(log.logEndOffset) log.maybeIncrementLogStartOffset(1L) - assertEquals(1, log.activeProducersWithLastSequence.size) + // Deleting records should not remove producer state + assertEquals(2, log.activeProducersWithLastSequence.size) val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) assertTrue(retainedLastSeqOpt.isDefined) assertEquals(0, retainedLastSeqOpt.get) log.close() + // Because the log start offset did not advance, producer snapshots will still be present and the state will be rebuilt val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) - assertEquals(1, reloadedLog.activeProducersWithLastSequence.size) + assertEquals(2, reloadedLog.activeProducersWithLastSequence.size) val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt) } @@ -1104,14 +1106,16 @@ class LogTest { log.maybeIncrementLogStartOffset(1L) log.deleteOldSegments() + // Deleting records should not remove producer state assertEquals(1, log.logSegments.size) - assertEquals(1, log.activeProducersWithLastSequence.size) + assertEquals(2, log.activeProducersWithLastSequence.size) val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2) assertTrue(retainedLastSeqOpt.isDefined) assertEquals(0, retainedLastSeqOpt.get) log.close() + // After reloading log, producer state should not be regenerated val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L) assertEquals(1, reloadedLog.activeProducersWithLastSequence.size) val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2) @@ -1162,8 +1166,9 @@ class LogTest { log.updateHighWatermark(log.logEndOffset) log.deleteOldSegments() + // Producer state should not be removed when deleting log segment assertEquals(2, log.logSegments.size) - assertEquals(Set(pid2), log.activeProducersWithLastSequence.keySet) + assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet) } @Test diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala index e98e59e..8500c94 100644 --- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala @@ -561,52 +561,7 @@ class ProducerStateManagerTest { } @Test - def testFirstUnstableOffsetAfterEviction(): Unit = { - val epoch = 0.toShort - val sequence = 0 - append(stateManager, producerId, epoch, sequence, offset = 99, isTransactional = true) - assertEquals(Some(99), stateManager.firstUnstableOffset.map(_.messageOffset)) - append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true) - stateManager.truncateHead(100) - assertEquals(Some(106), stateManager.firstUnstableOffset.map(_.messageOffset)) - } - - @Test - def testTruncateHead(): Unit = { - val epoch = 0.toShort - - append(stateManager, producerId, epoch, 0, 0L) - append(stateManager, producerId, epoch, 1, 1L) - stateManager.takeSnapshot() - - val anotherPid = 2L - append(stateManager, anotherPid, epoch, 0, 2L) - append(stateManager, anotherPid, epoch, 1, 3L) - stateManager.takeSnapshot() - assertEquals(Set(2, 4), currentSnapshotOffsets) - - stateManager.truncateHead(2) - assertEquals(Set(2, 4), currentSnapshotOffsets) - assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) - assertEquals(None, stateManager.lastEntry(producerId)) - - val maybeEntry = stateManager.lastEntry(anotherPid) - assertTrue(maybeEntry.isDefined) - assertEquals(3L, maybeEntry.get.lastDataOffset) - - stateManager.truncateHead(3) - assertEquals(Set(anotherPid), stateManager.activeProducers.keySet) - assertEquals(Set(4), currentSnapshotOffsets) - assertEquals(4, stateManager.mapEndOffset) - - stateManager.truncateHead(5) - assertEquals(Set(), stateManager.activeProducers.keySet) - assertEquals(Set(), currentSnapshotOffsets) - assertEquals(5, stateManager.mapEndOffset) - } - - @Test - def testLoadFromSnapshotRemovesNonRetainedProducers(): Unit = { + def testLoadFromSnapshotRetainsNonExpiredProducers(): Unit = { val epoch = 0.toShort val pid1 = 1L val pid2 = 2L @@ -617,13 +572,17 @@ class ProducerStateManagerTest { assertEquals(2, stateManager.activeProducers.size) stateManager.truncateAndReload(1L, 2L, time.milliseconds()) - assertEquals(1, stateManager.activeProducers.size) - assertEquals(None, stateManager.lastEntry(pid1)) + assertEquals(2, stateManager.activeProducers.size) + + val entry1 = stateManager.lastEntry(pid1) + assertTrue(entry1.isDefined) + assertEquals(0, entry1.get.lastSeq) + assertEquals(0L, entry1.get.lastDataOffset) - val entry = stateManager.lastEntry(pid2) - assertTrue(entry.isDefined) - assertEquals(0, entry.get.lastSeq) - assertEquals(1L, entry.get.lastDataOffset) + val entry2 = stateManager.lastEntry(pid2) + assertTrue(entry2.isDefined) + assertEquals(0, entry2.get.lastSeq) + assertEquals(1L, entry2.get.lastDataOffset) } @Test