cadonna commented on code in PR #14930:
URL: https://github.com/apache/kafka/pull/14930#discussion_r1422458903
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -16,8 +16,43 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonList;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
Review Comment:
Static imports should be the last block of imports.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -42,192 +77,142 @@
import
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.message.OffsetCommitResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
-import org.mockito.MockedConstruction;
import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.opentest4j.AssertionFailedError;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockConstruction;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+@SuppressWarnings("unchecked")
public class AsyncKafkaConsumerTest {
- private AsyncKafkaConsumer<?, ?> consumer;
- private FetchCollector<?, ?> fetchCollector;
- private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
- private ApplicationEventHandler applicationEventHandler;
- private SubscriptionState subscriptions;
+ private AsyncKafkaConsumer<String, String> consumer = null;
- @BeforeEach
- public void setup() {
- // By default, the consumer is part of a group and autoCommit is
enabled.
- setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
- }
-
- private void setup(Optional<ConsumerTestBuilder.GroupInformation>
groupInfo, boolean enableAutoCommit) {
- testBuilder = new
ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo, enableAutoCommit,
true);
- applicationEventHandler = testBuilder.applicationEventHandler;
- consumer = testBuilder.consumer;
- fetchCollector = testBuilder.fetchCollector;
- subscriptions = testBuilder.subscriptions;
- }
+ private final Time time = new MockTime(1);
+ private final FetchCollector<String, String> fetchCollector =
mock(FetchCollector.class);
+ private final ApplicationEventHandler applicationEventHandler =
mock(ApplicationEventHandler.class);
+ private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+ private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue =
new LinkedBlockingQueue<>();
@AfterEach
- public void cleanup() {
- if (testBuilder != null) {
- shutDown();
+ public void resetAll() {
+ backgroundEventQueue.clear();
+ if (consumer != null) {
+ consumer.close();
}
+ consumer = null;
+ Mockito.framework().clearInlineMocks();
}
- private void shutDown() {
- prepAutocommitOnClose();
- testBuilder.close();
+ private AsyncKafkaConsumer<String, String> setup() {
+ final Properties props = requiredConsumerProperties();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
}
- private void resetWithEmptyGroupId() {
- // Create a consumer that is not configured as part of a group.
- cleanup();
- setup(Optional.empty(), false);
+ private AsyncKafkaConsumer<String, String> setupWithoutGroupId() {
+ final Properties props = requiredConsumerProperties();
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
}
- private void resetWithAutoCommitEnabled() {
- cleanup();
- setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
+ @SuppressWarnings("UnusedReturnValue")
+ private AsyncKafkaConsumer<String, String> setupWithEmptyGroupId() {
+ final Properties props = requiredConsumerPropertiesAndGroupId("");
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
+ }
+
+ private AsyncKafkaConsumer<String, String> setup(ConsumerConfig config) {
Review Comment:
nit:
Setup is quite generic. What about:
```suggestion
private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig
config) {
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1113,45 +1053,41 @@ private HashMap<TopicPartition, Long>
mockTimestampToSearch() {
return timestampToSearch;
}
- private void prepAutocommitOnClose() {
- Node node = testBuilder.metadata.fetch().nodes().get(0);
-
testBuilder.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"group-id", node));
- if (!testBuilder.subscriptions.allConsumed().isEmpty()) {
- List<TopicPartition> topicPartitions = new
ArrayList<>(testBuilder.subscriptions.assignedPartitionsList());
- testBuilder.client.prepareResponse(mockAutocommitResponse(
- topicPartitions,
- (short) 1,
- Errors.NONE).responseBody());
- }
+ private void mockCommitApplicationEventException(Exception ex) {
Review Comment:
I think calling this method (and its siblings) something like
`completeCommitApplicationEventExceptionally()`
(`completeCommitApplicationEventSuccessfully()` for the good case) would make
the code better readable.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -241,65 +226,64 @@ private static Stream<Exception>
commitExceptionSupplier() {
@Test
public void testFencedInstanceException() {
- CompletableFuture<Void> future = new CompletableFuture<>();
- doReturn(future).when(consumer).commit(new HashMap<>(), false);
+ consumer = setup();
+
mockCommitApplicationEventException(Errors.FENCED_INSTANCE_ID.exception());
+
assertDoesNotThrow(() -> consumer.commitAsync());
- future.completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
}
@Test
public void testCommitted() {
- Map<TopicPartition, OffsetAndMetadata> offsets =
mockTopicPartitionOffset();
- CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
committedFuture = new CompletableFuture<>();
- committedFuture.complete(offsets);
+ consumer = setup();
+ Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets =
mockTopicPartitionOffset();
+
mockFetchedCommittedOffsetApplicationEventCompleted(topicPartitionOffsets);
- try (MockedConstruction<FetchCommittedOffsetsApplicationEvent> ignored
= offsetFetchEventMocker(committedFuture)) {
- assertDoesNotThrow(() -> consumer.committed(offsets.keySet(),
Duration.ofMillis(1000)));
-
verify(applicationEventHandler).add(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class));
- }
+ assertDoesNotThrow(() ->
consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
+
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class),
any());
Review Comment:
I am missing the verification of the return value of `committed()`.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -42,192 +77,142 @@
import
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.message.OffsetCommitResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
-import org.mockito.MockedConstruction;
import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.opentest4j.AssertionFailedError;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockConstruction;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+@SuppressWarnings("unchecked")
public class AsyncKafkaConsumerTest {
- private AsyncKafkaConsumer<?, ?> consumer;
- private FetchCollector<?, ?> fetchCollector;
- private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
- private ApplicationEventHandler applicationEventHandler;
- private SubscriptionState subscriptions;
+ private AsyncKafkaConsumer<String, String> consumer = null;
- @BeforeEach
- public void setup() {
- // By default, the consumer is part of a group and autoCommit is
enabled.
- setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
- }
-
- private void setup(Optional<ConsumerTestBuilder.GroupInformation>
groupInfo, boolean enableAutoCommit) {
- testBuilder = new
ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo, enableAutoCommit,
true);
- applicationEventHandler = testBuilder.applicationEventHandler;
- consumer = testBuilder.consumer;
- fetchCollector = testBuilder.fetchCollector;
- subscriptions = testBuilder.subscriptions;
- }
+ private final Time time = new MockTime(1);
+ private final FetchCollector<String, String> fetchCollector =
mock(FetchCollector.class);
+ private final ApplicationEventHandler applicationEventHandler =
mock(ApplicationEventHandler.class);
+ private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+ private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue =
new LinkedBlockingQueue<>();
@AfterEach
- public void cleanup() {
- if (testBuilder != null) {
- shutDown();
+ public void resetAll() {
+ backgroundEventQueue.clear();
+ if (consumer != null) {
+ consumer.close();
}
+ consumer = null;
+ Mockito.framework().clearInlineMocks();
}
- private void shutDown() {
- prepAutocommitOnClose();
- testBuilder.close();
+ private AsyncKafkaConsumer<String, String> setup() {
+ final Properties props = requiredConsumerProperties();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
}
- private void resetWithEmptyGroupId() {
- // Create a consumer that is not configured as part of a group.
- cleanup();
- setup(Optional.empty(), false);
+ private AsyncKafkaConsumer<String, String> setupWithoutGroupId() {
+ final Properties props = requiredConsumerProperties();
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
}
- private void resetWithAutoCommitEnabled() {
- cleanup();
- setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
+ @SuppressWarnings("UnusedReturnValue")
+ private AsyncKafkaConsumer<String, String> setupWithEmptyGroupId() {
+ final Properties props = requiredConsumerPropertiesAndGroupId("");
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
+ }
+
+ private AsyncKafkaConsumer<String, String> setup(ConsumerConfig config) {
+ return new AsyncKafkaConsumer<>(
+ config,
+ new StringDeserializer(),
+ new StringDeserializer(),
+ time,
+ (a, b, c, d, e, f) -> applicationEventHandler,
+ (a, b, c, d, e, f, g) -> fetchCollector,
+ (a, b, c, d) -> metadata,
+ backgroundEventQueue
+ );
}
@Test
public void testSuccessfulStartupShutdown() {
+ consumer = setup();
assertDoesNotThrow(() -> consumer.close());
}
- @Test
- public void testSuccessfulStartupShutdownWithAutoCommit() {
- resetWithAutoCommitEnabled();
- TopicPartition tp = new TopicPartition("topic", 0);
- consumer.assign(singleton(tp));
- consumer.seek(tp, 100);
- prepAutocommitOnClose();
- }
-
@Test
public void testInvalidGroupId() {
- // Create consumer without group id
- resetWithEmptyGroupId();
- assertThrows(InvalidGroupIdException.class, () ->
consumer.committed(new HashSet<>()));
+ KafkaException e = assertThrows(KafkaException.class,
this::setupWithEmptyGroupId);
+ assertTrue(e.getCause() instanceof InvalidGroupIdException);
}
@Test
public void testFailOnClosedConsumer() {
+ consumer = setup();
consumer.close();
final IllegalStateException res =
assertThrows(IllegalStateException.class, consumer::assignment);
assertEquals("This consumer has already been closed.",
res.getMessage());
}
@Test
- public void testCommitAsync_NullCallback() throws InterruptedException {
- CompletableFuture<Void> future = new CompletableFuture<>();
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(new TopicPartition("my-topic", 0), new
OffsetAndMetadata(100L));
- offsets.put(new TopicPartition("my-topic", 1), new
OffsetAndMetadata(200L));
+ public void testCommitAsyncWithNullCallback() {
+ consumer = setup();
+ final TopicPartition t0 = new TopicPartition("t0", 2);
+ final TopicPartition t1 = new TopicPartition("t0", 3);
+ HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(t0, new OffsetAndMetadata(10L));
+ offsets.put(t1, new OffsetAndMetadata(20L));
- doReturn(future).when(consumer).commit(offsets, false);
consumer.commitAsync(offsets, null);
- future.complete(null);
- TestUtils.waitForCondition(future::isDone,
- 2000,
- "commit future should complete");
- assertFalse(future.isCompletedExceptionally());
+ final ArgumentCaptor<CommitApplicationEvent> commitEventCaptor =
ArgumentCaptor.forClass(CommitApplicationEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final CommitApplicationEvent commitEvent =
commitEventCaptor.getValue();
+ assertEquals(offsets, commitEvent.offsets());
+ assertDoesNotThrow(() -> commitEvent.future().complete(null));
+ assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
}
@ParameterizedTest
@MethodSource("commitExceptionSupplier")
- public void testCommitAsync_UserSuppliedCallback(Exception exception) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public void testCommitAsyncUserSuppliedCallback(Exception exception) {
Review Comment:
I would separate the exceptional from the non-exceptional case. It would
make the test more readable.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -241,65 +226,64 @@ private static Stream<Exception>
commitExceptionSupplier() {
@Test
public void testFencedInstanceException() {
- CompletableFuture<Void> future = new CompletableFuture<>();
- doReturn(future).when(consumer).commit(new HashMap<>(), false);
+ consumer = setup();
+
mockCommitApplicationEventException(Errors.FENCED_INSTANCE_ID.exception());
+
assertDoesNotThrow(() -> consumer.commitAsync());
- future.completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
}
Review Comment:
Isn't here something missing? The next call to `consumer.commitAsync()`
should throw, right? Shouldn't we verify that?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -482,6 +497,47 @@ private void process(final GroupMetadataUpdateEvent event)
{
requestManagersSupplier);
}
+ // auxiliary interface for testing
+ interface ApplicationEventHandlerSupplier {
Review Comment:
It is nor clear to me why we need a supplier here. Could you elaborate?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -42,192 +77,142 @@
import
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.message.OffsetCommitResponseData;
-import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
-import org.mockito.MockedConstruction;
import org.mockito.Mockito;
-import org.mockito.stubbing.Answer;
-import org.opentest4j.AssertionFailedError;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static java.util.Arrays.asList;
-import static java.util.Collections.singleton;
-import static java.util.Collections.singletonList;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.mockConstruction;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+@SuppressWarnings("unchecked")
public class AsyncKafkaConsumerTest {
- private AsyncKafkaConsumer<?, ?> consumer;
- private FetchCollector<?, ?> fetchCollector;
- private ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder testBuilder;
- private ApplicationEventHandler applicationEventHandler;
- private SubscriptionState subscriptions;
+ private AsyncKafkaConsumer<String, String> consumer = null;
- @BeforeEach
- public void setup() {
- // By default, the consumer is part of a group and autoCommit is
enabled.
- setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
- }
-
- private void setup(Optional<ConsumerTestBuilder.GroupInformation>
groupInfo, boolean enableAutoCommit) {
- testBuilder = new
ConsumerTestBuilder.AsyncKafkaConsumerTestBuilder(groupInfo, enableAutoCommit,
true);
- applicationEventHandler = testBuilder.applicationEventHandler;
- consumer = testBuilder.consumer;
- fetchCollector = testBuilder.fetchCollector;
- subscriptions = testBuilder.subscriptions;
- }
+ private final Time time = new MockTime(1);
+ private final FetchCollector<String, String> fetchCollector =
mock(FetchCollector.class);
+ private final ApplicationEventHandler applicationEventHandler =
mock(ApplicationEventHandler.class);
+ private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
+ private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue =
new LinkedBlockingQueue<>();
@AfterEach
- public void cleanup() {
- if (testBuilder != null) {
- shutDown();
+ public void resetAll() {
+ backgroundEventQueue.clear();
+ if (consumer != null) {
+ consumer.close();
}
+ consumer = null;
+ Mockito.framework().clearInlineMocks();
}
- private void shutDown() {
- prepAutocommitOnClose();
- testBuilder.close();
+ private AsyncKafkaConsumer<String, String> setup() {
+ final Properties props = requiredConsumerProperties();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
}
- private void resetWithEmptyGroupId() {
- // Create a consumer that is not configured as part of a group.
- cleanup();
- setup(Optional.empty(), false);
+ private AsyncKafkaConsumer<String, String> setupWithoutGroupId() {
+ final Properties props = requiredConsumerProperties();
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
}
- private void resetWithAutoCommitEnabled() {
- cleanup();
- setup(ConsumerTestBuilder.createDefaultGroupInformation(), true);
+ @SuppressWarnings("UnusedReturnValue")
+ private AsyncKafkaConsumer<String, String> setupWithEmptyGroupId() {
+ final Properties props = requiredConsumerPropertiesAndGroupId("");
+ final ConsumerConfig config = new ConsumerConfig(props);
+ return setup(config);
+ }
+
+ private AsyncKafkaConsumer<String, String> setup(ConsumerConfig config) {
+ return new AsyncKafkaConsumer<>(
+ config,
+ new StringDeserializer(),
+ new StringDeserializer(),
+ time,
+ (a, b, c, d, e, f) -> applicationEventHandler,
+ (a, b, c, d, e, f, g) -> fetchCollector,
+ (a, b, c, d) -> metadata,
+ backgroundEventQueue
+ );
}
@Test
public void testSuccessfulStartupShutdown() {
+ consumer = setup();
assertDoesNotThrow(() -> consumer.close());
}
- @Test
- public void testSuccessfulStartupShutdownWithAutoCommit() {
- resetWithAutoCommitEnabled();
- TopicPartition tp = new TopicPartition("topic", 0);
- consumer.assign(singleton(tp));
- consumer.seek(tp, 100);
- prepAutocommitOnClose();
- }
-
@Test
public void testInvalidGroupId() {
- // Create consumer without group id
- resetWithEmptyGroupId();
- assertThrows(InvalidGroupIdException.class, () ->
consumer.committed(new HashSet<>()));
+ KafkaException e = assertThrows(KafkaException.class,
this::setupWithEmptyGroupId);
+ assertTrue(e.getCause() instanceof InvalidGroupIdException);
}
@Test
public void testFailOnClosedConsumer() {
+ consumer = setup();
consumer.close();
final IllegalStateException res =
assertThrows(IllegalStateException.class, consumer::assignment);
assertEquals("This consumer has already been closed.",
res.getMessage());
}
@Test
- public void testCommitAsync_NullCallback() throws InterruptedException {
- CompletableFuture<Void> future = new CompletableFuture<>();
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- offsets.put(new TopicPartition("my-topic", 0), new
OffsetAndMetadata(100L));
- offsets.put(new TopicPartition("my-topic", 1), new
OffsetAndMetadata(200L));
+ public void testCommitAsyncWithNullCallback() {
+ consumer = setup();
+ final TopicPartition t0 = new TopicPartition("t0", 2);
+ final TopicPartition t1 = new TopicPartition("t0", 3);
+ HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(t0, new OffsetAndMetadata(10L));
+ offsets.put(t1, new OffsetAndMetadata(20L));
- doReturn(future).when(consumer).commit(offsets, false);
consumer.commitAsync(offsets, null);
- future.complete(null);
- TestUtils.waitForCondition(future::isDone,
- 2000,
- "commit future should complete");
- assertFalse(future.isCompletedExceptionally());
+ final ArgumentCaptor<CommitApplicationEvent> commitEventCaptor =
ArgumentCaptor.forClass(CommitApplicationEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final CommitApplicationEvent commitEvent =
commitEventCaptor.getValue();
+ assertEquals(offsets, commitEvent.offsets());
+ assertDoesNotThrow(() -> commitEvent.future().complete(null));
+ assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
}
@ParameterizedTest
@MethodSource("commitExceptionSupplier")
- public void testCommitAsync_UserSuppliedCallback(Exception exception) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public void testCommitAsyncUserSuppliedCallback(Exception exception) {
+ consumer = setup();
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("my-topic", 1), new
OffsetAndMetadata(200L));
+ if (exception == null) {
+ mockCommitApplicationEventCompleted();
+ } else {
+ mockCommitApplicationEventException(exception);
+ }
- doReturn(future).when(consumer).commit(offsets, false);
MockCommitCallback callback = new MockCommitCallback();
assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
+ consumer.maybeInvokeCommitCallbacks();
Review Comment:
As far as, I understand the agreement is that the commit callback is called
in `poll()`, `close()`, `commitSync()`, and `commitAsync()`. Why make
`maybeInvokeCommitCallbacks()` visible instead of calling one of the mentioned
methods?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -505,57 +484,10 @@ public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception
this.exception = exception;
}
}
- /**
- * This is a rather ugly bit of code. Not my choice :(
- *
- * <p/>
- *
- * Inside the {@link
org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we
create an
- * instance of {@link FetchCommittedOffsetsApplicationEvent} that holds
the partitions and internally holds a
- * {@link CompletableFuture}. We want to test different behaviours of the
{@link Future#get()}, such as
- * returning normally, timing out, throwing an error, etc. By mocking the
construction of the event object that
- * is created, we can affect that behavior.
- */
- private static MockedConstruction<FetchCommittedOffsetsApplicationEvent>
offsetFetchEventMocker(CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> future) {
- // This "answer" is where we pass the future to be invoked by the
ConsumerUtils.getResult() method
- Answer<Map<TopicPartition, OffsetAndMetadata>> getInvocationAnswer =
invocation -> {
- // This argument captures the actual argument value that was
passed to the event's get() method, so we
- // just "forward" that value to our mocked call
- Timer timer = invocation.getArgument(0);
- return ConsumerUtils.getResult(future, timer);
- };
-
-
MockedConstruction.MockInitializer<FetchCommittedOffsetsApplicationEvent>
mockInitializer = (mock, ctx) -> {
- // When the event's get() method is invoked, we call the "answer"
method just above
- when(mock.get(any())).thenAnswer(getInvocationAnswer);
-
- // When the event's type() method is invoked, we have to return
the type as it will be null in the mock
-
when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSETS);
-
- // This is needed for the WakeupTrigger code that keeps track of
the active task
- when(mock.future()).thenReturn(future);
- };
-
- return mockConstruction(FetchCommittedOffsetsApplicationEvent.class,
mockInitializer);
- }
-
- private static MockedConstruction<CommitApplicationEvent>
commitEventMocker(CompletableFuture<Void> future) {
- Answer<Void> getInvocationAnswer = invocation -> {
- Timer timer = invocation.getArgument(0);
- return ConsumerUtils.getResult(future, timer);
- };
-
- MockedConstruction.MockInitializer<CommitApplicationEvent>
mockInitializer = (mock, ctx) -> {
- when(mock.get(any())).thenAnswer(getInvocationAnswer);
- when(mock.type()).thenReturn(ApplicationEvent.Type.COMMIT);
- when(mock.future()).thenReturn(future);
- };
-
- return mockConstruction(CommitApplicationEvent.class, mockInitializer);
- }
Review Comment:
🥳
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -241,65 +226,64 @@ private static Stream<Exception>
commitExceptionSupplier() {
@Test
public void testFencedInstanceException() {
Review Comment:
I would change the name to "testCommitAsyncWithFencedException".
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -505,57 +484,10 @@ public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception
this.exception = exception;
}
}
- /**
- * This is a rather ugly bit of code. Not my choice :(
- *
- * <p/>
- *
- * Inside the {@link
org.apache.kafka.clients.consumer.Consumer#committed(Set, Duration)} call we
create an
- * instance of {@link FetchCommittedOffsetsApplicationEvent} that holds
the partitions and internally holds a
- * {@link CompletableFuture}. We want to test different behaviours of the
{@link Future#get()}, such as
- * returning normally, timing out, throwing an error, etc. By mocking the
construction of the event object that
- * is created, we can affect that behavior.
- */
- private static MockedConstruction<FetchCommittedOffsetsApplicationEvent>
offsetFetchEventMocker(CompletableFuture<Map<TopicPartition,
OffsetAndMetadata>> future) {
- // This "answer" is where we pass the future to be invoked by the
ConsumerUtils.getResult() method
- Answer<Map<TopicPartition, OffsetAndMetadata>> getInvocationAnswer =
invocation -> {
- // This argument captures the actual argument value that was
passed to the event's get() method, so we
- // just "forward" that value to our mocked call
- Timer timer = invocation.getArgument(0);
- return ConsumerUtils.getResult(future, timer);
- };
-
-
MockedConstruction.MockInitializer<FetchCommittedOffsetsApplicationEvent>
mockInitializer = (mock, ctx) -> {
- // When the event's get() method is invoked, we call the "answer"
method just above
- when(mock.get(any())).thenAnswer(getInvocationAnswer);
-
- // When the event's type() method is invoked, we have to return
the type as it will be null in the mock
-
when(mock.type()).thenReturn(ApplicationEvent.Type.FETCH_COMMITTED_OFFSETS);
-
- // This is needed for the WakeupTrigger code that keeps track of
the active task
- when(mock.future()).thenReturn(future);
- };
-
- return mockConstruction(FetchCommittedOffsetsApplicationEvent.class,
mockInitializer);
- }
-
- private static MockedConstruction<CommitApplicationEvent>
commitEventMocker(CompletableFuture<Void> future) {
- Answer<Void> getInvocationAnswer = invocation -> {
- Timer timer = invocation.getArgument(0);
- return ConsumerUtils.getResult(future, timer);
- };
-
- MockedConstruction.MockInitializer<CommitApplicationEvent>
mockInitializer = (mock, ctx) -> {
- when(mock.get(any())).thenAnswer(getInvocationAnswer);
- when(mock.type()).thenReturn(ApplicationEvent.Type.COMMIT);
- when(mock.future()).thenReturn(future);
- };
-
- return mockConstruction(CommitApplicationEvent.class, mockInitializer);
- }
Review Comment:
🥳
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1535,6 +1591,11 @@ int callbacks() {
return invoker.callbackQueue.size();
}
+ // Visible for testing
+ SubscriptionState subscriptions() {
+ return subscriptions;
+ }
Review Comment:
Is there a way to avoid this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]