This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3af1396  KAFKA-6503: Parallelize plugin scanning
3af1396 is described below

commit 3af13967db089b9a8320b539f5d5d218488ce467
Author: Robert Yokota <rayok...@gmail.com>
AuthorDate: Wed Feb 14 16:24:05 2018 -0800

    KAFKA-6503: Parallelize plugin scanning
    
    This is a small change to parallelize plugin scanning.  This may help in 
some environments where otherwise plugin scanning is slow.
    
    Author: Robert Yokota <rayok...@gmail.com>
    
    Reviewers: Konstantine Karantasis <konstant...@confluent.io>, Randall Hauch 
<rha...@gmail.com>, Ewen Cheslack-Postava <e...@confluent.io>
    
    Closes #4561 from rayokota/K6503-improve-plugin-scanning
---
 .../runtime/isolation/DelegatingClassLoader.java   | 29 +++++++++++++++++++++-
 1 file changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 345d7ef..b21cdcb 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -20,7 +20,10 @@ import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
+import org.reflections.Configuration;
 import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
 import org.reflections.util.ClasspathHelper;
 import org.reflections.util.ConfigurationBuilder;
 import org.slf4j.Logger;
@@ -269,7 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader {
         ConfigurationBuilder builder = new ConfigurationBuilder();
         builder.setClassLoaders(new ClassLoader[]{loader});
         builder.addUrls(urls);
-        Reflections reflections = new Reflections(builder);
+        builder.setScanners(new SubTypesScanner());
+        builder.setExpandSuperTypes(false);
+        builder.useParallelExecutor();
+        Reflections reflections = new InternalReflections(builder);
 
         return new PluginScanResult(
                 getPluginDesc(reflections, Connector.class, loader),
@@ -353,4 +359,25 @@ public class DelegatingClassLoader extends URLClassLoader {
             }
         }
     }
+
+    private static class InternalReflections extends Reflections {
+
+        public InternalReflections(Configuration configuration) {
+            super(configuration);
+        }
+
+        // When Reflections is used for parallel scans, it has a bug where it 
propagates ReflectionsException
+        // as RuntimeException.  Override the scan behavior to emulate the 
singled-threaded logic.
+        @Override
+        protected void scan(URL url) {
+            try {
+                super.scan(url);
+            } catch (ReflectionsException e) {
+                Logger log = Reflections.log;
+                if (log != null && log.isWarnEnabled()) {
+                    log.warn("could not create Vfs.Dir from url. ignoring the 
exception and continuing", e);
+                }
+            }
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
ewe...@apache.org.

Reply via email to