jolshan commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1419770188
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -941,39 +851,209 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- private def partitionEntriesForVerification(verificationGuards:
mutable.Map[TopicPartition, VerificationGuard],
- entriesPerPartition:
Map[TopicPartition, MemoryRecords],
- verifiedEntries:
mutable.Map[TopicPartition, MemoryRecords],
- unverifiedEntries:
mutable.Map[TopicPartition, MemoryRecords],
- errorEntries:
mutable.Map[TopicPartition, Errors]): Unit= {
- val transactionalProducerIds = mutable.HashSet[Long]()
+ /**
+ * Handles the produce request by starting any transactional verification
before appending.
+ *
+ * @param timeout maximum time we will wait to append
before returning
+ * @param requiredAcks number of replicas who must
acknowledge the append before sending the response
+ * @param internalTopicsAllowed boolean indicating whether internal
topics can be appended to
+ * @param origin source of the append request (ie,
client, replication, coordinator)
+ * @param entriesPerPartition the records per partition to be
appended
+ * @param responseCallback callback for sending the response
+ * @param delayedProduceLock lock for the delayed actions
+ * @param recordValidationStatsCallback callback for updating stats on
record conversions
+ * @param requestLocal container for the stateful instances
scoped to this request -- this must correspond to the
+ * thread calling this method
+ * @param actionQueue the action queue to use.
ReplicaManager#defaultActionQueue is used by default.
+ * @param verificationGuards the mapping from topic partition to
verification guards if transaction verification is used
+ */
+ def handleProduceAppend(timeout: Long,
+ requiredAcks: Short,
+ internalTopicsAllowed: Boolean,
+ origin: AppendOrigin,
+ transactionalId: String,
+ entriesPerPartition: Map[TopicPartition,
MemoryRecords],
+ responseCallback: Map[TopicPartition,
PartitionResponse] => Unit,
+ recordValidationStatsCallback: Map[TopicPartition,
RecordValidationStats] => Unit = _ => (),
+ requestLocal: RequestLocal = RequestLocal.NoCaching,
+ actionQueue: ActionQueue = this.defaultActionQueue):
Unit = {
+
+ val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
+ val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
entriesPerPartition.foreach { case (topicPartition, records) =>
- try {
- // Produce requests (only requests that require verification) should
only have one batch per partition in "batches" but check all just to be safe.
- val transactionalBatches = records.batches.asScala.filter(batch =>
batch.hasProducerId && batch.isTransactional)
- transactionalBatches.foreach(batch =>
transactionalProducerIds.add(batch.producerId))
-
- if (transactionalBatches.nonEmpty) {
- // We return VerificationGuard if the partition needs to be
verified. If no state is present, no need to verify.
- val firstBatch = records.firstBatch
- val verificationGuard =
getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId,
firstBatch.baseSequence, firstBatch.producerEpoch)
- if (verificationGuard != VerificationGuard.SENTINEL) {
- verificationGuards.put(topicPartition, verificationGuard)
- unverifiedEntries.put(topicPartition, records)
- } else
- verifiedEntries.put(topicPartition, records)
- } else {
- // If there is no producer ID or transactional records in the
batches, no need to verify.
- verifiedEntries.put(topicPartition, records)
- }
- } catch {
- case e: Exception => errorEntries.put(topicPartition,
Errors.forException(e))
- }
+ // Produce requests (only requests that require verification) should
only have one batch per partition in "batches" but check all just to be safe.
+ val transactionalBatches = records.batches.asScala.filter(batch =>
batch.hasProducerId && batch.isTransactional)
+ transactionalBatches.foreach(batch =>
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
+ if (!transactionalBatches.isEmpty)
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
}
- // We should have exactly one producer ID for transactional records
- if (transactionalProducerIds.size > 1) {
+ if (transactionalProducerInfo.size > 1) {
throw new InvalidPidMappingException("Transactional records contained
more than one producer ID")
}
+
+ def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors],
+ newRequestLocal: RequestLocal,
+ verificationGuards: Map[TopicPartition,
VerificationGuard]): Unit = {
+ val errorResults = preAppendErrors.map {
+ case (topicPartition, error) =>
+ // translate transaction coordinator errors to known producer
response errors
+ val customException =
+ error match {
+ case Errors.INVALID_TXN_STATE => Some(error.exception("Partition
was not added to the transaction"))
+ case Errors.CONCURRENT_TRANSACTIONS |
+ Errors.COORDINATOR_LOAD_IN_PROGRESS |
+ Errors.COORDINATOR_NOT_AVAILABLE |
+ Errors.NOT_COORDINATOR => Some(new
NotEnoughReplicasException(
+ s"Unable to verify the partition has been added to the
transaction. Underlying error: ${error.toString}"))
+ case _ => None
+ }
+ topicPartition -> LogAppendResult(
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+ Some(customException.getOrElse(error.exception)),
+ hasCustomErrorMessage = customException.isDefined
+ )
+ }
+
+ appendRecords(
+ timeout = timeout,
+ requiredAcks = requiredAcks,
+ internalTopicsAllowed = internalTopicsAllowed,
+ origin = origin,
+ entriesPerPartition = entriesPerPartition,
+ responseCallback = responseCallback,
+ recordValidationStatsCallback = recordValidationStatsCallback,
+ requestLocal = newRequestLocal,
+ verificationGuards = verificationGuards,
+ preAppendErrors = errorResults
+ )
+ }
+
+ if (transactionalProducerInfo.size < 1) {
+ postVerificationCallback(Map.empty[TopicPartition, Errors],
requestLocal, Map.empty[TopicPartition, VerificationGuard])
+ return
+ }
+ maybeStartTransactionVerificationForPartitions(topicPartitionBatchInfo,
transactionalId,
+ transactionalProducerInfo.head._1, transactionalProducerInfo.head._2,
requestLocal, postVerificationCallback)
+ }
+
+ private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition,
MemoryRecords],
+ responseCallback:
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+ // If required.acks is outside accepted range, something is wrong with the
client
+ // Just return an error and don't handle the request at all
+ val responseStatus = entries.map { case (topicPartition, _) =>
+ topicPartition -> new PartitionResponse(
+ Errors.INVALID_REQUIRED_ACKS,
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+ RecordBatch.NO_TIMESTAMP,
+ LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
+ )
+ }
+ responseCallback(responseStatus)
+ }
+
+ def maybeStartTransactionVerificationForPartition(
+ topicPartition: TopicPartition,
+ transactionalId: String,
+ producerId: Long,
+ producerEpoch: Short,
+ baseSequence: Int,
+ requestLocal: RequestLocal,
+ callback: (Errors, RequestLocal, VerificationGuard) => Unit
+ ): Unit = {
+ def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
+ newRequestLocal: RequestLocal,
+ verificationGuards: Map[TopicPartition,
VerificationGuard]): Unit = {
+ callback(preAppendErrors.getOrElse(topicPartition, Errors.NONE),
newRequestLocal, verificationGuards.getOrElse(topicPartition,
VerificationGuard.SENTINEL))
+ }
+
+ maybeStartTransactionVerificationForPartitions(
+ Map(topicPartition -> baseSequence),
+ transactionalId,
+ producerId,
+ producerEpoch,
+ requestLocal,
+ generalizedCallback
+ )
+ }
+
+ def maybeStartTransactionVerificationForPartitions(
+ topicPartitionBatchInfo: Map[TopicPartition, Int],
+ transactionalId: String,
+ producerId: Long,
+ producerEpoch: Short,
+ requestLocal: RequestLocal,
+ callback: (Map[TopicPartition, Errors], RequestLocal, Map[TopicPartition,
VerificationGuard]) => Unit
Review Comment:
I think it is also the case that we may have a guard we tried to verify and
failed verification. It is ok to be in the verification guard map and have an
error in that case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]