This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push: new 7723293 KAFKA-6288: Broken symlink interrupts scanning of the plugin path 7723293 is described below commit 772329358216434368db1e92bd8849f7ed728838 Author: Konstantine Karantasis <konstant...@confluent.io> AuthorDate: Sun Feb 4 14:57:25 2018 -0800 KAFKA-6288: Broken symlink interrupts scanning of the plugin path Submitting a fail safe fix for rare IOExceptions on symbolic links. The fix is submitted without a test case since it does seem easy to reproduce such type of failures (just having a broken symbolic link does not reproduce the issue) and it's considered pretty low risk. If accepted, needs to be ported at least to 1.0, if not 0.11 Author: Konstantine Karantasis <konstant...@confluent.io> Reviewers: Randall Hauch <rha...@gmail.com>, Ewen Cheslack-Postava <e...@confluent.io> Closes #4481 from kkonstantine/KAFKA-6288-Broken-symlink-interrupts-scanning-the-plugin-path (cherry picked from commit 17aaff3606393b42d4e8ef5299141b5bb21300b0) Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org> --- checkstyle/suppressions.xml | 2 +- .../runtime/isolation/DelegatingClassLoader.java | 29 +++++++++------- .../connect/runtime/isolation/PluginUtils.java | 39 ++++++++++++++-------- 3 files changed, 44 insertions(+), 26 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f06155f..de1bdfd 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -57,7 +57,7 @@ files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/> <suppress checks="NPathComplexity" - files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values).java"/> + files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values|PluginUtils).java"/> <!-- clients tests --> <suppress checks="ClassDataAbstractionCoupling" diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index 0c133b9..8a44d4d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -52,6 +52,7 @@ import java.util.TreeSet; public class DelegatingClassLoader extends URLClassLoader { private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); + private static final String CLASSPATH_NAME = "classpath"; private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders; private final Map<String, String> aliases; @@ -139,10 +140,23 @@ public class DelegatingClassLoader extends URLClassLoader { } protected void initLoaders() { - String path = null; + for (String configPath : pluginPaths) { + initPluginLoader(configPath); + } + // Finally add parent/system loader. + initPluginLoader(CLASSPATH_NAME); + addAllAliases(); + } + + private void initPluginLoader(String path) { try { - for (String configPath : pluginPaths) { - path = configPath; + if (CLASSPATH_NAME.equals(path)) { + scanUrlsAndAddPlugins( + getParent(), + ClasspathHelper.forJavaClassPath().toArray(new URL[0]), + null + ); + } else { Path pluginPath = Paths.get(path).toAbsolutePath(); // Update for exception handling path = pluginPath.toString(); @@ -156,14 +170,6 @@ public class DelegatingClassLoader extends URLClassLoader { registerPlugin(pluginPath); } } - - path = "classpath"; - // Finally add parent/system loader. - scanUrlsAndAddPlugins( - getParent(), - ClasspathHelper.forJavaClassPath().toArray(new URL[0]), - null - ); } catch (InvalidPathException | MalformedURLException e) { log.error("Invalid path in plugin path: {}. Ignoring.", path, e); } catch (IOException e) { @@ -171,7 +177,6 @@ public class DelegatingClassLoader extends URLClassLoader { } catch (InstantiationException | IllegalAccessException e) { log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e); } - addAllAliases(); } private void registerPlugin(Path pluginLocation) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index d85986e..d490bde 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -133,7 +133,7 @@ public class PluginUtils { private static final DirectoryStream.Filter<Path> PLUGIN_PATH_FILTER = new DirectoryStream .Filter<Path>() { @Override - public boolean accept(Path path) throws IOException { + public boolean accept(Path path) { return Files.isDirectory(path) || isArchive(path) || isClassFile(path); } }; @@ -232,16 +232,29 @@ public class PluginUtils { Path adjacent = neighbors.next(); if (Files.isSymbolicLink(adjacent)) { - Path symlink = Files.readSymbolicLink(adjacent); - // if symlink is absolute resolve() returns the absolute symlink itself - Path parent = adjacent.getParent(); - if (parent == null) { - continue; - } - Path absolute = parent.resolve(symlink).toRealPath(); - if (Files.exists(absolute)) { - adjacent = absolute; - } else { + try { + Path symlink = Files.readSymbolicLink(adjacent); + // if symlink is absolute resolve() returns the absolute symlink itself + Path parent = adjacent.getParent(); + if (parent == null) { + continue; + } + Path absolute = parent.resolve(symlink).toRealPath(); + if (Files.exists(absolute)) { + adjacent = absolute; + } else { + continue; + } + } catch (IOException e) { + // See https://issues.apache.org/jira/browse/KAFKA-6288 for a reported + // failure. Such a failure at this stage is not easily reproducible and + // therefore an exception is caught and ignored after issuing a + // warning. This allows class scanning to continue for non-broken plugins. + log.warn( + "Resolving symbolic link '{}' failed. Ignoring this path.", + adjacent, + e + ); continue; } } @@ -341,8 +354,8 @@ public class PluginUtils { } private static class DirectoryEntry { - DirectoryStream<Path> stream; - Iterator<Path> iterator; + final DirectoryStream<Path> stream; + final Iterator<Path> iterator; DirectoryEntry(DirectoryStream<Path> stream) { this.stream = stream; -- To stop receiving notification emails like this one, please contact ewe...@apache.org.