C0urante commented on code in PR #14309:
URL: https://github.com/apache/kafka/pull/14309#discussion_r1586591136


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector
         return configDef.validateAll(config);
     }
 
+    /**
+     * General-purpose validation logic for converters that are configured 
directly
+     * in a connector config (as opposed to inherited from the worker config).
+     * @param connectorConfig the configuration for the connector; may not be 
null
+     * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+     *                          may be null, in which case no validation will 
be performed under the assumption that the
+     *                          connector will use inherit the converter 
settings from the worker
+     * @param pluginInterface the interface for the plugin type
+     *                        (e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+     *                        may not be null
+     * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+     *                          from an instance of the plugin type (e.g., 
{@code Converter::config});
+     *                          may not be null
+     * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+     *                   may not be null
+     * @param pluginProperty the property used to define a custom class for 
the plugin type
+     *                       in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+     *                       may not be null
+     * @param defaultProperties any default properties to include in the 
configuration that will be used for
+     *                          the plugin; may be null
+
+     * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+     * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+     * config)
+
+     * @param <T> the plugin class to perform validation for
+     */
+    private <T> ConfigInfos validateConverterConfig(
+            Map<String, String> connectorConfig,
+            ConfigValue pluginConfigValue,
+            Class<T> pluginInterface,
+            Function<T, ConfigDef> configDefAccessor,
+            String pluginName,
+            String pluginProperty,
+            Map<String, String> defaultProperties
+    ) {
+        Objects.requireNonNull(connectorConfig);
+        Objects.requireNonNull(pluginInterface);
+        Objects.requireNonNull(configDefAccessor);
+        Objects.requireNonNull(pluginName);
+        Objects.requireNonNull(pluginProperty);
+
+        String pluginClass = connectorConfig.get(pluginProperty);
+
+        if (pluginClass == null
+                || pluginConfigValue == null
+                || !pluginConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom converter was specified, or one was specified 
but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }
+
+        T pluginInstance;
+        try {
+            pluginInstance = Utils.newInstance(pluginClass, pluginInterface);

Review Comment:
   I think this is actually correct. All calls to `validateConverterConfig` 
take place within a `LoaderSwap` that causes the connector's classloader to be 
used, which unless I'm mistaken matches the behavior when instantiating tasks 
(loader swap 
[here](https://github.com/apache/kafka/blob/4825c89d14e5f1b2da7e1f48dac97888602028d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L654),
 converter instantiation 
[here](https://github.com/apache/kafka/blob/4825c89d14e5f1b2da7e1f48dac97888602028d7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L666-L674)).
 It's true that `Plugins::newConverter` and `Plugins::newHeaderConverter` are 
used instead of `Utils::newInstance` when starting tasks, but when invoking the 
`Plugins` methods with `classLoaderUsage` set to `CURRENT_CLASSLOADER`, no 
classloader swapping takes place, so the connector loader is still used.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector
         return configDef.validateAll(config);
     }
 
+    /**
+     * General-purpose validation logic for converters that are configured 
directly
+     * in a connector config (as opposed to inherited from the worker config).
+     * @param connectorConfig the configuration for the connector; may not be 
null
+     * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+     *                          may be null, in which case no validation will 
be performed under the assumption that the
+     *                          connector will use inherit the converter 
settings from the worker
+     * @param pluginInterface the interface for the plugin type
+     *                        (e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+     *                        may not be null
+     * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+     *                          from an instance of the plugin type (e.g., 
{@code Converter::config});
+     *                          may not be null
+     * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+     *                   may not be null
+     * @param pluginProperty the property used to define a custom class for 
the plugin type
+     *                       in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+     *                       may not be null
+     * @param defaultProperties any default properties to include in the 
configuration that will be used for
+     *                          the plugin; may be null
+
+     * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+     * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+     * config)
+
+     * @param <T> the plugin class to perform validation for
+     */
+    private <T> ConfigInfos validateConverterConfig(
+            Map<String, String> connectorConfig,
+            ConfigValue pluginConfigValue,
+            Class<T> pluginInterface,
+            Function<T, ConfigDef> configDefAccessor,
+            String pluginName,
+            String pluginProperty,
+            Map<String, String> defaultProperties
+    ) {
+        Objects.requireNonNull(connectorConfig);
+        Objects.requireNonNull(pluginInterface);
+        Objects.requireNonNull(configDefAccessor);
+        Objects.requireNonNull(pluginName);
+        Objects.requireNonNull(pluginProperty);
+
+        String pluginClass = connectorConfig.get(pluginProperty);
+
+        if (pluginClass == null
+                || pluginConfigValue == null
+                || !pluginConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom converter was specified, or one was specified 
but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }
+
+        T pluginInstance;
+        try {
+            pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+            pluginConfigValue.addErrorMessage("Failed to load class " + 
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));

Review Comment:
   Ack, added a note to the Javadoc.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector
         return configDef.validateAll(config);
     }
 
+    /**
+     * General-purpose validation logic for converters that are configured 
directly
+     * in a connector config (as opposed to inherited from the worker config).
+     * @param connectorConfig the configuration for the connector; may not be 
null
+     * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+     *                          may be null, in which case no validation will 
be performed under the assumption that the
+     *                          connector will use inherit the converter 
settings from the worker
+     * @param pluginInterface the interface for the plugin type
+     *                        (e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+     *                        may not be null
+     * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+     *                          from an instance of the plugin type (e.g., 
{@code Converter::config});
+     *                          may not be null
+     * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+     *                   may not be null
+     * @param pluginProperty the property used to define a custom class for 
the plugin type
+     *                       in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+     *                       may not be null
+     * @param defaultProperties any default properties to include in the 
configuration that will be used for
+     *                          the plugin; may be null
+
+     * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+     * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+     * config)
+
+     * @param <T> the plugin class to perform validation for
+     */
+    private <T> ConfigInfos validateConverterConfig(
+            Map<String, String> connectorConfig,
+            ConfigValue pluginConfigValue,
+            Class<T> pluginInterface,
+            Function<T, ConfigDef> configDefAccessor,
+            String pluginName,
+            String pluginProperty,
+            Map<String, String> defaultProperties
+    ) {
+        Objects.requireNonNull(connectorConfig);
+        Objects.requireNonNull(pluginInterface);
+        Objects.requireNonNull(configDefAccessor);
+        Objects.requireNonNull(pluginName);
+        Objects.requireNonNull(pluginProperty);
+
+        String pluginClass = connectorConfig.get(pluginProperty);
+
+        if (pluginClass == null
+                || pluginConfigValue == null
+                || !pluginConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom converter was specified, or one was specified 
but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }
+
+        T pluginInstance;
+        try {
+            pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+            pluginConfigValue.addErrorMessage("Failed to load class " + 
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+            return null;
+        }
+
+        try {
+            ConfigDef configDef;
+            try {
+                configDef = configDefAccessor.apply(pluginInstance);
+            } catch (RuntimeException e) {
+                log.error("Failed to load ConfigDef from {} of type {}", 
pluginName, pluginClass, e);
+                pluginConfigValue.addErrorMessage("Failed to load ConfigDef 
from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+                return null;
+            }
+            if (configDef == null) {
+                log.warn("{}.config() has returned a null ConfigDef; no 
further preflight config validation for this converter will be performed", 
pluginClass);
+                // Older versions of Connect didn't do any converter 
validation.
+                // Even though converters are technically required to return a 
non-null ConfigDef object from their config() method,
+                // we permit this case in order to avoid breaking existing 
converters that, despite not adhering to this requirement,
+                // can be used successfully with a connector.
+                return null;
+            }
+            final String pluginPrefix = pluginProperty + ".";
+            Map<String, String> pluginConfig = 
connectorConfig.entrySet().stream()

Review Comment:
   Good call, done 👍



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -392,6 +399,146 @@ protected Map<String, ConfigValue> 
validateSourceConnectorConfig(SourceConnector
         return configDef.validateAll(config);
     }
 
+    /**
+     * General-purpose validation logic for converters that are configured 
directly
+     * in a connector config (as opposed to inherited from the worker config).
+     * @param connectorConfig the configuration for the connector; may not be 
null
+     * @param pluginConfigValue the {@link ConfigValue} for the converter 
property in the connector config;
+     *                          may be null, in which case no validation will 
be performed under the assumption that the
+     *                          connector will use inherit the converter 
settings from the worker
+     * @param pluginInterface the interface for the plugin type
+     *                        (e.g., {@code 
org.apache.kafka.connect.storage.Converter.class});
+     *                        may not be null
+     * @param configDefAccessor an accessor that can be used to retrieve a 
{@link ConfigDef}
+     *                          from an instance of the plugin type (e.g., 
{@code Converter::config});
+     *                          may not be null
+     * @param pluginName a lowercase, human-readable name for the type of 
plugin (e.g., {@code "key converter"});
+     *                   may not be null
+     * @param pluginProperty the property used to define a custom class for 
the plugin type
+     *                       in a connector config (e.g., {@link 
ConnectorConfig#KEY_CONVERTER_CLASS_CONFIG});
+     *                       may not be null
+     * @param defaultProperties any default properties to include in the 
configuration that will be used for
+     *                          the plugin; may be null
+
+     * @return a {@link ConfigInfos} object containing validation results for 
the plugin in the connector config,
+     * or null if no custom validation was performed (possibly because no 
custom plugin was defined in the connector
+     * config)
+
+     * @param <T> the plugin class to perform validation for
+     */
+    private <T> ConfigInfos validateConverterConfig(
+            Map<String, String> connectorConfig,
+            ConfigValue pluginConfigValue,
+            Class<T> pluginInterface,
+            Function<T, ConfigDef> configDefAccessor,
+            String pluginName,
+            String pluginProperty,
+            Map<String, String> defaultProperties
+    ) {
+        Objects.requireNonNull(connectorConfig);
+        Objects.requireNonNull(pluginInterface);
+        Objects.requireNonNull(configDefAccessor);
+        Objects.requireNonNull(pluginName);
+        Objects.requireNonNull(pluginProperty);
+
+        String pluginClass = connectorConfig.get(pluginProperty);
+
+        if (pluginClass == null
+                || pluginConfigValue == null
+                || !pluginConfigValue.errorMessages().isEmpty()
+        ) {
+            // Either no custom converter was specified, or one was specified 
but there's a problem with it.
+            // No need to proceed any further.
+            return null;
+        }
+
+        T pluginInstance;
+        try {
+            pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+        } catch (ClassNotFoundException | RuntimeException e) {
+            log.error("Failed to instantiate {} class {}; this should have 
been caught by prior validation logic", pluginName, pluginClass, e);
+            pluginConfigValue.addErrorMessage("Failed to load class " + 
pluginClass + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+            return null;
+        }
+
+        try {
+            ConfigDef configDef;
+            try {
+                configDef = configDefAccessor.apply(pluginInstance);
+            } catch (RuntimeException e) {
+                log.error("Failed to load ConfigDef from {} of type {}", 
pluginName, pluginClass, e);
+                pluginConfigValue.addErrorMessage("Failed to load ConfigDef 
from " + pluginName + (e.getMessage() != null ? ": " + e.getMessage() : ""));
+                return null;
+            }
+            if (configDef == null) {
+                log.warn("{}.config() has returned a null ConfigDef; no 
further preflight config validation for this converter will be performed", 
pluginClass);
+                // Older versions of Connect didn't do any converter 
validation.
+                // Even though converters are technically required to return a 
non-null ConfigDef object from their config() method,
+                // we permit this case in order to avoid breaking existing 
converters that, despite not adhering to this requirement,
+                // can be used successfully with a connector.
+                return null;
+            }
+            final String pluginPrefix = pluginProperty + ".";
+            Map<String, String> pluginConfig = 
connectorConfig.entrySet().stream()
+                    .filter(e -> e.getKey().startsWith(pluginPrefix))
+                    .collect(Collectors.toMap(
+                            e -> e.getKey().substring(pluginPrefix.length()),
+                            Map.Entry::getValue
+                    ));
+            if (defaultProperties != null)
+                defaultProperties.forEach(pluginConfig::putIfAbsent);
+
+            List<ConfigValue> configValues;
+            try {
+                configValues = configDef.validate(pluginConfig);
+            } catch (RuntimeException e) {

Review Comment:
   I don't understand this suggestion. When could a checked exception be thrown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to