This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1a5971ebdc77f976627da8af30a9dfab596dfcb1
Author: Bob Barrett <bob.barr...@confluent.io>
AuthorDate: Tue Oct 8 14:06:36 2019 -0700

    KAFKA-7190; Retain producer state until transactionalIdExpiration time 
passes (#7388)
    
    As described in KIP-360, this patch changes producer state retention so 
that producer state remains cached even after it is removed from the log. 
Producer state will only be removed now when the transactional id expiration 
time has passed. This is intended to reduce the incidence of 
UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a 
topic has a short retention time. Tested with unit tests.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala            |  1 -
 .../scala/kafka/log/ProducerStateManager.scala     | 58 +++-----------------
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 13 +++--
 .../unit/kafka/log/ProducerStateManagerTest.scala  | 63 ++++------------------
 4 files changed, 27 insertions(+), 108 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 8068c9b..b0af105 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1241,7 +1241,6 @@ class Log(@volatile var dir: File,
           info(s"Incrementing log start offset to $newLogStartOffset")
           logStartOffset = newLogStartOffset
           leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
-          producerStateManager.truncateHead(logStartOffset)
           maybeIncrementFirstUnstableOffset()
         }
       }
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 995bf85..ae5b77a 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -113,7 +113,7 @@ private[log] class ProducerStateEntry(val producerId: Long,
 
   def lastDataOffset: Long = if (isEmpty) -1L else 
batchMetadata.last.lastOffset
 
-  def lastTimestamp = if (isEmpty) RecordBatch.NO_TIMESTAMP else 
batchMetadata.last.timestamp
+  def lastTimestamp: Long = if (isEmpty) RecordBatch.NO_TIMESTAMP else 
batchMetadata.last.timestamp
 
   def lastOffsetDelta : Int = if (isEmpty) 0 else 
batchMetadata.last.offsetDelta
 
@@ -148,8 +148,6 @@ private[log] class ProducerStateEntry(val producerId: Long,
     this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset
   }
 
-  def removeBatchesOlderThan(offset: Long): Unit = 
batchMetadata.dropWhile(_.lastOffset < offset)
-
   def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
     if (batch.producerEpoch != producerEpoch)
        None
@@ -542,7 +540,7 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
   /**
    * Returns the last offset of this map
    */
-  def mapEndOffset = lastMapOffset
+  def mapEndOffset: Long = lastMapOffset
 
   /**
    * Get a copy of the active producers
@@ -557,9 +555,7 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
         case Some(file) =>
           try {
             info(s"Loading producer state from snapshot file '$file'")
-            val loadedProducers = readSnapshot(file).filter { producerEntry =>
-              isProducerRetained(producerEntry, logStartOffset) && 
!isProducerExpired(currentTime, producerEntry)
-            }
+            val loadedProducers = readSnapshot(file).filter { producerEntry => 
!isProducerExpired(currentTime, producerEntry) }
             loadedProducers.foreach(loadProducerEntry)
             lastSnapOffset = offsetFromFile(file)
             lastMapOffset = lastSnapOffset
@@ -600,8 +596,10 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
 
   /**
    * Truncate the producer id mapping to the given offset range and reload the 
entries from the most recent
-   * snapshot in range (if there is one). Note that the log end offset is 
assumed to be less than
-   * or equal to the high watermark.
+   * snapshot in range (if there is one). We delete snapshot files prior to 
the logStartOffset but do not remove
+   * producer state from the map. This means that in-memory and on-disk state 
can diverge, and in the case of
+   * broker failover or unclean shutdown, any in-memory state not persisted in 
the snapshots will be lost.
+   * Note that the log end offset is assumed to be less than or equal to the 
high watermark.
    */
   def truncateAndReload(logStartOffset: Long, logEndOffset: Long, 
currentTimeMs: Long): Unit = {
     // remove all out of range snapshots
@@ -617,8 +615,6 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
       // safe to clear the unreplicated transactions
       unreplicatedTxns.clear()
       loadFromSnapshot(logStartOffset, currentTimeMs)
-    } else {
-      truncateHead(logStartOffset)
     }
   }
 
@@ -692,46 +688,6 @@ class ProducerStateManager(val topicPartition: 
TopicPartition,
    */
   def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => 
offsetFromFile(file))
 
-  private def isProducerRetained(producerStateEntry: ProducerStateEntry, 
logStartOffset: Long): Boolean = {
-    producerStateEntry.removeBatchesOlderThan(logStartOffset)
-    producerStateEntry.lastDataOffset >= logStartOffset
-  }
-
-  /**
-   * When we remove the head of the log due to retention, we need to clean up 
the id map. This method takes
-   * the new start offset and removes all producerIds which have a smaller 
last written offset. Additionally,
-   * we remove snapshots older than the new log start offset.
-   *
-   * Note that snapshots from offsets greater than the log start offset may 
have producers included which
-   * should no longer be retained: these producers will be removed if and when 
we need to load state from
-   * the snapshot.
-   */
-  def truncateHead(logStartOffset: Long): Unit = {
-    val evictedProducerEntries = producers.filter { case (_, producerState) =>
-      !isProducerRetained(producerState, logStartOffset)
-    }
-    val evictedProducerIds = evictedProducerEntries.keySet
-
-    producers --= evictedProducerIds
-    removeEvictedOngoingTransactions(evictedProducerIds)
-    removeUnreplicatedTransactions(logStartOffset)
-
-    if (lastMapOffset < logStartOffset)
-      lastMapOffset = logStartOffset
-
-    deleteSnapshotsBefore(logStartOffset)
-    lastSnapOffset = latestSnapshotOffset.getOrElse(logStartOffset)
-  }
-
-  private def removeEvictedOngoingTransactions(expiredProducerIds: 
collection.Set[Long]): Unit = {
-    val iterator = ongoingTxns.entrySet.iterator
-    while (iterator.hasNext) {
-      val txnEntry = iterator.next()
-      if (expiredProducerIds.contains(txnEntry.getValue.producerId))
-        iterator.remove()
-    }
-  }
-
   private def removeUnreplicatedTransactions(offset: Long): Unit = {
     val iterator = unreplicatedTxns.entrySet.iterator
     while (iterator.hasNext) {
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala 
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index c446563..29b564e 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1070,15 +1070,17 @@ class LogTest {
     log.updateHighWatermark(log.logEndOffset)
     log.maybeIncrementLogStartOffset(1L)
 
-    assertEquals(1, log.activeProducersWithLastSequence.size)
+    // Deleting records should not remove producer state
+    assertEquals(2, log.activeProducersWithLastSequence.size)
     val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
     assertTrue(retainedLastSeqOpt.isDefined)
     assertEquals(0, retainedLastSeqOpt.get)
 
     log.close()
 
+    // Because the log start offset did not advance, producer snapshots will 
still be present and the state will be rebuilt
     val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
-    assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
+    assertEquals(2, reloadedLog.activeProducersWithLastSequence.size)
     val reloadedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
     assertEquals(retainedLastSeqOpt, reloadedLastSeqOpt)
   }
@@ -1104,14 +1106,16 @@ class LogTest {
     log.maybeIncrementLogStartOffset(1L)
     log.deleteOldSegments()
 
+    // Deleting records should not remove producer state
     assertEquals(1, log.logSegments.size)
-    assertEquals(1, log.activeProducersWithLastSequence.size)
+    assertEquals(2, log.activeProducersWithLastSequence.size)
     val retainedLastSeqOpt = log.activeProducersWithLastSequence.get(pid2)
     assertTrue(retainedLastSeqOpt.isDefined)
     assertEquals(0, retainedLastSeqOpt.get)
 
     log.close()
 
+    // After reloading log, producer state should not be regenerated
     val reloadedLog = createLog(logDir, logConfig, logStartOffset = 1L)
     assertEquals(1, reloadedLog.activeProducersWithLastSequence.size)
     val reloadedEntryOpt = log.activeProducersWithLastSequence.get(pid2)
@@ -1162,8 +1166,9 @@ class LogTest {
     log.updateHighWatermark(log.logEndOffset)
     log.deleteOldSegments()
 
+    // Producer state should not be removed when deleting log segment
     assertEquals(2, log.logSegments.size)
-    assertEquals(Set(pid2), log.activeProducersWithLastSequence.keySet)
+    assertEquals(Set(pid1, pid2), log.activeProducersWithLastSequence.keySet)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index e98e59e..8500c94 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -561,52 +561,7 @@ class ProducerStateManagerTest {
   }
 
   @Test
-  def testFirstUnstableOffsetAfterEviction(): Unit = {
-    val epoch = 0.toShort
-    val sequence = 0
-    append(stateManager, producerId, epoch, sequence, offset = 99, 
isTransactional = true)
-    assertEquals(Some(99), 
stateManager.firstUnstableOffset.map(_.messageOffset))
-    append(stateManager, 2L, epoch, 0, offset = 106, isTransactional = true)
-    stateManager.truncateHead(100)
-    assertEquals(Some(106), 
stateManager.firstUnstableOffset.map(_.messageOffset))
-  }
-
-  @Test
-  def testTruncateHead(): Unit = {
-    val epoch = 0.toShort
-
-    append(stateManager, producerId, epoch, 0, 0L)
-    append(stateManager, producerId, epoch, 1, 1L)
-    stateManager.takeSnapshot()
-
-    val anotherPid = 2L
-    append(stateManager, anotherPid, epoch, 0, 2L)
-    append(stateManager, anotherPid, epoch, 1, 3L)
-    stateManager.takeSnapshot()
-    assertEquals(Set(2, 4), currentSnapshotOffsets)
-
-    stateManager.truncateHead(2)
-    assertEquals(Set(2, 4), currentSnapshotOffsets)
-    assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
-    assertEquals(None, stateManager.lastEntry(producerId))
-
-    val maybeEntry = stateManager.lastEntry(anotherPid)
-    assertTrue(maybeEntry.isDefined)
-    assertEquals(3L, maybeEntry.get.lastDataOffset)
-
-    stateManager.truncateHead(3)
-    assertEquals(Set(anotherPid), stateManager.activeProducers.keySet)
-    assertEquals(Set(4), currentSnapshotOffsets)
-    assertEquals(4, stateManager.mapEndOffset)
-
-    stateManager.truncateHead(5)
-    assertEquals(Set(), stateManager.activeProducers.keySet)
-    assertEquals(Set(), currentSnapshotOffsets)
-    assertEquals(5, stateManager.mapEndOffset)
-  }
-
-  @Test
-  def testLoadFromSnapshotRemovesNonRetainedProducers(): Unit = {
+  def testLoadFromSnapshotRetainsNonExpiredProducers(): Unit = {
     val epoch = 0.toShort
     val pid1 = 1L
     val pid2 = 2L
@@ -617,13 +572,17 @@ class ProducerStateManagerTest {
     assertEquals(2, stateManager.activeProducers.size)
 
     stateManager.truncateAndReload(1L, 2L, time.milliseconds())
-    assertEquals(1, stateManager.activeProducers.size)
-    assertEquals(None, stateManager.lastEntry(pid1))
+    assertEquals(2, stateManager.activeProducers.size)
+
+    val entry1 = stateManager.lastEntry(pid1)
+    assertTrue(entry1.isDefined)
+    assertEquals(0, entry1.get.lastSeq)
+    assertEquals(0L, entry1.get.lastDataOffset)
 
-    val entry = stateManager.lastEntry(pid2)
-    assertTrue(entry.isDefined)
-    assertEquals(0, entry.get.lastSeq)
-    assertEquals(1L, entry.get.lastDataOffset)
+    val entry2 = stateManager.lastEntry(pid2)
+    assertTrue(entry2.isDefined)
+    assertEquals(0, entry2.get.lastSeq)
+    assertEquals(1L, entry2.get.lastDataOffset)
   }
 
   @Test

Reply via email to