gharris1727 commented on code in PR #15305: URL: https://github.com/apache/kafka/pull/15305#discussion_r1596961414
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception { ); } + @Test + public void testPollTimeoutExpiry() throws Exception { + // This is a fabricated test to ensure that a poll timeout expiry happens. The tick thread awaits on + // task#stop method which is blocked. The timeouts have been set accordingly + workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(20))); + workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(40))); + connect = connectBuilder + .numBrokers(1) + .numWorkers(1) + .build(); + + connect.start(); + + connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not brought up in time"); + + Map<String, String> connectorWithBlockingTaskStopConfig = new HashMap<>(); + connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getName()); + connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1"); + connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG, Objects.requireNonNull(TASK_STOP)); + + connect.configureConnector(CONNECTOR_NAME, connectorWithBlockingTaskStopConfig); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, 1, "connector and tasks did not start in time" + ); + + try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) { + connect.restartTask(CONNECTOR_NAME, 0); + TestUtils.waitForCondition(() -> logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("WARN")) && Review Comment: Could this assertion be added to an existing BlockingConnectorTest? The blocking plugins are inherently slow to use so we should avoid adding more instances of them. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ########## @@ -1205,7 +1215,7 @@ public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) } // visible for testing - synchronized RequestFuture<Void> sendHeartbeatRequest() { + public synchronized RequestFuture<Void> sendHeartbeatRequest() { Review Comment: I would respect the "visible for testing" comment above, and leave this package-local. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java: ########## @@ -267,6 +268,20 @@ public String memberId() { return JoinGroupRequest.UNKNOWN_MEMBER_ID; } + @Override + protected void handlePollTimeoutExpiry() { + Stage currentStage = listener.onPollTimeoutExpiry(); + log.warn("worker poll timeout has expired. This means the time between subsequent calls to poll() " + + "in DistributedHerder tick() method was longer than the configured rebalance.timeout.ms. " + Review Comment: From a user perspective, the class and method names are irrelevant, and bringing up irrelevant details in diagnostics can be misleading. "The last thing the worker was doing was: {} and may contribute to this timeout" is much more understandable and still gets the point across. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ########## @@ -2704,6 +2704,11 @@ public void onRevoked(String leader, Collection<String> connectors, Collection<C } } + @Override + public Stage onPollTimeoutExpiry() { Review Comment: This method name does not make sense. The name would indicate that this method is used to inform the callee (the listener, this implementation) that the poll timeout happened. The actual behavior and signature is a getter for the current stage. I don't think it makes sense for the listener to require the caller to keep a stage, so i think the implementation should change to follow the method name. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java: ########## @@ -36,4 +37,15 @@ public interface WorkerRebalanceListener { * or tasks might refer to all or some of the connectors and tasks running on the worker. */ void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks); + + + /** + * Invoked when a worker experiences a poll timeout expiry. Invoking this method allows getting + * the stage which was currently being executed when the poll timeout happened. The default implementation + * returns null + * @return The current stage being executed. Could be null + */ + default Stage onPollTimeoutExpiry() { Review Comment: This is an internal interface, unless this default method actually makes sense on it's own I wouldn't add it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org