C0urante commented on a change in pull request #11369:
URL: https://github.com/apache/kafka/pull/11369#discussion_r718551225



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -344,12 +354,73 @@ public StatusBackingStore statusBackingStore() {
                 status.workerId(), status.trace());
     }
 
-    protected Map<String, ConfigValue> validateBasicConnectorConfig(Connector 
connector,
-                                                                    ConfigDef 
configDef,
-                                                                    
Map<String, String> config) {
+    protected Map<String, ConfigValue> validateSinkConnectorConfig(ConfigDef 
configDef, Map<String, String> config) {
+        return SinkConnectorConfig.validate(configDef.validateAll(config), 
config);
+    }
+
+    protected Map<String, ConfigValue> validateSourceConnectorConfig(ConfigDef 
configDef, Map<String, String> config) {
         return configDef.validateAll(config);
     }
 
+    private ConfigInfos validateHeaderConverterConfig(Map<String, String> 
connectorConfig, ConfigValue headerConverterConfigValue) {
+        String headerConverterClass = 
connectorConfig.get(HEADER_CONVERTER_CLASS_CONFIG);
+
+        if (headerConverterClass == null
+            || headerConverterConfigValue == null
+            || !headerConverterConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom header converter was specified, or one was 
specified but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }

Review comment:
       The short-circuiting pattern here where we conditionally exit the method 
and otherwise proceed normally doesn't seem to be playing very nicely with 
Checkstyle's NPath complexity metric. I kept this style because it reduces 
indentation and seems more readable than the alternative (where the portions of 
code after an `if` statement such as this one would be put inside an `else` 
block), but it comes at the cost that we have to add a Checkstyle suppression.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -425,7 +439,10 @@ void enrich(ConfigDef newDef) {
                 final ConfigDef.Validator typeValidator = 
ConfigDef.LambdaValidator.with(
                     (String name, Object value) -> {
                         validateProps(prefix);
-                        getConfigDefFromConfigProvidingClass(typeConfig, 
(Class<?>) value);
+                        // The value will be null if the class couldn't be 
found; no point in trying to load a ConfigDef for it
+                        if (value != null) {
+                            getConfigDefFromConfigProvidingClass(typeConfig, 
(Class<?>) value);
+                        }

Review comment:
       This fixes an issue present in our [unit 
tests](https://github.com/apache/kafka/blob/0888953e0e834bf849751a36ae5adf92b20ec0a5/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java#L495)
 where multiple error messages are added for a single invalid 
transform/predicate property when the class cannot be found. It's best if a 
single error message is returned stating that the class cannot be found; the 
second message (something along the lines of "invalid value null for property 
...") did not add any value.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -499,13 +516,15 @@ ConfigDef getConfigDefFromConfigProvidingClass(String 
key, Class<?> cls) {
                         aliasKind + " is abstract and cannot be created. Did 
you mean " + childClassNames + "?";
                 throw new ConfigException(key, String.valueOf(cls), message);
             }
-            T transformation;
+            T plugin;
             try {
-                transformation = Utils.newInstance(cls, baseClass);
+                plugin = Utils.newInstance(cls, baseClass);
             } catch (Exception e) {
-                throw new ConfigException(key, String.valueOf(cls), "Error 
getting config definition from " + baseClass.getSimpleName() + ": " + 
e.getMessage());
+                // Log the entire exception here in order to provide a stack 
trace that can be useful for debugging classloading issues
+                log.error("Failed to instantiate {} '{}'", 
baseClass.getSimpleName(), cls, e);

Review comment:
       Added this error message based on discussion of a similar error-handling 
situation 
[here](https://github.com/apache/kafka/pull/11349#discussion_r715452885).

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -499,13 +516,15 @@ ConfigDef getConfigDefFromConfigProvidingClass(String 
key, Class<?> cls) {
                         aliasKind + " is abstract and cannot be created. Did 
you mean " + childClassNames + "?";
                 throw new ConfigException(key, String.valueOf(cls), message);
             }
-            T transformation;

Review comment:
       Some basic cleanup to highlight the fact that this class is now agnostic 
with regards to plugin type and is not specific to SMTs.

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java
##########
@@ -70,60 +73,105 @@
             "keys, all error context header keys will start with 
<code>__connect.errors.</code>";
     private static final String DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY = "Enable 
Error Context Headers";
 
-    static ConfigDef config = ConnectorConfig.configDef()
-        .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, 
ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, 
TOPICS_DISPLAY)
-        .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, 
TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, 
TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
-        .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, 
DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
-        .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
-        .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, 
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
-
     public static ConfigDef configDef() {
-        return config;
+        return ConnectorConfig.configDef()
+            .define(TOPICS_CONFIG, ConfigDef.Type.LIST, TOPICS_DEFAULT, 
ConfigDef.Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, 
TOPICS_DISPLAY)
+            .define(TOPICS_REGEX_CONFIG, ConfigDef.Type.STRING, 
TOPICS_REGEX_DEFAULT, new RegexValidator(), ConfigDef.Importance.HIGH, 
TOPICS_REGEX_DOC, COMMON_GROUP, 4, ConfigDef.Width.LONG, TOPICS_REGEX_DISPLAY)
+            .define(DLQ_TOPIC_NAME_CONFIG, ConfigDef.Type.STRING, 
DLQ_TOPIC_DEFAULT, Importance.MEDIUM, DLQ_TOPIC_NAME_DOC, ERROR_GROUP, 6, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_DISPLAY)
+            .define(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, ConfigDef.Type.SHORT, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DEFAULT, Importance.MEDIUM, 
DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DOC, ERROR_GROUP, 7, 
ConfigDef.Width.MEDIUM, DLQ_TOPIC_REPLICATION_FACTOR_CONFIG_DISPLAY)
+            .define(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, 
DLQ_CONTEXT_HEADERS_ENABLE_DEFAULT, Importance.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DOC, ERROR_GROUP, 8, ConfigDef.Width.MEDIUM, 
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);

Review comment:
       It's safer to use a new `ConfigDef` object every time in order to avoid 
issues like [https://issues.apache.org/jira/browse/KAFKA-9950), which can be 
caused by reusing a `ConfigDef` that is then accidentally mutated by some 
downstream logic.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
##########
@@ -114,8 +114,8 @@ public void start(Map<String, String> props) {
             taskId = props.get("task.id");
             connectorName = props.get("connector.name");
             topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic");
-            throughput = Long.valueOf(props.getOrDefault("throughput", "-1"));
-            batchSize = 
Integer.valueOf(props.getOrDefault("messages.per.poll", "1"));
+            throughput = Long.parseLong(props.getOrDefault("throughput", 
"-1"));
+            batchSize = 
Integer.parseInt(props.getOrDefault("messages.per.poll", "1"));

Review comment:
       My IDE was nagging me about unnecessary boxing.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
##########
@@ -133,7 +133,7 @@ private void assertFailCreateConnector(String policy, 
Map<String, String> props)
             connect.configureConnector(CONNECTOR_NAME, props);
             fail("Shouldn't be able to create connector");
         } catch (ConnectRestException e) {
-            assertEquals(e.statusCode(), 400);
+            assertEquals(400, e.statusCode());

Review comment:
       The expected value should go first, as it's used in error messages that 
come up with test failures ("expected <first argument>, got <second argument>").
   
   This test did fail once locally for me; I believe it was just some flakiness 
induced by my laptop being overworked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to