gharris1727 commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1610727814


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,7 +1039,12 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            ConfigHash connectorConfigHash

Review Comment:
   optional: You could use the old signature, and compute ConfigHash from the 
configState variable.
   
   This would switch the AbstractHerderTest cases to test the real hash 
algorithm, rather than using mocked hashes, and less mocking seems better to me.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A deterministic hash of a connector configuration. This can be used to 
detect changes
+ * in connector configurations across worker lifetimes, which is sometimes 
necessary when
+ * connectors are reconfigured in a way that affects their tasks' runtime 
behavior but does
+ * not affect their tasks' configurations (for example, changing the key 
converter class).
+ *
+ * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-9228";>KAFKA-9228</a>
+ */
+public class ConfigHash {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ConfigHash.class);
+
+    public static final ConfigHash NO_HASH = new ConfigHash(null);
+    public static final String CONNECTOR_CONFIG_HASH_HEADER = 
"X-Connect-Connector-Config-Hash";
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private final Integer hash;
+
+    // Visible for testing
+    ConfigHash(Integer hash) {
+        this.hash = hash;
+    }
+
+    /**
+     * Read and parse a hash from the headers of a REST request.
+     *
+     * @param connector the name of the connector; only used for error logging
+     *                  purposes and may be null
+     * @param headers the headers from which to read and parse the hash;
+     *                may be null
+     *
+     * @return the parsed hash; never null, but may be {@link #NO_HASH} if
+     * no hash header is present
+     *
+     * @throws ConnectException if the expected header is present for the hash,
+     * but it cannot be parsed as a 32-bit signed integer
+     */
+    public static ConfigHash fromHeaders(String connector, HttpHeaders 
headers) {
+        if (headers == null)
+            return NO_HASH;
+
+        String header = headers.getHeaderString(CONNECTOR_CONFIG_HASH_HEADER);
+        if (header == null)
+            return NO_HASH;
+
+        int hash;
+        try {
+            hash = Integer.parseInt(header);
+        } catch (NumberFormatException e) {
+            if (connector == null)
+                connector = "<unknown>";
+
+            if (log.isTraceEnabled()) {
+                log.error("Invalid connector config hash header for connector 
{}", connector);

Review Comment:
   nit: error log in trace if-condition



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A deterministic hash of a connector configuration. This can be used to 
detect changes
+ * in connector configurations across worker lifetimes, which is sometimes 
necessary when
+ * connectors are reconfigured in a way that affects their tasks' runtime 
behavior but does
+ * not affect their tasks' configurations (for example, changing the key 
converter class).
+ *
+ * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-9228";>KAFKA-9228</a>
+ */
+public class ConfigHash {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ConfigHash.class);
+
+    public static final ConfigHash NO_HASH = new ConfigHash(null);
+    public static final String CONNECTOR_CONFIG_HASH_HEADER = 
"X-Connect-Connector-Config-Hash";
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private final Integer hash;
+
+    // Visible for testing
+    ConfigHash(Integer hash) {
+        this.hash = hash;
+    }
+
+    /**
+     * Read and parse a hash from the headers of a REST request.
+     *
+     * @param connector the name of the connector; only used for error logging
+     *                  purposes and may be null
+     * @param headers the headers from which to read and parse the hash;
+     *                may be null
+     *
+     * @return the parsed hash; never null, but may be {@link #NO_HASH} if
+     * no hash header is present
+     *
+     * @throws ConnectException if the expected header is present for the hash,
+     * but it cannot be parsed as a 32-bit signed integer
+     */
+    public static ConfigHash fromHeaders(String connector, HttpHeaders 
headers) {
+        if (headers == null)
+            return NO_HASH;
+
+        String header = headers.getHeaderString(CONNECTOR_CONFIG_HASH_HEADER);
+        if (header == null)
+            return NO_HASH;
+
+        int hash;
+        try {
+            hash = Integer.parseInt(header);
+        } catch (NumberFormatException e) {
+            if (connector == null)
+                connector = "<unknown>";
+
+            if (log.isTraceEnabled()) {
+                log.error("Invalid connector config hash header for connector 
{}", connector);
+                log.trace("Invalid connector config hash header for connector 
{}: '{}'", connector, header);
+            } else {
+                log.error(
+                        "Invalid connector config hash header for connector 
{}. "
+                                + "Please enable TRACE logging to see the 
invalid value",
+                        connector
+                );
+            }
+            throw new ConnectException("Invalid hash header; expected a 32-bit 
signed integer");
+        }
+        return new ConfigHash(hash);
+    }
+
+    /**
+     * Generate a deterministic hash from the config. For configurations
+     * with identical key-value pairs, this hash will always be the same, and
+     * {@link #shouldUpdateTasks(ConfigHash, ConfigHash)} will return {@code 
false}
+     * for any two such configurations. Note that, for security reasons, those
+     * {@link ConfigHash} instances will still not {@link #equals(Object) 
equal}
+     * each other.
+     *
+     * @param config the configuration to hash; may be null
+     *
+     * @return the resulting hash; may be {@link #NO_HASH} if the configuration
+     * was null
+     *
+     * @throws ConnectException if the configuration cannot be serialized to 
JSON
+     * for the purposes of hashing
+     */
+    public static ConfigHash fromConfig(Map<String, String> config) {
+        if (config == null)
+            return NO_HASH;
+
+        Map<String, String> toHash = new TreeMap<>(config);
+
+        byte[] serialized;
+        try {
+            serialized = OBJECT_MAPPER.writeValueAsBytes(toHash);
+        } catch (IOException e) {
+            throw new ConnectException(
+                    "Unable to serialize connector config contents for 
hashing",
+                    e
+            );
+        }
+
+        int hash = Utils.murmur2(serialized);
+        return new ConfigHash(hash);
+    }
+
+    /**
+     * Read and parse the hash from the headers of a REST request.
+     *
+     * @param map the map to read the hash field from; may be null
+     * @param field the field to read and parse; may be null
+     *
+     * @return the parsed hash; never null, but may be {@link #NO_HASH} if
+     * the map is null or the field is not present in the map
+     *
+     * @throws ConnectException if the expected field is present for the hash,
+     * but it cannot be parsed as a 32-bit signed integer
+     */
+    public static ConfigHash fromMap(Map<String, ?> map, String field) {
+        if (map == null)
+            return NO_HASH;
+
+        Object rawHash = map.get(field);
+        if (rawHash == null)
+            return NO_HASH;
+
+        int hash = ConnectUtils.intValue(rawHash);
+        return new ConfigHash(hash);
+    }
+
+    /**
+     * Determine whether tasks should be restarted based on a previously-stored
+     * hash, and the hash for a connector config that was used to generate new 
task configs
+     *
+     * @param previous the previously-stored config hash for the connector
+     * @param current the hash of the connector config which led to 
newly-generated
+     *                task configs
+     *
+     * @return whether a restart of the connector's tasks should be forced, 
possibly to
+     * pick up runtime-controlled configuration changes that would otherwise 
be dropped
+     *
+     * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-9228";>KAFKA-9228</a>
+     */
+    public static boolean shouldUpdateTasks(ConfigHash previous, ConfigHash 
current) {
+        if (previous == null || current == null)
+            return false;
+
+        return previous.exists() && !previous.matches(current);
+    }
+
+    /**
+     * Insert this hash (if it {@link #exists() exists}) into a {@link Struct} 
with the desired field name.
+     *
+     * @param struct the struct to add the hash to; may be null, in which case
+     *               this method becomes a no-op
+     * @param field the name of the field to add the hash under; may be null,
+     *              in which case this method becomes a no-op
+     */
+    public void addToStruct(Struct struct, String field) {
+        if (hash == null || struct == null || field == null)
+            return;
+        struct.put(field, hash);
+    }
+
+    /**
+     * Add this hash (if it {@link #exists() exists}) to a map of HTTP headers
+     *
+     * @param headers the headers map to add this hash to; may be null, in 
which case
+     *                this method becomes a no-op
+     */
+    public void addToHeaders(Map<String, String> headers) {
+        if (headers == null || !exists())
+            return;
+
+        headers.put(CONNECTOR_CONFIG_HASH_HEADER, Integer.toString(hash));
+    }
+
+    /**
+     * @return whether a hash for this config was found
+     */
+    public boolean exists() {
+        return hash != null;
+    }
+
+    @Override
+    public String toString() {
+        // DO NOT OVERRIDE THIS METHOD; config hashes should not be logged

Review Comment:
   You can mark the class as `final` or just the methods as `final` if you want 
the compiler to enforce this.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java:
##########
@@ -70,40 +70,45 @@ HttpClient httpClient(SslContextFactory.Client 
sslContextFactory) {
      *
      * @param url             HTTP connection will be established with this 
url, non-null.
      * @param method          HTTP method ("GET", "POST", "PUT", etc.), 
non-null
-     * @param headers         HTTP headers from REST endpoint
+     * @param inboundHeaders            HTTP headers received from an inbound 
REST request, which

Review Comment:
   nit: indenting looks wonky



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConfigHash.java:
##########
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.ConnectUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A deterministic hash of a connector configuration. This can be used to 
detect changes
+ * in connector configurations across worker lifetimes, which is sometimes 
necessary when
+ * connectors are reconfigured in a way that affects their tasks' runtime 
behavior but does
+ * not affect their tasks' configurations (for example, changing the key 
converter class).
+ *
+ * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-9228";>KAFKA-9228</a>
+ */
+public class ConfigHash {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ConfigHash.class);
+
+    public static final ConfigHash NO_HASH = new ConfigHash(null);
+    public static final String CONNECTOR_CONFIG_HASH_HEADER = 
"X-Connect-Connector-Config-Hash";
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private final Integer hash;
+
+    // Visible for testing
+    ConfigHash(Integer hash) {
+        this.hash = hash;
+    }
+
+    /**
+     * Read and parse a hash from the headers of a REST request.
+     *
+     * @param connector the name of the connector; only used for error logging
+     *                  purposes and may be null
+     * @param headers the headers from which to read and parse the hash;
+     *                may be null
+     *
+     * @return the parsed hash; never null, but may be {@link #NO_HASH} if
+     * no hash header is present
+     *
+     * @throws ConnectException if the expected header is present for the hash,
+     * but it cannot be parsed as a 32-bit signed integer
+     */
+    public static ConfigHash fromHeaders(String connector, HttpHeaders 
headers) {
+        if (headers == null)
+            return NO_HASH;
+
+        String header = headers.getHeaderString(CONNECTOR_CONFIG_HASH_HEADER);
+        if (header == null)
+            return NO_HASH;
+
+        int hash;
+        try {
+            hash = Integer.parseInt(header);
+        } catch (NumberFormatException e) {
+            if (connector == null)
+                connector = "<unknown>";
+
+            if (log.isTraceEnabled()) {
+                log.error("Invalid connector config hash header for connector 
{}", connector);
+                log.trace("Invalid connector config hash header for connector 
{}: '{}'", connector, header);
+            } else {
+                log.error(
+                        "Invalid connector config hash header for connector 
{}. "
+                                + "Please enable TRACE logging to see the 
invalid value",
+                        connector
+                );
+            }
+            throw new ConnectException("Invalid hash header; expected a 32-bit 
signed integer");
+        }
+        return new ConfigHash(hash);
+    }
+
+    /**
+     * Generate a deterministic hash from the config. For configurations
+     * with identical key-value pairs, this hash will always be the same, and
+     * {@link #shouldUpdateTasks(ConfigHash, ConfigHash)} will return {@code 
false}
+     * for any two such configurations. Note that, for security reasons, those
+     * {@link ConfigHash} instances will still not {@link #equals(Object) 
equal}
+     * each other.
+     *
+     * @param config the configuration to hash; may be null
+     *
+     * @return the resulting hash; may be {@link #NO_HASH} if the configuration
+     * was null
+     *
+     * @throws ConnectException if the configuration cannot be serialized to 
JSON
+     * for the purposes of hashing
+     */
+    public static ConfigHash fromConfig(Map<String, String> config) {
+        if (config == null)
+            return NO_HASH;
+
+        Map<String, String> toHash = new TreeMap<>(config);
+
+        byte[] serialized;
+        try {
+            serialized = OBJECT_MAPPER.writeValueAsBytes(toHash);
+        } catch (IOException e) {
+            throw new ConnectException(
+                    "Unable to serialize connector config contents for 
hashing",
+                    e
+            );
+        }
+
+        int hash = Utils.murmur2(serialized);
+        return new ConfigHash(hash);
+    }
+
+    /**
+     * Read and parse the hash from the headers of a REST request.
+     *
+     * @param map the map to read the hash field from; may be null
+     * @param field the field to read and parse; may be null
+     *
+     * @return the parsed hash; never null, but may be {@link #NO_HASH} if
+     * the map is null or the field is not present in the map
+     *
+     * @throws ConnectException if the expected field is present for the hash,
+     * but it cannot be parsed as a 32-bit signed integer
+     */
+    public static ConfigHash fromMap(Map<String, ?> map, String field) {
+        if (map == null)
+            return NO_HASH;
+
+        Object rawHash = map.get(field);
+        if (rawHash == null)
+            return NO_HASH;
+
+        int hash = ConnectUtils.intValue(rawHash);
+        return new ConfigHash(hash);
+    }
+
+    /**
+     * Determine whether tasks should be restarted based on a previously-stored
+     * hash, and the hash for a connector config that was used to generate new 
task configs
+     *
+     * @param previous the previously-stored config hash for the connector
+     * @param current the hash of the connector config which led to 
newly-generated
+     *                task configs
+     *
+     * @return whether a restart of the connector's tasks should be forced, 
possibly to
+     * pick up runtime-controlled configuration changes that would otherwise 
be dropped
+     *
+     * @see <a 
href="https://issues.apache.org/jira/browse/KAFKA-9228";>KAFKA-9228</a>
+     */
+    public static boolean shouldUpdateTasks(ConfigHash previous, ConfigHash 
current) {
+        if (previous == null || current == null)
+            return false;
+
+        return previous.exists() && !previous.matches(current);
+    }
+
+    /**
+     * Insert this hash (if it {@link #exists() exists}) into a {@link Struct} 
with the desired field name.
+     *
+     * @param struct the struct to add the hash to; may be null, in which case
+     *               this method becomes a no-op
+     * @param field the name of the field to add the hash under; may be null,
+     *              in which case this method becomes a no-op
+     */
+    public void addToStruct(Struct struct, String field) {
+        if (hash == null || struct == null || field == null)
+            return;
+        struct.put(field, hash);
+    }
+
+    /**
+     * Add this hash (if it {@link #exists() exists}) to a map of HTTP headers
+     *
+     * @param headers the headers map to add this hash to; may be null, in 
which case
+     *                this method becomes a no-op
+     */
+    public void addToHeaders(Map<String, String> headers) {
+        if (headers == null || !exists())
+            return;
+
+        headers.put(CONNECTOR_CONFIG_HASH_HEADER, Integer.toString(hash));
+    }

Review Comment:
   I think in service of not having a hash getter, these methods are 
reasonable. I agree that these are the most leak prone, but in order to have an 
effect elsewhere, the data has to get out somehow. Having this have a 
dependency on Map and Struct is fine, the alternative of making this class 
depend on a REST client and the config topic would be insane.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##########
@@ -243,4 +249,29 @@ public static Map<String, String> patchConfig(
         });
         return result;
     }
+
+    /**
+     * Generate a deterministic hash of the supplied config. For configurations
+     * with identical key-value pairs, this hash will always be the same.
+     * @param config the config to hash; may be null
+     * @return a hash of the config
+     */
+    public static int configHash(Map<String, String> config) {
+        if (config == null)
+            return 0;
+
+        Map<String, String> toHash = new TreeMap<>(config);
+
+        byte[] serialized;
+        try {
+            serialized = OBJECT_MAPPER.writeValueAsBytes(toHash);

Review Comment:
   They do document the hashCode implementation very precisely: 
https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.base/java/util/AbstractMap.html#hashCode()
 
   
   > Returns the hash code value for this map. The hash code of a map is 
defined to be the sum of the hash codes of each entry in the map's entrySet() 
view. This ensures that m1.equals(m2) implies that m1.hashCode()==m2.hashCode() 
for any two maps m1 and m2, as required by the general contract of 
[Object.hashCode()](https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.base/java/lang/Object.html#hashCode()).
   
   
https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.base/java/util/Map.Entry.html#hashCode()
   
   > Returns the hash code value for this map entry. The hash code of a map 
entry e is defined to be:
   > ```
   >     (e.getKey()==null   ? 0 : e.getKey().hashCode()) ^
   >     (e.getValue()==null ? 0 : e.getValue().hashCode())
   > ```
   
   
https://docs.oracle.com/en%2Fjava%2Fjavase%2F21%2Fdocs%2Fapi%2F%2F/java.base/java/lang/String.html#hashCode()
   > Returns a hash code for this string. The hash code for a String object is 
computed as
   > ```
   > s[0]*31^(n-1) + s[1]*31^(n-2) + ... + s[n-1]
   > ```
   
   I sincerely doubt that a future JDK version would change the hash algorithm 
of the String, but I don't know that for sure.
   
   I am more confident about the AbstractMap implementation staying the same 
though: because it's an interface that can be extended, the equals and hashCode 
implementations are expected to hold across distinct implementations. If the 
AbstractMap implementation changed, I think that would break some existing 
implementations.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>> 
reverseTransform(String connName,
         return result;
     }
 
-    public boolean taskConfigsChanged(ClusterConfigState configState, String 
connName, List<Map<String, String>> taskProps) {
+    public boolean taskConfigsChanged(
+            ClusterConfigState configState,
+            String connName,
+            List<Map<String, String>> taskProps,
+            int connectorConfigHash
+    ) {
         int currentNumTasks = configState.taskCount(connName);
         boolean result = false;
         if (taskProps.size() != currentNumTasks) {
             log.debug("Connector {} task count changed from {} to {}", 
connName, currentNumTasks, taskProps.size());
             result = true;
         } else {
-            for (int index = 0; index < currentNumTasks; index++) {
+            for (int index = 0; index < currentNumTasks && !result; index++) {
                 ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
                 if 
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
                     log.debug("Connector {} has change in configuration for 
task {}-{}", connName, connName, index);
                     result = true;
                 }
             }
+            // Do a final check to see if runtime-controlled properties that 
affect tasks but may
+            // not be included in the connector-generated configs for them 
(such as converter overrides)
+            // have changed
+            if (!result) {
+                Integer storedConnectorConfigHash = 
configState.taskConfigHash(connName);
+                if (storedConnectorConfigHash == null) {
+                    log.debug("Connector {} has no config hash stored for its 
existing tasks", connName);

Review Comment:
   Thank you for the detailed description of that failure mode. I was not 
thinking about the cluster including workers without this patch, and infinitely 
updating the task configs is definitely undesirable.
   
   I'm still just a little uncomfortable letting this un-upgraded state linger, 
but I can't really come up with a viable alternative. We'll just need to 
recommend applying the workaround once whenever this log message is shown.
   
   Do you think that this could be WARN, and/or contain some more actionable 
information, like "please update one of these configurations" and list the 
config keys in the task configs? While unusual, I think that would greatly 
improve the time-to-resolve this issue if it appears in practice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to