gharris1727 commented on a change in pull request #8069: URL: https://github.com/apache/kafka/pull/8069#discussion_r421079889
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ########## @@ -239,9 +236,10 @@ private synchronized void putConnectorConfig(String connName, } requestExecutorService.submit(() -> { - synchronized (this) { - updateConnectorTasks(connName); - } + updateConnectorTasks(connName); + // synchronized (this) { + // updateConnectorTasks(connName); + // } Review comment: nit: leftover comments ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -536,20 +561,37 @@ private void processConnectorConfigUpdates(Set<String> connectorConfigUpdates) { // If we only have connector config updates, we can just bounce the updated connectors that are // currently assigned to this worker. Set<String> localConnectors = assignment == null ? Collections.<String>emptySet() : new HashSet<>(assignment.connectors()); + log.trace( + "Processing connector config updates; " + + "currently-owned connectors are {}, and to-be-updated connectors are {}", + localConnectors, + connectorConfigUpdates + ); for (String connectorName : connectorConfigUpdates) { - if (!localConnectors.contains(connectorName)) + if (!localConnectors.contains(connectorName)) { + log.trace( + "Skipping config update for connector {} as it is not owned by this worker", + connectorName + ); continue; + } boolean remains = configState.contains(connectorName); log.info("Handling connector-only config update by {} connector {}", remains ? "restarting" : "stopping", connectorName); - worker.stopConnector(connectorName); + worker.stopAndAwaitConnector(connectorName); // The update may be a deletion, so verify we actually need to restart the connector if (remains) - startConnector(connectorName); + startConnector(connectorName, (error, result) -> { }); Review comment: The caller of this function doesn't need to block on starting the connector, but I think it should log errors inside of the callback. Otherwise they're going to get swallowed. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -1182,31 +1245,49 @@ public Void call() throws Exception { // Helper for starting a connector with the given name, which will extract & parse the config, generate connector // context and add to the worker. This needs to be called from within the main worker thread for this herder. - private boolean startConnector(String connectorName) { + // The callback is invoked after the connector has finished startup and generated task configs, or failed in the process. + private void startConnector(String connectorName, Callback<Void> callback) { log.info("Starting connector {}", connectorName); final Map<String, String> configProps = configState.connectorConfig(connectorName); - final ConnectorContext ctx = new HerderConnectorContext(this, connectorName); + final CloseableConnectorContext ctx = new HerderConnectorContext(this, connectorName); final TargetState initialState = configState.targetState(connectorName); - boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState); - - // Immediately request configuration since this could be a brand new connector. However, also only update those - // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is - // just restoring an existing connector. - if (started && initialState == TargetState.STARTED) - reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName); + final Callback<TargetState> onInitialStateChange = (error, newState) -> { + if (error != null) { + callback.onCompletion(new ConnectException("Failed to start connector: " + connectorName), null); + return; + } - return started; + // Use newState here in case the connector has been paused right after being created + if (newState == TargetState.STARTED) { + addRequest( + new Callable<Void>() { + @Override + public Void call() { + // Request configuration since this could be a brand new connector. However, also only update those + // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is + // just restoring an existing connector. + reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName); + callback.onCompletion(null, null); + return null; + } + }, + forwardErrorCallback(callback) + ); + } else { + callback.onCompletion(null, null); + } + }; + worker.startConnector(connectorName, configProps, ctx, this, initialState, onInitialStateChange); } private Callable<Void> getConnectorStartingCallable(final String connectorName) { return new Callable<Void>() { @Override public Void call() throws Exception { try { - startConnector(connectorName); + startConnector(connectorName, (error, result) -> { }); Review comment: This needs a log statement to avoid swallowing the exception silently. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ########## @@ -412,9 +407,10 @@ public void onConnectorTargetStateChange(String connector) { if (newState == TargetState.STARTED) { requestExecutorService.submit(() -> { - synchronized (StandaloneHerder.this) { - updateConnectorTasks(connector); - } + updateConnectorTasks(connector); + // synchronized (StandaloneHerder.this) { + // updateConnectorTasks(connector); + // } Review comment: nit: leftover comments ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ########## @@ -82,6 +82,9 @@ // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases, // but currently a worker simply leaving the group can take this long as well. public static final long REQUEST_TIMEOUT_MS = 90 * 1000; + // Mutable for integration testing; otherwise, some tests would take at least REQUEST_TIMEOUT_MS Review comment: Yeah, I don't think that it's worth changing everything just to mock time, especially if requires us to change the functionality. SGTM. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org