PARADOXST commented on code in PR #16604:
URL: https://github.com/apache/kafka/pull/16604#discussion_r1697060180
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -77,53 +79,59 @@ private static <T> String versionFor(Class<? extends T>
pluginKlass) throws Refl
@Override
protected PluginScanResult scanPlugins(PluginSource source) {
- ConfigurationBuilder builder = new ConfigurationBuilder();
- builder.setClassLoaders(new ClassLoader[]{source.loader()});
- builder.addUrls(source.urls());
- builder.setScanners(Scanners.SubTypes);
- builder.setParallel(true);
- Reflections reflections = new Reflections(builder);
-
- return new PluginScanResult(
- getPluginDesc(reflections, PluginType.SINK, source),
- getPluginDesc(reflections, PluginType.SOURCE, source),
- getPluginDesc(reflections, PluginType.CONVERTER, source),
- getPluginDesc(reflections, PluginType.HEADER_CONVERTER,
source),
- getTransformationPluginDesc(source, reflections),
- getPredicatePluginDesc(source, reflections),
- getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
- getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
-
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY,
source)
- );
+ Set<URL> urls = new HashSet<>();
Review Comment:
Hmm, I am not too sure how to fix that, any suggestions?
##########
gradle/dependencies.gradle:
##########
@@ -174,8 +175,10 @@ libs += [
argparse4j: "net.sourceforge.argparse4j:argparse4j:$versions.argparse4j",
bcpkix: "org.bouncycastle:bcpkix-jdk18on:$versions.bcpkix",
caffeine: "com.github.ben-manes.caffeine:caffeine:$versions.caffeine",
+ classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
commonsCli: "commons-cli:commons-cli:$versions.commonsCli",
commonsValidator:
"commons-validator:commons-validator:$versions.commonsValidator",
+ guava: "com.google.guava:guava:$versions.guava",
Review Comment:
Removed. This seems to be from a prior testing.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -77,53 +79,59 @@ private static <T> String versionFor(Class<? extends T>
pluginKlass) throws Refl
@Override
protected PluginScanResult scanPlugins(PluginSource source) {
- ConfigurationBuilder builder = new ConfigurationBuilder();
- builder.setClassLoaders(new ClassLoader[]{source.loader()});
- builder.addUrls(source.urls());
- builder.setScanners(Scanners.SubTypes);
- builder.setParallel(true);
- Reflections reflections = new Reflections(builder);
-
- return new PluginScanResult(
- getPluginDesc(reflections, PluginType.SINK, source),
- getPluginDesc(reflections, PluginType.SOURCE, source),
- getPluginDesc(reflections, PluginType.CONVERTER, source),
- getPluginDesc(reflections, PluginType.HEADER_CONVERTER,
source),
- getTransformationPluginDesc(source, reflections),
- getPredicatePluginDesc(source, reflections),
- getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
- getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
-
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY,
source)
- );
+ Set<URL> urls = new HashSet<>();
+ Collections.addAll(urls, source.urls());
+ ClassGraph classGraphBuilder = new ClassGraph()
+ .addClassLoader(source.loader())
+ .filterClasspathElementsByURL(urls::contains)
+ .enableExternalClasses()
+ .enableAllInfo();
+ try (ScanResult classGraph = classGraphBuilder.scan()) {
+ return new PluginScanResult(
+ getPluginDesc(classGraph, PluginType.SINK, source),
+ getPluginDesc(classGraph, PluginType.SOURCE, source),
+ getPluginDesc(classGraph, PluginType.CONVERTER, source),
+ getPluginDesc(classGraph, PluginType.HEADER_CONVERTER,
source),
+ getTransformationPluginDesc(source, classGraph),
+ getPredicatePluginDesc(source, classGraph),
+ getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER,
source),
+ getServiceLoaderPluginDesc(PluginType.REST_EXTENSION,
source),
+
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY,
source)
+ );
+ }
}
@SuppressWarnings({"unchecked"})
- private SortedSet<PluginDesc<Predicate<?>>>
getPredicatePluginDesc(PluginSource source, Reflections reflections) {
- return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>)
getPluginDesc(reflections, PluginType.PREDICATE, source);
+ private SortedSet<PluginDesc<Predicate<?>>>
getPredicatePluginDesc(PluginSource source, ScanResult classGraph) {
+ return (SortedSet<PluginDesc<Predicate<?>>>) (SortedSet<?>)
getPluginDesc(classGraph, PluginType.PREDICATE, source);
}
@SuppressWarnings({"unchecked"})
- private SortedSet<PluginDesc<Transformation<?>>>
getTransformationPluginDesc(PluginSource source, Reflections reflections) {
- return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>)
getPluginDesc(reflections, PluginType.TRANSFORMATION, source);
+ private SortedSet<PluginDesc<Transformation<?>>>
getTransformationPluginDesc(PluginSource source, ScanResult classGraph) {
+ return (SortedSet<PluginDesc<Transformation<?>>>) (SortedSet<?>)
getPluginDesc(classGraph, PluginType.TRANSFORMATION, source);
}
@SuppressWarnings({"unchecked"})
private <T> SortedSet<PluginDesc<T>> getPluginDesc(
- Reflections reflections,
+ ScanResult classGraph,
PluginType type,
PluginSource source
) {
- Set<Class<? extends T>> plugins;
+ ClassInfoList plugins;
+ Class<T> kclass = (Class<T>) type.superClass();
try {
- plugins = reflections.getSubTypesOf((Class<T>) type.superClass());
- } catch (ReflectionsException e) {
+ plugins = classGraph.getSubclasses(kclass.getName());
+ if (plugins.isEmpty()) {
Review Comment:
Done.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java:
##########
@@ -77,53 +79,59 @@ private static <T> String versionFor(Class<? extends T>
pluginKlass) throws Refl
@Override
protected PluginScanResult scanPlugins(PluginSource source) {
- ConfigurationBuilder builder = new ConfigurationBuilder();
- builder.setClassLoaders(new ClassLoader[]{source.loader()});
- builder.addUrls(source.urls());
- builder.setScanners(Scanners.SubTypes);
- builder.setParallel(true);
- Reflections reflections = new Reflections(builder);
-
- return new PluginScanResult(
- getPluginDesc(reflections, PluginType.SINK, source),
- getPluginDesc(reflections, PluginType.SOURCE, source),
- getPluginDesc(reflections, PluginType.CONVERTER, source),
- getPluginDesc(reflections, PluginType.HEADER_CONVERTER,
source),
- getTransformationPluginDesc(source, reflections),
- getPredicatePluginDesc(source, reflections),
- getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source),
- getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source),
-
getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY,
source)
- );
+ Set<URL> urls = new HashSet<>();
+ Collections.addAll(urls, source.urls());
+ ClassGraph classGraphBuilder = new ClassGraph()
+ .addClassLoader(source.loader())
+ .filterClasspathElementsByURL(urls::contains)
Review Comment:
From my reading on [this
function](https://github.com/classgraph/classgraph/blob/8bfaa5776f22ca45ff8cc1c552d55510e754b2dd/src/main/java/io/github/classgraph/ClassGraph.java#L581)
it seems to only scan the classes matching the URLs rather than filter them
after the effect.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java:
##########
@@ -453,4 +457,58 @@ private static class DirectoryEntry {
}
}
+ private static Collection<URL> forJavaClassPath() {
+ Collection<URL> urls = new ArrayList<URL>();
+ String javaClassPath = System.getProperty("java.class.path");
+ if (javaClassPath != null) {
+ for (String path : javaClassPath.split(File.pathSeparator)) {
+ try {
+ urls.add(new File(path).toURI().toURL());
+ } catch (Exception e) {
+ log.debug("Could not get URL", e);
+ }
+ }
+ }
+ return distinctUrls(urls);
+ }
+
+ private static Collection<URL> forClassLoader(ClassLoader... classLoaders)
{
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]