C0urante commented on a change in pull request #11369: URL: https://github.com/apache/kafka/pull/11369#discussion_r718551225
########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ########## @@ -344,12 +354,73 @@ public StatusBackingStore statusBackingStore() { status.workerId(), status.trace()); } - protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector connector, - ConfigDef configDef, - Map<String, String> config) { + protected Map<String, ConfigValue> validateSinkConnectorConfig(ConfigDef configDef, Map<String, String> config) { + return SinkConnectorConfig.validate(configDef.validateAll(config), config); + } + + protected Map<String, ConfigValue> validateSourceConnectorConfig(ConfigDef configDef, Map<String, String> config) { return configDef.validateAll(config); } + private ConfigInfos validateHeaderConverterConfig(Map<String, String> connectorConfig, ConfigValue headerConverterConfigValue) { + String headerConverterClass = connectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG); + + if (headerConverterClass == null + || headerConverterConfigValue == null + || !headerConverterConfigValue.errorMessages().isEmpty() + ) { + // Either no custom header converter was specified, or one was specified but there's a problem with it. + // No need to proceed any further. + return null; + } Review comment: The short-circuiting pattern here where we conditionally exit the method and otherwise proceed normally doesn't seem to be playing very nicely with Checkstyle's NPath complexity metric. I kept this style because it reduces indentation and seems more readable than the alternative (where the portions of code after an `if` statement such as this one would be put inside an `else` block), but it comes at the cost that we have to add a Checkstyle suppression. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java ########## @@ -425,7 +439,10 @@ void enrich(ConfigDef newDef) { final ConfigDef.Validator typeValidator = ConfigDef.LambdaValidator.with( (String name, Object value) -> { validateProps(prefix); - getConfigDefFromConfigProvidingClass(typeConfig, (Class<?>) value); + // The value will be null if the class couldn't be found; no point in trying to load a ConfigDef for it + if (value != null) { + getConfigDefFromConfigProvidingClass(typeConfig, (Class<?>) value); + } Review comment: This fixes an issue present in our [unit tests](https://github.com/apache/kafka/blob/0888953e0e834bf849751a36ae5adf92b20ec0a5/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java#L495) where multiple error messages are added for a single invalid transform/predicate property when the class cannot be found. It's best if a single error message is returned stating that the class cannot be found; the second message (something along the lines of "invalid value null for property ...") did not add any value. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java ########## @@ -499,13 +516,15 @@ ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) { aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?"; throw new ConfigException(key, String.valueOf(cls), message); } - T transformation; + T plugin; try { - transformation = Utils.newInstance(cls, baseClass); + plugin = Utils.newInstance(cls, baseClass); } catch (Exception e) { - throw new ConfigException(key, String.valueOf(cls), "Error getting config definition from " + baseClass.getSimpleName() + ": " + e.getMessage()); + // Log the entire exception here in order to provide a stack trace that can be useful for debugging classloading issues + log.error("Failed to instantiate {} '{}'", baseClass.getSimpleName(), cls, e); Review comment: Added this error message based on discussion of a similar error-handling situation [here](https://github.com/apache/kafka/pull/11349#discussion_r715452885). ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java ########## @@ -499,13 +516,15 @@ ConfigDef getConfigDefFromConfigProvidingClass(String key, Class<?> cls) { aliasKind + " is abstract and cannot be created. Did you mean " + childClassNames + "?"; throw new ConfigException(key, String.valueOf(cls), message); } - T transformation; Review comment: Some basic cleanup to highlight the fact that this class is now agnostic with regards to plugin type and is not specific to SMTs. ########## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ########## @@ -70,60 +73,105 @@ "keys, all error context header keys will start with <code>__connect.errors.</code>"; private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable Error Context Headers"; - static ConfigDef config = ConnectorConfig.configDef() - .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) - .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY) - .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY) - .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY) - .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); - public static ConfigDef configDef() { - return config; + return ConnectorConfig.configDef() + .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_DISPLAY) + .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY) + .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY) + .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY) + .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY); Review comment: It's safer to use a new `ConfigDef` object every time in order to avoid issues like [https://issues.apache.org/jira/browse/KAFKA-9950), which can be caused by reusing a `ConfigDef` that is then accidentally mutated by some downstream logic. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java ########## @@ -114,8 +114,8 @@ public void start(Map<String, String> props) { taskId = props.get("task.id"); connectorName = props.get("connector.name"); topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic"); - throughput = Long.valueOf(props.getOrDefault("throughput", "-1")); - batchSize = Integer.valueOf(props.getOrDefault("messages.per.poll", "1")); + throughput = Long.parseLong(props.getOrDefault("throughput", "-1")); + batchSize = Integer.parseInt(props.getOrDefault("messages.per.poll", "1")); Review comment: My IDE was nagging me about unnecessary boxing. ########## File path: connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java ########## @@ -133,7 +133,7 @@ private void assertFailCreateConnector(String policy, Map<String, String> props) connect.configureConnector(CONNECTOR_NAME, props); fail("Shouldn't be able to create connector"); } catch (ConnectRestException e) { - assertEquals(e.statusCode(), 400); + assertEquals(400, e.statusCode()); Review comment: The expected value should go first, as it's used in error messages that come up with test failures ("expected <first argument>, got <second argument>"). This test did fail once locally for me; I believe it was just some flakiness induced by my laptop being overworked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org