artemlivshits commented on code in PR #16840:
URL: https://github.com/apache/kafka/pull/16840#discussion_r1724057107


##########
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java:
##########
@@ -115,6 +134,18 @@ public NodeApiVersions(Collection<ApiVersion> 
nodeApiVersions, Collection<Suppor
         }
         this.supportedFeatures = 
Collections.unmodifiableMap(supportedFeaturesBuilder);
         this.zkMigrationEnabled = zkMigrationEnabled;
+
+        this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+        if (finalizedFeaturesEpoch == -1) {
+            this.finalizedFeatures = Collections.emptyMap();
+            return;
+        }
+
+        Map<String, Short> finalizedFeaturesBuilder = new HashMap<>();

Review Comment:
   Do we need a separate map?  I think we can just put directly to 
`finalizedFeatures`.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -368,15 +370,21 @@ public synchronized TransactionalRequestResult 
sendOffsetsToTransaction(final Ma
                 "(currentState= " + currentState + ")");
         }
 
-        log.debug("Begin adding offsets {} for consumer group {} to 
transaction", offsets, groupMetadata);
-        AddOffsetsToTxnRequest.Builder builder = new 
AddOffsetsToTxnRequest.Builder(
-            new AddOffsetsToTxnRequestData()
-                .setTransactionalId(transactionalId)
-                .setProducerId(producerIdAndEpoch.producerId)
-                .setProducerEpoch(producerIdAndEpoch.epoch)
-                .setGroupId(groupMetadata.groupId())
-        );
-        AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, 
offsets, groupMetadata);
+        TxnRequestHandler handler;
+        if (coordinatorSupportsTransactionV2) {
+            log.debug("Begin adding offsets {} for consumer group {} to 
transaction with transaction protocol V2", offsets, groupMetadata);
+            handler = txnOffsetCommitHandler(null, offsets, groupMetadata, 
ApiKeys.TXN_OFFSET_COMMIT.latestVersion());
+        } else {
+            log.debug("Begin adding offsets {} for consumer group {} to 
transaction", offsets, groupMetadata);
+            AddOffsetsToTxnRequest.Builder builder = new 
AddOffsetsToTxnRequest.Builder(
+                new AddOffsetsToTxnRequestData()
+                    .setTransactionalId(transactionalId)
+                    .setProducerId(producerIdAndEpoch.producerId)
+                    .setProducerEpoch(producerIdAndEpoch.epoch)
+                    .setGroupId(groupMetadata.groupId())
+            );
+            handler = new AddOffsetsToTxnHandler(builder, offsets, 
groupMetadata);

Review Comment:
   I presume once the txn v2 protocol is released, the latest version will be 
the default. but we need to use a previous version because v2 is not enabled.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -395,6 +403,8 @@ public synchronized void maybeAddPartition(TopicPartition 
topicPartition) {
                     " to transaction while in state  " + currentState);
             } else if (isPartitionAdded(topicPartition) || 
isPartitionPendingAdd(topicPartition)) {
                 return;
+            } else if (coordinatorSupportsTransactionV2) {
+                txnPartitionMap.getOrCreate(topicPartition);

Review Comment:
   Why not just directly add to `partitionsInTransaction` set here?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -980,6 +1002,28 @@ void handleCoordinatorReady() {
                 null;
         this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
                 initProducerIdVersion.maxVersion() >= 3;
+
+        if (nodeApiVersions == null) return;
+
+        if (nodeApiVersions.finalizedFeatures() != null) {
+            /*
+                To enable the transaction V2, it requires:
+                1. transaction.version finalized version >= 2

Review Comment:
   We should check every node API versions (not just coordinator's) and keep 
track of the latest transaction version (where "latest" is defined as the 
latest epoch).  So that logic would be something like this:
   
   1. When we get API versions from any node, check if the transaction manager 
has the epoch that is less than the new epoch.
   2. If the epoch stored in the transaction manager is less than the new 
epoch, store the new value and the new epoch in the transaction manager.
   3. Have a method `boolean coordinatorSupportsTransactionV2() { return 
this.transactionVersion >= 2; }`
   
   This way we learn the cluster-wide consensus on the transaction version, vs. 
individual brokers that may have different value so our view doesn't jump back 
and forth (if coordinator moves, for example).



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -904,16 +906,21 @@ private void sendProduceRequest(long now, int 
destination, short acks, int timeo
         }
 
         String transactionalId = null;
+        short maxProduceRequestVersion = ApiKeys.PRODUCE.latestVersion();
         if (transactionManager != null && 
transactionManager.isTransactional()) {
             transactionalId = transactionManager.transactionalId();
+            if (!transactionManager.isTransactionV2Enabled()) {
+                maxProduceRequestVersion = 
ProduceRequest.LAST_BEFORE_TRANSACTION_V2_VERSION;

Review Comment:
   What if `ApiKeys.PRODUCE.latestVersion()` is less than 
`ProduceRequest.LAST_BEFORE_TRANSACTION_V2_VERSION`?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -980,6 +1002,28 @@ void handleCoordinatorReady() {
                 null;
         this.coordinatorSupportsBumpingEpoch = initProducerIdVersion != null &&
                 initProducerIdVersion.maxVersion() >= 3;
+
+        if (nodeApiVersions == null) return;
+
+        if (nodeApiVersions.finalizedFeatures() != null) {
+            /*
+                To enable the transaction V2, it requires:
+                1. transaction.version finalized version >= 2
+                2. The ProduceRequest max version > 
ProducerRequest.LAST_BEFORE_TRANSACTION_V2_VERSION

Review Comment:
   Why would we check the produce request version on the coordinator?  We could 
be producing to a different broker that could have a different produce version 
than the coordinator.  Ditto for OffsetCommitRequest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to