mjsax commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631768784
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws
InterruptedException {
}
@Test
- public void testApplicationEvent() {
- ApplicationEvent e = new PollEvent(100);
- applicationEventsQueue.add(e);
+ public void testRequestsTransferFromManagersToClientOnThreadRun() {
+ List<Optional<? extends RequestManager>> list = new ArrayList<>();
+ list.add(Optional.of(coordinatorRequestManager));
+ list.add(Optional.of(heartbeatRequestManager));
+ list.add(Optional.of(offsetsRequestManager));
+
+ when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
consumerNetworkThread.runOnce();
- verify(applicationEventProcessor, times(1)).process(e);
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).poll(anyLong())));
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).maximumTimeToWait(anyLong())));
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+ verify(networkClientDelegate).poll(anyLong(), anyLong());
}
- @Test
- public void testMetadataUpdateEvent() {
Review Comment:
Seems this test was effectively removed? Why?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -190,15 +208,6 @@ public void testListOffsetsEventIsProcessed(boolean
requireTimestamp) {
assertTrue(applicationEventsQueue.isEmpty());
}
- @Test
- public void testResetPositionsEventIsProcessed() {
Review Comment:
Seems this test was effectively removed? Why?
Same for more test below
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -72,68 +64,97 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerNetworkThreadTest {
-
- private ConsumerTestBuilder testBuilder;
- private Time time;
- private ConsumerMetadata metadata;
- private NetworkClientDelegate networkClient;
- private BlockingQueue<ApplicationEvent> applicationEventsQueue;
- private ApplicationEventProcessor applicationEventProcessor;
- private OffsetsRequestManager offsetsRequestManager;
- private CommitRequestManager commitRequestManager;
- private CoordinatorRequestManager coordinatorRequestManager;
- private ConsumerNetworkThread consumerNetworkThread;
- private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
- private MockClient client;
-
- @BeforeEach
- public void setup() {
- testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
- time = testBuilder.time;
- metadata = testBuilder.metadata;
- networkClient = testBuilder.networkClientDelegate;
- client = testBuilder.client;
- applicationEventsQueue = testBuilder.applicationEventQueue;
- applicationEventProcessor = testBuilder.applicationEventProcessor;
- commitRequestManager =
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
- offsetsRequestManager = testBuilder.offsetsRequestManager;
- coordinatorRequestManager =
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
- consumerNetworkThread = new ConsumerNetworkThread(
- testBuilder.logContext,
+ private final Time time;
+ private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+ private final ApplicationEventProcessor applicationEventProcessor;
+ private final OffsetsRequestManager offsetsRequestManager;
+ private final HeartbeatRequestManager heartbeatRequestManager;
+ private final CoordinatorRequestManager coordinatorRequestManager;
+ private final ConsumerNetworkThread consumerNetworkThread;
+ private final NetworkClientDelegate networkClientDelegate;
+ private final RequestManagers requestManagers;
+ private final CompletableEventReaper applicationEventReaper;
+
+ ConsumerNetworkThreadTest() {
+ this.networkClientDelegate = mock(NetworkClientDelegate.class);
+ this.requestManagers = mock(RequestManagers.class);
+ this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+ this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+ this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+ this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+ this.applicationEventReaper = mock(CompletableEventReaper.class);
+ this.time = new MockTime();
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ LogContext logContext = new LogContext();
+
+ this.consumerNetworkThread = new ConsumerNetworkThread(
+ logContext,
time,
- testBuilder.applicationEventQueue,
+ applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
- () -> testBuilder.networkClientDelegate,
- () -> testBuilder.requestManagers
+ () -> networkClientDelegate,
+ () -> requestManagers
);
+ }
+
+ @BeforeEach
+ public void setup() {
consumerNetworkThread.initializeResources();
}
@AfterEach
public void tearDown() {
- if (testBuilder != null) {
- testBuilder.close();
- consumerNetworkThread.close(Duration.ZERO);
- }
+ if (consumerNetworkThread != null)
+ consumerNetworkThread.close();
+ }
+
+ @Test
+ public void testEnsureCloseStopsRunningThread() {
+ assertTrue(consumerNetworkThread.isRunning(),
+ "ConsumerNetworkThread should start running when created");
+
+ consumerNetworkThread.close();
+ assertFalse(consumerNetworkThread.isRunning(),
+ "close() should make consumerNetworkThread.running false by
calling closeInternal(Duration timeout)");
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS - 1,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS + 1})
+ public void testConsumerNetworkThreadPollTimeComputations(long
exampleTime) {
+ List<Optional<? extends RequestManager>> list = new ArrayList<>();
+ list.add(Optional.of(coordinatorRequestManager));
+ list.add(Optional.of(heartbeatRequestManager));
Review Comment:
Why do we only add two of the three request managers to this list?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws
InterruptedException {
}
@Test
- public void testApplicationEvent() {
Review Comment:
It seems this test is rewritten as `testApplicationEventIsProcessed` further
below? And `testRequestsTransferFromManagersToClientOnThreadRun` is another
newly added test?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws
InterruptedException {
}
@Test
- public void testApplicationEvent() {
- ApplicationEvent e = new PollEvent(100);
- applicationEventsQueue.add(e);
+ public void testRequestsTransferFromManagersToClientOnThreadRun() {
+ List<Optional<? extends RequestManager>> list = new ArrayList<>();
+ list.add(Optional.of(coordinatorRequestManager));
+ list.add(Optional.of(heartbeatRequestManager));
+ list.add(Optional.of(offsetsRequestManager));
+
+ when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
consumerNetworkThread.runOnce();
- verify(applicationEventProcessor, times(1)).process(e);
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).poll(anyLong())));
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).maximumTimeToWait(anyLong())));
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+ verify(networkClientDelegate).poll(anyLong(), anyLong());
}
- @Test
- public void testMetadataUpdateEvent() {
- ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+ @ParameterizedTest
+ @MethodSource("applicationEvents")
+ public void testApplicationEventIsProcessed(ApplicationEvent e) {
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
- verify(metadata).requestUpdateForNewTopics();
- }
- @Test
- public void testAsyncCommitEvent() {
Review Comment:
Seems this test was effectively removed? Why?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -72,68 +64,97 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerNetworkThreadTest {
-
- private ConsumerTestBuilder testBuilder;
- private Time time;
- private ConsumerMetadata metadata;
- private NetworkClientDelegate networkClient;
- private BlockingQueue<ApplicationEvent> applicationEventsQueue;
- private ApplicationEventProcessor applicationEventProcessor;
- private OffsetsRequestManager offsetsRequestManager;
- private CommitRequestManager commitRequestManager;
- private CoordinatorRequestManager coordinatorRequestManager;
- private ConsumerNetworkThread consumerNetworkThread;
- private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
- private MockClient client;
-
- @BeforeEach
- public void setup() {
- testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
- time = testBuilder.time;
- metadata = testBuilder.metadata;
- networkClient = testBuilder.networkClientDelegate;
- client = testBuilder.client;
- applicationEventsQueue = testBuilder.applicationEventQueue;
- applicationEventProcessor = testBuilder.applicationEventProcessor;
- commitRequestManager =
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
- offsetsRequestManager = testBuilder.offsetsRequestManager;
- coordinatorRequestManager =
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
- consumerNetworkThread = new ConsumerNetworkThread(
- testBuilder.logContext,
+ private final Time time;
+ private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+ private final ApplicationEventProcessor applicationEventProcessor;
+ private final OffsetsRequestManager offsetsRequestManager;
+ private final HeartbeatRequestManager heartbeatRequestManager;
+ private final CoordinatorRequestManager coordinatorRequestManager;
+ private final ConsumerNetworkThread consumerNetworkThread;
+ private final NetworkClientDelegate networkClientDelegate;
+ private final RequestManagers requestManagers;
+ private final CompletableEventReaper applicationEventReaper;
+
+ ConsumerNetworkThreadTest() {
+ this.networkClientDelegate = mock(NetworkClientDelegate.class);
+ this.requestManagers = mock(RequestManagers.class);
+ this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+ this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+ this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+ this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+ this.applicationEventReaper = mock(CompletableEventReaper.class);
+ this.time = new MockTime();
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ LogContext logContext = new LogContext();
+
+ this.consumerNetworkThread = new ConsumerNetworkThread(
+ logContext,
time,
- testBuilder.applicationEventQueue,
+ applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
- () -> testBuilder.networkClientDelegate,
- () -> testBuilder.requestManagers
+ () -> networkClientDelegate,
+ () -> requestManagers
);
+ }
+
+ @BeforeEach
+ public void setup() {
consumerNetworkThread.initializeResources();
}
@AfterEach
public void tearDown() {
- if (testBuilder != null) {
- testBuilder.close();
- consumerNetworkThread.close(Duration.ZERO);
- }
+ if (consumerNetworkThread != null)
+ consumerNetworkThread.close();
+ }
+
+ @Test
+ public void testEnsureCloseStopsRunningThread() {
Review Comment:
Seems this is one of the newly added tests?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -148,35 +169,32 @@ public void testStartupAndTearDown() throws
InterruptedException {
}
@Test
- public void testApplicationEvent() {
- ApplicationEvent e = new PollEvent(100);
- applicationEventsQueue.add(e);
+ public void testRequestsTransferFromManagersToClientOnThreadRun() {
+ List<Optional<? extends RequestManager>> list = new ArrayList<>();
+ list.add(Optional.of(coordinatorRequestManager));
+ list.add(Optional.of(heartbeatRequestManager));
+ list.add(Optional.of(offsetsRequestManager));
+
+ when(requestManagers.entries()).thenReturn(list);
+
when(coordinatorRequestManager.poll(anyLong())).thenReturn(mock(NetworkClientDelegate.PollResult.class));
consumerNetworkThread.runOnce();
- verify(applicationEventProcessor, times(1)).process(e);
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).poll(anyLong())));
+ requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm ->
verify(rm).maximumTimeToWait(anyLong())));
+
verify(networkClientDelegate).addAll(any(NetworkClientDelegate.PollResult.class));
+ verify(networkClientDelegate).poll(anyLong(), anyLong());
}
- @Test
- public void testMetadataUpdateEvent() {
- ApplicationEvent e = new NewTopicsMetadataUpdateRequestEvent();
+ @ParameterizedTest
+ @MethodSource("applicationEvents")
+ public void testApplicationEventIsProcessed(ApplicationEvent e) {
applicationEventsQueue.add(e);
consumerNetworkThread.runOnce();
- verify(metadata).requestUpdateForNewTopics();
- }
- @Test
- public void testAsyncCommitEvent() {
- ApplicationEvent e = new AsyncCommitEvent(new HashMap<>());
- applicationEventsQueue.add(e);
- consumerNetworkThread.runOnce();
- verify(applicationEventProcessor).process(any(AsyncCommitEvent.class));
- }
+ if (e instanceof CompletableEvent)
+ verify(applicationEventReaper).add((CompletableEvent<?>) e);
- @Test
- public void testSyncCommitEvent() {
Review Comment:
Seems this test was effectively removed? Why?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -72,68 +64,97 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerNetworkThreadTest {
-
- private ConsumerTestBuilder testBuilder;
- private Time time;
- private ConsumerMetadata metadata;
- private NetworkClientDelegate networkClient;
- private BlockingQueue<ApplicationEvent> applicationEventsQueue;
- private ApplicationEventProcessor applicationEventProcessor;
- private OffsetsRequestManager offsetsRequestManager;
- private CommitRequestManager commitRequestManager;
- private CoordinatorRequestManager coordinatorRequestManager;
- private ConsumerNetworkThread consumerNetworkThread;
- private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
- private MockClient client;
-
- @BeforeEach
- public void setup() {
- testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
- time = testBuilder.time;
- metadata = testBuilder.metadata;
- networkClient = testBuilder.networkClientDelegate;
- client = testBuilder.client;
- applicationEventsQueue = testBuilder.applicationEventQueue;
- applicationEventProcessor = testBuilder.applicationEventProcessor;
- commitRequestManager =
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
- offsetsRequestManager = testBuilder.offsetsRequestManager;
- coordinatorRequestManager =
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
- consumerNetworkThread = new ConsumerNetworkThread(
- testBuilder.logContext,
+ private final Time time;
+ private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+ private final ApplicationEventProcessor applicationEventProcessor;
+ private final OffsetsRequestManager offsetsRequestManager;
+ private final HeartbeatRequestManager heartbeatRequestManager;
+ private final CoordinatorRequestManager coordinatorRequestManager;
+ private final ConsumerNetworkThread consumerNetworkThread;
+ private final NetworkClientDelegate networkClientDelegate;
+ private final RequestManagers requestManagers;
+ private final CompletableEventReaper applicationEventReaper;
+
+ ConsumerNetworkThreadTest() {
+ this.networkClientDelegate = mock(NetworkClientDelegate.class);
+ this.requestManagers = mock(RequestManagers.class);
+ this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+ this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+ this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+ this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+ this.applicationEventReaper = mock(CompletableEventReaper.class);
+ this.time = new MockTime();
+ this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ LogContext logContext = new LogContext();
+
+ this.consumerNetworkThread = new ConsumerNetworkThread(
+ logContext,
time,
- testBuilder.applicationEventQueue,
+ applicationEventsQueue,
applicationEventReaper,
() -> applicationEventProcessor,
- () -> testBuilder.networkClientDelegate,
- () -> testBuilder.requestManagers
+ () -> networkClientDelegate,
+ () -> requestManagers
);
+ }
+
+ @BeforeEach
+ public void setup() {
consumerNetworkThread.initializeResources();
}
@AfterEach
public void tearDown() {
- if (testBuilder != null) {
- testBuilder.close();
- consumerNetworkThread.close(Duration.ZERO);
- }
+ if (consumerNetworkThread != null)
+ consumerNetworkThread.close();
+ }
+
+ @Test
+ public void testEnsureCloseStopsRunningThread() {
+ assertTrue(consumerNetworkThread.isRunning(),
+ "ConsumerNetworkThread should start running when created");
+
+ consumerNetworkThread.close();
+ assertFalse(consumerNetworkThread.isRunning(),
+ "close() should make consumerNetworkThread.running false by
calling closeInternal(Duration timeout)");
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS - 1,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS,
ConsumerNetworkThread.MAX_POLL_TIMEOUT_MS + 1})
+ public void testConsumerNetworkThreadPollTimeComputations(long
exampleTime) {
Review Comment:
Seems this is the second newly added tests?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]