hachikuji commented on a change in pull request #11098:
URL: https://github.com/apache/kafka/pull/11098#discussion_r674140433
##########
File path:
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##########
@@ -140,79 +141,154 @@ class TransactionStateManager(brokerId: Int,
}
}
- def enableTransactionalIdExpiration(): Unit = {
- scheduler.schedule("transactionalId-expiration", () => {
- val now = time.milliseconds()
- inReadLock(stateLock) {
- val transactionalIdByPartition: Map[Int,
mutable.Iterable[TransactionalIdCoordinatorEpochAndMetadata]] =
- transactionMetadataCache.flatMap { case (_, entry) =>
- entry.metadataPerTransactionalId.filter { case (_, txnMetadata) =>
txnMetadata.state match {
- case Empty | CompleteCommit | CompleteAbort => true
- case _ => false
- }
- }.filter { case (_, txnMetadata) =>
- txnMetadata.txnLastUpdateTimestamp <= now -
config.transactionalIdExpirationMs
- }.map { case (transactionalId, txnMetadata) =>
- val txnMetadataTransition = txnMetadata.inLock {
- txnMetadata.prepareDead()
+ private def collectExpiredTransactionalIds(
+ partitionId: Int,
+ partitionCacheEntry: TxnMetadataCacheEntry
+ ): (Iterable[TransactionalIdCoordinatorEpochAndMetadata], MemoryRecords) = {
+ val currentTimeMs = time.milliseconds()
+
+ inReadLock(stateLock) {
+ val transactionPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId)
+ replicaManager.getLogConfig(transactionPartition) match {
+ case Some(logConfig) =>
+ val maxBatchSize = logConfig.maxMessageSize
+ val expired =
mutable.ListBuffer.empty[TransactionalIdCoordinatorEpochAndMetadata]
+
+ lazy val recordsBuilder = MemoryRecords.builder(
+ ByteBuffer.allocate(math.min(16384, maxBatchSize)),
+ TransactionLog.EnforcedCompressionType,
+ TimestampType.CREATE_TIME,
+ 0L,
+ maxBatchSize
+ )
+
+ partitionCacheEntry.metadataPerTransactionalId.foreachWhile {
(transactionalId, txnMetadata) =>
+ txnMetadata.inLock {
+ if (!shouldExpire(txnMetadata, currentTimeMs)) {
+ true
+ } else if (maybeAppendExpiration(txnMetadata, recordsBuilder,
currentTimeMs, maxBatchSize)) {
+ val transitMetadata = txnMetadata.prepareDead()
+ expired += TransactionalIdCoordinatorEpochAndMetadata(
+ transactionalId,
+ partitionCacheEntry.coordinatorEpoch,
+ transitMetadata
+ )
+ true
+ } else {
+ // If the batch is full, return false to end the search. Any
remaining
+ // transactionalIds eligible for expiration can be picked next
time.
+ false
}
- TransactionalIdCoordinatorEpochAndMetadata(transactionalId,
entry.coordinatorEpoch, txnMetadataTransition)
}
- }.groupBy { transactionalIdCoordinatorEpochAndMetadata =>
-
partitionFor(transactionalIdCoordinatorEpochAndMetadata.transactionalId)
}
- val recordsPerPartition = transactionalIdByPartition
- .map { case (partition, transactionalIdCoordinatorEpochAndMetadatas)
=>
- val deletes: Array[SimpleRecord] =
transactionalIdCoordinatorEpochAndMetadatas.map { entry =>
- new SimpleRecord(now,
TransactionLog.keyToBytes(entry.transactionalId), null)
- }.toArray
- val records =
MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, deletes: _*)
- val topicPartition = new
TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partition)
- (topicPartition, records)
+ if (expired.isEmpty) {
+ (Seq.empty, MemoryRecords.EMPTY)
+ } else {
+ (expired, recordsBuilder.build())
}
- def removeFromCacheCallback(responses: collection.Map[TopicPartition,
PartitionResponse]): Unit = {
- responses.forKeyValue { (topicPartition, response) =>
- inReadLock(stateLock) {
- val toRemove =
transactionalIdByPartition(topicPartition.partition)
- transactionMetadataCache.get(topicPartition.partition).foreach {
txnMetadataCacheEntry =>
- toRemove.foreach { idCoordinatorEpochAndMetadata =>
- val transactionalId =
idCoordinatorEpochAndMetadata.transactionalId
- val txnMetadata =
txnMetadataCacheEntry.metadataPerTransactionalId.get(transactionalId)
- txnMetadata.inLock {
- if (txnMetadataCacheEntry.coordinatorEpoch ==
idCoordinatorEpochAndMetadata.coordinatorEpoch
- && txnMetadata.pendingState.contains(Dead)
- && txnMetadata.producerEpoch ==
idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch
- && response.error == Errors.NONE) {
-
txnMetadataCacheEntry.metadataPerTransactionalId.remove(transactionalId)
- } else {
- warn(s"Failed to remove expired transactionalId:
$transactionalId" +
- s" from cache. Tombstone append error code:
${response.error}," +
- s" pendingState: ${txnMetadata.pendingState},
producerEpoch: ${txnMetadata.producerEpoch}," +
- s" expected producerEpoch:
${idCoordinatorEpochAndMetadata.transitMetadata.producerEpoch}," +
- s" coordinatorEpoch:
${txnMetadataCacheEntry.coordinatorEpoch}, expected coordinatorEpoch: " +
- s"${idCoordinatorEpochAndMetadata.coordinatorEpoch}")
- txnMetadata.pendingState = None
- }
+ case None =>
+ (Seq.empty, MemoryRecords.EMPTY)
+ }
+ }
+ }
+
+ private def shouldExpire(
+ txnMetadata: TransactionMetadata,
+ currentTimeMs: Long
+ ): Boolean = {
+ val isExpirableState = txnMetadata.state match {
+ case Empty | CompleteCommit | CompleteAbort => true
+ case _ => false
+ }
+
+ isExpirableState && txnMetadata.txnLastUpdateTimestamp <= currentTimeMs -
config.transactionalIdExpirationMs
+ }
+
+ private def maybeAppendExpiration(
+ txnMetadata: TransactionMetadata,
+ recordsBuilder: MemoryRecordsBuilder,
+ currentTimeMs: Long,
+ maxBatchSize: Int
+ ): Boolean = {
+ val keyBytes = TransactionLog.keyToBytes(txnMetadata.transactionalId)
+ if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, null,
Record.EMPTY_HEADERS)) {
+ recordsBuilder.append(currentTimeMs, keyBytes, null,
Record.EMPTY_HEADERS)
+ true
+ } else {
+ if (recordsBuilder.numRecords == 0) {
Review comment:
Yeah, I guess this is the downside of using something like
`MemoryRecordsBuilder` which is so tailored to the producer. Would it make
sense to add a stricter `hasRoomFor`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]