hachikuji commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1419739785


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -339,156 +340,183 @@ class GroupMetadataManager(brokerId: Int,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback,
       requestLocal = requestLocal,
-      transactionalId = transactionalId)
+      verificationGuards = verificationGuards,
+      preAppendErrors = preAppendErrors)
+  }
+
+  def generateOffsetRecords(magicValue: Byte,

Review Comment:
   nit: could these two methods be private?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -339,156 +340,183 @@ class GroupMetadataManager(brokerId: Int,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback,
       requestLocal = requestLocal,
-      transactionalId = transactionalId)
+      verificationGuards = verificationGuards,
+      preAppendErrors = preAppendErrors)
+  }
+
+  def generateOffsetRecords(magicValue: Byte,
+                            isTxnOffsetCommit: Boolean,
+                            groupId: String,
+                            offsetTopicPartition: TopicPartition,
+                            filteredOffsetMetadata: Map[TopicIdPartition, 
OffsetAndMetadata],
+                            producerId: Long,
+                            producerEpoch: Short): Map[TopicPartition, 
MemoryRecords] = {
+      // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
+      val timestampType = TimestampType.CREATE_TIME
+      val timestamp = time.milliseconds()
+
+      val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
+        val key = GroupMetadataManager.offsetCommitKey(groupId, 
topicIdPartition.topicPartition)
+        val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
+        new SimpleRecord(timestamp, key, value)
+      }
+      val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
+
+      if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
+        throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to 
make a transaction offset commit with an invalid magic: " + magicValue)
+
+      val builder = MemoryRecords.builder(buffer, magicValue, compressionType, 
timestampType, 0L, time.milliseconds(),
+        producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
+
+      records.foreach(builder.append)
+      Map(offsetTopicPartition -> builder.build())
+  }
+
+  def createPutCacheCallback(isTxnOffsetCommit: Boolean,
+                             group: GroupMetadata,
+                             consumerId: String,
+                             offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
+                             filteredOffsetMetadata: Map[TopicIdPartition, 
OffsetAndMetadata],
+                             responseCallback: immutable.Map[TopicIdPartition, 
Errors] => Unit,
+                             producerId: Long = RecordBatch.NO_PRODUCER_ID,
+                             records: Map[TopicPartition, MemoryRecords],
+                             preAppendErrors: Map[TopicPartition, 
LogAppendResult] = Map.empty): Map[TopicPartition, PartitionResponse] => Unit = 
{
+    val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+    // set the callback function to insert offsets into cache after log append 
completed
+    def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
+      // the append response should only contain the topics partition
+      if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
+        throw new IllegalStateException("Append status %s should only have one 
partition %s"
+          .format(responseStatus, offsetTopicPartition))
+
+      // construct the commit response status and insert
+      // the offset and metadata to cache if the append status has no error
+      val status = responseStatus(offsetTopicPartition)
+
+      val responseError = group.inLock {
+        if (status.error == Errors.NONE) {
+          if (!group.is(Dead)) {
+            filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+              if (isTxnOffsetCommit)
+                group.onTxnOffsetCommitAppend(producerId, topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+              else
+                group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+            }
+          }
+
+          // Record the number of offsets committed to the log
+          offsetCommitsSensor.record(records.size)
+
+          Errors.NONE
+        } else {
+          if (!group.is(Dead)) {
+            if (!group.hasPendingOffsetCommitsFromProducer(producerId))
+              removeProducerGroup(producerId, group.groupId)
+            filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+              if (isTxnOffsetCommit)
+                group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
+              else
+                group.failPendingOffsetWrite(topicIdPartition, 
offsetAndMetadata)
+            }
+          }
+
+          debug(s"Offset commit $filteredOffsetMetadata from group 
${group.groupId}, consumer $consumerId " +
+            s"with generation ${group.generationId} failed when appending to 
log due to ${status.error.exceptionName}")
+
+          // transform the log append error code to the corresponding the 
commit status error code
+          status.error match {
+            case Errors.UNKNOWN_TOPIC_OR_PARTITION
+                 | Errors.NOT_ENOUGH_REPLICAS
+                 | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+              Errors.COORDINATOR_NOT_AVAILABLE
+
+            case Errors.NOT_LEADER_OR_FOLLOWER
+                 | Errors.KAFKA_STORAGE_ERROR =>
+              Errors.NOT_COORDINATOR
+
+            case Errors.MESSAGE_TOO_LARGE
+                 | Errors.RECORD_LIST_TOO_LARGE
+                 | Errors.INVALID_FETCH_SIZE =>
+              Errors.INVALID_COMMIT_OFFSET_SIZE
+
+            case other => other
+          }
+        }
+      }
+
+      // compute the final error codes for the commit response
+      val commitStatus = offsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
+        if (!validateOffsetMetadataLength(offsetAndMetadata.metadata))
+          (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+        else if (preAppendErrors.contains(topicIdPartition.topicPartition))
+          (topicIdPartition, 
preAppendErrors(topicIdPartition.topicPartition).error)
+        else
+          (topicIdPartition, responseError)
+      }
+
+      // finally trigger the callback logic passed from the API layer
+      responseCallback(commitStatus)
+    }
+    putCacheCallback
   }
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to 
cache
    */
   def storeOffsets(group: GroupMetadata,
                    consumerId: String,
+                   offsetTopicPartition: TopicPartition,
                    offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicIdPartition, Errors] 
=> Unit,
-                   transactionalId: String = null,
                    producerId: Long = RecordBatch.NO_PRODUCER_ID,
                    producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
-                   requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
-    // first filter out partitions with offset metadata size exceeding limit
-    val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
-      validateOffsetMetadataLength(offsetAndMetadata.metadata)
-    }
-
+                   requestLocal: RequestLocal = RequestLocal.NoCaching,
+                   verificationGuard: Option[VerificationGuard]): Unit = {
     group.inLock {
       if (!group.hasReceivedConsistentOffsetCommits)
         warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has 
received offset commits from consumers as well " +
           s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
           s"should be avoided.")
     }
 
-    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
-    // construct the message set to append
+    val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
+      validateOffsetMetadataLength(offsetAndMetadata.metadata)
+    }
     if (filteredOffsetMetadata.isEmpty) {
       // compute the final error codes for the commit response
       val commitStatus = offsetMetadata.map { case (k, _) => k -> 
Errors.OFFSET_METADATA_TOO_LARGE }
       responseCallback(commitStatus)
-    } else {
-      getMagic(partitionFor(group.groupId)) match {
-        case Some(magicValue) =>
-          // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-          val timestampType = TimestampType.CREATE_TIME
-          val timestamp = time.milliseconds()
-
-          val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-            val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicIdPartition.topicPartition)
-            val value = 
GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
-            new SimpleRecord(timestamp, key, value)
-          }
-          val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
-          val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
-
-          if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
-            throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting 
to make a transaction offset commit with an invalid magic: " + magicValue)
-
-          val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L, time.milliseconds(),
-            producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
-          records.foreach(builder.append)
-          val entries = Map(offsetTopicPartition -> builder.build())
-
-          // set the callback function to insert offsets into cache after log 
append completed
-          def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
-            // the append response should only contain the topics partition
-            if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
-              throw new IllegalStateException("Append status %s should only 
have one partition %s"
-                .format(responseStatus, offsetTopicPartition))
-
-            // construct the commit response status and insert
-            // the offset and metadata to cache if the append status has no 
error
-            val status = responseStatus(offsetTopicPartition)
-
-            val responseError = group.inLock {
-              if (status.error == Errors.NONE) {
-                if (!group.is(Dead)) {
-                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-                    if (isTxnOffsetCommit)
-                      group.onTxnOffsetCommitAppend(producerId, 
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), 
offsetAndMetadata))
-                    else
-                      group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
-                  }
-                }
-
-                // Record the number of offsets committed to the log
-                offsetCommitsSensor.record(records.size)
-
-                Errors.NONE
-              } else {
-                if (!group.is(Dead)) {
-                  if (!group.hasPendingOffsetCommitsFromProducer(producerId))
-                    removeProducerGroup(producerId, group.groupId)
-                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-                    if (isTxnOffsetCommit)
-                      group.failPendingTxnOffsetCommit(producerId, 
topicIdPartition)
-                    else
-                      group.failPendingOffsetWrite(topicIdPartition, 
offsetAndMetadata)
-                  }
-                }
-
-                debug(s"Offset commit $filteredOffsetMetadata from group 
${group.groupId}, consumer $consumerId " +
-                  s"with generation ${group.generationId} failed when 
appending to log due to ${status.error.exceptionName}")
-
-                // transform the log append error code to the corresponding 
the commit status error code
-                status.error match {
-                  case Errors.UNKNOWN_TOPIC_OR_PARTITION
-                       | Errors.NOT_ENOUGH_REPLICAS
-                       | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                    Errors.COORDINATOR_NOT_AVAILABLE
-
-                  case Errors.NOT_LEADER_OR_FOLLOWER
-                       | Errors.KAFKA_STORAGE_ERROR =>
-                    Errors.NOT_COORDINATOR
-
-                  case Errors.MESSAGE_TOO_LARGE
-                       | Errors.RECORD_LIST_TOO_LARGE
-                       | Errors.INVALID_FETCH_SIZE =>
-                    Errors.INVALID_COMMIT_OFFSET_SIZE
-
-                  case other => other
-                }
-              }
-            }
-
-            // compute the final error codes for the commit response
-            val commitStatus = offsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-              if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-                (topicIdPartition, responseError)
-              else
-                (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-            }
+      return
+    }
 
-            // finally trigger the callback logic passed from the API layer
-            responseCallback(commitStatus)
-          }
+    val magicOpt = getMagic(partitionFor(group.groupId))
+    if (magicOpt.isEmpty) {
+      val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
+        (topicIdPartition, Errors.NOT_COORDINATOR)
+      }
+      responseCallback(commitStatus)
+      return
+    }
 
-          if (isTxnOffsetCommit) {
-            group.inLock {
-              addProducerGroup(producerId, group.groupId)
-              group.prepareTxnOffsetCommit(producerId, offsetMetadata)
-            }
-          } else {
-            group.inLock {
-              group.prepareOffsetCommit(offsetMetadata)
-            }
-          }
+    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
+    val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit, 
group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId, 
producerEpoch)
+    val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group, 
consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback, 
producerId, records)
 
-          appendForGroup(group, entries, requestLocal, putCacheCallback, 
transactionalId)
+    val verificationGuards = verificationGuard match {
+      case Some(guard) => Map(offsetTopicPartition -> guard)
+      case None => Map.empty[TopicPartition, VerificationGuard]
+    }
 
-        case None =>
-          val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
-            (topicIdPartition, Errors.NOT_COORDINATOR)
-          }
-          responseCallback(commitStatus)
+    group.inLock {
+      if (isTxnOffsetCommit) {
+        addProducerGroup(producerId, group.groupId)
+        group.prepareTxnOffsetCommit(producerId, offsetMetadata)
+      } else {
+        group.prepareOffsetCommit(offsetMetadata)
       }
     }
+
+    appendForGroup(group, records, requestLocal, putCacheCallback, 
verificationGuards)

Review Comment:
   What error do we expect if the guard check fails during write? 



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -976,6 +975,231 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Append messages to offsets topic, and wait for them to be replicated to 
other replicas;
+   * the callback function will be triggered either when timeout or the 
required acks are satisfied;
+   * if the callback function itself is already synchronized on some object 
then pass this object to avoid deadlock.
+   *
+   * Noted that all pending delayed check operations are stored in a queue. 
All callers to ReplicaManager.appendRecords()
+   * are expected to call ActionQueue.tryCompleteActions for all affected 
partitions, without holding any conflicting
+   * locks.
+   *
+   * @param timeout                       maximum time we will wait to append 
before returning
+   * @param requiredAcks                  number of replicas who must 
acknowledge the append before sending the response
+   * @param internalTopicsAllowed         boolean indicating whether internal 
topics can be appended to
+   * @param origin                        source of the append request (ie, 
client, replication, coordinator)
+   * @param entriesPerPartition           the records per partition to be 
appended
+   * @param responseCallback              callback for sending the response
+   * @param delayedProduceLock            lock for the delayed actions
+   * @param recordValidationStatsCallback callback for updating stats on 
record conversions
+   * @param requestLocal                  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *                                      thread calling this method
+   * @param actionQueue                   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuards            the mapping from topic partition to 
verification guards if transaction verification is used
+   * @param preAppendErrors               the mapping from topic partition to 
LogAppendResult for errors that occurred before appending
+   */
+  def appendForGroup(timeout: Long,
+                     requiredAcks: Short,
+                     internalTopicsAllowed: Boolean,
+                     origin: AppendOrigin,
+                     entriesPerPartition: Map[TopicPartition, MemoryRecords],
+                     responseCallback: Map[TopicPartition, PartitionResponse] 
=> Unit,
+                     delayedProduceLock: Option[Lock] = None,
+                     recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
+                     requestLocal: RequestLocal = RequestLocal.NoCaching,
+                     actionQueue: ActionQueue = this.defaultActionQueue,
+                     verificationGuards: Map[TopicPartition, 
VerificationGuard] = Map.empty,
+                     preAppendErrors: Map[TopicPartition, LogAppendResult] = 
Map.empty): Unit = {
+    if (!isValidRequiredAcks(requiredAcks)) {
+      sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
+      return
+    }
+
+    val nonErrorEntriesPerPartition = entriesPerPartition.filter {
+      case (tp, _) => !preAppendErrors.contains(tp)
+    }
+
+    val sTime = time.milliseconds
+    val localProduceResults = appendToLocalLog(internalTopicsAllowed = 
internalTopicsAllowed,
+      origin, nonErrorEntriesPerPartition, requiredAcks, requestLocal, 
verificationGuards.toMap)
+    debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
+
+    val allResults = localProduceResults ++ preAppendErrors
+    val produceStatus = allResults.map { case (topicPartition, result) =>
+      topicPartition -> ProducePartitionStatus(
+        result.info.lastOffset + 1, // required offset
+        new PartitionResponse(
+          result.error,
+          result.info.firstOffset,
+          result.info.lastOffset,
+          result.info.logAppendTime,
+          result.info.logStartOffset,
+          result.info.recordErrors,
+          result.errorMessage
+        )
+      ) // response status
+    }
+
+    actionQueue.add {
+      () => allResults.foreach { case (topicPartition, result) =>
+        val requestKey = TopicPartitionOperationKey(topicPartition)
+        result.info.leaderHwChange match {
+          case LeaderHwChange.INCREASED =>
+            // some delayed operations may be unblocked after HW changed
+            delayedProducePurgatory.checkAndComplete(requestKey)
+            delayedFetchPurgatory.checkAndComplete(requestKey)
+            delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
+          case LeaderHwChange.SAME =>
+            // probably unblock some follower fetch requests since log end 
offset has been updated
+            delayedFetchPurgatory.checkAndComplete(requestKey)
+          case LeaderHwChange.NONE =>
+            // nothing
+        }
+      }
+    }
+
+    recordValidationStatsCallback(localProduceResults.map { case (k, v) => k 
-> v.info.recordValidationStats })
+
+    if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
allResults)) {
+      // create delayed produce operation
+      val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
+      val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, 
responseCallback, delayedProduceLock)
+
+      // create a list of (topic, partition) pairs to use as keys for this 
delayed produce operation
+      val producerRequestKeys = 
entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
+
+      // try to complete the request immediately, otherwise put it into the 
purgatory
+      // this is because while the delayed produce operation is being created, 
new
+      // requests may arrive and hence make this operation completable.
+      delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, 
producerRequestKeys)
+    } else {
+      // we can respond immediately
+      val produceResponseStatus = produceStatus.map { case (k, status) => k -> 
status.responseStatus }
+      responseCallback(produceResponseStatus)
+    }
+  }
+
+  private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, 
MemoryRecords],
+                                             responseCallback: 
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+    // If required.acks is outside accepted range, something is wrong with the 
client
+    // Just return an error and don't handle the request at all
+    val responseStatus = entries.map { case (topicPartition, _) =>
+      topicPartition -> new PartitionResponse(
+        Errors.INVALID_REQUIRED_ACKS,
+        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+        RecordBatch.NO_TIMESTAMP,
+        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
+      )
+    }
+    responseCallback(responseStatus)
+  }
+
+  def maybeStartTransactionVerificationForPartition(
+    topicPartition: TopicPartition,
+    transactionalId: String,
+    producerId: Long,
+    producerEpoch: Short,
+    baseSequence: Int,
+    requestLocal: RequestLocal,
+    callback: (Errors, RequestLocal, VerificationGuard) => Unit
+  ): Unit = {
+    def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
+                            newRequestLocal: RequestLocal,
+                            verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
+      callback(preAppendErrors.getOrElse(topicPartition, Errors.NONE), 
newRequestLocal, verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))

Review Comment:
   nit: its a long argument list. could we put the arguments on separate lines?



##########
core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala:
##########
@@ -168,8 +168,37 @@ object AbstractCoordinatorConcurrencyTest {
       watchKeys = Collections.newSetFromMap(new 
ConcurrentHashMap[TopicPartitionOperationKey, java.lang.Boolean]()).asScala
     }
 
+    override def maybeStartTransactionVerificationForPartition(
+      topicPartition: TopicPartition,
+      transactionalId: String,
+      producerId: Long,
+      producerEpoch: Short,
+      baseSequence: Int,
+      requestLocal: RequestLocal,
+      callback: (Errors, RequestLocal, VerificationGuard) => Unit
+    ): Unit = {
+      // Skip verification
+      callback(Errors.NONE, requestLocal, VerificationGuard.SENTINEL)
+    }
+
     override def tryCompleteActions(): Unit = 
watchKeys.map(producePurgatory.checkAndComplete)
 
+    override def appendForGroup(timeout: Long,
+                                requiredAcks: Short,
+                                internalTopicsAllowed: Boolean,
+                                origin: AppendOrigin,
+                                entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
+                                responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
+                                delayedProduceLock: Option[Lock] = None,
+                                recordValidationStatsCallback: 
Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
+                                requestLocal: RequestLocal = 
RequestLocal.NoCaching,
+                                actionQueue: ActionQueue = null,
+                                verificationGuards: Map[TopicPartition, 
VerificationGuard] = Map.empty,

Review Comment:
   What is the difference between passing no verification guard and passing 
`VerificationGuard.SENTINEL`?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -941,39 +851,209 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def partitionEntriesForVerification(verificationGuards: 
mutable.Map[TopicPartition, VerificationGuard],
-                                              entriesPerPartition: 
Map[TopicPartition, MemoryRecords],
-                                              verifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-                                              unverifiedEntries: 
mutable.Map[TopicPartition, MemoryRecords],
-                                              errorEntries: 
mutable.Map[TopicPartition, Errors]): Unit= {
-    val transactionalProducerIds = mutable.HashSet[Long]()
+  /**
+   * Handles the produce request by starting any transactional verification 
before appending.
+   *
+   * @param timeout                       maximum time we will wait to append 
before returning
+   * @param requiredAcks                  number of replicas who must 
acknowledge the append before sending the response
+   * @param internalTopicsAllowed         boolean indicating whether internal 
topics can be appended to
+   * @param origin                        source of the append request (ie, 
client, replication, coordinator)
+   * @param entriesPerPartition           the records per partition to be 
appended
+   * @param responseCallback              callback for sending the response
+   * @param delayedProduceLock            lock for the delayed actions
+   * @param recordValidationStatsCallback callback for updating stats on 
record conversions
+   * @param requestLocal                  container for the stateful instances 
scoped to this request -- this must correspond to the
+   *                                      thread calling this method
+   * @param actionQueue                   the action queue to use. 
ReplicaManager#defaultActionQueue is used by default.
+   * @param verificationGuards            the mapping from topic partition to 
verification guards if transaction verification is used
+   */
+  def handleProduceAppend(timeout: Long,
+                          requiredAcks: Short,
+                          internalTopicsAllowed: Boolean,
+                          origin: AppendOrigin,
+                          transactionalId: String,
+                          entriesPerPartition: Map[TopicPartition, 
MemoryRecords],
+                          responseCallback: Map[TopicPartition, 
PartitionResponse] => Unit,
+                          recordValidationStatsCallback: Map[TopicPartition, 
RecordValidationStats] => Unit = _ => (),
+                          requestLocal: RequestLocal = RequestLocal.NoCaching,
+                          actionQueue: ActionQueue = this.defaultActionQueue): 
Unit = {
+
+    val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
+    val topicPartitionBatchInfo = mutable.Map[TopicPartition, Int]()
     entriesPerPartition.foreach { case (topicPartition, records) =>
-      try {
-        // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
-        val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
-        transactionalBatches.foreach(batch => 
transactionalProducerIds.add(batch.producerId))
-
-        if (transactionalBatches.nonEmpty) {
-          // We return VerificationGuard if the partition needs to be 
verified. If no state is present, no need to verify.
-          val firstBatch = records.firstBatch
-          val verificationGuard = 
getPartitionOrException(topicPartition).maybeStartTransactionVerification(firstBatch.producerId,
 firstBatch.baseSequence, firstBatch.producerEpoch)
-          if (verificationGuard != VerificationGuard.SENTINEL) {
-            verificationGuards.put(topicPartition, verificationGuard)
-            unverifiedEntries.put(topicPartition, records)
-          } else
-            verifiedEntries.put(topicPartition, records)
-        } else {
-          // If there is no producer ID or transactional records in the 
batches, no need to verify.
-          verifiedEntries.put(topicPartition, records)
-        }
-      } catch {
-        case e: Exception => errorEntries.put(topicPartition, 
Errors.forException(e))
-      }
+      // Produce requests (only requests that require verification) should 
only have one batch per partition in "batches" but check all just to be safe.
+      val transactionalBatches = records.batches.asScala.filter(batch => 
batch.hasProducerId && batch.isTransactional)
+      transactionalBatches.foreach(batch => 
transactionalProducerInfo.add(batch.producerId, batch.producerEpoch))
+      if (!transactionalBatches.isEmpty) 
topicPartitionBatchInfo.put(topicPartition, records.firstBatch.baseSequence)
     }
-    // We should have exactly one producer ID for transactional records
-    if (transactionalProducerIds.size > 1) {
+    if (transactionalProducerInfo.size > 1) {
       throw new InvalidPidMappingException("Transactional records contained 
more than one producer ID")
     }
+
+    def postVerificationCallback(preAppendErrors: Map[TopicPartition, Errors],
+                                 newRequestLocal: RequestLocal,
+                                 verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
+      val errorResults = preAppendErrors.map {
+        case (topicPartition, error) =>
+          // translate transaction coordinator errors to known producer 
response errors
+          val customException =
+            error match {
+              case Errors.INVALID_TXN_STATE => Some(error.exception("Partition 
was not added to the transaction"))
+              case Errors.CONCURRENT_TRANSACTIONS |
+                   Errors.COORDINATOR_LOAD_IN_PROGRESS |
+                   Errors.COORDINATOR_NOT_AVAILABLE |
+                   Errors.NOT_COORDINATOR => Some(new 
NotEnoughReplicasException(
+                s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
+              case _ => None
+            }
+          topicPartition -> LogAppendResult(
+            LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
+            Some(customException.getOrElse(error.exception)),
+            hasCustomErrorMessage = customException.isDefined
+          )
+      }
+
+      appendRecords(
+        timeout = timeout,
+        requiredAcks = requiredAcks,
+        internalTopicsAllowed = internalTopicsAllowed,
+        origin = origin,
+        entriesPerPartition = entriesPerPartition,
+        responseCallback = responseCallback,
+        recordValidationStatsCallback = recordValidationStatsCallback,
+        requestLocal = newRequestLocal,
+        verificationGuards = verificationGuards,
+        preAppendErrors = errorResults
+      )
+    }
+
+    if (transactionalProducerInfo.size < 1) {
+      postVerificationCallback(Map.empty[TopicPartition, Errors], 
requestLocal, Map.empty[TopicPartition, VerificationGuard])
+      return
+    }
+    maybeStartTransactionVerificationForPartitions(topicPartitionBatchInfo, 
transactionalId,
+      transactionalProducerInfo.head._1, transactionalProducerInfo.head._2, 
requestLocal, postVerificationCallback)
+  }
+
+  private def sendInvalidRequiredAcksResponse(entries: Map[TopicPartition, 
MemoryRecords],
+                                             responseCallback: 
Map[TopicPartition, PartitionResponse] => Unit): Unit = {
+    // If required.acks is outside accepted range, something is wrong with the 
client
+    // Just return an error and don't handle the request at all
+    val responseStatus = entries.map { case (topicPartition, _) =>
+      topicPartition -> new PartitionResponse(
+        Errors.INVALID_REQUIRED_ACKS,
+        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.firstOffset,
+        RecordBatch.NO_TIMESTAMP,
+        LogAppendInfo.UNKNOWN_LOG_APPEND_INFO.logStartOffset
+      )
+    }
+    responseCallback(responseStatus)
+  }
+
+  def maybeStartTransactionVerificationForPartition(
+    topicPartition: TopicPartition,
+    transactionalId: String,
+    producerId: Long,
+    producerEpoch: Short,
+    baseSequence: Int,
+    requestLocal: RequestLocal,
+    callback: (Errors, RequestLocal, VerificationGuard) => Unit
+  ): Unit = {
+    def generalizedCallback(preAppendErrors: Map[TopicPartition, Errors],
+                            newRequestLocal: RequestLocal,
+                            verificationGuards: Map[TopicPartition, 
VerificationGuard]): Unit = {
+      callback(preAppendErrors.getOrElse(topicPartition, Errors.NONE), 
newRequestLocal, verificationGuards.getOrElse(topicPartition, 
VerificationGuard.SENTINEL))
+    }
+
+    maybeStartTransactionVerificationForPartitions(
+      Map(topicPartition -> baseSequence),
+      transactionalId,
+      producerId,
+      producerEpoch,
+      requestLocal,
+      generalizedCallback
+    )
+  }
+
+  def maybeStartTransactionVerificationForPartitions(
+    topicPartitionBatchInfo: Map[TopicPartition, Int],
+    transactionalId: String,
+    producerId: Long,
+    producerEpoch: Short,
+    requestLocal: RequestLocal,
+    callback: (Map[TopicPartition, Errors], RequestLocal, Map[TopicPartition, 
VerificationGuard]) => Unit

Review Comment:
   I don't feel too strongly. Kind of a nasty signature either way. I guess it 
makes it clear that a partition should not be included in both maps.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -339,156 +340,183 @@ class GroupMetadataManager(brokerId: Int,
       delayedProduceLock = Some(group.lock),
       responseCallback = callback,
       requestLocal = requestLocal,
-      transactionalId = transactionalId)
+      verificationGuards = verificationGuards,
+      preAppendErrors = preAppendErrors)
+  }
+
+  def generateOffsetRecords(magicValue: Byte,
+                            isTxnOffsetCommit: Boolean,
+                            groupId: String,
+                            offsetTopicPartition: TopicPartition,
+                            filteredOffsetMetadata: Map[TopicIdPartition, 
OffsetAndMetadata],
+                            producerId: Long,
+                            producerEpoch: Short): Map[TopicPartition, 
MemoryRecords] = {
+      // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
+      val timestampType = TimestampType.CREATE_TIME
+      val timestamp = time.milliseconds()
+
+      val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
+        val key = GroupMetadataManager.offsetCommitKey(groupId, 
topicIdPartition.topicPartition)
+        val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
+        new SimpleRecord(timestamp, key, value)
+      }
+      val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
+
+      if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
+        throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to 
make a transaction offset commit with an invalid magic: " + magicValue)
+
+      val builder = MemoryRecords.builder(buffer, magicValue, compressionType, 
timestampType, 0L, time.milliseconds(),
+        producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
+
+      records.foreach(builder.append)
+      Map(offsetTopicPartition -> builder.build())
+  }
+
+  def createPutCacheCallback(isTxnOffsetCommit: Boolean,
+                             group: GroupMetadata,
+                             consumerId: String,
+                             offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
+                             filteredOffsetMetadata: Map[TopicIdPartition, 
OffsetAndMetadata],
+                             responseCallback: immutable.Map[TopicIdPartition, 
Errors] => Unit,
+                             producerId: Long = RecordBatch.NO_PRODUCER_ID,
+                             records: Map[TopicPartition, MemoryRecords],
+                             preAppendErrors: Map[TopicPartition, 
LogAppendResult] = Map.empty): Map[TopicPartition, PartitionResponse] => Unit = 
{
+    val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+    // set the callback function to insert offsets into cache after log append 
completed
+    def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
+      // the append response should only contain the topics partition
+      if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
+        throw new IllegalStateException("Append status %s should only have one 
partition %s"
+          .format(responseStatus, offsetTopicPartition))
+
+      // construct the commit response status and insert
+      // the offset and metadata to cache if the append status has no error
+      val status = responseStatus(offsetTopicPartition)
+
+      val responseError = group.inLock {
+        if (status.error == Errors.NONE) {
+          if (!group.is(Dead)) {
+            filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+              if (isTxnOffsetCommit)
+                group.onTxnOffsetCommitAppend(producerId, topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+              else
+                group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+            }
+          }
+
+          // Record the number of offsets committed to the log
+          offsetCommitsSensor.record(records.size)
+
+          Errors.NONE
+        } else {
+          if (!group.is(Dead)) {
+            if (!group.hasPendingOffsetCommitsFromProducer(producerId))
+              removeProducerGroup(producerId, group.groupId)
+            filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
+              if (isTxnOffsetCommit)
+                group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
+              else
+                group.failPendingOffsetWrite(topicIdPartition, 
offsetAndMetadata)
+            }
+          }
+
+          debug(s"Offset commit $filteredOffsetMetadata from group 
${group.groupId}, consumer $consumerId " +
+            s"with generation ${group.generationId} failed when appending to 
log due to ${status.error.exceptionName}")
+
+          // transform the log append error code to the corresponding the 
commit status error code
+          status.error match {
+            case Errors.UNKNOWN_TOPIC_OR_PARTITION
+                 | Errors.NOT_ENOUGH_REPLICAS
+                 | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+              Errors.COORDINATOR_NOT_AVAILABLE
+
+            case Errors.NOT_LEADER_OR_FOLLOWER
+                 | Errors.KAFKA_STORAGE_ERROR =>
+              Errors.NOT_COORDINATOR
+
+            case Errors.MESSAGE_TOO_LARGE
+                 | Errors.RECORD_LIST_TOO_LARGE
+                 | Errors.INVALID_FETCH_SIZE =>
+              Errors.INVALID_COMMIT_OFFSET_SIZE
+
+            case other => other
+          }
+        }
+      }
+
+      // compute the final error codes for the commit response
+      val commitStatus = offsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
+        if (!validateOffsetMetadataLength(offsetAndMetadata.metadata))
+          (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+        else if (preAppendErrors.contains(topicIdPartition.topicPartition))
+          (topicIdPartition, 
preAppendErrors(topicIdPartition.topicPartition).error)
+        else
+          (topicIdPartition, responseError)
+      }
+
+      // finally trigger the callback logic passed from the API layer
+      responseCallback(commitStatus)
+    }
+    putCacheCallback
   }
 
   /**
    * Store offsets by appending it to the replicated log and then inserting to 
cache
    */
   def storeOffsets(group: GroupMetadata,
                    consumerId: String,
+                   offsetTopicPartition: TopicPartition,
                    offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicIdPartition, Errors] 
=> Unit,
-                   transactionalId: String = null,
                    producerId: Long = RecordBatch.NO_PRODUCER_ID,
                    producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
-                   requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
-    // first filter out partitions with offset metadata size exceeding limit
-    val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
-      validateOffsetMetadataLength(offsetAndMetadata.metadata)
-    }
-
+                   requestLocal: RequestLocal = RequestLocal.NoCaching,
+                   verificationGuard: Option[VerificationGuard]): Unit = {
     group.inLock {
       if (!group.hasReceivedConsistentOffsetCommits)
         warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has 
received offset commits from consumers as well " +
           s"as transactional producers. Mixing both types of offset commits 
will generally result in surprises and " +
           s"should be avoided.")
     }
 
-    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
-    // construct the message set to append
+    val filteredOffsetMetadata = offsetMetadata.filter { case (_, 
offsetAndMetadata) =>
+      validateOffsetMetadataLength(offsetAndMetadata.metadata)
+    }
     if (filteredOffsetMetadata.isEmpty) {
       // compute the final error codes for the commit response
       val commitStatus = offsetMetadata.map { case (k, _) => k -> 
Errors.OFFSET_METADATA_TOO_LARGE }
       responseCallback(commitStatus)
-    } else {
-      getMagic(partitionFor(group.groupId)) match {
-        case Some(magicValue) =>
-          // We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-          val timestampType = TimestampType.CREATE_TIME
-          val timestamp = time.milliseconds()
-
-          val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-            val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicIdPartition.topicPartition)
-            val value = 
GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
-            new SimpleRecord(timestamp, key, value)
-          }
-          val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
-          val buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, 
compressionType, records.asJava))
-
-          if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
-            throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting 
to make a transaction offset commit with an invalid magic: " + magicValue)
-
-          val builder = MemoryRecords.builder(buffer, magicValue, 
compressionType, timestampType, 0L, time.milliseconds(),
-            producerId, producerEpoch, 0, isTxnOffsetCommit, 
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
-          records.foreach(builder.append)
-          val entries = Map(offsetTopicPartition -> builder.build())
-
-          // set the callback function to insert offsets into cache after log 
append completed
-          def putCacheCallback(responseStatus: Map[TopicPartition, 
PartitionResponse]): Unit = {
-            // the append response should only contain the topics partition
-            if (responseStatus.size != 1 || 
!responseStatus.contains(offsetTopicPartition))
-              throw new IllegalStateException("Append status %s should only 
have one partition %s"
-                .format(responseStatus, offsetTopicPartition))
-
-            // construct the commit response status and insert
-            // the offset and metadata to cache if the append status has no 
error
-            val status = responseStatus(offsetTopicPartition)
-
-            val responseError = group.inLock {
-              if (status.error == Errors.NONE) {
-                if (!group.is(Dead)) {
-                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-                    if (isTxnOffsetCommit)
-                      group.onTxnOffsetCommitAppend(producerId, 
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), 
offsetAndMetadata))
-                    else
-                      group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
-                  }
-                }
-
-                // Record the number of offsets committed to the log
-                offsetCommitsSensor.record(records.size)
-
-                Errors.NONE
-              } else {
-                if (!group.is(Dead)) {
-                  if (!group.hasPendingOffsetCommitsFromProducer(producerId))
-                    removeProducerGroup(producerId, group.groupId)
-                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
-                    if (isTxnOffsetCommit)
-                      group.failPendingTxnOffsetCommit(producerId, 
topicIdPartition)
-                    else
-                      group.failPendingOffsetWrite(topicIdPartition, 
offsetAndMetadata)
-                  }
-                }
-
-                debug(s"Offset commit $filteredOffsetMetadata from group 
${group.groupId}, consumer $consumerId " +
-                  s"with generation ${group.generationId} failed when 
appending to log due to ${status.error.exceptionName}")
-
-                // transform the log append error code to the corresponding 
the commit status error code
-                status.error match {
-                  case Errors.UNKNOWN_TOPIC_OR_PARTITION
-                       | Errors.NOT_ENOUGH_REPLICAS
-                       | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                    Errors.COORDINATOR_NOT_AVAILABLE
-
-                  case Errors.NOT_LEADER_OR_FOLLOWER
-                       | Errors.KAFKA_STORAGE_ERROR =>
-                    Errors.NOT_COORDINATOR
-
-                  case Errors.MESSAGE_TOO_LARGE
-                       | Errors.RECORD_LIST_TOO_LARGE
-                       | Errors.INVALID_FETCH_SIZE =>
-                    Errors.INVALID_COMMIT_OFFSET_SIZE
-
-                  case other => other
-                }
-              }
-            }
-
-            // compute the final error codes for the commit response
-            val commitStatus = offsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
-              if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-                (topicIdPartition, responseError)
-              else
-                (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
-            }
+      return
+    }
 
-            // finally trigger the callback logic passed from the API layer
-            responseCallback(commitStatus)
-          }
+    val magicOpt = getMagic(partitionFor(group.groupId))
+    if (magicOpt.isEmpty) {
+      val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
+        (topicIdPartition, Errors.NOT_COORDINATOR)
+      }
+      responseCallback(commitStatus)
+      return
+    }
 
-          if (isTxnOffsetCommit) {
-            group.inLock {
-              addProducerGroup(producerId, group.groupId)
-              group.prepareTxnOffsetCommit(producerId, offsetMetadata)
-            }
-          } else {
-            group.inLock {
-              group.prepareOffsetCommit(offsetMetadata)
-            }
-          }
+    val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
+    val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit, 
group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId, 
producerEpoch)
+    val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group, 
consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback, 
producerId, records)
 
-          appendForGroup(group, entries, requestLocal, putCacheCallback, 
transactionalId)
+    val verificationGuards = verificationGuard match {

Review Comment:
   nit: might be able to simplify this? something like this:
   ```scala
   verificationGuard.map(guard => offsetTopicPartition -> guard).toMap
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to