C0urante commented on code in PR #16001: URL: https://github.com/apache/kafka/pull/16001#discussion_r1612176138
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -1039,21 +1039,42 @@ public static List<Map<String, String>> reverseTransform(String connName, return result; } - public boolean taskConfigsChanged(ClusterConfigState configState, String connName, List<Map<String, String>> taskProps) { + public boolean taskConfigsChanged( + ClusterConfigState configState, + String connName, + List<Map<String, String>> taskProps, + int connectorConfigHash + ) { int currentNumTasks = configState.taskCount(connName); boolean result = false; if (taskProps.size() != currentNumTasks) { log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, taskProps.size()); result = true; } else { - for (int index = 0; index < currentNumTasks; index++) { + for (int index = 0; index < currentNumTasks && !result; index++) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); if (!taskProps.get(index).equals(configState.taskConfig(taskId))) { log.debug("Connector {} has change in configuration for task {}-{}", connName, connName, index); result = true; } } + // Do a final check to see if runtime-controlled properties that affect tasks but may + // not be included in the connector-generated configs for them (such as converter overrides) + // have changed + if (!result) { + Integer storedConnectorConfigHash = configState.taskConfigHash(connName); + if (storedConnectorConfigHash == null) { + log.debug("Connector {} has no config hash stored for its existing tasks", connName); Review Comment: I don't love the idea of warn logs here. They'll be emitted unconditionally during upgrade even on completely unaffected clusters, and if users follow instructions to reconfigure their connectors, it'll have the same disruptive effect of forcing task restarts and extra churn. Honestly though, I suspect what's more likely is that people will ignore them. I think the underlying problem we're running into is that tightly coupling the storage of config hashes with the storage of task configs is forcing us into an uncomfortable spot where we can't publish a new config hash without forcing task restarts. I've published a new PR with a different, hopefully simpler approach [here](https://github.com/apache/kafka/pull/16053). Let me know what you think! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org