jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1418093193
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -708,23 +708,40 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}
- if (authorizedRequestInfo.isEmpty)
- sendResponseCallback(Map.empty)
- else {
- val internalTopicsAllowed = request.header.clientId ==
AdminUtils.ADMIN_CLIENT_ID
+ val internalTopicsAllowed = request.header.clientId ==
AdminUtils.ADMIN_CLIENT_ID
+ val transactionVerificationEntries = new
ReplicaManager.TransactionVerificationEntries
- // call the replica manager to append messages to the replicas
+ def postVerificationCallback(newRequestLocal: RequestLocal)
Review Comment:
The new code fixes this.
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -941,39 +857,129 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- private def partitionEntriesForVerification(verificationGuards:
mutable.Map[TopicPartition, VerificationGuard],
- entriesPerPartition:
Map[TopicPartition, MemoryRecords],
- verifiedEntries:
mutable.Map[TopicPartition, MemoryRecords],
- unverifiedEntries:
mutable.Map[TopicPartition, MemoryRecords],
- errorEntries:
mutable.Map[TopicPartition, Errors]): Unit= {
+ private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition,
MemoryRecords],
+ responseCallback:
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+ // If required.acks is outside accepted range, something is wrong with the
client
+ // Just return an error and don't handle the request at all
+ val responseStatus = entries.map { case (topicPartition, _) =>
+ topicPartition -> new PartitionResponse(
+ Errors.INVALID_REQUIRED_ACKS,
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+ RecordBatch.NO_TIMESTAMP,
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
+ )
+ }
+ responseCallback(responseStatus)
+ }
+
+ /**
+ * Apply the postVerificationCallback asynchronously only after verifying
the partitions have been added to the transaction.
+ * The postVerificationCallback takes the arguments of the requestLocal for
the thread that will be doing the append as
+ * well as a mapping of topic partitions to LogAppendResult for the
partitions that saw errors when verifying.
+ *
+ * This method will start the verification process for all the
topicPartitions in entriesPerPartition and supply the
+ * postVerificationCallback to be run on a request handler thread when the
response is received.
+ *
+ * @param entriesPerPartition the records per partition to be
appended and therefore need verification
+ * @param transactionVerificationEntries the object that will store the
entries to verify, the errors, and the verification guards
+ * @param transactionalId the id for the transaction
+ * @param requestLocal container for the stateful
instances scoped to this request -- this must correspond to the
+ * thread calling this method
+ * @param postVerificationCallback the method to be called when
verification completes and the verification errors
+ * and the thread's RequestLocal are
supplied
+ */
+ def appendRecordsWithTransactionVerification(entriesPerPartition:
Map[TopicPartition, MemoryRecords],
+ transactionVerificationEntries:
TransactionVerificationEntries,
+ transactionalId: String,
+ requestLocal: RequestLocal,
+ postVerificationCallback:
RequestLocal => Map[TopicPartition, LogAppendResult] => Unit): Unit = {
+ if (transactionalId != null &&
config.transactionPartitionVerificationEnable &&
addPartitionsToTxnManager.isDefined)
+ partitionEntriesForVerification(transactionVerificationEntries,
entriesPerPartition)
+
+ val onVerificationComplete: (RequestLocal, Map[TopicPartition, Errors]) =>
Unit =
+ executePostVerificationCallback(
+ transactionVerificationEntries,
+ postVerificationCallback,
+ )
+
+ if (transactionVerificationEntries.unverified.isEmpty) {
+ onVerificationComplete(requestLocal,
transactionVerificationEntries.errors.toMap)
+ } else {
+ // For unverified entries, send a request to verify. When verified, the
append process will proceed via the callback.
+ // We verify above that all partitions use the same producer ID.
+ val batchInfo =
transactionVerificationEntries.unverified.head._2.firstBatch()
+ addPartitionsToTxnManager.foreach(_.verifyTransaction(
+ transactionalId = transactionalId,
+ producerId = batchInfo.producerId,
+ producerEpoch = batchInfo.producerEpoch,
+ topicPartitions =
transactionVerificationEntries.unverified.keySet.toSeq,
+ callback =
KafkaRequestHandler.wrapAsyncCallback(onVerificationComplete, requestLocal)
+ ))
+ }
+ }
+
+ /**
+ * A helper method to compile the results from the transaction verification
and call the postVerificationCallback.
+ *
+ * @param transactionVerificationEntries the object that will store the
entries to verify, the errors, and the verification guards
+ * @param postVerificationCallback the method to be called when
verification completes and the verification errors
+ * and the thread's RequestLocal are
supplied
+ * @param requestLocal container for the stateful
instances scoped to this request -- this must correspond to the
+ * thread calling this method
+ *
+ */
+ private def executePostVerificationCallback(transactionVerificationEntries:
TransactionVerificationEntries,
+
postVerificationCallback: RequestLocal => Map[TopicPartition, LogAppendResult]
=> Unit)
+ (requestLocal:
RequestLocal, unverifiedEntries: Map[TopicPartition, Errors]): Unit = {
+ val errorResults = (unverifiedEntries ++
transactionVerificationEntries.errors).map {
+ case (topicPartition, error) =>
+ // translate transaction coordinator errors to known producer response
errors
+ val customException =
+ error match {
+ case Errors.INVALID_TXN_STATE => Some(error.exception("Partition
was not added to the transaction"))
+ case Errors.CONCURRENT_TRANSACTIONS |
+ Errors.COORDINATOR_LOAD_IN_PROGRESS |
+ Errors.COORDINATOR_NOT_AVAILABLE |
+ Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
+ s"Unable to verify the partition has been added to the
transaction. Underlying error: ${error.toString}"))
+ case _ => None
+ }
+ topicPartition -> LogAppendResult(
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+ Some(customException.getOrElse(error.exception)),
+ hasCustomErrorMessage = customException.isDefined
+ )
+ }
+ postVerificationCallback(requestLocal)(errorResults)
+ }
+
+ private def partitionEntriesForVerification(transactionVerificationEntries:
TransactionVerificationEntries, entriesPerPartition: Map[TopicPartition,
MemoryRecords]): TransactionVerificationEntries = {
Review Comment:
The new code also removes this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]