artemlivshits commented on code in PR #15142: URL: https://github.com/apache/kafka/pull/15142#discussion_r1448111130
########## core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala: ########## @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( + tp: TopicPartition, + transactionalId: String, + producerId: Long, + producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { + val future = new CompletableFuture[VerificationGuard]() + replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { + if (error != Errors.NONE) { + future.completeExceptionally(error.exception) + } else { + future.complete(verificationGuard) + } + } + ) + future + } + private def internalAppend( tp: TopicPartition, - memoryRecords: MemoryRecords + memoryRecords: MemoryRecords, + verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty - replicaManager.appendRecords( + replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, Review Comment: My understanding is that by the time we come to this function it already runs on a GC thread (it has to happen because that's how we guarantee the atomicity), so there is no request local anyway here and we must use NoCaching (it is always safe to use NoCaching, just not as optimal as thread local). The future's callback will be called in the thread that completed it, but like I said, it doesn't matter for this function as it gets rescheduled on the GC thread pool. I gave a suggestion to hoist wrapping to the caller, so new GC doesn't have to do double-schedule https://github.com/apache/kafka/pull/14774#discussion_r1420931474. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org