lianetm commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1568972567


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1333,32 +1342,41 @@ public void 
testListenerCallbacksInvoke(List<ConsumerRebalanceListenerMethodName
     private static Stream<Arguments> listenerCallbacksInvokeSource() {
         Optional<RuntimeException> empty = Optional.empty();
         Optional<RuntimeException> error = Optional.of(new 
RuntimeException("Intentional error"));
+        Optional<RuntimeException> kafkaException = Optional.of(new 
KafkaException("Intentional error"));
+        Optional<RuntimeException> wrappedException = Optional.of(new 
KafkaException("User rebalance callback throws an error", error.get()));
 
         return Stream.of(
             // Tests if we don't have an event, the listener doesn't get 
called.
-            Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0),
+            Arguments.of(Collections.emptyList(), empty, empty, empty, 0, 0, 
0, empty),
 
             // Tests if we get an event for a revocation, that we invoke our 
listener.
-            Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0),
+            Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
empty, empty, empty, 1, 0, 0, empty),
 
             // Tests if we get an event for an assignment, that we invoke our 
listener.
-            Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0),
+            Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, empty, empty, 0, 1, 0, empty),
 
             // Tests that we invoke our listener even if it encounters an 
exception.
-            Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1),
+            Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, empty, 0, 0, 1, empty),
 
             // Tests that we invoke our listener even if it encounters an 
exception.
-            Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0),
+            Arguments.of(Collections.singletonList(ON_PARTITIONS_REVOKED), 
error, empty, empty, 1, 0, 0, wrappedException),
 
             // Tests that we invoke our listener even if it encounters an 
exception.
-            Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0),
+            Arguments.of(Collections.singletonList(ON_PARTITIONS_ASSIGNED), 
empty, error, empty, 0, 1, 0, wrappedException),
 
             // Tests that we invoke our listener even if it encounters an 
exception.
-            Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1),
+            Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, error, 0, 0, 1, wrappedException),
+
+            // Tests that we invoke our listener even if it encounters an 
exception. Special case to test that a kafka exception is not wrapped.
+            Arguments.of(Collections.singletonList(ON_PARTITIONS_LOST), empty, 
empty, kafkaException, 0, 0, 1, kafkaException),

Review Comment:
   Nice addition, indeed part of what the legacy logic does for not wrapping a 
KafkaException  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to