vamossagar12 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1579057875
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java: ########## @@ -533,6 +536,57 @@ public void testSkippingAssignmentFails() { verify(configStorage).snapshot(); } + @Test + public void testPollTimeoutExpiry() throws InterruptedException { + // We will create a new WorkerCoordinator object with a rebalance timeout smaller + // than session timeout. This might not happen in the real world but it makes testing + // easier and the test not flaky. + int smallRebalanceTimeout = 20; + this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, + smallRebalanceTimeout, + heartbeatIntervalMs, + groupId, + Optional.empty(), + retryBackoffMs, + retryBackoffMaxMs, + true); + this.coordinator = new WorkerCoordinator(rebalanceConfig, + logContext, + consumerClient, + new Metrics(time), + "consumer" + groupId, + time, + LEADER_URL, + configStorage, + rebalanceListener, + compatibility, + 0); + + when(configStorage.snapshot()).thenReturn(configState1); + + client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node)); + coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); + + client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE)); + client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(), + Collections.singletonList(taskId1x0), Errors.NONE)); + + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { + coordinator.ensureActiveGroup(); + coordinator.poll(0, () -> null); + + // The heartbeat thread is running and keeps sending heartbeat requests. + TestUtils.waitForCondition(() -> { + // Rebalance timeout elapses while poll is never invoked causing a poll timeout expiry + coordinator.sendHeartbeatRequest(); + client.prepareResponse(new HeartbeatResponse(new HeartbeatResponseData())); + time.sleep(1); Review Comment: @showuon , actually I typed in the wrong message yesterday. I ran the tests multiple times and it passes as expected. The reason I tried doing the other approach was that in the other [comment](https://github.com/apache/kafka/pull/15305#discussion_r1570226078) you had mentioned that it is not normal for consumers to not send heartbeats (other than readability). So to be closer to the real case, I had kept it. Nonetheless I have reverted the code based on your suggestion and it works as well. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org