showuon commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1556799227


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java:
##########
@@ -533,6 +536,47 @@ public void testSkippingAssignmentFails() {
         verify(configStorage).snapshot();
     }
 
+    @Test
+    public void testPollTimeoutExpiry() throws InterruptedException {
+
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+        
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+            Collections.singletonList(taskId1x0), Errors.NONE));
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+        
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
+
+        client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
+        
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
+            Collections.singletonList(taskId1x0), Errors.NONE));
+
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+            coordinator.ensureActiveGroup();
+            coordinator.poll(0, () -> {
+                return null;
+            });
+
+            long now = time.milliseconds();
+            // We keep the heartbeat thread running behind the scenes and poll 
frequently so that eventually
+            // the time goes past now + rebalanceTimeoutMs which triggers poll 
timeout expiry.
+            TestUtils.waitForCondition(() -> {
+                time.sleep(heartbeatIntervalMs - 1);
+                return time.milliseconds() > now + rebalanceTimeoutMs;
+            }, Duration.ofMinutes(1).toMillis(), "Coordinator did not poll for 
rebalance.timeout.ms");
+            coordinator.poll(0, () -> {

Review Comment:
   1. You didn't provide HeartBeatResponse, so it'll have session timeout.
   2. The `heartbeatIntervalMs` is the minimum time interval the heartbeat 
should send, but the real timeout for heartBeat should be sessionTimeout, so we 
can set `sessionTimeoutMs - 1` to make the time faster to reach 
`rebalanceTimeoutMs`.
   3. The last poll doesn't make any sense because the poll timeout should be 
triggered already. Why do we need it?
   
   What I would write is something like this, FYR:
   ```
   public void testPollTimeoutExpiry() throws InterruptedException {
   
           when(configStorage.snapshot()).thenReturn(configState1);
   
           
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node));
           coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
   
           client.prepareResponse(joinGroupFollowerResponse(1, "member", 
"leader", Errors.NONE));
           
client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, 
"leader", configState1.offset(), Collections.emptyList(),
               Collections.singletonList(taskId1x0), Errors.NONE));
   
           // prepare 3 heartBeatResponses because we will trigger 3 heartBeat 
requests until rebalanceTimeout,
           // that is (sessionTimeoutMs - 1) * 3 > rebalanceTimeoutMs
           client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
           client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
           client.prepareResponse(new HeartbeatResponse(new 
HeartbeatResponseData()));
   
           try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
               coordinator.ensureActiveGroup();
               System.out.println("!!! poll");
               coordinator.poll(0, () -> {
                   return null;
               });
   
               // We keep the heartbeat thread running behind the scenes and 
poll frequently so that eventually
               // the time goes past now + rebalanceTimeoutMs which triggers 
poll timeout expiry.
               TestUtils.waitForCondition(() -> {
                   // sleep until sessionTimeoutMs to trigger a heartBeat 
request to avoid session timeout.
                   // Not sure if this will be flaky in CI because the 
heartbeat thread might not send out the heartBeat request in time.
                   time.sleep(sessionTimeoutMs - 1);
                   return logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&
                           logCaptureAppender.getEvents().stream().anyMatch(e 
-> e.getMessage().startsWith("worker poll timeout has expired"));
               }, "Coordinator did not poll for rebalance.timeout.ms");
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to