kkonstantine commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r417687040



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##########
@@ -191,32 +192,71 @@ public synchronized void putConnectorConfig(String 
connName,
                                                 boolean allowReplace,
                                                 final 
Callback<Created<ConnectorInfo>> callback) {
         try {
-            if (maybeAddConfigErrors(validateConnectorConfig(config), 
callback)) {
+            validateConnectorConfig(config, new Callback<ConfigInfos>() {
+                @Override
+                public void onCompletion(Throwable error, ConfigInfos 
configInfos) {
+                    if (error != null) {
+                        callback.onCompletion(error, null);
+                        return;
+                    }
+
+                    requestExecutorService.submit(
+                        () -> putConnectorConfig(connName, config, 
allowReplace, callback, configInfos)
+                    );
+                }
+            });
+        } catch (Throwable t) {
+            callback.onCompletion(t, null);
+        }
+    }
+
+    private synchronized void putConnectorConfig(String connName,
+                                                 final Map<String, String> 
config,
+                                                 boolean allowReplace,
+                                                 final 
Callback<Created<ConnectorInfo>> callback,
+                                                 ConfigInfos configInfos) {
+        try {
+            if (maybeAddConfigErrors(configInfos, callback)) {
                 return;
             }
 
-            boolean created = false;
+            final boolean created;
             if (configState.contains(connName)) {
                 if (!allowReplace) {
                     callback.onCompletion(new 
AlreadyExistsException("Connector " + connName + " already exists"), null);
                     return;
                 }
-                worker.stopConnector(connName);
+                worker.stopAndAwaitConnector(connName);
+                created = false;
             } else {
                 created = true;
             }
 
             configBackingStore.putConnectorConfig(connName, config);
 
-            if (!startConnector(connName)) {
-                callback.onCompletion(new ConnectException("Failed to start 
connector: " + connName), null);
-                return;
-            }
+            Callback<TargetState> onStart = new Callback<TargetState>() {
+                @Override
+                public void onCompletion(Throwable error, TargetState result) {
+                    if (error != null) {
+                        callback.onCompletion(error, null);
+                        return;
+                    }
+
+                    requestExecutorService.submit(new Runnable() {

Review comment:
       I don't think this is worth backporting before AK 2.0, and maybe not 
even as far back. Given that, I'd suggest using lambda notation whenever a new 
Runnable is needed to avoid the Runnable declaration boilerplate.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to