hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r675005086
##########
File path:
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
}
}
- def enableTransactionalIdExpiration(): Unit = {
- scheduler.schedule("transactionalId-expiration", () => {
- val now = time.milliseconds()
- inReadLock(stateLock) {
- val transactionalIdByPartition: Map[Int,
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
- transactionMetadataCache.flatMap { case (_, entry) =>
- entry.metadataPerTransactionalId.filter { case (_, txnMetadata) =>
txnMetadata.state match {
- case Empty | CompleteCommit | CompleteAbort => true
- case _ => false
- }
- }.filter { case (_, txnMetadata) =>
- txnMetadata.txnLastUpdateTimestamp <= now -
config.transactionalIdExpirationMs
- }.map { case (transactionalId, txnMetadata) =>
- val txnMetadataTransition = txnMetadata.inLock {
- txnMetadata.prepareDead()
+ private def collectExpiredTransactionalIds(
+ partitionId: Int,
+ partitionCacheEntry: TxnMetadataCacheEntry
+ ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+ val currentTimeMs = time.milliseconds()
+
+ inReadLock(stateLock) {
+ val transactionPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+ replicaManager.getLogConfig(transactionPartition) match {
+ case Some(logConfig) =>
+ val maxBatchSize = logConfig.maxMessageSize
+ val expired =
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+ lazy val recordsBuilder = MemoryRecords.builder(
+ ByteBuffer.allocate(math.min(16384, maxBatchSize)),
Review comment:
Note that the buffer will still grow to reach the limit of
max.message.bytes. I agree, however, that one hour is a long time to wait. Let
me look into triggering the next run right away.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]