C0urante commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1612176138


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {
                 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
                     log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
                     result = true;
                 }
             }
+            // Do a final check to see if runtime-controlled properties that 
affect tasks but may
+            // not be included in the connector-generated configs for them 
(such as converter overrides)
+            // have changed
+            if (!result) {
+                Integer storedConnectorConfigHash = 
configState.taskConfigHash(connName);
+                if (storedConnectorConfigHash == null) {
+                    log.debug("Connector {} has no config hash stored for its 
existing tasks", connName);

Review Comment:
   I don't love the idea of warn logs here. They'll be emitted unconditionally 
during upgrade even on completely unaffected clusters, and if users follow 
instructions to reconfigure their connectors, it'll have the same disruptive 
effect of forcing task restarts and extra churn. Honestly though, I suspect 
what's more likely is that people will ignore them.
   
   I think the underlying problem we're running into is that tightly coupling 
the storage of config hashes with the storage of task configs is forcing us 
into an uncomfortable spot where we can't publish a new config hash without 
forcing task restarts.
   
   I've published a new PR with a different, hopefully simpler approach 
[here](https://github.com/apache/kafka/pull/16053). Let me know what you think!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to