C0urante commented on a change in pull request #11369:
URL: https://github.com/apache/kafka/pull/11369#discussion_r720238529



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
##########
@@ -70,60 +73,105 @@
             "keys, all error context header keys will start with 
<code>__connect.errors.</code>";
     private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable 
Error Context Headers";
 
-    static ConfigDef config = ConnectorConfig.configDef()
-        .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, 
ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, 
TOPICS_DISPLAY)
-        .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, 
TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, 
TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
-        .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, 
DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
-        .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
-        .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, 
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
-
     public static ConfigDef configDef() {
-        return config;
+        return ConnectorConfig.configDef()
+            .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, 
ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, 
TOPICS_DISPLAY)
+            .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, 
TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, 
TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
+            .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, 
DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
+            .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
+            .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, 
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
     }
 
     public SinkConnectorConfig(Plugins plugins, Map<String, String> props) {
-        super(plugins, config, props);
+        super(plugins, configDef(), props);
     }
 
     /**
      * Throw an exception if the passed-in properties do not constitute a 
valid sink.
      * @param props sink configuration properties
      */
     public static void validate(Map<String, String> props) {
-        final boolean hasTopicsConfig = hasTopicsConfig(props);
-        final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props);
-        final boolean hasDlqTopicConfig = hasDlqTopicConfig(props);
+        validate(
+            props,
+            error -> {
+                throw new ConfigException(error.property, error.value, 
error.errorMessage);
+            }
+        );
+    }
+
+    /**
+     * Perform preflight validation for the sink-specific properties for this 
connector.
+     */
+    public static Map<String, ConfigValue> validate(Map<String, ConfigValue> 
validatedConfig, Map<String, String> props) {
+        validate(props, error -> addErrorMessage(validatedConfig, error));
+        return validatedConfig;
+    }
+
+    private static void validate(Map<String, String> props, 
Consumer<ConfigError> onError) {
+        final String topicsList = props.get(TOPICS_CONFIG);
+        final String topicsRegex = props.get(TOPICS_REGEX_CONFIG);
+        final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, 
"").trim();
+        final boolean hasTopicsConfig = !Utils.isBlank(topicsList);
+        final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex);
+        final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic);
 
         if (hasTopicsConfig && hasTopicsRegexConfig) {
-            throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + 
SinkTask.TOPICS_REGEX_CONFIG +
-                " are mutually exclusive options, but both are set.");
+            String errorMessage = TOPICS_CONFIG + " and " + 
TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set.";
+            onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, 
errorMessage));
+            onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, 
errorMessage));
         }
 
         if (!hasTopicsConfig && !hasTopicsRegexConfig) {
-            throw new ConfigException("Must configure one of " +
-                SinkTask.TOPICS_CONFIG + " or " + 
SinkTask.TOPICS_REGEX_CONFIG);
+            String errorMessage = "Must configure one of " + TOPICS_CONFIG + " 
or " + TOPICS_REGEX_CONFIG;
+            onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, 
errorMessage));
+            onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, 
errorMessage));
         }
 
         if (hasDlqTopicConfig) {
-            String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim();
             if (hasTopicsConfig) {
                 List<String> topics = parseTopicsList(props);
                 if (topics.contains(dlqTopic)) {
-                    throw new ConfigException(String.format("The DLQ topic 
'%s' may not be included in the list of "
-                            + "topics ('%s=%s') consumed by the connector", 
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics));
+                    String errorMessage = String.format(
+                        "The DLQ topic '%s' may not be included in the list of 
topics ('%s=%s') consumed by the connector",
+                        dlqTopic, TOPICS_CONFIG, topics
+                    );
+                    onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, 
errorMessage));
                 }
             }
             if (hasTopicsRegexConfig) {
-                String topicsRegexStr = 
props.get(SinkTask.TOPICS_REGEX_CONFIG);
-                Pattern pattern = Pattern.compile(topicsRegexStr);
+                Pattern pattern = Pattern.compile(topicsRegex);
                 if (pattern.matcher(dlqTopic).matches()) {
-                    throw new ConfigException(String.format("The DLQ topic 
'%s' may not be included in the regex matching the "
-                            + "topics ('%s=%s') consumed by the connector", 
dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr));
+                    String errorMessage = String.format(
+                        "The DLQ topic '%s' may not be included in the regex 
matching the topics ('%s=%s') consumed by the connector",
+                        dlqTopic, TOPICS_REGEX_CONFIG, topicsRegex
+                    );
+                    onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, 
topicsRegex, errorMessage));
                 }
             }
         }
     }
 
+    private static class ConfigError {

Review comment:
       > I think the reason you're using this class instead of the plain 
exception to avoid the cost of instantiating the exception and filling the 
stacktrace.
   
   Not exactly; it's because the `ConfigException` class isn't really an 
effective POJO for a property, its value, and an error message. There are no 
getters for any of those fields, and two of the three constructors available 
would permit null values for some of them. I chose not to change the 
`ConfigException` class to add these getters since I'm not sure it's the best 
idea for an exception class to be doing double duty as a POJO as well.
   
   > I think passing in Map<String, ConfigValue> and calling the 
addErrorMessage function as a utility function directly avoids the need for 
this POJO, while getting you the ability to return multiple errors.
   
   This is true, but I'm not sure how we could make that work cleanly with the 
other current use for `SinkConnectorConfig::validate`, which is to do a 
[preliminary validation 
step](https://github.com/apache/kafka/blob/db42afd6e24ef4291390b4d1c1f10758beedefed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295)
 during `WorkerSinkTask` initialization.
   
   > Also, Is there a reason that we can't have this validation return a 
ConfigInfos object, and merge it in late like the rest of the validations 
taking place in AbstractHerder::validateConnnectorConfig? That seems like the 
standard convention.
   
   It'd make it a little more difficult for subclasses (i.e., 
`DistributedHerder`) to override behavior and add their own logic on top.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to