jeffkbkim commented on code in PR #14985:
URL: https://github.com/apache/kafka/pull/14985#discussion_r1426320341
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -892,6 +895,43 @@ public void replay(
}
}
+ /**
+ * Applies the given transaction marker.
+ *
+ * @param producerId The producer id.
+ * @param result The result of the transaction.
+ * @throws RuntimeException if the transaction can not be completed.
+ */
+ public void completeTransaction(
+ long producerId,
+ TransactionResult result
+ ) throws RuntimeException {
+ Offsets pendingOffsets =
pendingTransactionalOffsets.remove(producerId);
+
+ if (result == TransactionResult.COMMIT) {
+ log.debug("Committed transactional offset commits for producer id
{}.", producerId);
+ if (pendingOffsets == null) return;
+
+ pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+ topicOffsets.forEach((topicName, partitionOffsets) -> {
+ partitionOffsets.forEach((partitionId, offsetAndMetadata)
-> {
+ log.debug("Committed transaction offset commit for
producer id {} in group {} " +
+ "with topic {}, partition {}, and offset {}.",
+ producerId, groupId, topicName, partitionId,
offsetAndMetadata);
+ offsets.put(
+ groupId,
+ topicName,
+ partitionId,
+ offsetAndMetadata
+ );
Review Comment:
Some points i would like to confirm:
1. During load, let's say there is an uncommitted end txn marker and we
update the `offsets` map. Until the high watermark advances, we shouldn't
consider the transaction as committed.
2. During normal txn write operation, we append the end txn marker and
update the last written offset to the end txn marker offset + 1. Once it's
committed we can advance the HWM / last committed offset.
Is this right? i think i'm confused on actually how we differentiate a
replicated end txn marker from an end txn marker that was only appended to the
leader.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2416,14 +2418,49 @@ class KafkaApis(val requestChannel: RequestChannel,
numAppends.decrementAndGet()
skippedMarkers += 1
} else {
- val controlRecords = partitionsWithCompatibleMessageFormat.map {
partition =>
- val controlRecordType = marker.transactionResult match {
- case TransactionResult.COMMIT => ControlRecordType.COMMIT
- case TransactionResult.ABORT => ControlRecordType.ABORT
+ val controlRecordType = marker.transactionResult match {
+ case TransactionResult.COMMIT => ControlRecordType.COMMIT
+ case TransactionResult.ABORT => ControlRecordType.ABORT
+ }
+
+ val markerResults = new ConcurrentHashMap[TopicPartition, Errors]()
+ def maybeComplete(): Unit = {
+ if (partitionsWithCompatibleMessageFormat.size ==
markerResults.size) {
+ maybeSendResponseCallback(producerId, marker.transactionResult,
markerResults)
+ }
+ }
+
+ val controlRecords = mutable.Map.empty[TopicPartition, MemoryRecords]
+ partitionsWithCompatibleMessageFormat.foreach { partition =>
+ if (config.isNewGroupCoordinatorEnabled && partition.topic ==
GROUP_METADATA_TOPIC_NAME) {
+ // When the new group coordinator is used, writing the end marker
is fully delegated
+ // the group coordinator.
Review Comment:
nit: "to the"
##########
core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala:
##########
@@ -135,7 +136,22 @@ class CoordinatorLoaderImpl[T](
memoryRecords.batches.forEach { batch =>
if (batch.isControlBatch) {
- throw new IllegalStateException("Control batches are not
supported yet.")
+ batch.asScala.foreach { record =>
Review Comment:
+1, both writing and loading the end txn marker are referred to as
completeTransaction which is confusing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]