dajac commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r673818846



##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
       For my own education, why do we use `16384` as a minimum here?

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
##########
@@ -629,6 +631,62 @@ class TransactionStateManagerTest {
     verifyMetadataDoesExistAndIsUsable(transactionalId2)
   }
 
+  @Test
+  def testTransactionExpirationShouldRespectBatchSize(): Unit = {
+    val partitionIds = 0 until numPartitions
+    val maxBatchSize = 512
+
+    loadTransactionsForPartitions(partitionIds)
+
+    val allTransactionalIds = mutable.Set.empty[String]
+    for (i <- 0 to 1000) {
+      val txnlId = s"id_$i"
+      val producerId = i
+      val txnMetadata = transactionMetadata(txnlId, producerId)
+      txnMetadata.txnLastUpdateTimestamp = time.milliseconds() - 
txnConfig.transactionalIdExpirationMs
+      transactionManager.putTransactionStateIfNotExists(txnMetadata)
+      allTransactionalIds += txnlId
+    }
+
+    def removeExpiredTransactionalIds(): Map[TopicPartition, MemoryRecords] = {
+      EasyMock.reset(replicaManager)
+      expectLogConfig(partitionIds, maxBatchSize)
+
+      val appendedRecordsCapture = expectTransactionalIdExpiration(Errors.NONE)
+      EasyMock.replay(replicaManager)
+
+      transactionManager.removeExpiredTransactionalIds()
+      EasyMock.verify(replicaManager)
+
+      assertTrue(appendedRecordsCapture.hasCaptured)
+      appendedRecordsCapture.getValue
+    }
+
+    def hasUnexpiredTransactionalIds: Boolean = {
+      val unexpiredTransactions = 
transactionManager.listTransactionStates(Set.empty, Set.empty)
+        .transactionStates.asScala
+      assertTrue(unexpiredTransactions.forall(txn => txn.transactionState == 
Empty.name))
+      unexpiredTransactions.nonEmpty
+    }
+
+    var iterations = 0
+    val expiredTransactionalIds = mutable.Set.empty[String]
+    while (hasUnexpiredTransactionalIds) {
+      removeExpiredTransactionalIds().forKeyValue { (_, records) =>
+        assertTrue(records.sizeInBytes() < maxBatchSize)

Review comment:
       nit: We might be able to drop the `()` of `sizeInBytes` and `records` 
below.

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, 
currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any 
remaining
+                // transactionalIds eligible for expiration can be picked next 
time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, 
entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            
partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) 
=>
-            val deletes: Array[SimpleRecord] = 
transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, 
TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = 
transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: 
$transactionalId" +
-                        s" from cache. Tombstone append error code: 
${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, 
producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)
+      }
+    }
+  }
+
+  private def shouldExpire(
+    txnMetadata: TransactionMetadata,
+    currentTimeMs: Long
+  ): Boolean = {
+    val isExpirableState = txnMetadata.state match {
+      case Empty | CompleteCommit | CompleteAbort => true
+      case _ => false
+    }
+
+    isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs - 
config.transactionalIdExpirationMs
+  }
+
+  private def maybeAppendExpiration(
+    txnMetadata: TransactionMetadata,
+    recordsBuilder: MemoryRecordsBuilder,
+    currentTimeMs: Long,
+    maxBatchSize: Int
+  ): Boolean = {
+    val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+    if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null, 
Record.EMPTY_HEADERS)) {
+      recordsBuilder.append(currentTimeMs, keyBytes, null, 
Record.EMPTY_HEADERS)
+      true
+    } else {
+      if (recordsBuilder.numRecords == 0) {

Review comment:
       It seems to me that this condition will never be true because 
`hasRoomFor` returns `true` if `numRecords == 0` so the first record will 
always be appended. Were you trying to warn if we can't even append the first 
record to the batch?

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, 
currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any 
remaining
+                // transactionalIds eligible for expiration can be picked next 
time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, 
entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            
partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) 
=>
-            val deletes: Array[SimpleRecord] = 
transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, 
TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = 
transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: 
$transactionalId" +
-                        s" from cache. Tombstone append error code: 
${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, 
producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)
+      }
+    }
+  }
+
+  private def shouldExpire(
+    txnMetadata: TransactionMetadata,
+    currentTimeMs: Long
+  ): Boolean = {
+    val isExpirableState = txnMetadata.state match {
+      case Empty | CompleteCommit | CompleteAbort => true
+      case _ => false
+    }
+
+    isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs - 
config.transactionalIdExpirationMs
+  }
+
+  private def maybeAppendExpiration(
+    txnMetadata: TransactionMetadata,
+    recordsBuilder: MemoryRecordsBuilder,
+    currentTimeMs: Long,
+    maxBatchSize: Int
+  ): Boolean = {
+    val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+    if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null, 
Record.EMPTY_HEADERS)) {
+      recordsBuilder.append(currentTimeMs, keyBytes, null, 
Record.EMPTY_HEADERS)
+      true
+    } else {
+      if (recordsBuilder.numRecords == 0) {
+        warn(s"Failed to write expiration record for transactionalId 
${txnMetadata.transactionalId} " +
+          s"because the tombstone record exceeds the max allowed batch size of 
$maxBatchSize")
+      }
+      false
+    }
+  }
+
+  private[transaction] def removeExpiredTransactionalIds(): Unit = {
+    inReadLock(stateLock) {
+      val expirationRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
+      val expiredTransactionalIds = mutable.Map.empty[TopicPartition, 
Iterable[TransactionalIdCoordinatorEpochAndMetadata]]
+
+      transactionMetadataCache.forKeyValue { (partitionId, 
partitionCacheEntry) =>
+        val (expiredForPartition, partitionRecords) = 
collectExpiredTransactionalIds(partitionId, partitionCacheEntry)
+        if (expiredForPartition.nonEmpty) {
+          val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)

Review comment:
       nit: It might be worth instantiating this one earlier and passing it to 
`collectExpiredTransactionalIds` instead of passing the `partitionId`. This 
would avoid having to instantiate it twice.

##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+            TransactionLog.EnforcedCompressionType,
+            TimestampType.CREATE_TIME,
+            0L,
+            maxBatchSize
+          )
+
+          partitionCacheEntry.metadataPerTransactionalId.foreachWhile { 
(transactionalId, txnMetadata) =>
+            txnMetadata.inLock {
+              if (!shouldExpire(txnMetadata, currentTimeMs)) {
+                true
+              } else if (maybeAppendExpiration(txnMetadata, recordsBuilder, 
currentTimeMs, maxBatchSize)) {
+                val transitMetadata = txnMetadata.prepareDead()
+                expired += TransactionalIdCoordinatorEpochAndMetadata(
+                  transactionalId,
+                  partitionCacheEntry.coordinatorEpoch,
+                  transitMetadata
+                )
+                true
+              } else {
+                // If the batch is full, return false to end the search. Any 
remaining
+                // transactionalIds eligible for expiration can be picked next 
time.
+                false
               }
-              TransactionalIdCoordinatorEpochAndMetadata(transactionalId, 
entry.coordinatorEpoch, txnMetadataTransition)
             }
-          }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-            
partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
           }
 
-        val recordsPerPartition = transactionalIdByPartition
-          .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas) 
=>
-            val deletes: Array[SimpleRecord] = 
transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
-              new SimpleRecord(now, 
TransactionLog.keyToBytes(entry.transactionalId), null)
-            }.toArray
-            val records = 
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
-            val topicPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
-            (topicPartition, records)
+          if (expired.isEmpty) {
+            (Seq.empty, MemoryRecords.EMPTY)
+          } else {
+            (expired, recordsBuilder.build())
           }
 
-        def removeFromCacheCallback(responses: collection.Map[TopicPartition, 
PartitionResponse]): Unit = {
-          responses.forKeyValue { (topicPartition, response) =>
-            inReadLock(stateLock) {
-              val toRemove = 
transactionalIdByPartition(topicPartition.partition)
-              transactionMetadataCache.get(topicPartition.partition).foreach { 
txnMetadataCacheEntry =>
-                toRemove.foreach { idCoordinatorEpochAndMetadata =>
-                  val transactionalId = 
idCoordinatorEpochAndMetadata.transactionalId
-                  val txnMetadata = 
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
-                  txnMetadata.inLock {
-                    if (txnMetadataCacheEntry.coordinatorEpoch == 
idCoordinatorEpochAndMetadata.coordinatorEpoch
-                      && txnMetadata.pendingState.contains(Dead)
-                      && txnMetadata.producerEpoch == 
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
-                      && response.error == Errors.NONE) {
-                      
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
-                    } else {
-                      warn(s"Failed to remove expired transactionalId: 
$transactionalId" +
-                        s" from cache. Tombstone append error code: 
${response.error}," +
-                        s" pendingState: ${txnMetadata.pendingState}, 
producerEpoch: ${txnMetadata.producerEpoch}," +
-                        s" expected producerEpoch: 
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
-                        s" coordinatorEpoch: 
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
-                        s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
-                      txnMetadata.pendingState = None
-                    }
+        case None =>
+          (Seq.empty, MemoryRecords.EMPTY)

Review comment:
       Have you considered logging a warning here? Not being able to look up 
the config means that the partitions is not online. I suppose that previously, 
we would have tried to write to the log and the write would have failed and 
thus warn us at L257.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to