C0urante commented on code in PR #16001:
URL: https://github.com/apache/kafka/pull/16001#discussion_r1608664359
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java:
##########
@@ -187,6 +193,17 @@ public Map<String, String> rawTaskConfig(ConnectorTaskId
task) {
return taskConfigs.get(task);
}
+ /**
+ * Get the hash of the connector config that was used to generate the
+ * latest set of task configs for the connector
+ * @param connectorName name of the connector
+ * @return the config hash, or null if the connector does not exist or
+ * no config hash for its latest set of tasks has been stored
+ */
+ public Integer taskConfigHash(String connectorName) {
Review Comment:
True, but that would imply that it's the hash of the latest connector
config, which is not always the case (and this feature is only value when it is
not the case). I juggled several names including `tasksConnectorConfigHash` and
`connectorConfigHash`, and tried to settle on something that was brief and
least inaccurate.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java:
##########
@@ -243,4 +249,29 @@ public static Map<String, String> patchConfig(
});
return result;
}
+
+ /**
+ * Generate a deterministic hash of the supplied config. For configurations
+ * with identical key-value pairs, this hash will always be the same.
+ * @param config the config to hash; may be null
+ * @return a hash of the config
+ */
+ public static int configHash(Map<String, String> config) {
+ if (config == null)
+ return 0;
+
+ Map<String, String> toHash = new TreeMap<>(config);
+
+ byte[] serialized;
+ try {
+ serialized = OBJECT_MAPPER.writeValueAsBytes(toHash);
Review Comment:
I didn't want to rely on hash code implementations being identical across
JVM restarts or even different JVMs. If there's an official spec somewhere that
applies to all of our supported Java versions that guarantees that
`AbstractMap::hashCode` or some other hash implementation will be the same
everywhere, then I agree it'd be better to use that.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -1039,21 +1039,42 @@ public static List<Map<String, String>>
reverseTransform(String connName,
return result;
}
- public boolean taskConfigsChanged(ClusterConfigState configState, String
connName, List<Map<String, String>> taskProps) {
+ public boolean taskConfigsChanged(
+ ClusterConfigState configState,
+ String connName,
+ List<Map<String, String>> taskProps,
+ int connectorConfigHash
+ ) {
int currentNumTasks = configState.taskCount(connName);
boolean result = false;
if (taskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}",
connName, currentNumTasks, taskProps.size());
result = true;
} else {
- for (int index = 0; index < currentNumTasks; index++) {
+ for (int index = 0; index < currentNumTasks && !result; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if
(!taskProps.get(index).equals(configState.taskConfig(taskId))) {
log.debug("Connector {} has change in configuration for
task {}-{}", connName, connName, index);
result = true;
}
}
+ // Do a final check to see if runtime-controlled properties that
affect tasks but may
+ // not be included in the connector-generated configs for them
(such as converter overrides)
+ // have changed
+ if (!result) {
+ Integer storedConnectorConfigHash =
configState.taskConfigHash(connName);
+ if (storedConnectorConfigHash == null) {
+ log.debug("Connector {} has no config hash stored for its
existing tasks", connName);
+ } else if (storedConnectorConfigHash != connectorConfigHash) {
+ log.debug(
+ "Connector {} has change in config hash ({}) for
tasks ({})",
Review Comment:
I'd like to treat it as sensitive information and not log above `TRACE`
level if possible (we allow record contents and other potentially-sensitive
details to be logged at this level). I've added a note to a few Javadocs making
this clear. Does this seem sufficient? If not, I can look into alternative hash
functions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]