C0urante commented on a change in pull request #11369: URL: https://github.com/apache/kafka/pull/11369#discussion_r720238529
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ########## @@ -70,60 +73,105 @@ "keys, all error context header keys will start with <code>__connect.errors.</code>"; private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers"; - static ConfigDef config = ConnectorConfig.configDef() - .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) - .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY) - .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY) - .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY) - .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); - public static ConfigDef configDef() { - return config; + return ConnectorConfig.configDef() + .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) + .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY) + .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY) + .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY) + .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); } public SinkConnectorConfig(Plugins plugins, Map<String, String> props) { - super(plugins, config, props); + super(plugins, configDef(), props); } /** * Throw an exception if the passed-in properties do not constitute a valid sink. * @param props sink configuration properties */ public static void validate(Map<String, String> props) { - final boolean hasTopicsConfig = hasTopicsConfig(props); - final boolean hasTopicsRegexConfig = hasTopicsRegexConfig(props); - final boolean hasDlqTopicConfig = hasDlqTopicConfig(props); + validate( + props, + error -> { + throw new ConfigException(error.property, error.value, error.errorMessage); + } + ); + } + + /** + * Perform preflight validation for the sink-specific properties for this connector. + */ + public static Map<String, ConfigValue> validate(Map<String, ConfigValue> validatedConfig, Map<String, String> props) { + validate(props, error -> addErrorMessage(validatedConfig, error)); + return validatedConfig; + } + + private static void validate(Map<String, String> props, Consumer<ConfigError> onError) { + final String topicsList = props.get(TOPICS_CONFIG); + final String topicsRegex = props.get(TOPICS_REGEX_CONFIG); + final String dlqTopic = props.getOrDefault(DLQ_TOPIC_NAME_CONFIG, "").trim(); + final boolean hasTopicsConfig = !Utils.isBlank(topicsList); + final boolean hasTopicsRegexConfig = !Utils.isBlank(topicsRegex); + final boolean hasDlqTopicConfig = !Utils.isBlank(dlqTopic); if (hasTopicsConfig && hasTopicsRegexConfig) { - throw new ConfigException(SinkTask.TOPICS_CONFIG + " and " + SinkTask.TOPICS_REGEX_CONFIG + - " are mutually exclusive options, but both are set."); + String errorMessage = TOPICS_CONFIG + " and " + TOPICS_REGEX_CONFIG + " are mutually exclusive options, but both are set."; + onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, errorMessage)); + onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, errorMessage)); } if (!hasTopicsConfig && !hasTopicsRegexConfig) { - throw new ConfigException("Must configure one of " + - SinkTask.TOPICS_CONFIG + " or " + SinkTask.TOPICS_REGEX_CONFIG); + String errorMessage = "Must configure one of " + TOPICS_CONFIG + " or " + TOPICS_REGEX_CONFIG; + onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, errorMessage)); + onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, errorMessage)); } if (hasDlqTopicConfig) { - String dlqTopic = props.get(DLQ_TOPIC_NAME_CONFIG).trim(); if (hasTopicsConfig) { List<String> topics = parseTopicsList(props); if (topics.contains(dlqTopic)) { - throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the list of " - + "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topics)); + String errorMessage = String.format( + "The DLQ topic '%s' may not be included in the list of topics ('%s=%s') consumed by the connector", + dlqTopic, TOPICS_CONFIG, topics + ); + onError.accept(new ConfigError(TOPICS_CONFIG, topicsList, errorMessage)); } } if (hasTopicsRegexConfig) { - String topicsRegexStr = props.get(SinkTask.TOPICS_REGEX_CONFIG); - Pattern pattern = Pattern.compile(topicsRegexStr); + Pattern pattern = Pattern.compile(topicsRegex); if (pattern.matcher(dlqTopic).matches()) { - throw new ConfigException(String.format("The DLQ topic '%s' may not be included in the regex matching the " - + "topics ('%s=%s') consumed by the connector", dlqTopic, SinkTask.TOPICS_REGEX_CONFIG, topicsRegexStr)); + String errorMessage = String.format( + "The DLQ topic '%s' may not be included in the regex matching the topics ('%s=%s') consumed by the connector", + dlqTopic, TOPICS_REGEX_CONFIG, topicsRegex + ); + onError.accept(new ConfigError(TOPICS_REGEX_CONFIG, topicsRegex, errorMessage)); } } } } + private static class ConfigError { Review comment: > I think the reason you're using this class instead of the plain exception to avoid the cost of instantiating the exception and filling the stacktrace. Not exactly; it's because the `ConfigException` class isn't really an effective POJO for a property, its value, and an error message. There are no getters for any of those fields, and two of the three constructors available would permit null values for some of them. I chose not to change the `ConfigException` class to add these getters since I'm not sure it's the best idea for an exception class to be doing double duty as a POJO as well. > I think passing in Map<String, ConfigValue> and calling the addErrorMessage function as a utility function directly avoids the need for this POJO, while getting you the ability to return multiple errors. This is true, but I'm not sure how we could make that work cleanly with the other current use for `SinkConnectorConfig::validate`, which is to do a [preliminary validation step](https://github.com/apache/kafka/blob/db42afd6e24ef4291390b4d1c1f10758beedefed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295) during `WorkerSinkTask` initialization. > Also, Is there a reason that we can't have this validation return a ConfigInfos object, and merge it in late like the rest of the validations taking place in AbstractHerder::validateConnnectorConfig? That seems like the standard convention. It'd make it a little more difficult for subclasses (i.e., `DistributedHerder`) to override behavior and add their own logic on top. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org