IGNITE-3902: Hadoop: fixed incorrect context classloader management.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a94f27e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a94f27e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a94f27e Branch: refs/heads/ignite-comm-balance Commit: 3a94f27e6bdee1672a2a94faf0b5b63ab218b476 Parents: 3a28545 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Thu Sep 15 13:58:38 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Sep 15 13:58:38 2016 +0300 ---------------------------------------------------------------------- .../hadoop/fs/BasicHadoopFileSystemFactory.java | 17 ++----- .../processors/hadoop/HadoopDefaultJobInfo.java | 1 - .../internal/processors/hadoop/HadoopUtils.java | 53 +++++++++++--------- .../processors/hadoop/v2/HadoopV2Job.java | 32 ++++++------ .../hadoop/v2/HadoopV2JobResourceManager.java | 5 +- .../hadoop/v2/HadoopV2TaskContext.java | 15 +++--- 6 files changed, 63 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3a94f27e/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java index 06f76c3..a01bfaf 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/BasicHadoopFileSystemFactory.java @@ -93,20 +93,13 @@ public class BasicHadoopFileSystemFactory implements HadoopFileSystemFactory, Ex // FileSystem.get() might delegate to ServiceLoader to get the list of file system implementation. // And ServiceLoader is known to be sensitive to context classloader. Therefore, we change context // classloader to classloader of current class to avoid strange class-cast-exceptions. - ClassLoader ctxClsLdr = Thread.currentThread().getContextClassLoader(); - ClassLoader clsLdr = getClass().getClassLoader(); + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); - if (ctxClsLdr == clsLdr) + try { return create(usrName); - else { - Thread.currentThread().setContextClassLoader(clsLdr); - - try { - return create(usrName); - } - finally { - Thread.currentThread().setContextClassLoader(ctxClsLdr); - } + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); } } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3a94f27e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index be2d9ca..1382c1f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -92,7 +92,6 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { return constructor.newInstance(jobId, this, log, libNames); } - // NB: java.lang.NoClassDefFoundError may be thrown from Class#getConstructor() call. catch (Throwable t) { if (t instanceof Error) throw (Error)t; http://git-wip-us.apache.org/repos/asf/ignite/blob/3a94f27e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 44d871a..65d9810 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.hadoop.v2.HadoopSplitWrapper; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; /** * Hadoop utility methods. @@ -326,33 +327,13 @@ public class HadoopUtils { * @return New instance of {@link Configuration}. */ public static Configuration safeCreateConfiguration() { - final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); - - Thread.currentThread().setContextClassLoader(Configuration.class.getClassLoader()); + final ClassLoader oldLdr = setContextClassLoader(Configuration.class.getClassLoader()); try { return new Configuration(); } finally { - Thread.currentThread().setContextClassLoader(cl0); - } - } - - /** - * Creates {@link JobConf} in a correct class loader context to avoid caching - * of inappropriate class loader in the Configuration object. - * @return New instance of {@link JobConf}. - */ - public static JobConf safeCreateJobConf() { - final ClassLoader cl0 = Thread.currentThread().getContextClassLoader(); - - Thread.currentThread().setContextClassLoader(JobConf.class.getClassLoader()); - - try { - return new JobConf(); - } - finally { - Thread.currentThread().setContextClassLoader(cl0); + restoreContextClassLoader(oldLdr); } } @@ -382,6 +363,33 @@ public class HadoopUtils { } /** + * Set context class loader. + * + * @param newLdr New class loader. + * @return Old class loader. + */ + @Nullable public static ClassLoader setContextClassLoader(@Nullable ClassLoader newLdr) { + ClassLoader oldLdr = Thread.currentThread().getContextClassLoader(); + + if (newLdr != oldLdr) + Thread.currentThread().setContextClassLoader(newLdr); + + return oldLdr; + } + + /** + * Restore context class loader. + * + * @param oldLdr Original class loader. + */ + public static void restoreContextClassLoader(@Nullable ClassLoader oldLdr) { + ClassLoader newLdr = Thread.currentThread().getContextClassLoader(); + + if (newLdr != oldLdr) + Thread.currentThread().setContextClassLoader(oldLdr); + } + + /** * Split wrapper for sorting. */ private static class SplitSortWrapper implements Comparable<SplitSortWrapper> { @@ -432,5 +440,4 @@ public class HadoopUtils { return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id; } } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3a94f27e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 8804e29..a0f30eb 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -138,20 +138,25 @@ public class HadoopV2Job implements HadoopJob { this.jobInfo = jobInfo; this.libNames = libNames; - hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); - jobConf = HadoopUtils.safeCreateJobConf(); + try { + hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); - HadoopFileSystemsUtils.setupFileSystems(jobConf); + jobConf = new JobConf(); - Thread.currentThread().setContextClassLoader(null); + HadoopFileSystemsUtils.setupFileSystems(jobConf); - for (Map.Entry<String,String> e : jobInfo.properties().entrySet()) - jobConf.set(e.getKey(), e.getValue()); + for (Map.Entry<String,String> e : jobInfo.properties().entrySet()) + jobConf.set(e.getKey(), e.getValue()); - jobCtx = new JobContextImpl(jobConf, hadoopJobID); + jobCtx = new JobContextImpl(jobConf, hadoopJobID); - rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this); + rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this); + } + finally { + HadoopUtils.setContextClassLoader(oldLdr); + } } /** {@inheritDoc} */ @@ -166,7 +171,7 @@ public class HadoopV2Job implements HadoopJob { /** {@inheritDoc} */ @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { - Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf.getClassLoader()); try { String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR); @@ -223,7 +228,7 @@ public class HadoopV2Job implements HadoopJob { } } finally { - Thread.currentThread().setContextClassLoader(null); + HadoopUtils.restoreContextClassLoader(oldLdr); } } @@ -296,16 +301,13 @@ public class HadoopV2Job implements HadoopJob { this.locNodeId = locNodeId; - assert ((HadoopClassLoader)getClass().getClassLoader()).name() - .equals(HadoopClassLoader.nameForJob(this.locNodeId)); - - Thread.currentThread().setContextClassLoader(jobConf.getClassLoader()); + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); try { rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId)); } finally { - Thread.currentThread().setContextClassLoader(null); + HadoopUtils.restoreContextClassLoader(oldLdr); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3a94f27e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 67ab600..33aef60 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -37,6 +37,7 @@ import org.apache.hadoop.util.RunJar; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -94,7 +95,7 @@ class HadoopV2JobResourceManager { private void setLocalFSWorkingDirectory(File dir) throws IOException { JobConf cfg = ctx.getJobConf(); - Thread.currentThread().setContextClassLoader(cfg.getClassLoader()); + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(cfg.getClassLoader()); try { cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath()); @@ -103,7 +104,7 @@ class HadoopV2JobResourceManager { FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath())); } finally { - Thread.currentThread().setContextClassLoader(null); + HadoopUtils.restoreContextClassLoader(oldLdr); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3a94f27e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index 68c9ff8..4b1121c 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.processors.hadoop.HadoopUtils; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; @@ -158,7 +159,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { this.locNodeId = locNodeId; // Before create JobConf instance we should set new context class loader. - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); try { JobConf jobConf = new JobConf(); @@ -180,7 +181,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { useNewCombiner = jobConf.getCombinerClass() == null; } finally { - Thread.currentThread().setContextClassLoader(null); + HadoopUtils.restoreContextClassLoader(oldLdr); } } @@ -229,9 +230,9 @@ public class HadoopV2TaskContext extends HadoopTaskContext { /** {@inheritDoc} */ @Override public void run() throws IgniteCheckedException { - try { - Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader()); + try { try { task = createTask(); } @@ -258,7 +259,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { finally { task = null; - Thread.currentThread().setContextClassLoader(null); + HadoopUtils.restoreContextClassLoader(oldLdr); } } @@ -289,7 +290,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { locDir = jobLocalDir(locNodeId, taskInfo().jobId()); } - Thread.currentThread().setContextClassLoader(jobConf().getClassLoader()); + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader()); try { FileSystem.get(jobConf()); @@ -305,7 +306,7 @@ public class HadoopV2TaskContext extends HadoopTaskContext { throw transformException(e); } finally { - Thread.currentThread().setContextClassLoader(null); + HadoopUtils.restoreContextClassLoader(oldLdr); } }