cadonna commented on code in PR #14930:
URL: https://github.com/apache/kafka/pull/14930#discussion_r1422458903


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -16,8 +16,43 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;

Review Comment:
   Static imports should be the last block of imports.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -42,192 +77,142 @@
 import 
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.message.OffsetCommitResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
-import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.opentest4j.AssertionFailedError;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockConstruction;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
+@SuppressWarnings("unchecked")
 public class AsyncKafkaConsumerTest {
 
-    private AsyncKafkaConsumer<?, ?> consumer;
-    private FetchCollector<?, ?> fetchCollector;
-    private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
-    private ApplicationEventHandler applicationEventHandler;
-    private SubscriptionState subscriptions;
+    private AsyncKafkaConsumer<String, String> consumer = null;
 
-    @BeforeEach
-    public void setup() {
-        // By default, the consumer is part of a group and autoCommit is 
enabled.
-        setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
-    }
-
-    private void setup(Optional<ConsumerTestBuilder.GroupInformation> 
groupInfo, boolean enableAutoCommit) {
-        testBuilder = new 
ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo, enableAutoCommit, 
true);
-        applicationEventHandler = testBuilder.applicationEventHandler;
-        consumer = testBuilder.consumer;
-        fetchCollector = testBuilder.fetchCollector;
-        subscriptions = testBuilder.subscriptions;
-    }
+    private final Time time = new MockTime(1);
+    private final FetchCollector<String, String> fetchCollector = 
mock(FetchCollector.class);
+    private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
+    private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+    private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = 
new LinkedBlockingQueue<>();
 
     @AfterEach
-    public void cleanup() {
-        if (testBuilder != null) {
-            shutDown();
+    public void resetAll() {
+        backgroundEventQueue.clear();
+        if (consumer != null) {
+            consumer.close();
         }
+        consumer = null;
+        Mockito.framework().clearInlineMocks();
     }
 
-    private void shutDown() {
-        prepAutocommitOnClose();
-        testBuilder.close();
+    private AsyncKafkaConsumer<String, String> setup() {
+        final Properties props = requiredConsumerProperties();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
     }
 
-    private void resetWithEmptyGroupId() {
-        // Create a consumer that is not configured as part of a group.
-        cleanup();
-        setup(Optional.empty(), false);
+    private AsyncKafkaConsumer<String, String> setupWithoutGroupId() {
+        final Properties props = requiredConsumerProperties();
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
     }
 
-    private void resetWithAutoCommitEnabled() {
-        cleanup();
-        setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
+    @SuppressWarnings("UnusedReturnValue")
+    private AsyncKafkaConsumer<String, String> setupWithEmptyGroupId() {
+        final Properties props = requiredConsumerPropertiesAndGroupId("");
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
+    }
+
+    private AsyncKafkaConsumer<String, String> setup(ConsumerConfig config) {

Review Comment:
   nit: 
   Setup is quite generic. What about:
   ```suggestion
       private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig 
config) {
   ```



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1113,45 +1053,41 @@ private HashMap<TopicPartition, Long> 
mockTimestampToSearch() {
         return timestampToSearch;
     }
 
-    private void prepAutocommitOnClose() {
-        Node node = testBuilder.metadata.fetch().nodes().get(0);
-        
testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
 "group-id", node));
-        if (!testBuilder.subscriptions.allConsumed().isEmpty()) {
-            List<TopicPartition> topicPartitions = new 
ArrayList<>(testBuilder.subscriptions.assignedPartitionsList());
-            testBuilder.client.prepareResponse(mockAutocommitResponse(
-                topicPartitions,
-                (short) 1,
-                Errors.NONE).responseBody());
-        }
+    private void mockCommitApplicationEventException(Exception ex) {

Review Comment:
   I think calling this method (and its siblings) something like 
`completeCommitApplicationEventExceptionally()` 
(`completeCommitApplicationEventSuccessfully()` for the good case) would make 
the code better readable. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -241,65 +226,64 @@ private static Stream<Exception> 
commitExceptionSupplier() {
 
     @Test
     public void testFencedInstanceException() {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        doReturn(future).when(consumer).commit(new HashMap<>(), false);
+        consumer = setup();
+        
mockCommitApplicationEventException(Errors.FENCED_INSTANCE_ID.exception());
+
         assertDoesNotThrow(() -> consumer.commitAsync());
-        future.completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
     }
 
     @Test
     public void testCommitted() {
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
mockTopicPartitionOffset();
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
committedFuture = new CompletableFuture<>();
-        committedFuture.complete(offsets);
+        consumer = setup();
+        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = 
mockTopicPartitionOffset();
+        
mockFetchedCommittedOffsetApplicationEventCompleted(topicPartitionOffsets);
 
-        try (MockedConstruction<FetchCommittedOffsetsApplicationEvent> ignored 
= offsetFetchEventMocker(committedFuture)) {
-            assertDoesNotThrow(() -> consumer.committed(offsets.keySet(), 
Duration.ofMillis(1000)));
-            
verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class));
-        }
+        assertDoesNotThrow(() -> 
consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
+        
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
 any());

Review Comment:
   I am missing the verification of the return value of `committed()`.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -42,192 +77,142 @@
 import 
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.message.OffsetCommitResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
-import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.opentest4j.AssertionFailedError;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockConstruction;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
+@SuppressWarnings("unchecked")
 public class AsyncKafkaConsumerTest {
 
-    private AsyncKafkaConsumer<?, ?> consumer;
-    private FetchCollector<?, ?> fetchCollector;
-    private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
-    private ApplicationEventHandler applicationEventHandler;
-    private SubscriptionState subscriptions;
+    private AsyncKafkaConsumer<String, String> consumer = null;
 
-    @BeforeEach
-    public void setup() {
-        // By default, the consumer is part of a group and autoCommit is 
enabled.
-        setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
-    }
-
-    private void setup(Optional<ConsumerTestBuilder.GroupInformation> 
groupInfo, boolean enableAutoCommit) {
-        testBuilder = new 
ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo, enableAutoCommit, 
true);
-        applicationEventHandler = testBuilder.applicationEventHandler;
-        consumer = testBuilder.consumer;
-        fetchCollector = testBuilder.fetchCollector;
-        subscriptions = testBuilder.subscriptions;
-    }
+    private final Time time = new MockTime(1);
+    private final FetchCollector<String, String> fetchCollector = 
mock(FetchCollector.class);
+    private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
+    private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+    private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = 
new LinkedBlockingQueue<>();
 
     @AfterEach
-    public void cleanup() {
-        if (testBuilder != null) {
-            shutDown();
+    public void resetAll() {
+        backgroundEventQueue.clear();
+        if (consumer != null) {
+            consumer.close();
         }
+        consumer = null;
+        Mockito.framework().clearInlineMocks();
     }
 
-    private void shutDown() {
-        prepAutocommitOnClose();
-        testBuilder.close();
+    private AsyncKafkaConsumer<String, String> setup() {
+        final Properties props = requiredConsumerProperties();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
     }
 
-    private void resetWithEmptyGroupId() {
-        // Create a consumer that is not configured as part of a group.
-        cleanup();
-        setup(Optional.empty(), false);
+    private AsyncKafkaConsumer<String, String> setupWithoutGroupId() {
+        final Properties props = requiredConsumerProperties();
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
     }
 
-    private void resetWithAutoCommitEnabled() {
-        cleanup();
-        setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
+    @SuppressWarnings("UnusedReturnValue")
+    private AsyncKafkaConsumer<String, String> setupWithEmptyGroupId() {
+        final Properties props = requiredConsumerPropertiesAndGroupId("");
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
+    }
+
+    private AsyncKafkaConsumer<String, String> setup(ConsumerConfig config) {
+        return new AsyncKafkaConsumer<>(
+            config,
+            new StringDeserializer(),
+            new StringDeserializer(),
+            time,
+            (a, b, c, d, e, f) -> applicationEventHandler,
+            (a, b, c, d, e, f, g) -> fetchCollector,
+            (a, b, c, d) -> metadata,
+            backgroundEventQueue
+        );
     }
 
     @Test
     public void testSuccessfulStartupShutdown() {
+        consumer = setup();
         assertDoesNotThrow(() -> consumer.close());
     }
 
-    @Test
-    public void testSuccessfulStartupShutdownWithAutoCommit() {
-        resetWithAutoCommitEnabled();
-        TopicPartition tp = new TopicPartition("topic", 0);
-        consumer.assign(singleton(tp));
-        consumer.seek(tp, 100);
-        prepAutocommitOnClose();
-    }
-
     @Test
     public void testInvalidGroupId() {
-        // Create consumer without group id
-        resetWithEmptyGroupId();
-        assertThrows(InvalidGroupIdException.class, () -> 
consumer.committed(new HashSet<>()));
+        KafkaException e = assertThrows(KafkaException.class, 
this::setupWithEmptyGroupId);
+        assertTrue(e.getCause() instanceof InvalidGroupIdException);
     }
 
     @Test
     public void testFailOnClosedConsumer() {
+        consumer = setup();
         consumer.close();
         final IllegalStateException res = 
assertThrows(IllegalStateException.class, consumer::assignment);
         assertEquals("This consumer has already been closed.", 
res.getMessage());
     }
 
     @Test
-    public void testCommitAsync_NullCallback() throws InterruptedException {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-        offsets.put(new TopicPartition("my-topic", 0), new 
OffsetAndMetadata(100L));
-        offsets.put(new TopicPartition("my-topic", 1), new 
OffsetAndMetadata(200L));
+    public void testCommitAsyncWithNullCallback() {
+        consumer = setup();
+        final TopicPartition t0 = new TopicPartition("t0", 2);
+        final TopicPartition t1 = new TopicPartition("t0", 3);
+        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(t0, new OffsetAndMetadata(10L));
+        offsets.put(t1, new OffsetAndMetadata(20L));
 
-        doReturn(future).when(consumer).commit(offsets, false);
         consumer.commitAsync(offsets, null);
-        future.complete(null);
-        TestUtils.waitForCondition(future::isDone,
-                2000,
-                "commit future should complete");
 
-        assertFalse(future.isCompletedExceptionally());
+        final ArgumentCaptor<CommitApplicationEvent> commitEventCaptor = 
ArgumentCaptor.forClass(CommitApplicationEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final CommitApplicationEvent commitEvent = 
commitEventCaptor.getValue();
+        assertEquals(offsets, commitEvent.offsets());
+        assertDoesNotThrow(() -> commitEvent.future().complete(null));
+        assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
     }
 
     @ParameterizedTest
     @MethodSource("commitExceptionSupplier")
-    public void testCommitAsync_UserSuppliedCallback(Exception exception) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+    public void testCommitAsyncUserSuppliedCallback(Exception exception) {

Review Comment:
   I would separate the exceptional from the non-exceptional case. It would 
make the test more readable. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -241,65 +226,64 @@ private static Stream<Exception> 
commitExceptionSupplier() {
 
     @Test
     public void testFencedInstanceException() {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        doReturn(future).when(consumer).commit(new HashMap<>(), false);
+        consumer = setup();
+        
mockCommitApplicationEventException(Errors.FENCED_INSTANCE_ID.exception());
+
         assertDoesNotThrow(() -> consumer.commitAsync());
-        future.completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
     }

Review Comment:
   Isn't here something missing? The next call to `consumer.commitAsync()` 
should throw, right? Shouldn't we verify that?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -482,6 +497,47 @@ private void process(final GroupMetadataUpdateEvent event) 
{
                 requestManagersSupplier);
     }
 
+    // auxiliary interface for testing
+    interface ApplicationEventHandlerSupplier {

Review Comment:
   It is nor clear to me why we need a supplier here. Could you elaborate?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -42,192 +77,142 @@
 import 
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.message.OffsetCommitResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.ListOffsetsRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.function.Executable;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatchers;
-import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.opentest4j.AssertionFailedError;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockConstruction;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
+@SuppressWarnings("unchecked")
 public class AsyncKafkaConsumerTest {
 
-    private AsyncKafkaConsumer<?, ?> consumer;
-    private FetchCollector<?, ?> fetchCollector;
-    private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
-    private ApplicationEventHandler applicationEventHandler;
-    private SubscriptionState subscriptions;
+    private AsyncKafkaConsumer<String, String> consumer = null;
 
-    @BeforeEach
-    public void setup() {
-        // By default, the consumer is part of a group and autoCommit is 
enabled.
-        setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
-    }
-
-    private void setup(Optional<ConsumerTestBuilder.GroupInformation> 
groupInfo, boolean enableAutoCommit) {
-        testBuilder = new 
ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo, enableAutoCommit, 
true);
-        applicationEventHandler = testBuilder.applicationEventHandler;
-        consumer = testBuilder.consumer;
-        fetchCollector = testBuilder.fetchCollector;
-        subscriptions = testBuilder.subscriptions;
-    }
+    private final Time time = new MockTime(1);
+    private final FetchCollector<String, String> fetchCollector = 
mock(FetchCollector.class);
+    private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
+    private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+    private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = 
new LinkedBlockingQueue<>();
 
     @AfterEach
-    public void cleanup() {
-        if (testBuilder != null) {
-            shutDown();
+    public void resetAll() {
+        backgroundEventQueue.clear();
+        if (consumer != null) {
+            consumer.close();
         }
+        consumer = null;
+        Mockito.framework().clearInlineMocks();
     }
 
-    private void shutDown() {
-        prepAutocommitOnClose();
-        testBuilder.close();
+    private AsyncKafkaConsumer<String, String> setup() {
+        final Properties props = requiredConsumerProperties();
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
     }
 
-    private void resetWithEmptyGroupId() {
-        // Create a consumer that is not configured as part of a group.
-        cleanup();
-        setup(Optional.empty(), false);
+    private AsyncKafkaConsumer<String, String> setupWithoutGroupId() {
+        final Properties props = requiredConsumerProperties();
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
     }
 
-    private void resetWithAutoCommitEnabled() {
-        cleanup();
-        setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
+    @SuppressWarnings("UnusedReturnValue")
+    private AsyncKafkaConsumer<String, String> setupWithEmptyGroupId() {
+        final Properties props = requiredConsumerPropertiesAndGroupId("");
+        final ConsumerConfig config = new ConsumerConfig(props);
+        return setup(config);
+    }
+
+    private AsyncKafkaConsumer<String, String> setup(ConsumerConfig config) {
+        return new AsyncKafkaConsumer<>(
+            config,
+            new StringDeserializer(),
+            new StringDeserializer(),
+            time,
+            (a, b, c, d, e, f) -> applicationEventHandler,
+            (a, b, c, d, e, f, g) -> fetchCollector,
+            (a, b, c, d) -> metadata,
+            backgroundEventQueue
+        );
     }
 
     @Test
     public void testSuccessfulStartupShutdown() {
+        consumer = setup();
         assertDoesNotThrow(() -> consumer.close());
     }
 
-    @Test
-    public void testSuccessfulStartupShutdownWithAutoCommit() {
-        resetWithAutoCommitEnabled();
-        TopicPartition tp = new TopicPartition("topic", 0);
-        consumer.assign(singleton(tp));
-        consumer.seek(tp, 100);
-        prepAutocommitOnClose();
-    }
-
     @Test
     public void testInvalidGroupId() {
-        // Create consumer without group id
-        resetWithEmptyGroupId();
-        assertThrows(InvalidGroupIdException.class, () -> 
consumer.committed(new HashSet<>()));
+        KafkaException e = assertThrows(KafkaException.class, 
this::setupWithEmptyGroupId);
+        assertTrue(e.getCause() instanceof InvalidGroupIdException);
     }
 
     @Test
     public void testFailOnClosedConsumer() {
+        consumer = setup();
         consumer.close();
         final IllegalStateException res = 
assertThrows(IllegalStateException.class, consumer::assignment);
         assertEquals("This consumer has already been closed.", 
res.getMessage());
     }
 
     @Test
-    public void testCommitAsync_NullCallback() throws InterruptedException {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-        offsets.put(new TopicPartition("my-topic", 0), new 
OffsetAndMetadata(100L));
-        offsets.put(new TopicPartition("my-topic", 1), new 
OffsetAndMetadata(200L));
+    public void testCommitAsyncWithNullCallback() {
+        consumer = setup();
+        final TopicPartition t0 = new TopicPartition("t0", 2);
+        final TopicPartition t1 = new TopicPartition("t0", 3);
+        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(t0, new OffsetAndMetadata(10L));
+        offsets.put(t1, new OffsetAndMetadata(20L));
 
-        doReturn(future).when(consumer).commit(offsets, false);
         consumer.commitAsync(offsets, null);
-        future.complete(null);
-        TestUtils.waitForCondition(future::isDone,
-                2000,
-                "commit future should complete");
 
-        assertFalse(future.isCompletedExceptionally());
+        final ArgumentCaptor<CommitApplicationEvent> commitEventCaptor = 
ArgumentCaptor.forClass(CommitApplicationEvent.class);
+        verify(applicationEventHandler).add(commitEventCaptor.capture());
+        final CommitApplicationEvent commitEvent = 
commitEventCaptor.getValue();
+        assertEquals(offsets, commitEvent.offsets());
+        assertDoesNotThrow(() -> commitEvent.future().complete(null));
+        assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
     }
 
     @ParameterizedTest
     @MethodSource("commitExceptionSupplier")
-    public void testCommitAsync_UserSuppliedCallback(Exception exception) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+    public void testCommitAsyncUserSuppliedCallback(Exception exception) {
+        consumer = setup();
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("my-topic", 1), new 
OffsetAndMetadata(200L));
+        if (exception == null) {
+            mockCommitApplicationEventCompleted();
+        } else {
+            mockCommitApplicationEventException(exception);
+        }
 
-        doReturn(future).when(consumer).commit(offsets, false);
         MockCommitCallback callback = new MockCommitCallback();
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
+        consumer.maybeInvokeCommitCallbacks();

Review Comment:
   As far as, I understand the agreement is that the commit callback is called 
in `poll()`, `close()`, `commitSync()`, and `commitAsync()`. Why make 
`maybeInvokeCommitCallbacks()` visible instead of calling one of the mentioned 
methods?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -505,57 +484,10 @@ public void onComplete(Map<TopicPartition, 
OffsetAndMetadata> offsets, Exception
             this.exception = exception;
         }
     }
-    /**
-     * This is a rather ugly bit of code. Not my choice :(
-     *
-     * <p/>
-     *
-     * Inside the {@link 
org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we 
create an
-     * instance of {@link FetchCommittedOffsetsApplicationEvent} that holds 
the partitions and internally holds a
-     * {@link CompletableFuture}. We want to test different behaviours of the 
{@link Future#get()}, such as
-     * returning normally, timing out, throwing an error, etc. By mocking the 
construction of the event object that
-     * is created, we can affect that behavior.
-     */
-    private static MockedConstruction<FetchCommittedOffsetsApplicationEvent> 
offsetFetchEventMocker(CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
-        // This "answer" is where we pass the future to be invoked by the 
ConsumerUtils.getResult() method
-        Answer<Map<TopicPartition, OffsetAndMetadata>> getInvocationAnswer = 
invocation -> {
-            // This argument captures the actual argument value that was 
passed to the event's get() method, so we
-            // just "forward" that value to our mocked call
-            Timer timer = invocation.getArgument(0);
-            return ConsumerUtils.getResult(future, timer);
-        };
-
-        
MockedConstruction.MockInitializer<FetchCommittedOffsetsApplicationEvent> 
mockInitializer = (mock, ctx) -> {
-            // When the event's get() method is invoked, we call the "answer" 
method just above
-            when(mock.get(any())).thenAnswer(getInvocationAnswer);
-
-            // When the event's type() method is invoked, we have to return 
the type as it will be null in the mock
-            
when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSETS);
-
-            // This is needed for the WakeupTrigger code that keeps track of 
the active task
-            when(mock.future()).thenReturn(future);
-        };
-
-        return mockConstruction(FetchCommittedOffsetsApplicationEvent.class, 
mockInitializer);
-    }
-
-    private static MockedConstruction<CommitApplicationEvent> 
commitEventMocker(CompletableFuture<Void> future) {
-        Answer<Void> getInvocationAnswer = invocation -> {
-            Timer timer = invocation.getArgument(0);
-            return ConsumerUtils.getResult(future, timer);
-        };
-
-        MockedConstruction.MockInitializer<CommitApplicationEvent> 
mockInitializer = (mock, ctx) -> {
-            when(mock.get(any())).thenAnswer(getInvocationAnswer);
-            when(mock.type()).thenReturn(ApplicationEvent.Type.COMMIT);
-            when(mock.future()).thenReturn(future);
-        };
-
-        return mockConstruction(CommitApplicationEvent.class, mockInitializer);
-    }

Review Comment:
   🥳 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -241,65 +226,64 @@ private static Stream<Exception> 
commitExceptionSupplier() {
 
     @Test
     public void testFencedInstanceException() {

Review Comment:
   I would change the name to "testCommitAsyncWithFencedException".



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -505,57 +484,10 @@ public void onComplete(Map<TopicPartition, 
OffsetAndMetadata> offsets, Exception
             this.exception = exception;
         }
     }
-    /**
-     * This is a rather ugly bit of code. Not my choice :(
-     *
-     * <p/>
-     *
-     * Inside the {@link 
org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we 
create an
-     * instance of {@link FetchCommittedOffsetsApplicationEvent} that holds 
the partitions and internally holds a
-     * {@link CompletableFuture}. We want to test different behaviours of the 
{@link Future#get()}, such as
-     * returning normally, timing out, throwing an error, etc. By mocking the 
construction of the event object that
-     * is created, we can affect that behavior.
-     */
-    private static MockedConstruction<FetchCommittedOffsetsApplicationEvent> 
offsetFetchEventMocker(CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
-        // This "answer" is where we pass the future to be invoked by the 
ConsumerUtils.getResult() method
-        Answer<Map<TopicPartition, OffsetAndMetadata>> getInvocationAnswer = 
invocation -> {
-            // This argument captures the actual argument value that was 
passed to the event's get() method, so we
-            // just "forward" that value to our mocked call
-            Timer timer = invocation.getArgument(0);
-            return ConsumerUtils.getResult(future, timer);
-        };
-
-        
MockedConstruction.MockInitializer<FetchCommittedOffsetsApplicationEvent> 
mockInitializer = (mock, ctx) -> {
-            // When the event's get() method is invoked, we call the "answer" 
method just above
-            when(mock.get(any())).thenAnswer(getInvocationAnswer);
-
-            // When the event's type() method is invoked, we have to return 
the type as it will be null in the mock
-            
when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSETS);
-
-            // This is needed for the WakeupTrigger code that keeps track of 
the active task
-            when(mock.future()).thenReturn(future);
-        };
-
-        return mockConstruction(FetchCommittedOffsetsApplicationEvent.class, 
mockInitializer);
-    }
-
-    private static MockedConstruction<CommitApplicationEvent> 
commitEventMocker(CompletableFuture<Void> future) {
-        Answer<Void> getInvocationAnswer = invocation -> {
-            Timer timer = invocation.getArgument(0);
-            return ConsumerUtils.getResult(future, timer);
-        };
-
-        MockedConstruction.MockInitializer<CommitApplicationEvent> 
mockInitializer = (mock, ctx) -> {
-            when(mock.get(any())).thenAnswer(getInvocationAnswer);
-            when(mock.type()).thenReturn(ApplicationEvent.Type.COMMIT);
-            when(mock.future()).thenReturn(future);
-        };
-
-        return mockConstruction(CommitApplicationEvent.class, mockInitializer);
-    }

Review Comment:
   🥳 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1535,6 +1591,11 @@ int callbacks() {
         return invoker.callbackQueue.size();
     }
 
+    // Visible for testing
+    SubscriptionState subscriptions() {
+        return subscriptions;
+    }

Review Comment:
   Is there a way to avoid this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to