IGNITE-3914: Introduced HadoopHelper.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb304b14 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb304b14 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb304b14 Branch: refs/heads/ignite-1.6.8-hadoop Commit: cb304b14508011c9a3f3ba73d4a8ae3d51bd9045 Parents: 857cdcd Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Sep 20 10:55:22 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Sep 20 10:55:22 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/GridKernalContext.java | 8 + .../ignite/internal/GridKernalContextImpl.java | 12 + .../ignite/internal/IgniteComponentType.java | 11 +- .../apache/ignite/internal/IgniteKernal.java | 5 +- .../processors/hadoop/HadoopClassLoader.java | 367 ++++++++++ .../processors/hadoop/HadoopHelper.java | 66 ++ .../processors/hadoop/HadoopJobInfo.java | 4 +- .../processors/hadoop/HadoopNoopHelper.java | 59 ++ .../processors/hadoop/HadoopDefaultJobInfo.java | 6 +- .../hadoop/jobtracker/HadoopJobTracker.java | 6 +- .../child/HadoopChildProcessRunner.java | 4 +- .../processors/hadoop/v2/HadoopV2Job.java | 9 +- .../hadoop/HadoopClassLoaderTest.java | 2 +- .../processors/hadoop/HadoopPlannerMockJob.java | 2 +- .../processors/hadoop/HadoopSnappyTest.java | 2 +- .../processors/hadoop/HadoopTasksV1Test.java | 2 +- .../processors/hadoop/HadoopTasksV2Test.java | 2 +- .../processors/hadoop/HadoopV2JobSelfTest.java | 2 +- .../collections/HadoopAbstractMapTest.java | 3 +- .../processors/hadoop/HadoopClassLoader.java | 363 ---------- .../hadoop/HadoopClassLoaderUtils.java | 684 ------------------- .../processors/hadoop/HadoopHelperImpl.java | 684 +++++++++++++++++++ 22 files changed, 1237 insertions(+), 1066 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 3eaef1e..b123a4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.igfs.IgfsHelper; import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; @@ -285,6 +286,13 @@ public interface GridKernalContext extends Iterable<GridComponent> { public HadoopProcessorAdapter hadoop(); /** + * Gets Hadoop helper. + * + * @return Hadoop helper. + */ + public HadoopHelper hadoopHelper(); + + /** * Gets utility cache pool. * * @return Utility cache pool. http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 1ff4543..eb214e8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.igfs.IgfsHelper; import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; @@ -238,6 +239,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude + private HadoopHelper hadoopHelper; + + /** */ + @GridToStringInclude private GridSegmentationProcessor segProc; /** */ @@ -541,6 +546,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable if (helper instanceof IgfsHelper) igfsHelper = (IgfsHelper)helper; + else if (helper instanceof HadoopHelper) + hadoopHelper = (HadoopHelper)helper; else assert false : "Unknown helper class: " + helper.getClass(); } @@ -733,6 +740,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public HadoopHelper hadoopHelper() { + return hadoopHelper; + } + + /** {@inheritDoc} */ @Override public GridContinuousProcessor continuous() { return contProc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index b182bd8..c39e9c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -41,6 +41,13 @@ public enum IgniteComponentType { "ignite-hadoop-impl" ), + /** Hadoop Helper component. */ + HADOOP_HELPER( + "org.apache.ignite.internal.processors.hadoop.HadoopNoopHelper", + "org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl", + "ignite-hadoop" + ), + /** IGFS helper component. */ IGFS_HELPER( "org.apache.ignite.internal.processors.igfs.IgfsNoopHelper", @@ -160,7 +167,7 @@ public enum IgniteComponentType { * @return Created component. * @throws IgniteCheckedException If failed. */ - public <T extends GridComponent> T create(GridKernalContext ctx, boolean noOp) throws IgniteCheckedException { + public <T> T create(GridKernalContext ctx, boolean noOp) throws IgniteCheckedException { return create0(ctx, noOp ? noOpClsName : clsName); } @@ -172,7 +179,7 @@ public enum IgniteComponentType { * @return Created component. * @throws IgniteCheckedException If failed. */ - public <T extends GridComponent> T createIfInClassPath(GridKernalContext ctx, boolean mandatory) + public <T> T createIfInClassPath(GridKernalContext ctx, boolean mandatory) throws IgniteCheckedException { String cls = clsName; http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index b85692e..4f916be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -188,6 +188,7 @@ import static org.apache.ignite.internal.GridKernalState.STARTED; import static org.apache.ignite.internal.GridKernalState.STARTING; import static org.apache.ignite.internal.GridKernalState.STOPPED; import static org.apache.ignite.internal.GridKernalState.STOPPING; +import static org.apache.ignite.internal.IgniteComponentType.HADOOP_HELPER; import static org.apache.ignite.internal.IgniteComponentType.IGFS; import static org.apache.ignite.internal.IgniteComponentType.IGFS_HELPER; import static org.apache.ignite.internal.IgniteComponentType.SCHEDULE; @@ -821,6 +822,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { addHelper(IGFS_HELPER.create(F.isEmpty(cfg.getFileSystemConfiguration()))); + addHelper(HADOOP_HELPER.createIfInClassPath(null, false)); + startProcessor(new IgnitePluginProcessor(ctx, cfg, plugins)); // Off-heap processor has no dependencies. @@ -881,7 +884,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(new DataStreamProcessor(ctx)); startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration()))); startProcessor(new GridContinuousProcessor(ctx)); - startProcessor((GridProcessor)createHadoopComponent()); + startProcessor(createHadoopComponent()); startProcessor(new DataStructuresProcessor(ctx)); startProcessor(createComponent(PlatformProcessor.class, ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java new file mode 100644 index 0000000..5297cea --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.ClassCache; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Class loader allowing explicitly load classes without delegation to parent class loader. + * Also supports class parsing for finding dependencies which contain transitive dependencies + * unavailable for parent. + */ +public class HadoopClassLoader extends URLClassLoader implements ClassCache { + static { + // We are very parallel capable. + registerAsParallelCapable(); + } + + /** Hadoop class name: Daemon. */ + public static final String CLS_DAEMON = "org.apache.hadoop.util.Daemon"; + + /** Hadoop class name: ShutdownHookManager. */ + public static final String CLS_SHUTDOWN_HOOK_MANAGER = "org.apache.hadoop.util.ShutdownHookManager"; + + /** Hadoop class name: NativeCodeLoader. */ + public static final String CLS_NATIVE_CODE_LOADER = "org.apache.hadoop.util.NativeCodeLoader"; + + /** Hadoop class name: Daemon replacement. */ + public static final String CLS_DAEMON_REPLACE = "org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon"; + + /** Hadoop class name: ShutdownHookManager replacement. */ + public static final String CLS_SHUTDOWN_HOOK_MANAGER_REPLACE = + "org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager"; + + /** Name of libhadoop library. */ + private static final String LIBHADOOP = "hadoop."; + + /** */ + private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader(); + + /** */ + private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs()); + + /** */ + private static volatile Collection<URL> hadoopJars; + + /** */ + private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>(); + + /** Class cache. */ + private final ConcurrentMap<String, Class> cacheMap = new ConcurrentHashMap<>(); + + /** Diagnostic name of this class loader. */ + @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) + private final String name; + + /** Native library names. */ + private final String[] libNames; + + /** Igfs Helper. */ + private final HadoopHelper helper; + + /** + * Gets name for Job class loader. The name is specific for local node id. + * @param locNodeId The local node id. + * @return The class loader name. + */ + public static String nameForJob(UUID locNodeId) { + return "hadoop-job-node-" + locNodeId.toString(); + } + + /** + * Gets name for the task class loader. Task class loader + * @param info The task info. + * @param prefix Get only prefix (without task type and number) + * @return The class loader name. + */ + public static String nameForTask(HadoopTaskInfo info, boolean prefix) { + if (prefix) + return "hadoop-task-" + info.jobId() + "-"; + else + return "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber(); + } + + /** + * Constructor. + * + * @param urls Urls. + * @param name Classloader name. + * @param libNames Optional additional native library names to be linked from parent classloader. + */ + public HadoopClassLoader(URL[] urls, String name, @Nullable String[] libNames, HadoopHelper helper) { + super(addHadoopUrls(urls), APP_CLS_LDR); + + assert !(getParent() instanceof HadoopClassLoader); + + this.name = name; + this.libNames = libNames; + this.helper = helper; + + initializeNativeLibraries(); + } + + /** + * Workaround to load native Hadoop libraries. Java doesn't allow native libraries to be loaded from different + * classloaders. But we load Hadoop classes many times and one of these classes - {@code NativeCodeLoader} - tries + * to load the same native library over and over again. + * <p> + * To fix the problem, we force native library load in parent class loader and then "link" handle to this native + * library to our class loader. As a result, our class loader will think that the library is already loaded and will + * be able to link native methods. + * + * @see <a href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version"> + * JNI specification</a> + */ + private void initializeNativeLibraries() { + try { + // This must trigger native library load. + // TODO: Do not delegate to APP LDR + Class.forName(CLS_NATIVE_CODE_LOADER, true, APP_CLS_LDR); + + final Vector<Object> curVector = U.field(this, "nativeLibraries"); + + // TODO: Do not delegate to APP LDR + ClassLoader ldr = APP_CLS_LDR; + + while (ldr != null) { + Vector vector = U.field(ldr, "nativeLibraries"); + + for (Object lib : vector) { + String name = U.field(lib, "name"); + + boolean add = name.contains(LIBHADOOP); + + if (!add && libNames != null) { + for (String libName : libNames) { + if (libName != null && name.contains(libName)) { + add = true; + + break; + } + } + } + + if (add) { + curVector.add(lib); + + return; + } + } + + ldr = ldr.getParent(); + } + } + catch (Exception e) { + U.quietAndWarn(null, "Failed to initialize Hadoop native library " + + "(native Hadoop methods might not work properly): " + e); + } + } + + /** {@inheritDoc} */ + @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { + try { + // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. + if (helper.isHadoop(name)) { + if (name.equals(CLS_SHUTDOWN_HOOK_MANAGER)) // Dirty hack to get rid of Hadoop shutdown hooks. + return loadReplace(name, CLS_SHUTDOWN_HOOK_MANAGER_REPLACE); + else if (name.equals(CLS_DAEMON)) + // We replace this in order to be able to forcibly stop some daemon threads + // that otherwise never stop (e.g. PeerCache runnables): + return loadReplace(name, CLS_DAEMON_REPLACE); + + return loadClassExplicitly(name, resolve); + } + + // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. + if (helper.isHadoopIgfs(name)) { + if (hasExternalDependencies(name)) + return loadClassExplicitly(name, resolve); + } + + return super.loadClass(name, resolve); + } + catch (NoClassDefFoundError | ClassNotFoundException e) { + throw new ClassNotFoundException("Failed to load class: " + name, e); + } + } + + /** + * Load a class replacing it with our own implementation. + * + * @param originalName Name. + * @param replaceName Replacement. + * @return Class. + */ + private Class<?> loadReplace(final String originalName, final String replaceName) { + synchronized (getClassLoadingLock(originalName)) { + // First, check if the class has already been loaded + Class c = findLoadedClass(originalName); + + if (c != null) + return c; + + byte[] bytes = bytesCache.get(originalName); + + if (bytes == null) { + InputStream in = helper.loadClassBytes(getParent(), replaceName); + + bytes = helper.loadReplace(in, originalName, replaceName); + + bytesCache.put(originalName, bytes); + } + + return defineClass(originalName, bytes, 0, bytes.length); + } + } + + /** {@inheritDoc} */ + @Override public Class<?> getFromCache(String clsName) throws ClassNotFoundException { + Class<?> cls = cacheMap.get(clsName); + + if (cls == null) { + Class old = cacheMap.putIfAbsent(clsName, cls = Class.forName(clsName, true, this)); + + if (old != null) + cls = old; + } + + return cls; + } + + /** + * @param name Class name. + * @param resolve Resolve class. + * @return Class. + * @throws ClassNotFoundException If failed. + */ + private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException { + synchronized (getClassLoadingLock(name)) { + // First, check if the class has already been loaded + Class c = findLoadedClass(name); + + if (c == null) { + long t1 = System.nanoTime(); + + c = findClass(name); + + // this is the defining class loader; record the stats + sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1); + sun.misc.PerfCounter.getFindClasses().increment(); + } + + if (resolve) + resolveClass(c); + + return c; + } + } + + /** + * Check whether class has external dependencies on Hadoop. + * + * @param clsName Class name. + * @return {@code True} if class has external dependencies. + */ + boolean hasExternalDependencies(String clsName) { + return helper.hasExternalDependencies(clsName, getParent()); + } + + /** + * @param urls URLs. + * @return URLs. + */ + private static URL[] addHadoopUrls(URL[] urls) { + Collection<URL> hadoopJars; + + try { + hadoopJars = hadoopUrls(); + } + catch (IgniteCheckedException e) { + throw new RuntimeException(e); + } + + ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length)); + + list.addAll(appJars); + list.addAll(hadoopJars); + + if (!F.isEmpty(urls)) + list.addAll(F.asList(urls)); + + return list.toArray(new URL[list.size()]); + } + + /** + * @return Collection of jar URLs. + * @throws IgniteCheckedException If failed. + */ + public static Collection<URL> hadoopUrls() throws IgniteCheckedException { + Collection<URL> hadoopUrls = hadoopJars; + + if (hadoopUrls != null) + return hadoopUrls; + + synchronized (HadoopClassLoader.class) { + hadoopUrls = hadoopJars; + + if (hadoopUrls != null) + return hadoopUrls; + + try { + hadoopUrls = HadoopClasspathUtils.classpathForClassLoader(); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to resolve Hadoop JAR locations: " + e.getMessage(), e); + } + + hadoopJars = hadoopUrls; + + return hadoopUrls; + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopClassLoader.class, this); + } + + /** + * Getter for name field. + */ + public String name() { + return name; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java new file mode 100644 index 0000000..40852c5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopHelper.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import java.io.InputStream; +import org.jetbrains.annotations.Nullable; + +/** + * IGFS utility processor adapter. + */ +public interface HadoopHelper { + /** + * Load special replacement and impersonate. + * + * @param in Input stream. + * @param originalName Original class name. + * @param replaceName Replacer class name. + * @return Result. + */ + public byte[] loadReplace(InputStream in, final String originalName, final String replaceName); + + /** + * @param cls Class name. + * @return {@code true} If this is Hadoop class. + */ + public boolean isHadoop(String cls); + + /** + * Need to parse only Ignite Hadoop and IGFS classes. + * + * @param cls Class name. + * @return {@code true} if we need to check this class. + */ + public boolean isHadoopIgfs(String cls); + + /** + * @param ldr Loader. + * @param clsName Class. + * @return Input stream. + */ + @Nullable public InputStream loadClassBytes(ClassLoader ldr, String clsName); + + /** + * Check whether class has external dependencies on Hadoop. + * + * @param clsName Class name. + * @param parentClsLdr Parent class loader. + * @return {@code True} if class has external dependencies. + */ + public boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java index a3b1bb6..853c63d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobInfo.java @@ -59,11 +59,13 @@ public interface HadoopJobInfo extends Serializable { * @param jobId Job ID. * @param log Logger. * @param libNames Optional additional native library names. + * @param helper HadoopHelper. * @return Job. * @throws IgniteCheckedException If failed. */ public HadoopJob createJob(Class<? extends HadoopJob> jobCls, - HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames) throws IgniteCheckedException; + HadoopJobId jobId, IgniteLogger log, @Nullable String[] libNames, HadoopHelper helper) + throws IgniteCheckedException; /** * @return Number of reducers configured for job. http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java new file mode 100644 index 0000000..d9ce857 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopNoopHelper.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop; + +import org.jetbrains.annotations.Nullable; + +import java.io.InputStream; + +/** + * Noop Hadoop Helper implementation. + */ +public class HadoopNoopHelper implements HadoopHelper { + /** {@inheritDoc} */ + @Override public boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr) { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public byte[] loadReplace(InputStream in, String originalName, String replaceName) { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public boolean isHadoop(String cls) { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Override public boolean isHadoopIgfs(String cls) { + throw unsupported(); + } + + /** {@inheritDoc} */ + @Nullable @Override public InputStream loadClassBytes(ClassLoader ldr, String clsName) { + throw unsupported(); + } + + /** + * @return Exception. + */ + private static UnsupportedOperationException unsupported() { + throw new UnsupportedOperationException("Operation is unsupported (Hadoop module is not in the classpath)."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 1382c1f..ae17ac8 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -83,14 +83,14 @@ public class HadoopDefaultJobInfo implements HadoopJobInfo, Externalizable { /** {@inheritDoc} */ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, - @Nullable String[] libNames) throws IgniteCheckedException { + @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException { assert jobCls != null; try { Constructor<? extends HadoopJob> constructor = jobCls.getConstructor(HadoopJobId.class, - HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class); + HadoopDefaultJobInfo.class, IgniteLogger.class, String[].class, HadoopHelper.class); - return constructor.newInstance(jobId, this, log, libNames); + return constructor.newInstance(jobId, this, log, libNames, helper); } catch (Throwable t) { if (t instanceof Error) http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java index f3e17f3..a2c55a2 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobTracker.java @@ -162,7 +162,8 @@ public class HadoopJobTracker extends HadoopComponent { if (ctx.configuration() != null) libNames = ctx.configuration().getNativeLibraryNames(); - HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId), libNames); + HadoopClassLoader ldr = new HadoopClassLoader(null, HadoopClassLoader.nameForJob(nodeId), libNames, + ctx.kernalContext().hadoopHelper()); try { jobCls = (Class<HadoopV2Job>)ldr.loadClass(HadoopV2Job.class.getName()); @@ -1060,7 +1061,8 @@ public class HadoopJobTracker extends HadoopComponent { jobInfo = meta.jobInfo(); } - job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames()); + job = jobInfo.createJob(jobCls, jobId, log, ctx.configuration().getNativeLibraryNames(), + ctx.kernalContext().hadoopHelper()); job.initialize(false, ctx.localNodeId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java index 4a946e9..35747bb 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; +import org.apache.ignite.internal.processors.hadoop.HadoopHelperImpl; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; @@ -134,7 +136,7 @@ public class HadoopChildProcessRunner { assert job == null; - job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null); + job = req.jobInfo().createJob(HadoopV2Job.class, req.jobId(), log, null, new HadoopHelperImpl()); job.initialize(true, nodeDesc.processId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 595474c..4515131 100644 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -52,6 +52,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; @@ -87,6 +88,9 @@ public class HadoopV2Job implements HadoopJob { /** */ private final JobContextImpl jobCtx; + /** */ + private final HadoopHelper helper; + /** Hadoop job ID. */ private final HadoopJobId jobId; @@ -130,13 +134,14 @@ public class HadoopV2Job implements HadoopJob { * @param libNames Optional additional native library names. */ public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log, - @Nullable String[] libNames) { + @Nullable String[] libNames, HadoopHelper helper) { assert jobId != null; assert jobInfo != null; this.jobId = jobId; this.jobInfo = jobInfo; this.libNames = libNames; + this.helper = helper; ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); @@ -255,7 +260,7 @@ public class HadoopV2Job implements HadoopJob { // Note that the classloader identified by the task it was initially created for, // but later it may be reused for other tasks. HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), - HadoopClassLoader.nameForTask(info, false), libNames); + HadoopClassLoader.nameForTask(info, false), libNames, helper); cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java index 2fd7777..e202f48 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderTest.java @@ -49,7 +49,7 @@ import org.apache.ignite.internal.processors.hadoop.deps.Without; */ public class HadoopClassLoaderTest extends TestCase { /** */ - final HadoopClassLoader ldr = new HadoopClassLoader(null, "test", null); + final HadoopClassLoader ldr = new HadoopClassLoader(null, "test", null, new HadoopHelperImpl()); /** * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java index 88d0f80..1a87865d 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopPlannerMockJob.java @@ -145,7 +145,7 @@ public class HadoopPlannerMockJob implements HadoopJob { /** {@inheritDoc} */ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, - @Nullable String[] libNames) throws IgniteCheckedException { + @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException { throwUnsupported(); return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java index b4e3dc2..656ba66 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopSnappyTest.java @@ -49,7 +49,7 @@ public class HadoopSnappyTest extends GridCommonAbstractTest { // Run the same in several more class loaders simulating jobs and tasks: for (int i = 0; i < 2; i++) { - ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null); + ClassLoader hadoopClsLdr = new HadoopClassLoader(null, "cl-" + i, null, new HadoopHelperImpl()); Class<?> cls = (Class)Class.forName(HadoopSnappyTest.class.getName(), true, hadoopClsLdr); http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java index 27d7fc2..f914467 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV1Test.java @@ -48,7 +48,7 @@ public class HadoopTasksV1Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(uuid, 0); - return jobInfo.createJob(HadoopV2Job.class, jobId, log, null); + return jobInfo.createJob(HadoopV2Job.class, jobId, log, null, new HadoopHelperImpl()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java index 30cf50c..faec383 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTasksV2Test.java @@ -67,7 +67,7 @@ public class HadoopTasksV2Test extends HadoopTasksAllVersionsTest { HadoopJobId jobId = new HadoopJobId(uuid, 0); - return jobInfo.createJob(HadoopV2Job.class, jobId, log, null); + return jobInfo.createJob(HadoopV2Job.class, jobId, log, null, new HadoopHelperImpl()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java index ae2c00d..6b974bd 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopV2JobSelfTest.java @@ -78,7 +78,7 @@ public class HadoopV2JobSelfTest extends HadoopAbstractSelfTest { HadoopJobId id = new HadoopJobId(uuid, 1); - HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null); + HadoopJob job = info.createJob(HadoopV2Job.class, id, log, null, new HadoopHelperImpl()); HadoopTaskContext taskCtx = job.getTaskContext(new HadoopTaskInfo(HadoopTaskType.MAP, null, 0, 0, null)); http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java index 5266875..45c178a 100644 --- a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java +++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopAbstractMapTest.java @@ -23,6 +23,7 @@ import org.apache.commons.collections.comparators.ComparableComparator; import org.apache.hadoop.io.IntWritable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; import org.apache.ignite.internal.processors.hadoop.HadoopJob; import org.apache.ignite.internal.processors.hadoop.HadoopJobId; import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; @@ -144,7 +145,7 @@ public abstract class HadoopAbstractMapTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override public HadoopJob createJob(Class<? extends HadoopJob> jobCls, HadoopJobId jobId, IgniteLogger log, - @Nullable String[] libNames) throws IgniteCheckedException { + @Nullable String[] libNames, HadoopHelper helper) throws IgniteCheckedException { assert false; return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java deleted file mode 100644 index 30a6e72..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.util.ClassCache; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; -import java.util.Vector; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Class loader allowing explicitly load classes without delegation to parent class loader. - * Also supports class parsing for finding dependencies which contain transitive dependencies - * unavailable for parent. - */ -public class HadoopClassLoader extends URLClassLoader implements ClassCache { - static { - // We are very parallel capable. - registerAsParallelCapable(); - } - - /** Hadoop class name: Daemon. */ - public static final String CLS_DAEMON = "org.apache.hadoop.util.Daemon"; - - /** Hadoop class name: ShutdownHookManager. */ - public static final String CLS_SHUTDOWN_HOOK_MANAGER = "org.apache.hadoop.util.ShutdownHookManager"; - - /** Hadoop class name: NativeCodeLoader. */ - public static final String CLS_NATIVE_CODE_LOADER = "org.apache.hadoop.util.NativeCodeLoader"; - - /** Hadoop class name: Daemon replacement. */ - public static final String CLS_DAEMON_REPLACE = "org.apache.ignite.internal.processors.hadoop.v2.HadoopDaemon"; - - /** Hadoop class name: ShutdownHookManager replacement. */ - public static final String CLS_SHUTDOWN_HOOK_MANAGER_REPLACE = - "org.apache.ignite.internal.processors.hadoop.v2.HadoopShutdownHookManager"; - - /** Name of libhadoop library. */ - private static final String LIBHADOOP = "hadoop."; - - /** */ - private static final URLClassLoader APP_CLS_LDR = (URLClassLoader)HadoopClassLoader.class.getClassLoader(); - - /** */ - private static final Collection<URL> appJars = F.asList(APP_CLS_LDR.getURLs()); - - /** */ - private static volatile Collection<URL> hadoopJars; - - /** */ - private static final Map<String, byte[]> bytesCache = new ConcurrentHashMap8<>(); - - /** Class cache. */ - private final ConcurrentMap<String, Class> cacheMap = new ConcurrentHashMap<>(); - - /** Diagnostic name of this class loader. */ - @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) - private final String name; - - /** Native library names. */ - private final String[] libNames; - - /** - * Gets name for Job class loader. The name is specific for local node id. - * @param locNodeId The local node id. - * @return The class loader name. - */ - public static String nameForJob(UUID locNodeId) { - return "hadoop-job-node-" + locNodeId.toString(); - } - - /** - * Gets name for the task class loader. Task class loader - * @param info The task info. - * @param prefix Get only prefix (without task type and number) - * @return The class loader name. - */ - public static String nameForTask(HadoopTaskInfo info, boolean prefix) { - if (prefix) - return "hadoop-task-" + info.jobId() + "-"; - else - return "hadoop-task-" + info.jobId() + "-" + info.type() + "-" + info.taskNumber(); - } - - /** - * Constructor. - * - * @param urls Urls. - * @param name Classloader name. - * @param libNames Optional additional native library names to be linked from parent classloader. - */ - public HadoopClassLoader(URL[] urls, String name, @Nullable String[] libNames) { - super(addHadoopUrls(urls), APP_CLS_LDR); - - assert !(getParent() instanceof HadoopClassLoader); - - this.name = name; - this.libNames = libNames; - - initializeNativeLibraries(); - } - - /** - * Workaround to load native Hadoop libraries. Java doesn't allow native libraries to be loaded from different - * classloaders. But we load Hadoop classes many times and one of these classes - {@code NativeCodeLoader} - tries - * to load the same native library over and over again. - * <p> - * To fix the problem, we force native library load in parent class loader and then "link" handle to this native - * library to our class loader. As a result, our class loader will think that the library is already loaded and will - * be able to link native methods. - * - * @see <a href="http://docs.oracle.com/javase/1.5.0/docs/guide/jni/spec/invocation.html#library_version"> - * JNI specification</a> - */ - private void initializeNativeLibraries() { - try { - // This must trigger native library load. - // TODO: Do not delegate to APP LDR - Class.forName(CLS_NATIVE_CODE_LOADER, true, APP_CLS_LDR); - - final Vector<Object> curVector = U.field(this, "nativeLibraries"); - - // TODO: Do not delegate to APP LDR - ClassLoader ldr = APP_CLS_LDR; - - while (ldr != null) { - Vector vector = U.field(ldr, "nativeLibraries"); - - for (Object lib : vector) { - String name = U.field(lib, "name"); - - boolean add = name.contains(LIBHADOOP); - - if (!add && libNames != null) { - for (String libName : libNames) { - if (libName != null && name.contains(libName)) { - add = true; - - break; - } - } - } - - if (add) { - curVector.add(lib); - - return; - } - } - - ldr = ldr.getParent(); - } - } - catch (Exception e) { - U.quietAndWarn(null, "Failed to initialize Hadoop native library " + - "(native Hadoop methods might not work properly): " + e); - } - } - - /** {@inheritDoc} */ - @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { - try { - // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. - if (HadoopClassLoaderUtils.isHadoop(name)) { - if (name.equals(CLS_SHUTDOWN_HOOK_MANAGER)) // Dirty hack to get rid of Hadoop shutdown hooks. - return loadReplace(name, CLS_SHUTDOWN_HOOK_MANAGER_REPLACE); - else if (name.equals(CLS_DAEMON)) - // We replace this in order to be able to forcibly stop some daemon threads - // that otherwise never stop (e.g. PeerCache runnables): - return loadReplace(name, CLS_DAEMON_REPLACE); - - return loadClassExplicitly(name, resolve); - } - - // For Ignite Hadoop and IGFS classes we have to check if they depend on Hadoop. - if (HadoopClassLoaderUtils.isHadoopIgfs(name)) { - if (hasExternalDependencies(name)) - return loadClassExplicitly(name, resolve); - } - - return super.loadClass(name, resolve); - } - catch (NoClassDefFoundError | ClassNotFoundException e) { - throw new ClassNotFoundException("Failed to load class: " + name, e); - } - } - - /** - * Load a class replacing it with our own implementation. - * - * @param originalName Name. - * @param replaceName Replacement. - * @return Class. - */ - private Class<?> loadReplace(final String originalName, final String replaceName) { - synchronized (getClassLoadingLock(originalName)) { - // First, check if the class has already been loaded - Class c = findLoadedClass(originalName); - - if (c != null) - return c; - - byte[] bytes = bytesCache.get(originalName); - - if (bytes == null) { - InputStream in = HadoopClassLoaderUtils.loadClassBytes(getParent(), replaceName); - - bytes = HadoopClassLoaderUtils.loadReplace(in, originalName, replaceName); - - bytesCache.put(originalName, bytes); - } - - return defineClass(originalName, bytes, 0, bytes.length); - } - } - - /** {@inheritDoc} */ - @Override public Class<?> getFromCache(String clsName) throws ClassNotFoundException { - Class<?> cls = cacheMap.get(clsName); - - if (cls == null) { - Class old = cacheMap.putIfAbsent(clsName, cls = Class.forName(clsName, true, this)); - - if (old != null) - cls = old; - } - - return cls; - } - - /** - * @param name Class name. - * @param resolve Resolve class. - * @return Class. - * @throws ClassNotFoundException If failed. - */ - private Class<?> loadClassExplicitly(String name, boolean resolve) throws ClassNotFoundException { - synchronized (getClassLoadingLock(name)) { - // First, check if the class has already been loaded - Class c = findLoadedClass(name); - - if (c == null) { - long t1 = System.nanoTime(); - - c = findClass(name); - - // this is the defining class loader; record the stats - sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1); - sun.misc.PerfCounter.getFindClasses().increment(); - } - - if (resolve) - resolveClass(c); - - return c; - } - } - - /** - * Check whether class has external dependencies on Hadoop. - * - * @param clsName Class name. - * @return {@code True} if class has external dependencies. - */ - boolean hasExternalDependencies(String clsName) { - return HadoopClassLoaderUtils.hasExternalDependencies(clsName, getParent()); - } - - /** - * @param urls URLs. - * @return URLs. - */ - private static URL[] addHadoopUrls(URL[] urls) { - Collection<URL> hadoopJars; - - try { - hadoopJars = hadoopUrls(); - } - catch (IgniteCheckedException e) { - throw new RuntimeException(e); - } - - ArrayList<URL> list = new ArrayList<>(hadoopJars.size() + appJars.size() + (urls == null ? 0 : urls.length)); - - list.addAll(appJars); - list.addAll(hadoopJars); - - if (!F.isEmpty(urls)) - list.addAll(F.asList(urls)); - - return list.toArray(new URL[list.size()]); - } - - /** - * @return Collection of jar URLs. - * @throws IgniteCheckedException If failed. - */ - public static Collection<URL> hadoopUrls() throws IgniteCheckedException { - Collection<URL> hadoopUrls = hadoopJars; - - if (hadoopUrls != null) - return hadoopUrls; - - synchronized (HadoopClassLoader.class) { - hadoopUrls = hadoopJars; - - if (hadoopUrls != null) - return hadoopUrls; - - try { - hadoopUrls = HadoopClasspathUtils.classpathForClassLoader(); - } - catch (IOException e) { - throw new IgniteCheckedException("Failed to resolve Hadoop JAR locations: " + e.getMessage(), e); - } - - hadoopJars = hadoopUrls; - - return hadoopUrls; - } - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HadoopClassLoader.class, this); - } - - /** - * Getter for name field. - */ - public String name() { - return name; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/cb304b14/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java deleted file mode 100644 index 3415d6a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoaderUtils.java +++ /dev/null @@ -1,684 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop; - -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; -import org.objectweb.asm.AnnotationVisitor; -import org.objectweb.asm.Attribute; -import org.objectweb.asm.ClassReader; -import org.objectweb.asm.ClassVisitor; -import org.objectweb.asm.ClassWriter; -import org.objectweb.asm.FieldVisitor; -import org.objectweb.asm.Handle; -import org.objectweb.asm.Label; -import org.objectweb.asm.MethodVisitor; -import org.objectweb.asm.Opcodes; -import org.objectweb.asm.Type; -import org.objectweb.asm.commons.Remapper; -import org.objectweb.asm.commons.RemappingClassAdapter; - -import java.io.IOException; -import java.io.InputStream; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Utility methods for Hadoop classloader required to avoid direct 3rd-party dependencies in class loader. - */ -public class HadoopClassLoaderUtils { - /** Cache for resolved dependency info. */ - private static final Map<String, Boolean> dependenciesCache = new ConcurrentHashMap8<>(); - - /** - * Load special replacement and impersonate - * - * @param in Input stream. - * @param originalName Original class name. - * @param replaceName Replacer class name. - * @return Result. - */ - public static byte[] loadReplace(InputStream in, final String originalName, final String replaceName) { - ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException(e); - } - - ClassWriter w = new ClassWriter(Opcodes.ASM4); - - rdr.accept(new RemappingClassAdapter(w, new Remapper() { - /** */ - String replaceType = replaceName.replace('.', '/'); - - /** */ - String nameType = originalName.replace('.', '/'); - - @Override public String map(String type) { - if (type.equals(replaceType)) - return nameType; - - return type; - } - }), ClassReader.EXPAND_FRAMES); - - return w.toByteArray(); - } - - /** - * @param cls Class name. - * @return {@code true} If this is Hadoop class. - */ - public static boolean isHadoop(String cls) { - return cls.startsWith("org.apache.hadoop."); - } - - /** - * Need to parse only Ignite Hadoop and IGFS classes. - * - * @param cls Class name. - * @return {@code true} if we need to check this class. - */ - public static boolean isHadoopIgfs(String cls) { - String ignitePkgPrefix = "org.apache.ignite"; - - int len = ignitePkgPrefix.length(); - - return cls.startsWith(ignitePkgPrefix) && ( - cls.indexOf("igfs.", len) != -1 || - cls.indexOf(".fs.", len) != -1 || - cls.indexOf("hadoop.", len) != -1); - } - - /** - * @param ldr Loader. - * @param clsName Class. - * @return Input stream. - */ - @Nullable public static InputStream loadClassBytes(ClassLoader ldr, String clsName) { - return ldr.getResourceAsStream(clsName.replace('.', '/') + ".class"); - } - - /** - * Check whether class has external dependencies on Hadoop. - * - * @param clsName Class name. - * @param parentClsLdr Parent class loader. - * @return {@code True} if class has external dependencies. - */ - static boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr) { - Boolean hasDeps = dependenciesCache.get(clsName); - - if (hasDeps == null) { - CollectingContext ctx = new CollectingContext(parentClsLdr); - - ctx.annVisitor = new CollectingAnnotationVisitor(ctx); - ctx.mthdVisitor = new CollectingMethodVisitor(ctx, ctx.annVisitor); - ctx.fldVisitor = new CollectingFieldVisitor(ctx, ctx.annVisitor); - ctx.clsVisitor = new CollectingClassVisitor(ctx, ctx.annVisitor, ctx.mthdVisitor, ctx.fldVisitor); - - hasDeps = hasExternalDependencies(clsName, parentClsLdr, ctx); - - dependenciesCache.put(clsName, hasDeps); - } - - return hasDeps; - } - - /** - * Check whether class has external dependencies on Hadoop. - * - * @param clsName Class name. - * @param parentClsLdr Parent class loader. - * @param ctx Context. - * @return {@code true} If the class has external dependencies. - */ - static boolean hasExternalDependencies(String clsName, ClassLoader parentClsLdr, CollectingContext ctx) { - if (isHadoop(clsName)) // Hadoop must not be in classpath but Idea sucks, so filtering explicitly as external. - return true; - - // Try to get from parent to check if the type accessible. - InputStream in = loadClassBytes(parentClsLdr, clsName); - - if (in == null) // The class is external itself, it must be loaded from this class loader. - return true; - - if (!isHadoopIgfs(clsName)) // Other classes should not have external dependencies. - return false; - - final ClassReader rdr; - - try { - rdr = new ClassReader(in); - } - catch (IOException e) { - throw new RuntimeException("Failed to read class: " + clsName, e); - } - - ctx.visited.add(clsName); - - rdr.accept(ctx.clsVisitor, 0); - - if (ctx.found) // We already know that we have dependencies, no need to check parent. - return true; - - // Here we are known to not have any dependencies but possibly we have a parent which has them. - int idx = clsName.lastIndexOf('$'); - - if (idx == -1) // No parent class. - return false; - - String parentCls = clsName.substring(0, idx); - - if (ctx.visited.contains(parentCls)) - return false; - - Boolean res = dependenciesCache.get(parentCls); - - if (res == null) - res = hasExternalDependencies(parentCls, parentClsLdr, ctx); - - return res; - } - - /** - * @param name Class name. - * @return {@code true} If this is a valid class name. - */ - private static boolean validateClassName(String name) { - int len = name.length(); - - if (len <= 1) - return false; - - if (!Character.isJavaIdentifierStart(name.charAt(0))) - return false; - - boolean hasDot = false; - - for (int i = 1; i < len; i++) { - char c = name.charAt(i); - - if (c == '.') - hasDot = true; - else if (!Character.isJavaIdentifierPart(c)) - return false; - } - - return hasDot; - } - - /** - * Context for dependencies collection. - */ - private static class CollectingContext { - /** Visited classes. */ - private final Set<String> visited = new HashSet<>(); - - /** Parent class loader. */ - private final ClassLoader parentClsLdr; - - /** Whether dependency found. */ - private boolean found; - - /** Annotation visitor. */ - private AnnotationVisitor annVisitor; - - /** Method visitor. */ - private MethodVisitor mthdVisitor; - - /** Field visitor. */ - private FieldVisitor fldVisitor; - - /** Class visitor. */ - private ClassVisitor clsVisitor; - - /** - * Constrcutor. - * - * @param parentClsLdr Parent class loader. - */ - private CollectingContext(ClassLoader parentClsLdr) { - this.parentClsLdr = parentClsLdr; - } - - /** - * Processes a method descriptor - * @param methDesc The method desc String. - */ - void onMethodsDesc(final String methDesc) { - // Process method return type: - onType(Type.getReturnType(methDesc)); - - if (found) - return; - - // Process method argument types: - for (Type t: Type.getArgumentTypes(methDesc)) { - onType(t); - - if (found) - return; - } - } - - /** - * Processes dependencies of a class. - * - * @param depCls The class name as dot-notated FQN. - */ - void onClass(final String depCls) { - assert depCls.indexOf('/') == -1 : depCls; // class name should be fully converted to dot notation. - assert depCls.charAt(0) != 'L' : depCls; - assert validateClassName(depCls) : depCls; - - if (depCls.startsWith("java.") || depCls.startsWith("javax.")) // Filter out platform classes. - return; - - if (visited.contains(depCls)) - return; - - Boolean res = dependenciesCache.get(depCls); - - if (res == Boolean.TRUE || (res == null && hasExternalDependencies(depCls, parentClsLdr, this))) - found = true; - } - - /** - * Analyses dependencies of given type. - * - * @param t The type to process. - */ - void onType(Type t) { - if (t == null) - return; - - int sort = t.getSort(); - - switch (sort) { - case Type.ARRAY: - onType(t.getElementType()); - - break; - - case Type.OBJECT: - onClass(t.getClassName()); - - break; - } - } - - /** - * Analyses dependencies of given object type. - * - * @param objType The object type to process. - */ - void onInternalTypeName(String objType) { - if (objType == null) - return; - - assert objType.length() > 1 : objType; - - if (objType.charAt(0) == '[') - // handle array. In this case this is a type descriptor notation, like "[Ljava/lang/Object;" - onType(objType); - else { - assert objType.indexOf('.') == -1 : objType; // Must be slash-separated FQN. - - String clsName = objType.replace('/', '.'); // Convert it to dot notation. - - onClass(clsName); // Process. - } - } - - /** - * Type description analyser. - * - * @param desc The description. - */ - void onType(String desc) { - if (!F.isEmpty(desc)) { - if (desc.length() <= 1) - return; // Optimization: filter out primitive types in early stage. - - Type t = Type.getType(desc); - - onType(t); - } - } - } - - /** - * Annotation visitor. - */ - private static class CollectingAnnotationVisitor extends AnnotationVisitor { - /** */ - final CollectingContext ctx; - - /** - * Annotation visitor. - * - * @param ctx The collector. - */ - CollectingAnnotationVisitor(CollectingContext ctx) { - super(Opcodes.ASM4); - - this.ctx = ctx; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String name, String desc) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return this; - } - - /** {@inheritDoc} */ - @Override public void visitEnum(String name, String desc, String val) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitArray(String name) { - return ctx.found ? null : this; - } - - /** {@inheritDoc} */ - @Override public void visit(String name, Object val) { - if (ctx.found) - return; - - if (val instanceof Type) - ctx.onType((Type)val); - } - - /** {@inheritDoc} */ - @Override public void visitEnd() { - // No-op. - } - } - - /** - * Field visitor. - */ - private static class CollectingFieldVisitor extends FieldVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** - * Constructor. - */ - CollectingFieldVisitor(CollectingContext ctx, AnnotationVisitor av) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitAttribute(Attribute attr) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitEnd() { - // No-op. - } - } - - /** - * Class visitor. - */ - private static class CollectingClassVisitor extends ClassVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** Method visitor. */ - private final MethodVisitor mv; - - /** Field visitor. */ - private final FieldVisitor fv; - - /** - * Constructor. - * - * @param ctx Collector. - * @param av Annotation visitor. - * @param mv Method visitor. - * @param fv Field visitor. - */ - CollectingClassVisitor(CollectingContext ctx, AnnotationVisitor av, MethodVisitor mv, FieldVisitor fv) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - this.mv = mv; - this.fv = fv; - } - - /** {@inheritDoc} */ - @Override public void visit(int i, int i2, String name, String signature, String superName, String[] ifaces) { - if (ctx.found) - return; - - ctx.onInternalTypeName(superName); - - if (ctx.found) - return; - - if (ifaces != null) { - for (String iface : ifaces) { - ctx.onInternalTypeName(iface); - - if (ctx.found) - return; - } - } - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitInnerClass(String name, String outerName, String innerName, int i) { - if (ctx.found) - return; - - ctx.onInternalTypeName(name); - } - - /** {@inheritDoc} */ - @Override public FieldVisitor visitField(int i, String name, String desc, String signature, Object val) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : fv; - } - - /** {@inheritDoc} */ - @Override public MethodVisitor visitMethod(int i, String name, String desc, String signature, - String[] exceptions) { - if (ctx.found) - return null; - - ctx.onMethodsDesc(desc); - - // Process declared method exceptions: - if (exceptions != null) { - for (String e : exceptions) - ctx.onInternalTypeName(e); - } - - return ctx.found ? null : mv; - } - } - - /** - * Method visitor. - */ - private static class CollectingMethodVisitor extends MethodVisitor { - /** Collector. */ - private final CollectingContext ctx; - - /** Annotation visitor. */ - private final AnnotationVisitor av; - - /** - * Constructor. - * - * @param ctx Collector. - * @param av Annotation visitor. - */ - private CollectingMethodVisitor(CollectingContext ctx, AnnotationVisitor av) { - super(Opcodes.ASM4); - - this.ctx = ctx; - this.av = av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotation(String desc, boolean visible) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitParameterAnnotation(int i, String desc, boolean b) { - if (ctx.found) - return null; - - ctx.onType(desc); - - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public AnnotationVisitor visitAnnotationDefault() { - return ctx.found ? null : av; - } - - /** {@inheritDoc} */ - @Override public void visitFieldInsn(int opcode, String owner, String name, String desc) { - if (ctx.found) - return; - - ctx.onInternalTypeName(owner); - - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitInvokeDynamicInsn(String name, String desc, Handle bsm, Object... bsmArgs) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitFrame(int type, int nLoc, Object[] locTypes, int nStack, Object[] stackTypes) { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void visitLocalVariable(String name, String desc, String signature, Label lb, - Label lb2, int i) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitMethodInsn(int i, String owner, String name, String desc) { - if (ctx.found) - return; - - ctx.onInternalTypeName(owner); - - if (ctx.found) - return; - - ctx.onMethodsDesc(desc); - } - - /** {@inheritDoc} */ - @Override public void visitMultiANewArrayInsn(String desc, int dim) { - if (ctx.found) - return; - - ctx.onType(desc); - } - - /** {@inheritDoc} */ - @Override public void visitTryCatchBlock(Label start, Label end, Label hndl, String typeStr) { - if (ctx.found) - return; - - ctx.onInternalTypeName(typeStr); - } - - /** {@inheritDoc} */ - @Override public void visitTypeInsn(int opcode, String type) { - if (ctx.found) - return; - - ctx.onInternalTypeName(type); - } - } -}