ijuma commented on a change in pull request #11098: URL: https://github.com/apache/kafka/pull/11098#discussion_r676047197
########## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ########## @@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int, } } - def enableTransactionalIdExpiration(): Unit = { - scheduler.schedule("transactionalId-expiration", () => { - val now = time.milliseconds() - inReadLock(stateLock) { - val transactionalIdByPartition: Map[Int, mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] = - transactionMetadataCache.flatMap { case (_, entry) => - entry.metadataPerTransactionalId.filter { case (_, txnMetadata) => txnMetadata.state match { - case Empty | CompleteCommit | CompleteAbort => true - case _ => false - } - }.filter { case (_, txnMetadata) => - txnMetadata.txnLastUpdateTimestamp <= now - config.transactionalIdExpirationMs - }.map { case (transactionalId, txnMetadata) => - val txnMetadataTransition = txnMetadata.inLock { - txnMetadata.prepareDead() + private def collectExpiredTransactionalIds( + partitionId: Int, + partitionCacheEntry: TxnMetadataCacheEntry + ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = { + val currentTimeMs = time.milliseconds() + + inReadLock(stateLock) { + val transactionPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) + replicaManager.getLogConfig(transactionPartition) match { + case Some(logConfig) => + val maxBatchSize = logConfig.maxMessageSize + val expired = mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata] + + lazy val recordsBuilder = MemoryRecords.builder( + ByteBuffer.allocate(math.min(16384, maxBatchSize)), Review comment: 16k is also the default batch size for the producer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org