jolshan commented on code in PR #21176:
URL: https://github.com/apache/kafka/pull/21176#discussion_r2646222276
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/ProducerStateManagerTest.java:
##########
@@ -1090,6 +1090,27 @@ public void testVerificationStateEntryExpiration() {
assertNull(stateManager.verificationStateEntry(producerId));
}
+ @Test
+ public void testIdempotentTransactionMarkerExceptionThrownTV2() {
+ short transactionVersion = 2;
+ appendClientEntry(stateManager, producerId, epoch, defaultSequence,
99, true);
+ assertEquals(OptionalLong.of(99L),
stateManager.firstUndecidedOffset());
+
+ short markerEpoch = (short) (epoch + 1);
+ appendEndTxnMarker(stateManager, producerId, markerEpoch,
ControlRecordType.COMMIT, 100, transactionVersion);
Review Comment:
small question -- why do we do this append in two different ways in the
test? It seems like the behavior is mostly the same in this helper vs the code
at 1106. It's just the code at 1106 doesn't do three of the steps at the end:
```
private void appendEndTxnMarker(ProducerStateManager stateManager,
long producerId,
short producerEpoch,
ControlRecordType controlType,
long offset,
int coordinatorEpoch,
long timestamp,
short transactionVersion) {
ProducerAppendInfo producerAppendInfo =
stateManager.prepareUpdate(producerId, AppendOrigin.COORDINATOR,
time.milliseconds());
EndTransactionMarker endTxnMarker = new
EndTransactionMarker(controlType, coordinatorEpoch);
Optional<CompletedTxn> completedTxn =
producerAppendInfo.appendEndTxnMarker(endTxnMarker, producerEpoch, offset,
timestamp, transactionVersion);
stateManager.update(producerAppendInfo);
completedTxn.ifPresent(stateManager::completeTxn);
stateManager.updateMapEndOffset(offset + 1);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]