squah-confluent commented on code in PR #22239:
URL: https://github.com/apache/kafka/pull/22239#discussion_r3214186104


##########
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##########
@@ -1260,6 +1260,32 @@ public void 
testIllegalGenerationInTxnOffsetCommitByGroupMetadata() {
         assertAbortableError(CommitFailedException.class);
     }
 
+    @ParameterizedTest
+    @EnumSource(value = Errors.class, names = {"GROUP_ID_NOT_FOUND", 
"STALE_MEMBER_EPOCH"})
+    public void testGroupMetadataMismatchErrorInTxnOffsetCommit(Errors error) {
+        // GROUP_ID_NOT_FOUND and STALE_MEMBER_EPOCH from TxnOffsetCommit (v6+)
+        // must abort the transaction with a CommitFailedException, matching 
the
+        // behavior for ILLEGAL_GENERATION returned by older broker versions.
+        final TopicPartition tp = new TopicPartition("foo", 0);
+
+        doInitTransactions();
+
+        transactionManager.beginTransaction();
+        TransactionalRequestResult sendOffsetsResult = 
transactionManager.sendOffsetsToTransaction(
+            Map.of(tp, new OffsetAndMetadata(39L)), new 
ConsumerGroupMetadata(consumerGroupId));
+
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, 
producerId, epoch);
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.GROUP, consumerGroupId);

Review Comment:
   very minor nit: In the other two TxnOffsetCommit error tests above, we 
`runUntil(() -> transactionManager.coordinator(CoordinatorType.GROUP) != 
null);` and the line breaks around the last `runUntil` are different



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to