C0urante commented on a change in pull request #8069: URL: https://github.com/apache/kafka/pull/8069#discussion_r420963881
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ########## @@ -191,32 +192,61 @@ public synchronized void putConnectorConfig(String connName, boolean allowReplace, final Callback<Created<ConnectorInfo>> callback) { try { - if (maybeAddConfigErrors(validateConnectorConfig(config), callback)) { + validateConnectorConfig(config, (error, configInfos) -> { + if (error != null) { + callback.onCompletion(error, null); + return; + } + + requestExecutorService.submit( + () -> putConnectorConfig(connName, config, allowReplace, callback, configInfos) + ); + }); + } catch (Throwable t) { + callback.onCompletion(t, null); + } + } + + private synchronized void putConnectorConfig(String connName, + final Map<String, String> config, + boolean allowReplace, + final Callback<Created<ConnectorInfo>> callback, + ConfigInfos configInfos) { + try { + if (maybeAddConfigErrors(configInfos, callback)) { return; } - boolean created = false; + final boolean created; if (configState.contains(connName)) { if (!allowReplace) { callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null); return; } - worker.stopConnector(connName); + worker.stopAndAwaitConnector(connName); + created = false; } else { created = true; } configBackingStore.putConnectorConfig(connName, config); - if (!startConnector(connName)) { - callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null); - return; - } + // startConnector(connName, onStart); + startConnector(connName, (error, result) -> { + if (error != null) { + callback.onCompletion(error, null); + return; + } - updateConnectorTasks(connName); - callback.onCompletion(null, new Created<>(created, createConnectorInfo(connName))); - } catch (ConnectException e) { - callback.onCompletion(e, null); + requestExecutorService.submit(() -> { + synchronized (this) { Review comment: Ah yeah, done. Could have sworn I was getting SpotBugs complaints at one point when I tried that, but it seems to work now. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org