gharris1727 commented on code in PR #16604:
URL: https://github.com/apache/kafka/pull/16604#discussion_r1696102323


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -453,4 +457,58 @@ private static class DirectoryEntry {
         }
     }
 
+    private static Collection<URL> forJavaClassPath() {
+        Collection<URL> urls = new ArrayList<URL>();
+        String javaClassPath = System.getProperty("java.class.path");
+        if (javaClassPath != null) {
+            for (String path : javaClassPath.split(File.pathSeparator)) {
+                try {
+                    urls.add(new File(path).toURI().toURL());
+                } catch (Exception e) {
+                    log.debug("Could not get URL", e);
+                }
+            }
+        }
+        return distinctUrls(urls);
+    }
+    
+    private static Collection<URL> forClassLoader(ClassLoader... classLoaders) 
{

Review Comment:
   nit: Could you simplify this logic to be non-variadic, since we only ever 
call it with one classLoader?



##########
gradle/dependencies.gradle:
##########
@@ -174,8 +175,10 @@ libs += [
   argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
   bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix",
   caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
+  classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
   commonsCli: "commons-cli:commons-cli:$versions.commonsCli",
   commonsValidator: 
"commons-validator:commons-validator:$versions.commonsValidator",
+  guava: "com.google.guava:guava:$versions.guava",

Review Comment:
   What is the guava dependency for? I don't see any relevant imports, and 
classgraph doesn't appear to depend on it.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -453,4 +457,58 @@ private static class DirectoryEntry {
         }
     }
 
+    private static Collection<URL> forJavaClassPath() {
+        Collection<URL> urls = new ArrayList<URL>();

Review Comment:
   nit: The second `<URL>` is not required and can be `<>`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -77,53 +79,59 @@ private static <T> String versionFor(Class<? extends T> 
pluginKlass) throws Refl
 
     @Override
     protected PluginScanResult scanPlugins(PluginSource source) {
-        ConfigurationBuilder builder = new ConfigurationBuilder();
-        builder.setClassLoaders(new ClassLoader[]{source.loader()});
-        builder.addUrls(source.urls());
-        builder.setScanners(Scanners.SubTypes);
-        builder.setParallel(true);
-        Reflections reflections = new Reflections(builder);
-
-        return new PluginScanResult(
-                getPluginDesc(reflections, PluginType.SINK, source),
-                getPluginDesc(reflections, PluginType.SOURCE, source),
-                getPluginDesc(reflections, PluginType.CONVERTER, source),
-                getPluginDesc(reflections, PluginType.HEADER_CONVERTER, 
source),
-                getTransformationPluginDesc(source, reflections),
-                getPredicatePluginDesc(source, reflections),
-                getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
-                getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
-                
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, 
source)
-        );
+        Set<URL> urls = new HashSet<>();

Review Comment:
   My IDE warns me that `Set<URL>` may make network calls when equality is 
checked.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -77,53 +79,59 @@ private static <T> String versionFor(Class<? extends T> 
pluginKlass) throws Refl
 
     @Override
     protected PluginScanResult scanPlugins(PluginSource source) {
-        ConfigurationBuilder builder = new ConfigurationBuilder();
-        builder.setClassLoaders(new ClassLoader[]{source.loader()});
-        builder.addUrls(source.urls());
-        builder.setScanners(Scanners.SubTypes);
-        builder.setParallel(true);
-        Reflections reflections = new Reflections(builder);
-
-        return new PluginScanResult(
-                getPluginDesc(reflections, PluginType.SINK, source),
-                getPluginDesc(reflections, PluginType.SOURCE, source),
-                getPluginDesc(reflections, PluginType.CONVERTER, source),
-                getPluginDesc(reflections, PluginType.HEADER_CONVERTER, 
source),
-                getTransformationPluginDesc(source, reflections),
-                getPredicatePluginDesc(source, reflections),
-                getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
-                getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
-                
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, 
source)
-        );
+        Set<URL> urls = new HashSet<>();
+        Collections.addAll(urls, source.urls());
+        ClassGraph classGraphBuilder = new ClassGraph()
+                .addClassLoader(source.loader())
+                .filterClasspathElementsByURL(urls::contains)
+                .enableExternalClasses()
+                .enableAllInfo();
+        try (ScanResult classGraph = classGraphBuilder.scan()) {
+            return new PluginScanResult(
+                  getPluginDesc(classGraph, PluginType.SINK, source),
+                  getPluginDesc(classGraph, PluginType.SOURCE, source),
+                  getPluginDesc(classGraph, PluginType.CONVERTER, source),
+                  getPluginDesc(classGraph, PluginType.HEADER_CONVERTER, 
source),
+                  getTransformationPluginDesc(source, classGraph),
+                  getPredicatePluginDesc(source, classGraph),
+                  getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, 
source),
+                  getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, 
source),
+                  
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, 
source)
+          );
+        }
     }
 
     @SuppressWarnings({"unchecked"})
-    private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(PluginSource source, Reflections reflections) {
-        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, PluginType.PREDICATE, source);
+    private SortedSet<PluginDesc<Predicate<?>>> 
getPredicatePluginDesc(PluginSource source, ScanResult classGraph) {
+        return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>) 
getPluginDesc(classGraph, PluginType.PREDICATE, source);
     }
 
     @SuppressWarnings({"unchecked"})
-    private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(PluginSource source, Reflections reflections) {
-        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(reflections, PluginType.TRANSFORMATION, source);
+    private SortedSet<PluginDesc<Transformation<?>>> 
getTransformationPluginDesc(PluginSource source, ScanResult classGraph) {
+        return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>) 
getPluginDesc(classGraph, PluginType.TRANSFORMATION, source);
     }
 
     @SuppressWarnings({"unchecked"})
     private <T> SortedSet<PluginDesc<T>> getPluginDesc(
-            Reflections reflections,
+            ScanResult classGraph,
             PluginType type,
             PluginSource source
     ) {
-        Set<Class<? extends T>> plugins;
+        ClassInfoList plugins;
+        Class<T> kclass = (Class<T>) type.superClass();
         try {
-            plugins = reflections.getSubTypesOf((Class<T>) type.superClass());
-        } catch (ReflectionsException e) {
+            plugins = classGraph.getSubclasses(kclass.getName());
+            if (plugins.isEmpty()) {

Review Comment:
   nit: You could check kclass.isInterface() first rather than calling the 
wrong method first.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -77,53 +79,59 @@ private static <T> String versionFor(Class<? extends T> 
pluginKlass) throws Refl
 
     @Override
     protected PluginScanResult scanPlugins(PluginSource source) {
-        ConfigurationBuilder builder = new ConfigurationBuilder();
-        builder.setClassLoaders(new ClassLoader[]{source.loader()});
-        builder.addUrls(source.urls());
-        builder.setScanners(Scanners.SubTypes);
-        builder.setParallel(true);
-        Reflections reflections = new Reflections(builder);
-
-        return new PluginScanResult(
-                getPluginDesc(reflections, PluginType.SINK, source),
-                getPluginDesc(reflections, PluginType.SOURCE, source),
-                getPluginDesc(reflections, PluginType.CONVERTER, source),
-                getPluginDesc(reflections, PluginType.HEADER_CONVERTER, 
source),
-                getTransformationPluginDesc(source, reflections),
-                getPredicatePluginDesc(source, reflections),
-                getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
-                getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
-                
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, 
source)
-        );
+        Set<URL> urls = new HashSet<>();
+        Collections.addAll(urls, source.urls());
+        ClassGraph classGraphBuilder = new ClassGraph()
+                .addClassLoader(source.loader())
+                .filterClasspathElementsByURL(urls::contains)

Review Comment:
   Is ClassGraph scanning all of the classes in the parent/classpath loader 
too, requiring this filter? Is there a way to avoid that, because that means 
classpath scanning would happen N times for each plugin location.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to