C0urante commented on a change in pull request #8069: URL: https://github.com/apache/kafka/pull/8069#discussion_r421091305
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ########## @@ -536,20 +561,37 @@ private void processConnectorConfigUpdates(Set<String> connectorConfigUpdates) { // If we only have connector config updates, we can just bounce the updated connectors that are // currently assigned to this worker. Set<String> localConnectors = assignment == null ? Collections.<String>emptySet() : new HashSet<>(assignment.connectors()); + log.trace( + "Processing connector config updates; " + + "currently-owned connectors are {}, and to-be-updated connectors are {}", + localConnectors, + connectorConfigUpdates + ); for (String connectorName : connectorConfigUpdates) { - if (!localConnectors.contains(connectorName)) + if (!localConnectors.contains(connectorName)) { + log.trace( + "Skipping config update for connector {} as it is not owned by this worker", + connectorName + ); continue; + } boolean remains = configState.contains(connectorName); log.info("Handling connector-only config update by {} connector {}", remains ? "restarting" : "stopping", connectorName); - worker.stopConnector(connectorName); + worker.stopAndAwaitConnector(connectorName); // The update may be a deletion, so verify we actually need to restart the connector if (remains) - startConnector(connectorName); + startConnector(connectorName, (error, result) -> { }); Review comment: Agh, you're right. Thought I'd committed that but apparently not. Will add. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org