C0urante commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r420963881



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##########
@@ -191,32 +192,61 @@ public synchronized void putConnectorConfig(String 
connName,
                                                 boolean allowReplace,
                                                 final 
Callback<Created<ConnectorInfo>> callback) {
         try {
-            if (maybeAddConfigErrors(validateConnectorConfig(config), 
callback)) {
+            validateConnectorConfig(config, (error, configInfos) -> {
+                if (error != null) {
+                    callback.onCompletion(error, null);
+                    return;
+                }
+
+                requestExecutorService.submit(
+                    () -> putConnectorConfig(connName, config, allowReplace, 
callback, configInfos)
+                );
+            });
+        } catch (Throwable t) {
+            callback.onCompletion(t, null);
+        }
+    }
+
+    private synchronized void putConnectorConfig(String connName,
+                                                 final Map<String, String> 
config,
+                                                 boolean allowReplace,
+                                                 final 
Callback<Created<ConnectorInfo>> callback,
+                                                 ConfigInfos configInfos) {
+        try {
+            if (maybeAddConfigErrors(configInfos, callback)) {
                 return;
             }
 
-            boolean created = false;
+            final boolean created;
             if (configState.contains(connName)) {
                 if (!allowReplace) {
                     callback.onCompletion(new 
AlreadyExistsException("Connector " + connName + " already exists"), null);
                     return;
                 }
-                worker.stopConnector(connName);
+                worker.stopAndAwaitConnector(connName);
+                created = false;
             } else {
                 created = true;
             }
 
             configBackingStore.putConnectorConfig(connName, config);
 
-            if (!startConnector(connName)) {
-                callback.onCompletion(new ConnectException("Failed to start 
connector: " + connName), null);
-                return;
-            }
+            // startConnector(connName, onStart);
+            startConnector(connName, (error, result) -> {
+                if (error != null) {
+                    callback.onCompletion(error, null);
+                    return;
+                }
 
-            updateConnectorTasks(connName);
-            callback.onCompletion(null, new Created<>(created, 
createConnectorInfo(connName)));
-        } catch (ConnectException e) {
-            callback.onCompletion(e, null);
+                requestExecutorService.submit(() -> {
+                    synchronized (this) {

Review comment:
       Ah yeah, done. Could have sworn I was getting SpotBugs complaints at one 
point when I tried that, but it seems to work now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to