gharris1727 commented on code in PR #14562: URL: https://github.com/apache/kafka/pull/14562#discussion_r1418003736
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -771,6 +778,104 @@ private Map<String, String> defaultSinkConnectorProps(String topics) { return props; } + @Test + public void testRequestTimeouts() throws Exception { + final String configTopic = "test-request-timeout-configs"; + workerProps.put(CONFIG_TOPIC_CONFIG, configTopic); + // Workaround for KAFKA-15676, which can cause the scheduled rebalance delay to + // be spuriously triggered after the group coordinator for a Connect cluster is bounced + // Set to 1 instead of 0 as another workaround for KAFKA-15693, which can cause + // connectors and tasks to be unassigned indefinitely if the scheduled rebalance delay + // is set to 0 + workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "1"); + connect = connectBuilder + .numBrokers(1) + .numWorkers(1) + .build(); + connect.start(); + connect.assertions().assertAtLeastNumWorkersAreUp(1, + "Worker did not start in time"); + + Map<String, String> connectorConfig1 = defaultSourceConnectorProps(TOPIC_NAME); + Map<String, String> connectorConfig2 = new HashMap<>(connectorConfig1); + connectorConfig2.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS + 1)); + + // Create a connector to ensure that the worker has completed startup + log.info("Creating initial connector"); + connect.configureConnector(CONNECTOR_NAME, connectorConfig1); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, NUM_TASKS, "connector and tasks did not start in time" + ); + + // Bring down Kafka, which should cause some REST requests to fail + log.info("Stopping Kafka cluster"); + connect.kafka().stopOnlyKafka(); + // Allow for the workers to discover that the coordinator is unavailable, wait is + // heartbeat timeout * 2 + 4sec + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + connect.requestTimeout(5_000); + // Try to reconfigure the connector, which should fail with a timeout error + log.info("Trying to reconfigure connector while Kafka cluster is down"); + ConnectRestException e = assertThrows( + ConnectRestException.class, + () -> connect.configureConnector(CONNECTOR_NAME, connectorConfig2) + ); + assertEquals(INTERNAL_SERVER_ERROR.getStatusCode(), e.statusCode()); + assertNotNull(e.getMessage()); + assertTrue( + "Message '" + e.getMessage() + "' does not match expected format", + e.getMessage().contains("Request timed out. The worker is currently flushing updates to the status topic") Review Comment: Ah I see. So the `polling the group coordinator for up to ... or until interrupted` stage is only temporary and `flushing updates to the status topic` is permanent, so it'll always eventually get stuck on this stage. Do you think it's possible for the preceeding `Thread.sleep()` to cause a flake here, if the worker is in the "polling the group coordinator" stage for too long? Perhaps we could replace the sleep with a wait-until-condition that repeatedly makes the request until the flushing status store error appears. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org