gharris1727 commented on code in PR #15305:
URL: https://github.com/apache/kafka/pull/15305#discussion_r1596961414


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -850,6 +855,46 @@ public void testRequestTimeouts() throws Exception {
         );
     }
 
+    @Test
+    public void testPollTimeoutExpiry() throws Exception {
+        // This is a fabricated test to ensure that a poll timeout expiry 
happens. The tick thread awaits on
+        // task#stop method which is blocked. The timeouts have been set 
accordingly
+        workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(20)));
+        workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(40)));
+        connect = connectBuilder
+            .numBrokers(1)
+            .numWorkers(1)
+            .build();
+
+        connect.start();
+
+        connect.assertions().assertExactlyNumWorkersAreUp(1, "Worker not 
brought up in time");
+
+        Map<String, String> connectorWithBlockingTaskStopConfig = new 
HashMap<>();
+        connectorWithBlockingTaskStopConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getName());
+        connectorWithBlockingTaskStopConfig.put(TASKS_MAX_CONFIG, "1");
+        
connectorWithBlockingTaskStopConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG,
 Objects.requireNonNull(TASK_STOP));
+
+        connect.configureConnector(CONNECTOR_NAME, 
connectorWithBlockingTaskStopConfig);
+
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+        );
+
+        try (LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister(WorkerCoordinator.class)) {
+            connect.restartTask(CONNECTOR_NAME, 0);
+            TestUtils.waitForCondition(() -> 
logCaptureAppender.getEvents().stream().anyMatch(e -> 
e.getLevel().equals("WARN")) &&

Review Comment:
   Could this assertion be added to an existing BlockingConnectorTest? The 
blocking plugins are inherently slow to use so we should avoid adding more 
instances of them.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -1205,7 +1215,7 @@ public void handle(LeaveGroupResponse leaveResponse, 
RequestFuture<Void> future)
     }
 
     // visible for testing
-    synchronized RequestFuture<Void> sendHeartbeatRequest() {
+    public synchronized RequestFuture<Void> sendHeartbeatRequest() {

Review Comment:
   I would respect the "visible for testing" comment above, and leave this 
package-local.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java:
##########
@@ -267,6 +268,20 @@ public String memberId() {
         return JoinGroupRequest.UNKNOWN_MEMBER_ID;
     }
 
+    @Override
+    protected void handlePollTimeoutExpiry() {
+        Stage currentStage = listener.onPollTimeoutExpiry();
+        log.warn("worker poll timeout has expired. This means the time between 
subsequent calls to poll() " +
+            "in DistributedHerder tick() method was longer than the configured 
rebalance.timeout.ms. " +

Review Comment:
   From a user perspective, the class and method names are irrelevant, and 
bringing up irrelevant details in diagnostics can be misleading. "The last 
thing the worker was doing was: {} and may contribute to this timeout" is much 
more understandable and still gets the point across.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -2704,6 +2704,11 @@ public void onRevoked(String leader, Collection<String> 
connectors, Collection<C
             }
         }
 
+        @Override
+        public Stage onPollTimeoutExpiry() {

Review Comment:
   This method name does not make sense. The name would indicate that this 
method is used to inform the callee (the listener, this implementation) that 
the poll timeout happened. The actual behavior and signature is a getter for 
the current stage.
   
   I don't think it makes sense for the listener to require the caller to keep 
a stage, so i think the implementation should change to follow the method name.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java:
##########
@@ -36,4 +37,15 @@ public interface WorkerRebalanceListener {
      * or tasks might refer to all or some of the connectors and tasks running 
on the worker.
      */
     void onRevoked(String leader, Collection<String> connectors, 
Collection<ConnectorTaskId> tasks);
+
+
+    /**
+     * Invoked when a worker experiences a poll timeout expiry. Invoking this 
method allows getting
+     * the stage which was currently being executed when the poll timeout 
happened. The default implementation
+     * returns null
+     * @return The current stage being executed. Could be null
+     */
+    default Stage onPollTimeoutExpiry() {

Review Comment:
   This is an internal interface, unless this default method actually makes 
sense on it's own I wouldn't add it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to