artemlivshits commented on code in PR #16840:
URL: https://github.com/apache/kafka/pull/16840#discussion_r1772387333


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -971,7 +987,7 @@ synchronized boolean isInitializing() {
         return isTransactional() && currentState == State.INITIALIZING;
     }
 
-    void handleCoordinatorReady() {
+    synchronized void handleCoordinatorReady() {

Review Comment:
   What's the reason to make this `synchronized`?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -397,6 +399,7 @@ private long sendProducerData(long now) {
                 // Update both readyTimeMs and drainTimeMs, this would "reset" 
the node
                 // latency.
                 this.accumulator.updateNodeLatencyStats(node.id(), now, true);
+                if (transactionManager != null) 
this.transactionManager.handleCoordinatorReady();

Review Comment:
   Do we still need this line or it's a left-over from a preview version of 
this PR?



##########
clients/src/main/java/org/apache/kafka/clients/ApiVersions.java:
##########
@@ -34,10 +34,27 @@ public class ApiVersions {
 
     private final Map<String, NodeApiVersions> nodeApiVersions = new 
HashMap<>();
     private byte maxUsableProduceMagic = RecordBatch.CURRENT_MAGIC_VALUE;
+    // The maximum finalized feature epoch of all the node api versions.
+    private long maxFinalizedFeaturesEpoch = -1;
+    private Map<String, Short> finalizedFeatures;
+
+    public static class FinalizedFeaturesInfo {
+        public final long finalizedFeaturesEpoch;
+        public final Map<String, Short> finalizedFeatures;
+        FinalizedFeaturesInfo(long finalizedFeaturesEpoch, Map<String, Short> 
finalizedFeatures) {
+            this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+            this.finalizedFeatures = finalizedFeatures;
+        }
+    }
 
     public synchronized void update(String nodeId, NodeApiVersions 
nodeApiVersions) {
+        if (maxFinalizedFeaturesEpoch > 
nodeApiVersions.finalizedFeaturesEpoch()) {

Review Comment:
   I would expect that we still want to put the specific node's api versions 
into the per-node map.  Then we can check the epoch and put cluster-wide 
finalized features if it has newer epoch.  Something like this:
   ```
           this.nodeApiVersions.put(nodeId, nodeApiVersions);
           this.maxUsableProduceMagic = computeMaxUsableProduceMagic();
           if (maxFinalizedFeaturesEpoch > 
nodeApiVersions.finalizedFeaturesEpoch()) {
             this.maxFinalizedFeaturesEpoch = 
nodeApiVersions.finalizedFeaturesEpoch();
             this.finalizedFeatures = nodeApiVersions.finalizedFeatures();
           }
   



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -425,6 +434,18 @@ public boolean isTransactional() {
         return transactionalId != null;
     }
 
+    // Check all the finalized features from apiVersions to whether the 
transaction V2 is enabled.
+    public synchronized boolean isTransactionV2Enabled() {

Review Comment:
   Looks like the way it's written, we can can switch version in the middle of 
transaction.  I think it would be more robust to switch version when we begin 
transaction so that all calls made in the context of a single transaction would 
have the same version.



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -904,16 +907,21 @@ private void sendProduceRequest(long now, int 
destination, short acks, int timeo
         }
 
         String transactionalId = null;
+        short maxProduceRequestVersion = ApiKeys.PRODUCE.latestVersion();

Review Comment:
   I wonder if it would make more sense to push this logic into produce 
request.  Basically, we just pass the txnV2Enabled flag to the builder and it 
figures out which version of produce request to use.  This is what we do for 
magic (just pass magic rather than figuring out the produce request version 
here).



##########
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java:
##########
@@ -115,6 +134,17 @@ public NodeApiVersions(Collection<ApiVersion> 
nodeApiVersions, Collection<Suppor
         }
         this.supportedFeatures = 
Collections.unmodifiableMap(supportedFeaturesBuilder);
         this.zkMigrationEnabled = zkMigrationEnabled;
+
+        this.finalizedFeaturesEpoch = finalizedFeaturesEpoch;
+        if (finalizedFeaturesEpoch == -1) {
+            this.finalizedFeatures = Collections.emptyMap();
+            return;
+        }

Review Comment:
   nit: do we need this special case or the code below would just work?  I 
think it's good to avoid extra code.



##########
clients/src/main/resources/common/message/ProduceResponse.json:
##########
@@ -36,7 +36,12 @@
   // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields 
(KIP-951)
   //
   // Version 11 adds support for new error code TRANSACTION_ABORTABLE 
(KIP-890).
-  "validVersions": "0-11",
+  //
+  // Version 12 is the same as version 11 (KIP-890). Note when produce 
requests are used in transaction, if

Review Comment:
   Should the extended comment be part of the request instead of response?  So 
that it's consistent with TxnOffsetCommitRequest.



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -216,6 +221,10 @@ public String transactionalId() {
         return transactionalId;
     }
 
+    public boolean isTransactionV2Requested() {
+        return version() > LAST_BEFORE_TRANSACTION_V2_VERSION;

Review Comment:
   Should we call the static function?



##########
core/src/main/scala/kafka/server/AddPartitionsToTxnManager.scala:
##########
@@ -40,13 +40,33 @@ object AddPartitionsToTxnManager {
 
   val VerificationFailureRateMetricName = "VerificationFailureRate"
   val VerificationTimeMsMetricName = "VerificationTimeMs"
+
+  def produceRequestVersionToTransactionSupportedOperation(version: Short): 
TransactionSupportedOperation = {
+    if (version > 11) {
+      addPartition
+    } else if (version > 10) {
+      genericError
+    } else {
+      defaultError
+    }
+  }
+
+  def txnOffsetCommitRequestVersionToTransactionSupportedOperation(version: 
Short): TransactionSupportedOperation = {
+    if (version > 4) {
+      addPartition
+    } else if (version > 3) {
+      genericError
+    } else {
+      defaultError
+    }
+  }
 }
 
 /**
  * This is an enum which handles the Partition Response based on the Request 
Version and the exact operation
  *    defaultError:       This is the default workflow which maps to cases 
when the Produce Request Version or the Txn_offset_commit request was lower 
than the first version supporting the new Error Class
  *    genericError:       This maps to the case when the clients are updated 
to handle the TransactionAbortableException
- *    addPartition:       This is a WIP. To be updated as a part of KIP-890 
Part 2
+ *    addPartition:       This allows the partition to be added to the 
transactions inflight with the Produce and TxnOffsetCommit requests.

Review Comment:
   Is this the right code model to represent client capabilities?  The way it's 
coded it looks like there are 3 mutually exclusive options, but the options are 
not mutually exclusive: clients that support txnv2 can also support rollback 
error. 



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##########
@@ -1607,7 +1635,7 @@ public void handleResponse(AbstractResponse response) {
                 log.debug("Successfully added partition for consumer group {} 
to transaction", builder.data.groupId());
 
                 // note the result is not completed until the TxnOffsetCommit 
returns
-                pendingRequests.add(txnOffsetCommitHandler(result, offsets, 
groupMetadata));
+                pendingRequests.add(txnOffsetCommitHandler(result, offsets, 
groupMetadata, TxnOffsetCommitRequest.LAST_BEFORE_TRANSACTION_V2_VERSION));

Review Comment:
   Maybe just pass the txnV2Enabled flag and encapsulate actual version 
calculation in the builder?



##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1570,6 +1576,7 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
                 metadata = new RecordMetadata(topicPartition(), -1, -1, 
RecordBatch.NO_TIMESTAMP, -1, -1);
             }
             this.interceptors.onAcknowledgement(metadata, exception);
+

Review Comment:
   nit: extra empty line?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -848,6 +848,8 @@ class ReplicaManager(val config: KafkaConfig,
                    Errors.COORDINATOR_NOT_AVAILABLE |
                    Errors.NOT_COORDINATOR => Some(new 
NotEnoughReplicasException(
                 s"Unable to verify the partition has been added to the 
transaction. Underlying error: ${error.toString}"))
+              case Errors.UNKNOWN_PRODUCER_ID => Some(new 
OutOfOrderSequenceException(

Review Comment:
   Can we add a comment why we translate this error this way?  Also, it looks 
like we don't return this error any more (at least not in the product code).



##########
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java:
##########
@@ -216,6 +221,10 @@ public String transactionalId() {
         return transactionalId;
     }
 
+    public boolean isTransactionV2Requested() {

Review Comment:
   Is this used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to