gharris1727 commented on code in PR #17741:
URL: https://github.com/apache/kafka/pull/17741#discussion_r1868543459


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -661,25 +682,35 @@ ConfigInfos validateConnectorConfig(
             }
         }
         String connType = 
connectorProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        VersionRange connVersion = null;
+        try {
+            connVersion = 
PluginVersionUtils.connectorVersionRequirement(connectorProps.get(CONNECTOR_VERSION));
+        } catch (Exception e) {
+            throw new BadRequestException(e.getMessage());
+        }
+
         if (connType == null)
             throw new BadRequestException("Connector config " + connectorProps 
+ " contains no connector type");
 
-        Connector connector = getConnector(connType);
-        ClassLoader connectorLoader = plugins().connectorLoader(connType);
+        Connector connector = getConnector(connType, connVersion);
+        ClassLoader connectorLoader = connector.getClass().getClassLoader();

Review Comment:
   This gets the incorrect loader, see 
https://github.com/apache/kafka/pull/16984#discussion_r1868461401



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/PluginVersionUtils.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class PluginVersionUtils {
+
+    private static Plugins plugins = null;

Review Comment:
   You need to find a way to make this non-static; There are routinely multiple 
Plugins instances in use within the same JVM, which would immediately collide 
on this field.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -199,33 +234,37 @@ public Object get(String key) {
         }
     }
 
+    public static ConfigDef BASE_CONFIGS = new ConfigDef()

Review Comment:
   Don't make this a static field, it leaves opportunities for it to get 
mutated accidentally.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/util/PluginVersionUtils.java:
##########
@@ -0,0 +1,186 @@
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorType;
+import 
org.apache.maven.artifact.versioning.InvalidVersionSpecificationException;
+import org.apache.maven.artifact.versioning.VersionRange;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class PluginVersionUtils {
+
+    private static Plugins plugins = null;
+
+    public static void setPlugins(Plugins plugins) {
+        PluginVersionUtils.plugins = plugins;
+    }
+
+    public static VersionRange connectorVersionRequirement(String version) 
throws InvalidVersionSpecificationException {
+        if (version == null || version.equals("latest")) {
+            return null;
+        }
+        version = version.trim();
+
+        // check first if the given version is valid
+        VersionRange.createFromVersionSpec(version);
+
+        // now if the version is not enclosed we consider it as a hard 
requirement and enclose it in []
+        if (!version.startsWith("[") && !version.startsWith("(")) {
+            version = "[" + version + "]";
+        }

Review Comment:
   I think a more robust check for this would be to check if the 
recommendedVersion is non-null. (or i think equivalently, hasRestrictions)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java:
##########
@@ -411,13 +449,149 @@ protected ConfigDef config(Predicate<?> predicate) {
         return newDef;
     }
 
+    public static void updateDefaults(ConfigDef configDef, Plugins plugins, 
Map<String, String> connProps, Map<String, String> workerProps) {
+        updateAllConverterDefaults(configDef, plugins, connProps, workerProps);
+        updateConnectorVersionDefaults(configDef, plugins, 
connProps.get(CONNECTOR_CLASS_CONFIG));
+    }
+
+    public static void updateConnectorVersionDefaults(ConfigDef configDef, 
Plugins plugins, String connectorClass) {
+        // if provided connector version is null, the latest version is used
+        updateKeyDefault(configDef, ConnectorConfig.CONNECTOR_VERSION, 
plugins.latestVersion(connectorClass));
+    }
+
+    public static void updateAllConverterDefaults(ConfigDef configDef, Plugins 
plugins,
+                                                        Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateKeyConverterDefault(configDef, plugins, connProps, workerProps);
+        updateValueConverterDefault(configDef, plugins, connProps, 
workerProps);
+        updateHeaderConverterDefault(configDef, plugins, connProps, 
workerProps);
+    }
+
+    public static void updateKeyConverterDefault(ConfigDef configDef, Plugins 
plugins,
+                                                        Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                KEY_CONVERTER_CLASS_CONFIG, 
WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
+                KEY_CONVERTER_VERSION_CONFIG, 
WorkerConfig.KEY_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    public static void updateValueConverterDefault(ConfigDef configDef, 
Plugins plugins,
+                                                          Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                VALUE_CONVERTER_CLASS_CONFIG, 
WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
+                VALUE_CONVERTER_VERSION_CONFIG, 
WorkerConfig.VALUE_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    public static void updateHeaderConverterDefault(ConfigDef configDef, 
Plugins plugins,
+                                                           Map<String, String> 
connProps, Map<String, String> workerProps) {
+        updateConverterDefaults(
+                configDef, plugins,
+                HEADER_CONVERTER_CLASS_CONFIG, 
WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                HEADER_CONVERTER_VERSION_CONFIG, 
WorkerConfig.HEADER_CONVERTER_VERSION, connProps, workerProps
+        );
+    }
+
+    private static void updateConverterDefaults(

Review Comment:
   I think this "updating defaults" pattern is messy.
   I would much rather the ConnectorConfig ConfigDef depend on Plugins (like 
EnrichedConnectorConfig) and either define the no-default or with-default 
variants separately.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -462,11 +459,26 @@ private <T> ConfigInfos validateConverterConfig(
         T pluginInstance;
         String stageDescription = "instantiating the connector's " + 
pluginName + " for validation";
         try (TemporaryStage stage = reportStage.apply(stageDescription)) {
-            pluginInstance = Utils.newInstance(pluginClass, pluginInterface);
+            VersionRange range = 
PluginVersionUtils.connectorVersionRequirement(pluginVersion);
+            // utils.newInstance is done when no version is provided to 
preserve older behaviour prior to multi-versioning support
+            // utils.newInstance will try and load the class from the current 
classloader (which is the connector classloader) before
+            // delegating plugin loading mechanism of the worker, while 
plugins().newPlugin will always use the worker's plugin loading mechanism
+            // in cases where the converter is packaged with the connector 
package and no version is provided , utils.newInstance
+            // will choose the converter from the connector, while 
plugins().newPlugin will choose the latest converter version found while plugin 
loading.
+            pluginInstance = range == null ? Utils.newInstance(pluginClass, 
pluginInterface): (T) plugins().newPlugin(pluginClass, range);

Review Comment:
   I think this detail is inappropriate in the AbstractHerder, and the 
classloader choosing logic and etc is more appropriate to handle in the Plugins 
class.
   
   You can make range == null request the CURRENT_CLASSLOADER for legacy 
compatibility.
   
   Also i've been trying to eliminate Utils.newInstance in Connect for a while, 
so don't be afraid of removing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to