hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674117013



##########
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
     }
   }
 
-  def enableTransactionalIdExpiration(): Unit = {
-    scheduler.schedule("transactionalId-expiration", () => {
-      val now = time.milliseconds()
-      inReadLock(stateLock) {
-        val transactionalIdByPartition: Map[Int, 
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
-          transactionMetadataCache.flatMap { case (_, entry) =>
-            entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => 
txnMetadata.state match {
-              case Empty | CompleteCommit | CompleteAbort => true
-              case _ => false
-            }
-            }.filter { case (_, txnMetadata) =>
-              txnMetadata.txnLastUpdateTimestamp <= now - 
config.transactionalIdExpirationMs
-            }.map { case (transactionalId, txnMetadata) =>
-              val txnMetadataTransition = txnMetadata.inLock {
-                txnMetadata.prepareDead()
+  private def collectExpiredTransactionalIds(
+    partitionId: Int,
+    partitionCacheEntry: TxnMetadataCacheEntry
+  ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+    val currentTimeMs = time.milliseconds()
+
+    inReadLock(stateLock) {
+      val transactionPartition = new 
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+      replicaManager.getLogConfig(transactionPartition) match {
+        case Some(logConfig) =>
+          val maxBatchSize = logConfig.maxMessageSize
+          val expired = 
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+          lazy val recordsBuilder = MemoryRecords.builder(
+            ByteBuffer.allocate(math.min(16384, maxBatchSize)),

Review comment:
       Filling a batch is an uncommon scenario, so I thought that allocating a 
full 1MB (default max.message.bytes) buffer each time the task ran seemed 
excessive. 16K seemed more reasonable for the common case. Another thought that 
I considered was a statically allocated  buffer, but that seemed like overkill.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to