artemlivshits commented on code in PR #20882:
URL: https://github.com/apache/kafka/pull/20882#discussion_r2529300443
##########
storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerAppendInfo.java:
##########
@@ -87,24 +87,57 @@ public long producerId() {
}
private void maybeValidateDataBatch(short producerEpoch, int firstSeq,
long offset) {
- checkProducerEpoch(producerEpoch, offset);
+ // Default transaction version 0 is passed for data batches.
+ checkProducerEpoch(producerEpoch, offset, (short) 0);
if (origin == AppendOrigin.CLIENT) {
checkSequence(producerEpoch, firstSeq, offset);
}
}
- private void checkProducerEpoch(short producerEpoch, long offset) {
- if (producerEpoch < updatedEntry.producerEpoch()) {
- String message = "Epoch of producer " + producerId + " at offset "
+ offset + " in " + topicPartition +
- " is " + producerEpoch + ", " + "which is smaller than the
last seen epoch " + updatedEntry.producerEpoch();
+ /**
+ * Validates the producer epoch for transaction markers based on the
transaction version.
+ *
+ * <p>For Transaction Version 2 (TV2) and above (KIP-1228), the
coordinator always increments
+ * the producer epoch by one before writing the final transaction marker.
This establishes a
+ * clear invariant: a valid TV2 marker must have an epoch strictly greater
than the producer's
+ * current epoch at the leader. Any marker with markerEpoch <=
currentEpoch is a late or duplicate
+ * marker and must be rejected to prevent conflating multiple transactions
under the same epoch,
+ * which would threaten exactly-once semantics (EOS) guarantees.
+ *
+ * <p>For legacy transaction versions (TV0/TV1), markers were written with
the same epoch as
+ * the transactional records, so we accept markers when markerEpoch >=
currentEpoch. This
+ * preserves backward compatibility but cannot distinguish between active
and stale markers.
+ *
+ * @param producerEpoch the epoch from the transaction marker
+ * @param offset the offset where the marker will be written
+ * @param transactionVersion the transaction version (0/1 = legacy, 2 =
TV2)
+ */
+ private void checkProducerEpoch(short producerEpoch, long offset, short
transactionVersion) {
+ short current = updatedEntry.producerEpoch();
- if (origin == AppendOrigin.REPLICATION) {
- log.warn(message);
- } else {
- // Starting from 2.7, we replaced ProducerFenced error with
InvalidProducerEpoch in the
- // producer send response callback to differentiate from the
former fatal exception,
- // letting client abort the ongoing transaction and retry.
- throw new InvalidProducerEpochException(message);
+ if (transactionVersion >= 2) {
+ if (producerEpoch <= current) {
+ String message = "Reject late/dup TV2 marker: markerEpoch=" +
producerEpoch + " <= currentEpoch=" + current +
Review Comment:
Redundant code here, we could just do `boolean invalidEpoch =
(transactionVersion >= 2) ? (producerEpoch <= current) : (producerEpoch <
current);` and then `if (invalidEpoch)`.
##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1215,8 +1215,14 @@ class Partition(val topicPartition: TopicPartition,
}
}
- def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin,
requiredAcks: Int,
- requestLocal: RequestLocal, verificationGuard:
VerificationGuard = VerificationGuard.SENTINEL): LogAppendInfo = {
+ def appendRecordsToLeader(
+ records: MemoryRecords,
+ origin: AppendOrigin,
+ requiredAcks: Int,
+ requestLocal: RequestLocal,
+ verificationGuard: VerificationGuard = VerificationGuard.SENTINEL,
+ transactionVersion: Short = 0
Review Comment:
Can we make all default arguments to pass -1 (add a constant in
`TransactionVersion`, say `static final short TV_UNKNOWN = -1`) and then in
`appendEndTxnMarker` throw invalid argument exception? Otherwise it's too easy
to accidentally use a method with default and not pass proper version (which
would be then treated as TV_0).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]