dajac commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674676502
##########
File path:
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
}
}
- def enableTransactionalIdExpiration(): Unit = {
- scheduler.schedule("transactionalId-expiration", () => {
- val now = time.milliseconds()
- inReadLock(stateLock) {
- val transactionalIdByPartition: Map[Int,
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
- transactionMetadataCache.flatMap { case (_, entry) =>
- entry.metadataPerTransactionalId.filter { case (_, txnMetadata) =>
txnMetadata.state match {
- case Empty | CompleteCommit | CompleteAbort => true
- case _ => false
- }
- }.filter { case (_, txnMetadata) =>
- txnMetadata.txnLastUpdateTimestamp <= now -
config.transactionalIdExpirationMs
- }.map { case (transactionalId, txnMetadata) =>
- val txnMetadataTransition = txnMetadata.inLock {
- txnMetadata.prepareDead()
+ private def collectExpiredTransactionalIds(
+ partitionId: Int,
+ partitionCacheEntry: TxnMetadataCacheEntry
+ ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+ val currentTimeMs = time.milliseconds()
+
+ inReadLock(stateLock) {
+ val transactionPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+ replicaManager.getLogConfig(transactionPartition) match {
+ case Some(logConfig) =>
+ val maxBatchSize = logConfig.maxMessageSize
+ val expired =
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+ lazy val recordsBuilder = MemoryRecords.builder(
+ ByteBuffer.allocate(math.min(16384, maxBatchSize)),
Review comment:
I do agree that 16k seems quite reasonable for the common case. The
downside is that we have to wait another hour to clean the remaining ones if
they are many transactions to be expired.
##########
File path:
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
}
}
- def enableTransactionalIdExpiration(): Unit = {
- scheduler.schedule("transactionalId-expiration", () => {
- val now = time.milliseconds()
- inReadLock(stateLock) {
- val transactionalIdByPartition: Map[Int,
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
- transactionMetadataCache.flatMap { case (_, entry) =>
- entry.metadataPerTransactionalId.filter { case (_, txnMetadata) =>
txnMetadata.state match {
- case Empty | CompleteCommit | CompleteAbort => true
- case _ => false
- }
- }.filter { case (_, txnMetadata) =>
- txnMetadata.txnLastUpdateTimestamp <= now -
config.transactionalIdExpirationMs
- }.map { case (transactionalId, txnMetadata) =>
- val txnMetadataTransition = txnMetadata.inLock {
- txnMetadata.prepareDead()
+ private def collectExpiredTransactionalIds(
+ partitionId: Int,
+ partitionCacheEntry: TxnMetadataCacheEntry
+ ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+ val currentTimeMs = time.milliseconds()
+
+ inReadLock(stateLock) {
+ val transactionPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+ replicaManager.getLogConfig(transactionPartition) match {
+ case Some(logConfig) =>
+ val maxBatchSize = logConfig.maxMessageSize
+ val expired =
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+ lazy val recordsBuilder = MemoryRecords.builder(
+ ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+ TransactionLog.EnforcedCompressionType,
+ TimestampType.CREATE_TIME,
+ 0L,
+ maxBatchSize
+ )
+
+ partitionCacheEntry.metadataPerTransactionalId.foreachWhile {
(transactionalId, txnMetadata) =>
+ txnMetadata.inLock {
+ if (!shouldExpire(txnMetadata, currentTimeMs)) {
+ true
+ } else if (maybeAppendExpiration(txnMetadata, recordsBuilder,
currentTimeMs, maxBatchSize)) {
+ val transitMetadata = txnMetadata.prepareDead()
+ expired += TransactionalIdCoordinatorEpochAndMetadata(
+ transactionalId,
+ partitionCacheEntry.coordinatorEpoch,
+ transitMetadata
+ )
+ true
+ } else {
+ // If the batch is full, return false to end the search. Any
remaining
+ // transactionalIds eligible for expiration can be picked next
time.
+ false
}
- TransactionalIdCoordinatorEpochAndMetadata(transactionalId,
entry.coordinatorEpoch, txnMetadataTransition)
}
- }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-
partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
}
- val recordsPerPartition = transactionalIdByPartition
- .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas)
=>
- val deletes: Array[SimpleRecord] =
transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
- new SimpleRecord(now,
TransactionLog.keyToBytes(entry.transactionalId), null)
- }.toArray
- val records =
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
- val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
- (topicPartition, records)
+ if (expired.isEmpty) {
+ (Seq.empty, MemoryRecords.EMPTY)
+ } else {
+ (expired, recordsBuilder.build())
}
- def removeFromCacheCallback(responses: collection.Map[TopicPartition,
PartitionResponse]): Unit = {
- responses.forKeyValue { (topicPartition, response) =>
- inReadLock(stateLock) {
- val toRemove =
transactionalIdByPartition(topicPartition.partition)
- transactionMetadataCache.get(topicPartition.partition).foreach {
txnMetadataCacheEntry =>
- toRemove.foreach { idCoordinatorEpochAndMetadata =>
- val transactionalId =
idCoordinatorEpochAndMetadata.transactionalId
- val txnMetadata =
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
- txnMetadata.inLock {
- if (txnMetadataCacheEntry.coordinatorEpoch ==
idCoordinatorEpochAndMetadata.coordinatorEpoch
- && txnMetadata.pendingState.contains(Dead)
- && txnMetadata.producerEpoch ==
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
- && response.error == Errors.NONE) {
-
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
- } else {
- warn(s"Failed to remove expired transactionalId:
$transactionalId" +
- s" from cache. Tombstone append error code:
${response.error}," +
- s" pendingState: ${txnMetadata.pendingState},
producerEpoch: ${txnMetadata.producerEpoch}," +
- s" expected producerEpoch:
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
- s" coordinatorEpoch:
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
- s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
- txnMetadata.pendingState = None
- }
+ case None =>
+ (Seq.empty, MemoryRecords.EMPTY)
+ }
+ }
+ }
+
+ private def shouldExpire(
+ txnMetadata: TransactionMetadata,
+ currentTimeMs: Long
+ ): Boolean = {
+ val isExpirableState = txnMetadata.state match {
+ case Empty | CompleteCommit | CompleteAbort => true
+ case _ => false
+ }
+
+ isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs -
config.transactionalIdExpirationMs
+ }
+
+ private def maybeAppendExpiration(
+ txnMetadata: TransactionMetadata,
+ recordsBuilder: MemoryRecordsBuilder,
+ currentTimeMs: Long,
+ maxBatchSize: Int
+ ): Boolean = {
+ val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+ if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null,
Record.EMPTY_HEADERS)) {
+ recordsBuilder.append(currentTimeMs, keyBytes, null,
Record.EMPTY_HEADERS)
+ true
+ } else {
+ if (recordsBuilder.numRecords == 0) {
Review comment:
Yeah, why not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]