brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1628008988


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -78,64 +81,120 @@
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
+    static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+    static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
+
+    private final Time time;
+    private final ConsumerMetadata metadata;
+    private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+    private final ApplicationEventProcessor applicationEventProcessor;
+    private final OffsetsRequestManager offsetsRequestManager;
+    private final HeartbeatRequestManager heartbeatRequestManager;
+    private final CoordinatorRequestManager coordinatorRequestManager;
+    private final ConsumerNetworkThread consumerNetworkThread;
+    private final MockClient client;
+    private final NetworkClientDelegate networkClientDelegate;
+    private final NetworkClientDelegate networkClient;
+    private final RequestManagers requestManagers;
+    private final CompletableEventReaper applicationEventReaper;
+
+    ConsumerNetworkThreadTest() {
+        ConsumerConfig config = mock(ConsumerConfig.class);
+        this.time = new MockTime();
+        this.networkClientDelegate = mock(NetworkClientDelegate.class);
+        this.requestManagers = mock(RequestManagers.class);
+        this.offsetsRequestManager = mock(OffsetsRequestManager.class);
+        this.heartbeatRequestManager = mock(HeartbeatRequestManager.class);
+        this.coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+        this.metadata = mock(ConsumerMetadata.class);
+        this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
+        this.applicationEventReaper = mock(CompletableEventReaper.class);
+        this.client = mock(MockClient.class);
+        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        LogContext logContext = new LogContext();
+
+        this.networkClient = new NetworkClientDelegate(
+                time,
+                config,
+                logContext,
+                client
+        );
 
-    private ConsumerTestBuilder testBuilder;
-    private Time time;
-    private ConsumerMetadata metadata;
-    private NetworkClientDelegate networkClient;
-    private BlockingQueue<ApplicationEvent> applicationEventsQueue;
-    private ApplicationEventProcessor applicationEventProcessor;
-    private OffsetsRequestManager offsetsRequestManager;
-    private CommitRequestManager commitRequestManager;
-    private CoordinatorRequestManager coordinatorRequestManager;
-    private ConsumerNetworkThread consumerNetworkThread;
-    private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
-    private MockClient client;
-
-    @BeforeEach
-    public void setup() {
-        testBuilder = new ConsumerTestBuilder(createDefaultGroupInformation());
-        time = testBuilder.time;
-        metadata = testBuilder.metadata;
-        networkClient = testBuilder.networkClientDelegate;
-        client = testBuilder.client;
-        applicationEventsQueue = testBuilder.applicationEventQueue;
-        applicationEventProcessor = testBuilder.applicationEventProcessor;
-        commitRequestManager = 
testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
-        offsetsRequestManager = testBuilder.offsetsRequestManager;
-        coordinatorRequestManager = 
testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
-        consumerNetworkThread = new ConsumerNetworkThread(
-                testBuilder.logContext,
+        this.consumerNetworkThread = new ConsumerNetworkThread(
+                logContext,
                 time,
-                testBuilder.applicationEventQueue,
+                applicationEventsQueue,
                 applicationEventReaper,
                 () -> applicationEventProcessor,
-                () -> testBuilder.networkClientDelegate,
-                () -> testBuilder.requestManagers
+                () -> networkClientDelegate,
+                () -> requestManagers
         );
+    }
+
+    @BeforeEach
+    public void setup() {
         consumerNetworkThread.initializeResources();
     }
 
     @AfterEach
     public void tearDown() {
-        if (testBuilder != null) {
-            testBuilder.close();
-            consumerNetworkThread.close(Duration.ZERO);
-        }
+        if (consumerNetworkThread != null)
+            consumerNetworkThread.close();
+    }
+
+    @Test
+    public void testEnsureCloseStopsRunningThread() {
+        // consumerNetworkThread.running is set to true in its constructor
+        assertTrue(consumerNetworkThread.isRunning());
+
+        // close() should make consumerNetworkThread.running false by calling 
closeInternal(Duration timeout)
+        consumerNetworkThread.close();
+        assertFalse(consumerNetworkThread.isRunning());
+    }
+
+    @Test
+    public void testEnsureSendUnsentRequestPollWIthZeroRunsOnce() {

Review Comment:
   Looking into this, I am not sure there is a way I can test this 
functionality without making changes to the methods in 
```ConsumerNetworkThread```. To poll with a time of 0, the timer being used 
must be instantiated with a 0 as ```timeoutMs```. However, the timer that 
```ConsumerNetworkThread``` uses is a local variable ```closeTimeout```, which 
is private and is only modified by a single private method that is unrelated to 
```cleanup()```. 
   
   Let me know what you think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to