gharris1727 commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r421079889



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##########
@@ -239,9 +236,10 @@ private synchronized void putConnectorConfig(String 
connName,
                 }
 
                 requestExecutorService.submit(() -> {
-                    synchronized (this) {
-                        updateConnectorTasks(connName);
-                    }
+                    updateConnectorTasks(connName);
+                    // synchronized (this) {
+                    //     updateConnectorTasks(connName);
+                    // }

Review comment:
       nit: leftover comments

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -536,20 +561,37 @@ private void processConnectorConfigUpdates(Set<String> 
connectorConfigUpdates) {
         // If we only have connector config updates, we can just bounce the 
updated connectors that are
         // currently assigned to this worker.
         Set<String> localConnectors = assignment == null ? 
Collections.<String>emptySet() : new HashSet<>(assignment.connectors());
+        log.trace(
+            "Processing connector config updates; "
+                + "currently-owned connectors are {}, and to-be-updated 
connectors are {}",
+            localConnectors,
+            connectorConfigUpdates
+        );
         for (String connectorName : connectorConfigUpdates) {
-            if (!localConnectors.contains(connectorName))
+            if (!localConnectors.contains(connectorName)) {
+                log.trace(
+                    "Skipping config update for connector {} as it is not 
owned by this worker",
+                    connectorName
+                );
                 continue;
+            }
             boolean remains = configState.contains(connectorName);
             log.info("Handling connector-only config update by {} connector 
{}",
                     remains ? "restarting" : "stopping", connectorName);
-            worker.stopConnector(connectorName);
+            worker.stopAndAwaitConnector(connectorName);
             // The update may be a deletion, so verify we actually need to 
restart the connector
             if (remains)
-                startConnector(connectorName);
+                startConnector(connectorName, (error, result) -> { });

Review comment:
       The caller of this function doesn't need to block on starting the 
connector, but I think it should log errors inside of the callback. Otherwise 
they're going to get swallowed.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##########
@@ -1182,31 +1245,49 @@ public Void call() throws Exception {
 
     // Helper for starting a connector with the given name, which will extract 
& parse the config, generate connector
     // context and add to the worker. This needs to be called from within the 
main worker thread for this herder.
-    private boolean startConnector(String connectorName) {
+    // The callback is invoked after the connector has finished startup and 
generated task configs, or failed in the process.
+    private void startConnector(String connectorName, Callback<Void> callback) 
{
         log.info("Starting connector {}", connectorName);
         final Map<String, String> configProps = 
configState.connectorConfig(connectorName);
-        final ConnectorContext ctx = new HerderConnectorContext(this, 
connectorName);
+        final CloseableConnectorContext ctx = new HerderConnectorContext(this, 
connectorName);
         final TargetState initialState = 
configState.targetState(connectorName);
-        boolean started = worker.startConnector(connectorName, configProps, 
ctx, this, initialState);
-
-        // Immediately request configuration since this could be a brand new 
connector. However, also only update those
-        // task configs if they are actually different from the existing ones 
to avoid unnecessary updates when this is
-        // just restoring an existing connector.
-        if (started && initialState == TargetState.STARTED)
-            reconfigureConnectorTasksWithRetry(time.milliseconds(), 
connectorName);
+        final Callback<TargetState> onInitialStateChange = (error, newState) 
-> {
+            if (error != null) {
+                callback.onCompletion(new ConnectException("Failed to start 
connector: " + connectorName), null);
+                return;
+            }
 
-        return started;
+            // Use newState here in case the connector has been paused right 
after being created
+            if (newState == TargetState.STARTED) {
+                addRequest(
+                    new Callable<Void>() {
+                        @Override
+                        public Void call() {
+                            // Request configuration since this could be a 
brand new connector. However, also only update those
+                            // task configs if they are actually different 
from the existing ones to avoid unnecessary updates when this is
+                            // just restoring an existing connector.
+                            
reconfigureConnectorTasksWithRetry(time.milliseconds(), connectorName);
+                            callback.onCompletion(null, null);
+                            return null;
+                        }
+                    },
+                    forwardErrorCallback(callback)
+                );
+            } else {
+                callback.onCompletion(null, null);
+            }
+        };
+        worker.startConnector(connectorName, configProps, ctx, this, 
initialState, onInitialStateChange);
     }
 
     private Callable<Void> getConnectorStartingCallable(final String 
connectorName) {
         return new Callable<Void>() {
             @Override
             public Void call() throws Exception {
                 try {
-                    startConnector(connectorName);
+                    startConnector(connectorName, (error, result) -> { });

Review comment:
       This needs a log statement to avoid swallowing the exception silently.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
##########
@@ -412,9 +407,10 @@ public void onConnectorTargetStateChange(String connector) 
{
 
                     if (newState == TargetState.STARTED) {
                         requestExecutorService.submit(() -> {
-                            synchronized (StandaloneHerder.this) {
-                                updateConnectorTasks(connector);
-                            }
+                            updateConnectorTasks(connector);
+                            // synchronized (StandaloneHerder.this) {
+                            //     updateConnectorTasks(connector);
+                            // }

Review comment:
       nit: leftover comments

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
##########
@@ -82,6 +82,9 @@
     // we need to consider all possible scenarios this could fail. It might be 
ok to fail with a timeout in rare cases,
     // but currently a worker simply leaving the group can take this long as 
well.
     public static final long REQUEST_TIMEOUT_MS = 90 * 1000;
+    // Mutable for integration testing; otherwise, some tests would take at 
least REQUEST_TIMEOUT_MS

Review comment:
       Yeah, I don't think that it's worth changing everything just to mock 
time, especially if requires us to change the functionality.
   SGTM.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to