ijuma commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r676047197



##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
       16k is also the default batch size for the producer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to