Shekharrajak commented on code in PR #22357:
URL: https://github.com/apache/kafka/pull/22357#discussion_r3293885741


##########
server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java:
##########
@@ -243,6 +251,83 @@ public void completeStateTransition(boolean commit) {
         rollbackState = null;
     }
 
+    /**
+     * @return The producer id staged for this transactional acknowledgment, 
or -1 if not in TX_PENDING.
+     */
+    public long stagedProducerId() {
+        return stagedProducerId;
+    }
+
+    /**
+     * @return The producer epoch staged for this transactional 
acknowledgment, or -1 if not in TX_PENDING.
+     */
+    public short stagedProducerEpoch() {
+        return stagedProducerEpoch;
+    }
+
+    /**
+     * @return The acknowledge type staged for this transactional 
acknowledgment, or -1 if not in TX_PENDING.
+     */
+    public byte stagedAckType() {
+        return stagedAckType;
+    }
+
+    /**
+     * Stage this record into an open producer transaction. Transitions state 
from ACQUIRED to TX_PENDING,
+     * cancels the acquisition lock timer (transaction timeout governs the 
hold instead), and records the
+     * producer identity and ack type for later resolution by {@link 
#applyTxnMarker}.
+     * Only ACCEPT and REJECT are valid ack types inside a transaction.
+     */
+    public InFlightState stageTxnAcknowledge(long producerId, short 
producerEpoch, AcknowledgeType ackType) {
+        if (ackType != AcknowledgeType.ACCEPT && ackType != 
AcknowledgeType.REJECT) {
+            throw new IllegalArgumentException("Only ACCEPT or REJECT are 
valid inside a transaction, got: " + ackType);
+        }
+        try {
+            state = state.validateTransition(RecordState.TX_PENDING);
+            this.stagedProducerId = producerId;
+            this.stagedProducerEpoch = producerEpoch;
+            this.stagedAckType = ackType.id;
+            cancelAndClearAcquisitionLockTimeoutTask();
+            return this;
+        } catch (IllegalStateException e) {
+            log.error("Failed to stage transactional acknowledgment", e);
+            return null;
+        }
+    }
+
+    /**
+     * Apply a transaction commit or abort marker to a TX_PENDING record.
+     * On COMMIT: ACCEPT resolves to ACKNOWLEDGED, REJECT resolves to 
ARCHIVING.
+     * On ABORT: reverts to AVAILABLE so the record can be redelivered.
+     * Returns null if the state is not TX_PENDING or the producer identity 
does not match.
+     */
+    public InFlightState applyTxnMarker(long producerId, short producerEpoch, 
TransactionResult result) {
+        if (state != RecordState.TX_PENDING) {
+            return null;
+        }
+        if (this.stagedProducerId != producerId || this.stagedProducerEpoch != 
producerEpoch) {

Review Comment:
    we need fencing to ensure only the right producer can confirm.
   The confirmation can come from different RPC call from a different 
broker/server. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to