http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java new file mode 100644 index 0000000..207fde2 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Job.java @@ -0,0 +1,450 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.split.JobSplit; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopDefaultJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopHelper; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; +import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Splitter; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; +import org.jsr166.ConcurrentHashMap8; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir; +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir; +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching; + +/** + * Hadoop job implementation for v2 API. + */ +public class HadoopV2Job implements HadoopJob { + /** */ + private final JobConf jobConf; + + /** */ + private final JobContextImpl jobCtx; + + /** */ + private final HadoopHelper helper; + + /** Hadoop job ID. */ + private final HadoopJobId jobId; + + /** Job info. */ + protected final HadoopJobInfo jobInfo; + + /** Native library names. */ + private final String[] libNames; + + /** */ + private final JobID hadoopJobID; + + /** */ + private final HadoopV2JobResourceManager rsrcMgr; + + /** */ + private final ConcurrentMap<T2<HadoopTaskType, Integer>, GridFutureAdapter<HadoopTaskContext>> ctxs = + new ConcurrentHashMap8<>(); + + /** Pooling task context class and thus class loading environment. */ + private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = new ConcurrentLinkedQueue<>(); + + /** All created contexts. */ + private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = new ConcurrentLinkedDeque<>(); + + /** File system cache map. */ + private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap(); + + /** Local node ID */ + private volatile UUID locNodeId; + + /** Serialized JobConf. */ + private volatile byte[] jobConfData; + + /** + * Constructor. + * + * @param jobId Job ID. + * @param jobInfo Job info. + * @param log Logger. + * @param libNames Optional additional native library names. + */ + public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, IgniteLogger log, + @Nullable String[] libNames, HadoopHelper helper) { + assert jobId != null; + assert jobInfo != null; + + this.jobId = jobId; + this.jobInfo = jobInfo; + this.libNames = libNames; + this.helper = helper; + + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + hadoopJobID = new JobID(jobId.globalId().toString(), jobId.localId()); + + jobConf = new JobConf(); + + HadoopFileSystemsUtils.setupFileSystems(jobConf); + + for (Map.Entry<String,String> e : jobInfo.properties().entrySet()) + jobConf.set(e.getKey(), e.getValue()); + + jobCtx = new JobContextImpl(jobConf, hadoopJobID); + + rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this); + } + finally { + HadoopUtils.setContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @Override public HadoopJobId id() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public HadoopJobInfo info() { + return jobInfo; + } + + /** {@inheritDoc} */ + @Override public Collection<HadoopInputSplit> input() throws IgniteCheckedException { + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf.getClassLoader()); + + try { + String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR); + + if (jobDirPath == null) { // Probably job was submitted not by hadoop client. + // Assume that we have needed classes and try to generate input splits ourself. + if (jobConf.getUseNewMapper()) + return HadoopV2Splitter.splitJob(jobCtx); + else + return HadoopV1Splitter.splitJob(jobConf); + } + + Path jobDir = new Path(jobDirPath); + + try { + FileSystem fs = fileSystem(jobDir.toUri(), jobConf); + + JobSplit.TaskSplitMetaInfo[] metaInfos = SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf, + jobDir); + + if (F.isEmpty(metaInfos)) + throw new IgniteCheckedException("No input splits found."); + + Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir); + + try (FSDataInputStream in = fs.open(splitsFile)) { + Collection<HadoopInputSplit> res = new ArrayList<>(metaInfos.length); + + for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) { + long off = metaInfo.getStartOffset(); + + String[] hosts = metaInfo.getLocations(); + + in.seek(off); + + String clsName = Text.readString(in); + + HadoopFileBlock block = HadoopV1Splitter.readFileBlock(clsName, in, hosts); + + if (block == null) + block = HadoopV2Splitter.readFileBlock(clsName, in, hosts); + + res.add(block != null ? block : new HadoopExternalSplit(hosts, off)); + } + + return res; + } + } + catch (Throwable e) { + if (e instanceof Error) + throw (Error)e; + else + throw transformException(e); + } + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings({"unchecked", "MismatchedQueryAndUpdateOfCollection" }) + @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) throws IgniteCheckedException { + T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(), info.taskNumber()); + + GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId); + + if (fut != null) + return fut.get(); + + GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, fut = new GridFutureAdapter<>()); + + if (old != null) + return old.get(); + + Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll(); + + try { + if (cls == null) { + // If there is no pooled class, then load new one. + // Note that the classloader identified by the task it was initially created for, + // but later it may be reused for other tasks. + HadoopClassLoader ldr = new HadoopClassLoader(rsrcMgr.classPath(), + HadoopClassLoader.nameForTask(info, false), libNames, helper); + + cls = (Class<? extends HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName()); + + fullCtxClsQueue.add(cls); + } + + Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, HadoopJob.class, + HadoopJobId.class, UUID.class, DataInput.class); + + if (jobConfData == null) + synchronized(jobConf) { + if (jobConfData == null) { + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + + jobConf.write(new DataOutputStream(buf)); + + jobConfData = buf.toByteArray(); + } + } + + HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, this, jobId, locNodeId, + new DataInputStream(new ByteArrayInputStream(jobConfData))); + + fut.onDone(res); + + return res; + } + catch (Throwable e) { + IgniteCheckedException te = transformException(e); + + fut.onDone(te); + + if (e instanceof Error) + throw (Error)e; + + throw te; + } + } + + /** {@inheritDoc} */ + @Override public void initialize(boolean external, UUID locNodeId) throws IgniteCheckedException { + assert locNodeId != null; + + this.locNodeId = locNodeId; + + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, jobId)); + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ThrowFromFinallyBlock") + @Override public void dispose(boolean external) throws IgniteCheckedException { + try { + if (rsrcMgr != null && !external) { + File jobLocDir = jobLocalDir(locNodeId, jobId); + + if (jobLocDir.exists()) + U.delete(jobLocDir); + } + } + finally { + taskCtxClsPool.clear(); + + Throwable err = null; + + // Stop the daemon threads that have been created + // with the task class loaders: + while (true) { + Class<? extends HadoopTaskContext> cls = fullCtxClsQueue.poll(); + + if (cls == null) + break; + + try { + final ClassLoader ldr = cls.getClassLoader(); + + try { + // Stop Hadoop daemons for this *task*: + stopHadoopFsDaemons(ldr); + } + catch (Exception e) { + if (err == null) + err = e; + } + + // Also close all the FileSystems cached in + // HadoopLazyConcurrentMap for this *task* class loader: + closeCachedTaskFileSystems(ldr); + } + catch (Throwable e) { + if (err == null) + err = e; + + if (e instanceof Error) + throw (Error)e; + } + } + + assert fullCtxClsQueue.isEmpty(); + + try { + // Close all cached file systems for this *Job*: + fsMap.close(); + } + catch (Exception e) { + if (err == null) + err = e; + } + + if (err != null) + throw U.cast(err); + } + } + + /** + * Stops Hadoop Fs daemon threads. + * @param ldr The task ClassLoader to stop the daemons for. + * @throws Exception On error. + */ + private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception { + Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.CLS_DAEMON); + + Method m = daemonCls.getMethod("dequeueAndStopAll"); + + m.invoke(null); + } + + /** + * Closes all the file systems user by task + * @param ldr The task class loader. + * @throws Exception On error. + */ + private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception { + Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName()); + + Method m = clazz.getMethod("close"); + + m.invoke(null); + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info)); + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws IgniteCheckedException { + HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), info.taskNumber())).get(); + + taskCtxClsPool.add(ctx.getClass()); + + File locDir = taskLocalDir(locNodeId, info); + + if (locDir.exists()) + U.delete(locDir); + } + + /** {@inheritDoc} */ + @Override public void cleanupStagingDirectory() { + rsrcMgr.cleanupStagingDirectory(); + } + + /** + * Getter for job configuration. + * @return The job configuration. + */ + public JobConf jobConf() { + return jobConf; + } + + /** + * Gets file system for this job. + * @param uri The uri. + * @param cfg The configuration. + * @return The file system. + * @throws IOException On error. + */ + public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException { + return fileSystemForMrUserWithCaching(uri, cfg, fsMap); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java new file mode 100644 index 0000000..7e105e3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2JobResourceManager.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.FileSystemException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.util.RunJar; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Provides all resources are needed to the job execution. Downloads the main jar, the configuration and additional + * files are needed to be placed on local files system. + */ +class HadoopV2JobResourceManager { + /** File type Fs disable caching property name. */ + private static final String FILE_DISABLE_CACHING_PROPERTY_NAME = + HadoopFileSystemsUtils.disableFsCachePropertyName("file"); + + /** Hadoop job context. */ + private final JobContextImpl ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Job ID. */ + private final HadoopJobId jobId; + + /** Class path list. */ + private URL[] clsPath; + + /** Set of local resources. */ + private final Collection<File> rsrcSet = new HashSet<>(); + + /** Staging directory to delivery job jar and config to the work nodes. */ + private Path stagingDir; + + /** The job. */ + private final HadoopV2Job job; + + /** + * Creates new instance. + * @param jobId Job ID. + * @param ctx Hadoop job context. + * @param log Logger. + */ + public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job job) { + this.jobId = jobId; + this.ctx = ctx; + this.log = log.getLogger(HadoopV2JobResourceManager.class); + this.job = job; + } + + /** + * Set working directory in local file system. + * + * @param dir Working directory. + * @throws IOException If fails. + */ + private void setLocalFSWorkingDirectory(File dir) throws IOException { + JobConf cfg = ctx.getJobConf(); + + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(cfg.getClassLoader()); + + try { + cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, dir.getAbsolutePath()); + + if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false)) + FileSystem.getLocal(cfg).setWorkingDirectory(new Path(dir.getAbsolutePath())); + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + + /** + * Prepare job resources. Resolve the classpath list and download it if needed. + * + * @param download {@code true} If need to download resources. + * @param jobLocDir Work directory for the job. + * @throws IgniteCheckedException If failed. + */ + public void prepareJobEnvironment(boolean download, File jobLocDir) throws IgniteCheckedException { + try { + if (jobLocDir.exists()) + throw new IgniteCheckedException("Local job directory already exists: " + jobLocDir.getAbsolutePath()); + + JobConf cfg = ctx.getJobConf(); + + String mrDir = cfg.get("mapreduce.job.dir"); + + if (mrDir != null) { + stagingDir = new Path(new URI(mrDir)); + + if (download) { + FileSystem fs = job.fileSystem(stagingDir.toUri(), cfg); + + if (!fs.exists(stagingDir)) + throw new IgniteCheckedException("Failed to find map-reduce submission " + + "directory (does not exist): " + stagingDir); + + if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg)) + throw new IgniteCheckedException("Failed to copy job submission directory " + + "contents to local file system " + + "[path=" + stagingDir + ", locDir=" + jobLocDir.getAbsolutePath() + + ", jobId=" + jobId + ']'); + } + + File jarJobFile = new File(jobLocDir, "job.jar"); + + Collection<URL> clsPathUrls = new ArrayList<>(); + + clsPathUrls.add(jarJobFile.toURI().toURL()); + + rsrcSet.add(jarJobFile); + rsrcSet.add(new File(jobLocDir, "job.xml")); + + processFiles(jobLocDir, ctx.getCacheFiles(), download, false, null, MRJobConfig.CACHE_LOCALFILES); + processFiles(jobLocDir, ctx.getCacheArchives(), download, true, null, MRJobConfig.CACHE_LOCALARCHIVES); + processFiles(jobLocDir, ctx.getFileClassPaths(), download, false, clsPathUrls, null); + processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, true, clsPathUrls, null); + + if (!clsPathUrls.isEmpty()) { + clsPath = new URL[clsPathUrls.size()]; + + clsPathUrls.toArray(clsPath); + } + } + else if (!jobLocDir.mkdirs()) + throw new IgniteCheckedException("Failed to create local job directory: " + + jobLocDir.getAbsolutePath()); + + setLocalFSWorkingDirectory(jobLocDir); + } + catch (URISyntaxException | IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Process list of resources. + * + * @param jobLocDir Job working directory. + * @param files Array of {@link java.net.URI} or {@link org.apache.hadoop.fs.Path} to process resources. + * @param download {@code true}, if need to download. Process class path only else. + * @param extract {@code true}, if need to extract archive. + * @param clsPathUrls Collection to add resource as classpath resource. + * @param rsrcNameProp Property for resource name array setting. + * @throws IOException If failed. + */ + private void processFiles(File jobLocDir, @Nullable Object[] files, boolean download, boolean extract, + @Nullable Collection<URL> clsPathUrls, @Nullable String rsrcNameProp) throws IOException { + if (F.isEmptyOrNulls(files)) + return; + + Collection<String> res = new ArrayList<>(); + + for (Object pathObj : files) { + Path srcPath; + + if (pathObj instanceof URI) { + URI uri = (URI)pathObj; + + srcPath = new Path(uri); + } + else + srcPath = (Path)pathObj; + + String locName = srcPath.getName(); + + File dstPath = new File(jobLocDir.getAbsolutePath(), locName); + + res.add(locName); + + rsrcSet.add(dstPath); + + if (clsPathUrls != null) + clsPathUrls.add(dstPath.toURI().toURL()); + + if (!download) + continue; + + JobConf cfg = ctx.getJobConf(); + + FileSystem dstFs = FileSystem.getLocal(cfg); + + FileSystem srcFs = job.fileSystem(srcPath.toUri(), cfg); + + if (extract) { + File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); + + if (!archivesPath.exists() && !archivesPath.mkdir()) + throw new IOException("Failed to create directory " + + "[path=" + archivesPath + ", jobId=" + jobId + ']'); + + File archiveFile = new File(archivesPath, locName); + + FileUtil.copy(srcFs, srcPath, dstFs, new Path(archiveFile.toString()), false, cfg); + + String archiveNameLC = archiveFile.getName().toLowerCase(); + + if (archiveNameLC.endsWith(".jar")) + RunJar.unJar(archiveFile, dstPath); + else if (archiveNameLC.endsWith(".zip")) + FileUtil.unZip(archiveFile, dstPath); + else if (archiveNameLC.endsWith(".tar.gz") || + archiveNameLC.endsWith(".tgz") || + archiveNameLC.endsWith(".tar")) + FileUtil.unTar(archiveFile, dstPath); + else + throw new IOException("Cannot unpack archive [path=" + srcPath + ", jobId=" + jobId + ']'); + } + else + FileUtil.copy(srcFs, srcPath, dstFs, new Path(dstPath.toString()), false, cfg); + } + + if (!res.isEmpty() && rsrcNameProp != null) + ctx.getJobConf().setStrings(rsrcNameProp, res.toArray(new String[res.size()])); + } + + /** + * Prepares working directory for the task. + * + * <ul> + * <li>Creates working directory.</li> + * <li>Creates symbolic links to all job resources in working directory.</li> + * </ul> + * + * @param path Path to working directory of the task. + * @throws IgniteCheckedException If fails. + */ + public void prepareTaskWorkDir(File path) throws IgniteCheckedException { + try { + if (path.exists()) + throw new IOException("Task local directory already exists: " + path); + + if (!path.mkdir()) + throw new IOException("Failed to create directory: " + path); + + for (File resource : rsrcSet) { + File symLink = new File(path, resource.getName()); + + try { + Files.createSymbolicLink(symLink.toPath(), resource.toPath()); + } + catch (IOException e) { + String msg = "Unable to create symlink \"" + symLink + "\" to \"" + resource + "\"."; + + if (U.isWindows() && e instanceof FileSystemException) + msg += "\n\nAbility to create symbolic links is required!\n" + + "On Windows platform you have to grant permission 'Create symbolic links'\n" + + "to your user or run the Accelerator as Administrator.\n"; + + throw new IOException(msg, e); + } + } + } + catch (IOException e) { + throw new IgniteCheckedException("Unable to prepare local working directory for the task " + + "[jobId=" + jobId + ", path=" + path+ ']', e); + } + } + + /** + * Cleans up job staging directory. + */ + public void cleanupStagingDirectory() { + try { + if (stagingDir != null) { + FileSystem fs = job.fileSystem(stagingDir.toUri(), ctx.getJobConf()); + + fs.delete(stagingDir, true); + } + } + catch (Exception e) { + log.error("Failed to remove job staging directory [path=" + stagingDir + ", jobId=" + jobId + ']' , e); + } + } + + /** + * Returns array of class path for current job. + * + * @return Class path collection. + */ + @Nullable public URL[] classPath() { + return clsPath; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java new file mode 100644 index 0000000..fafa79b --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2MapTask.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.map.WrappedMapper; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; + +/** + * Hadoop map task implementation for v2 API. + */ +public class HadoopV2MapTask extends HadoopV2Task { + /** + * @param taskInfo Task info. + */ + public HadoopV2MapTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions", "unchecked"}) + @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + OutputFormat outputFormat = null; + Exception err = null; + + JobContextImpl jobCtx = taskCtx.jobContext(); + + try { + InputSplit nativeSplit = hadoopContext().getInputSplit(); + + if (nativeSplit == null) + throw new IgniteCheckedException("Input split cannot be null."); + + InputFormat inFormat = ReflectionUtils.newInstance(jobCtx.getInputFormatClass(), + hadoopContext().getConfiguration()); + + RecordReader reader = inFormat.createRecordReader(nativeSplit, hadoopContext()); + + reader.initialize(nativeSplit, hadoopContext()); + + hadoopContext().reader(reader); + + HadoopJobInfo jobInfo = taskCtx.job().info(); + + outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? null : prepareWriter(jobCtx); + + Mapper mapper = ReflectionUtils.newInstance(jobCtx.getMapperClass(), hadoopContext().getConfiguration()); + + try { + mapper.run(new WrappedMapper().getMapContext(hadoopContext())); + } + finally { + closeWriter(); + } + + commit(outputFormat); + } + catch (InterruptedException e) { + err = e; + + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + catch (Exception e) { + err = e; + + throw new IgniteCheckedException(e); + } + finally { + if (err != null) + abort(outputFormat); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Partitioner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Partitioner.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Partitioner.java new file mode 100644 index 0000000..e199ede --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Partitioner.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; + +/** + * Hadoop partitioner adapter for v2 API. + */ +public class HadoopV2Partitioner implements HadoopPartitioner { + /** Partitioner instance. */ + private Partitioner<Object, Object> part; + + /** + * @param cls Hadoop partitioner class. + * @param conf Job configuration. + */ + public HadoopV2Partitioner(Class<? extends Partitioner<?, ?>> cls, Configuration conf) { + part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, conf); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key, Object val, int parts) { + return part.getPartition(key, val, parts); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java new file mode 100644 index 0000000..e5c2ed2 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2ReduceTask.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; + +/** + * Hadoop reduce task implementation for v2 API. + */ +public class HadoopV2ReduceTask extends HadoopV2Task { + /** {@code True} if reduce, {@code false} if combine. */ + private final boolean reduce; + + /** + * Constructor. + * + * @param taskInfo Task info. + * @param reduce {@code True} if reduce, {@code false} if combine. + */ + public HadoopV2ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) { + super(taskInfo); + + this.reduce = reduce; + } + + /** {@inheritDoc} */ + @SuppressWarnings({"ConstantConditions", "unchecked"}) + @Override public void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + OutputFormat outputFormat = null; + Exception err = null; + + JobContextImpl jobCtx = taskCtx.jobContext(); + + try { + outputFormat = reduce || !taskCtx.job().info().hasReducer() ? prepareWriter(jobCtx) : null; + + Reducer reducer; + if (reduce) reducer = ReflectionUtils.newInstance(jobCtx.getReducerClass(), + jobCtx.getConfiguration()); + else reducer = ReflectionUtils.newInstance(jobCtx.getCombinerClass(), + jobCtx.getConfiguration()); + + try { + reducer.run(new WrappedReducer().getReducerContext(hadoopContext())); + } + finally { + closeWriter(); + } + + commit(outputFormat); + } + catch (InterruptedException e) { + err = e; + + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + catch (Exception e) { + err = e; + + throw new IgniteCheckedException(e); + } + finally { + if (err != null) + abort(outputFormat); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2SetupTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2SetupTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2SetupTask.java new file mode 100644 index 0000000..49b5ee7 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2SetupTask.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.IOException; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; + +/** + * Hadoop setup task (prepares job). + */ +public class HadoopV2SetupTask extends HadoopV2Task { + /** + * Constructor. + * + * @param taskInfo task info. + */ + public HadoopV2SetupTask(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override protected void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException { + try { + JobContextImpl jobCtx = taskCtx.jobContext(); + + OutputFormat outputFormat = getOutputFormat(jobCtx); + + outputFormat.checkOutputSpecs(jobCtx); + + OutputCommitter committer = outputFormat.getOutputCommitter(hadoopContext()); + + if (committer != null) + committer.setupJob(jobCtx); + } + catch (ClassNotFoundException | IOException e) { + throw new IgniteCheckedException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java new file mode 100644 index 0000000..eaa5101 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Splitter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.DataInput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop API v2 splitter. + */ +public class HadoopV2Splitter { + /** */ + private static final String[] EMPTY_HOSTS = {}; + + /** + * @param ctx Job context. + * @return Collection of mapped splits. + * @throws IgniteCheckedException If mapping failed. + */ + public static Collection<HadoopInputSplit> splitJob(JobContext ctx) throws IgniteCheckedException { + try { + InputFormat<?, ?> format = ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration()); + + assert format != null; + + List<InputSplit> splits = format.getSplits(ctx); + + Collection<HadoopInputSplit> res = new ArrayList<>(splits.size()); + + int id = 0; + + for (InputSplit nativeSplit : splits) { + if (nativeSplit instanceof FileSplit) { + FileSplit s = (FileSplit)nativeSplit; + + res.add(new HadoopFileBlock(s.getLocations(), s.getPath().toUri(), s.getStart(), s.getLength())); + } + else + res.add(HadoopUtils.wrapSplit(id, nativeSplit, nativeSplit.getLocations())); + + id++; + } + + return res; + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteInterruptedCheckedException(e); + } + } + + /** + * @param clsName Input split class name. + * @param in Input stream. + * @param hosts Optional hosts. + * @return File block or {@code null} if it is not a {@link FileSplit} instance. + * @throws IgniteCheckedException If failed. + */ + public static HadoopFileBlock readFileBlock(String clsName, DataInput in, @Nullable String[] hosts) + throws IgniteCheckedException { + if (!FileSplit.class.getName().equals(clsName)) + return null; + + FileSplit split = new FileSplit(); + + try { + split.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + if (hosts == null) + hosts = EMPTY_HOSTS; + + return new HadoopFileBlock(hosts, split.getPath().toUri(), split.getStart(), split.getLength()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Task.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Task.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Task.java new file mode 100644 index 0000000..1383a61 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2Task.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.IOException; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTask; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.jetbrains.annotations.Nullable; + +/** + * Extended Hadoop v2 task. + */ +public abstract class HadoopV2Task extends HadoopTask { + /** Hadoop context. */ + private HadoopV2Context hadoopCtx; + + /** + * Constructor. + * + * @param taskInfo Task info. + */ + protected HadoopV2Task(HadoopTaskInfo taskInfo) { + super(taskInfo); + } + + /** {@inheritDoc} */ + @Override public void run(HadoopTaskContext taskCtx) throws IgniteCheckedException { + HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx; + + hadoopCtx = new HadoopV2Context(ctx); + + run0(ctx); + } + + /** + * Internal task routine. + * + * @param taskCtx Task context. + * @throws IgniteCheckedException + */ + protected abstract void run0(HadoopV2TaskContext taskCtx) throws IgniteCheckedException; + + /** + * @return hadoop context. + */ + protected HadoopV2Context hadoopContext() { + return hadoopCtx; + } + + /** + * Create and configure an OutputFormat instance. + * + * @param jobCtx Job context. + * @return Instance of OutputFormat is specified in job configuration. + * @throws ClassNotFoundException If specified class not found. + */ + protected OutputFormat getOutputFormat(JobContext jobCtx) throws ClassNotFoundException { + return ReflectionUtils.newInstance(jobCtx.getOutputFormatClass(), hadoopContext().getConfiguration()); + } + + /** + * Put write into Hadoop context and return associated output format instance. + * + * @param jobCtx Job context. + * @return Output format. + * @throws IgniteCheckedException In case of Grid exception. + * @throws InterruptedException In case of interrupt. + */ + protected OutputFormat prepareWriter(JobContext jobCtx) + throws IgniteCheckedException, InterruptedException { + try { + OutputFormat outputFormat = getOutputFormat(jobCtx); + + assert outputFormat != null; + + OutputCommitter outCommitter = outputFormat.getOutputCommitter(hadoopCtx); + + if (outCommitter != null) + outCommitter.setupTask(hadoopCtx); + + RecordWriter writer = outputFormat.getRecordWriter(hadoopCtx); + + hadoopCtx.writer(writer); + + return outputFormat; + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Closes writer. + * + * @throws Exception If fails and logger hasn't been specified. + */ + protected void closeWriter() throws Exception { + RecordWriter writer = hadoopCtx.writer(); + + if (writer != null) + writer.close(hadoopCtx); + } + + /** + * Setup task. + * + * @param outputFormat Output format. + * @throws IOException In case of IO exception. + * @throws InterruptedException In case of interrupt. + */ + protected void setup(@Nullable OutputFormat outputFormat) throws IOException, InterruptedException { + if (hadoopCtx.writer() != null) { + assert outputFormat != null; + + outputFormat.getOutputCommitter(hadoopCtx).setupTask(hadoopCtx); + } + } + + /** + * Commit task. + * + * @param outputFormat Output format. + * @throws IgniteCheckedException In case of Grid exception. + * @throws IOException In case of IO exception. + * @throws InterruptedException In case of interrupt. + */ + protected void commit(@Nullable OutputFormat outputFormat) throws IgniteCheckedException, IOException, InterruptedException { + if (hadoopCtx.writer() != null) { + assert outputFormat != null; + + OutputCommitter outputCommitter = outputFormat.getOutputCommitter(hadoopCtx); + + if (outputCommitter.needsTaskCommit(hadoopCtx)) + outputCommitter.commitTask(hadoopCtx); + } + } + + /** + * Abort task. + * + * @param outputFormat Output format. + */ + protected void abort(@Nullable OutputFormat outputFormat) { + if (hadoopCtx.writer() != null) { + assert outputFormat != null; + + try { + outputFormat.getOutputCommitter(hadoopCtx).abortTask(hadoopCtx); + } + catch (IOException ignore) { + // Ignore. + } + catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + hadoopCtx.cancel(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java new file mode 100644 index 0000000..8eb0006 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopV2TaskContext.java @@ -0,0 +1,560 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.DataInput; +import java.io.File; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Comparator; +import java.util.UUID; +import java.util.concurrent.Callable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.TaskID; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader; +import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.apache.ignite.internal.processors.hadoop.HadoopTask; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopTaskCancelledException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; +import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap; +import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1CleanupTask; +import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1MapTask; +import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Partitioner; +import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1ReduceTask; +import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1SetupTask; +import org.apache.ignite.internal.processors.igfs.IgfsUtils; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.jobLocalDir; +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.taskLocalDir; +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.transformException; +import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.unwrapSplit; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES; + +/** + * Context for task execution. + */ +public class HadoopV2TaskContext extends HadoopTaskContext { + /** */ + private static final boolean COMBINE_KEY_GROUPING_SUPPORTED; + + /** Lazy per-user file system cache used by the Hadoop task. */ + private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap + = createHadoopLazyConcurrentMap(); + + /** + * This method is called with reflection upon Job finish with class loader of each task. + * This will clean up all the Fs created for specific task. + * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders + * are different. + * + * @throws IgniteCheckedException On error. + */ + public static void close() throws IgniteCheckedException { + fsMap.close(); + } + + /** + * Check for combiner grouping support (available since Hadoop 2.3). + */ + static { + boolean ok; + + try { + JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator"); + + ok = true; + } + catch (NoSuchMethodException ignore) { + ok = false; + } + + COMBINE_KEY_GROUPING_SUPPORTED = ok; + } + + /** Flag is set if new context-object code is used for running the mapper. */ + private final boolean useNewMapper; + + /** Flag is set if new context-object code is used for running the reducer. */ + private final boolean useNewReducer; + + /** Flag is set if new context-object code is used for running the combiner. */ + private final boolean useNewCombiner; + + /** */ + private final JobContextImpl jobCtx; + + /** Set if task is to cancelling. */ + private volatile boolean cancelled; + + /** Current task. */ + private volatile HadoopTask task; + + /** Local node ID */ + private final UUID locNodeId; + + /** Counters for task. */ + private final HadoopCounters cntrs = new HadoopCountersImpl(); + + /** + * @param taskInfo Task info. + * @param job Job. + * @param jobId Job ID. + * @param locNodeId Local node ID. + * @param jobConfDataInput DataInput for read JobConf. + */ + public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, HadoopJobId jobId, + @Nullable UUID locNodeId, DataInput jobConfDataInput) throws IgniteCheckedException { + super(taskInfo, job); + this.locNodeId = locNodeId; + + // Before create JobConf instance we should set new context class loader. + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(getClass().getClassLoader()); + + try { + JobConf jobConf = new JobConf(); + + try { + jobConf.readFields(jobConfDataInput); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + // For map-reduce jobs prefer local writes. + jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true); + + jobCtx = new JobContextImpl(jobConf, new JobID(jobId.globalId().toString(), jobId.localId())); + + useNewMapper = jobConf.getUseNewMapper(); + useNewReducer = jobConf.getUseNewReducer(); + useNewCombiner = jobConf.getCombinerClass() == null; + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @Override public <T extends HadoopCounter> T counter(String grp, String name, Class<T> cls) { + return cntrs.counter(grp, name, cls); + } + + /** {@inheritDoc} */ + @Override public HadoopCounters counters() { + return cntrs; + } + + /** + * Creates appropriate task from current task info. + * + * @return Task. + */ + private HadoopTask createTask() { + boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT; + + switch (taskInfo().type()) { + case SETUP: + return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new HadoopV1SetupTask(taskInfo()); + + case MAP: + return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new HadoopV1MapTask(taskInfo()); + + case REDUCE: + return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), true) : + new HadoopV1ReduceTask(taskInfo(), true); + + case COMBINE: + return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), false) : + new HadoopV1ReduceTask(taskInfo(), false); + + case COMMIT: + case ABORT: + return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), isAbort) : + new HadoopV1CleanupTask(taskInfo(), isAbort); + + default: + return null; + } + } + + /** {@inheritDoc} */ + @Override public void run() throws IgniteCheckedException { + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader()); + + try { + try { + task = createTask(); + } + catch (Throwable e) { + if (e instanceof Error) + throw e; + + throw transformException(e); + } + + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + try { + task.run(this); + } + catch (Throwable e) { + if (e instanceof Error) + throw e; + + throw transformException(e); + } + } + finally { + task = null; + + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancelled = true; + + HadoopTask t = task; + + if (t != null) + t.cancel(); + } + + /** {@inheritDoc} */ + @Override public void prepareTaskEnvironment() throws IgniteCheckedException { + File locDir; + + switch(taskInfo().type()) { + case MAP: + case REDUCE: + job().prepareTaskEnvironment(taskInfo()); + + locDir = taskLocalDir(locNodeId, taskInfo()); + + break; + + default: + locDir = jobLocalDir(locNodeId, taskInfo().jobId()); + } + + ClassLoader oldLdr = HadoopUtils.setContextClassLoader(jobConf().getClassLoader()); + + try { + FileSystem.get(jobConf()); + + LocalFileSystem locFs = FileSystem.getLocal(jobConf()); + + locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath())); + } + catch (Throwable e) { + if (e instanceof Error) + throw (Error)e; + + throw transformException(e); + } + finally { + HadoopUtils.restoreContextClassLoader(oldLdr); + } + } + + /** {@inheritDoc} */ + @Override public void cleanupTaskEnvironment() throws IgniteCheckedException { + job().cleanupTaskEnvironment(taskInfo()); + } + + /** + * Creates Hadoop attempt ID. + * + * @return Attempt ID. + */ + public TaskAttemptID attemptId() { + TaskID tid = new TaskID(jobCtx.getJobID(), taskType(taskInfo().type()), taskInfo().taskNumber()); + + return new TaskAttemptID(tid, taskInfo().attempt()); + } + + /** + * @param type Task type. + * @return Hadoop task type. + */ + private TaskType taskType(HadoopTaskType type) { + switch (type) { + case SETUP: + return TaskType.JOB_SETUP; + case MAP: + case COMBINE: + return TaskType.MAP; + + case REDUCE: + return TaskType.REDUCE; + + case COMMIT: + case ABORT: + return TaskType.JOB_CLEANUP; + + default: + return null; + } + } + + /** + * Gets job configuration of the task. + * + * @return Job configuration. + */ + public JobConf jobConf() { + return jobCtx.getJobConf(); + } + + /** + * Gets job context of the task. + * + * @return Job context. + */ + public JobContextImpl jobContext() { + return jobCtx; + } + + /** {@inheritDoc} */ + @Override public HadoopPartitioner partitioner() throws IgniteCheckedException { + Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", null); + + if (partClsOld != null) + return new HadoopV1Partitioner(jobConf().getPartitionerClass(), jobConf()); + + try { + return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), jobConf()); + } + catch (ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + } + + /** + * Gets serializer for specified class. + * + * @param cls Class. + * @param jobConf Job configuration. + * @return Appropriate serializer. + */ + @SuppressWarnings("unchecked") + private HadoopSerialization getSerialization(Class<?> cls, Configuration jobConf) throws IgniteCheckedException { + A.notNull(cls, "cls"); + + SerializationFactory factory = new SerializationFactory(jobConf); + + Serialization<?> serialization = factory.getSerialization(cls); + + if (serialization == null) + throw new IgniteCheckedException("Failed to find serialization for: " + cls.getName()); + + if (serialization.getClass() == WritableSerialization.class) + return new HadoopWritableSerialization((Class<? extends Writable>)cls); + + return new HadoopSerializationWrapper(serialization, cls); + } + + /** {@inheritDoc} */ + @Override public HadoopSerialization keySerialization() throws IgniteCheckedException { + return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf()); + } + + /** {@inheritDoc} */ + @Override public HadoopSerialization valueSerialization() throws IgniteCheckedException { + return getSerialization(jobCtx.getMapOutputValueClass(), jobConf()); + } + + /** {@inheritDoc} */ + @Override public Comparator<Object> sortComparator() { + return (Comparator<Object>)jobCtx.getSortComparator(); + } + + /** {@inheritDoc} */ + @Override public Comparator<Object> groupComparator() { + Comparator<?> res; + + switch (taskInfo().type()) { + case COMBINE: + res = COMBINE_KEY_GROUPING_SUPPORTED ? + jobContext().getCombinerKeyGroupingComparator() : jobContext().getGroupingComparator(); + + break; + + case REDUCE: + res = jobContext().getGroupingComparator(); + + break; + + default: + return null; + } + + if (res != null && res.getClass() != sortComparator().getClass()) + return (Comparator<Object>)res; + + return null; + } + + /** + * @param split Split. + * @return Native Hadoop split. + * @throws IgniteCheckedException if failed. + */ + @SuppressWarnings("unchecked") + public Object getNativeSplit(HadoopInputSplit split) throws IgniteCheckedException { + if (split instanceof HadoopExternalSplit) + return readExternalSplit((HadoopExternalSplit)split); + + if (split instanceof HadoopSplitWrapper) + return unwrapSplit((HadoopSplitWrapper)split); + + throw new IllegalStateException("Unknown split: " + split); + } + + /** + * @param split External split. + * @return Native input split. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private Object readExternalSplit(HadoopExternalSplit split) throws IgniteCheckedException { + Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR)); + + FileSystem fs; + + try { + // This assertion uses .startsWith() instead of .equals() because task class loaders may + // be reused between tasks of the same job. + assert ((HadoopClassLoader)getClass().getClassLoader()).name() + .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true)); + + // We also cache Fs there, all them will be cleared explicitly upon the Job end. + fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + try ( + FSDataInputStream in = fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) { + + in.seek(split.offset()); + + String clsName = Text.readString(in); + + Class<?> cls = jobConf().getClassByName(clsName); + + assert cls != null; + + Serialization serialization = new SerializationFactory(jobConf()).getSerialization(cls); + + Deserializer deserializer = serialization.getDeserializer(cls); + + deserializer.open(in); + + Object res = deserializer.deserialize(null); + + deserializer.close(); + + assert res != null; + + return res; + } + catch (IOException | ClassNotFoundException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public <T> T runAsJobOwner(final Callable<T> c) throws IgniteCheckedException { + String user = job.info().user(); + + user = IgfsUtils.fixUserName(user); + + assert user != null; + + String ugiUser; + + try { + UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); + + assert currUser != null; + + ugiUser = currUser.getShortUserName(); + } + catch (IOException ioe) { + throw new IgniteCheckedException(ioe); + } + + try { + if (F.eq(user, ugiUser)) + // if current UGI context user is the same, do direct call: + return c.call(); + else { + UserGroupInformation ugi = UserGroupInformation.getBestUGI(null, user); + + return ugi.doAs(new PrivilegedExceptionAction<T>() { + @Override public T run() throws Exception { + return c.call(); + } + }); + } + } + catch (Exception e) { + throw new IgniteCheckedException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java new file mode 100644 index 0000000..f46f068 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/v2/HadoopWritableSerialization.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.v2; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Optimized serialization for Hadoop {@link Writable} types. + */ +public class HadoopWritableSerialization implements HadoopSerialization { + /** */ + private final Class<? extends Writable> cls; + + /** + * @param cls Class. + */ + public HadoopWritableSerialization(Class<? extends Writable> cls) { + assert cls != null; + + this.cls = cls; + } + + /** {@inheritDoc} */ + @Override public void write(DataOutput out, Object obj) throws IgniteCheckedException { + assert cls.isAssignableFrom(obj.getClass()) : cls + " " + obj.getClass(); + + try { + ((Writable)obj).write(out); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + } + + /** {@inheritDoc} */ + @Override public Object read(DataInput in, @Nullable Object obj) throws IgniteCheckedException { + Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj); + + try { + w.readFields(in); + } + catch (IOException e) { + throw new IgniteCheckedException(e); + } + + return w; + } + + /** {@inheritDoc} */ + @Override public void close() { + // No-op. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java deleted file mode 100644 index 090b336..0000000 --- a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/jobtracker/HadoopJobMetadata.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.jobtracker; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.Collection; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit; -import org.apache.ignite.internal.processors.hadoop.HadoopJobId; -import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; -import org.apache.ignite.internal.processors.hadoop.HadoopJobPhase; -import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; -import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_SETUP; - -/** - * Hadoop job metadata. Internal object used for distributed job state tracking. - */ -public class HadoopJobMetadata implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Job ID. */ - private HadoopJobId jobId; - - /** Job info. */ - private HadoopJobInfo jobInfo; - - /** Node submitted job. */ - private UUID submitNodeId; - - /** Map-reduce plan. */ - private HadoopMapReducePlan mrPlan; - - /** Pending splits for which mapper should be executed. */ - private Map<HadoopInputSplit, Integer> pendingSplits; - - /** Pending reducers. */ - private Collection<Integer> pendingReducers; - - /** Reducers addresses. */ - @GridToStringInclude - private Map<Integer, HadoopProcessDescriptor> reducersAddrs; - - /** Job phase. */ - private HadoopJobPhase phase = PHASE_SETUP; - - /** Fail cause. */ - @GridToStringExclude - private Throwable failCause; - - /** Version. */ - private long ver; - - /** Job counters */ - private HadoopCounters counters = new HadoopCountersImpl(); - - /** - * Empty constructor required by {@link Externalizable}. - */ - public HadoopJobMetadata() { - // No-op. - } - - /** - * Constructor. - * - * @param submitNodeId Submit node ID. - * @param jobId Job ID. - * @param jobInfo Job info. - */ - public HadoopJobMetadata(UUID submitNodeId, HadoopJobId jobId, HadoopJobInfo jobInfo) { - this.jobId = jobId; - this.jobInfo = jobInfo; - this.submitNodeId = submitNodeId; - } - - /** - * Copy constructor. - * - * @param src Metadata to copy. - */ - public HadoopJobMetadata(HadoopJobMetadata src) { - // Make sure to preserve alphabetic order. - counters = src.counters; - failCause = src.failCause; - jobId = src.jobId; - jobInfo = src.jobInfo; - mrPlan = src.mrPlan; - pendingSplits = src.pendingSplits; - pendingReducers = src.pendingReducers; - phase = src.phase; - reducersAddrs = src.reducersAddrs; - submitNodeId = src.submitNodeId; - ver = src.ver + 1; - } - - /** - * @return Submit node ID. - */ - public UUID submitNodeId() { - return submitNodeId; - } - - /** - * @param phase Job phase. - */ - public void phase(HadoopJobPhase phase) { - this.phase = phase; - } - - /** - * @return Job phase. - */ - public HadoopJobPhase phase() { - return phase; - } - - /** - * Gets reducers addresses for external execution. - * - * @return Reducers addresses. - */ - public Map<Integer, HadoopProcessDescriptor> reducersAddresses() { - return reducersAddrs; - } - - /** - * Sets reducers addresses for external execution. - * - * @param reducersAddrs Map of addresses. - */ - public void reducersAddresses(Map<Integer, HadoopProcessDescriptor> reducersAddrs) { - this.reducersAddrs = reducersAddrs; - } - - /** - * Sets collection of pending splits. - * - * @param pendingSplits Collection of pending splits. - */ - public void pendingSplits(Map<HadoopInputSplit, Integer> pendingSplits) { - this.pendingSplits = pendingSplits; - } - - /** - * Gets collection of pending splits. - * - * @return Collection of pending splits. - */ - public Map<HadoopInputSplit, Integer> pendingSplits() { - return pendingSplits; - } - - /** - * Sets collection of pending reducers. - * - * @param pendingReducers Collection of pending reducers. - */ - public void pendingReducers(Collection<Integer> pendingReducers) { - this.pendingReducers = pendingReducers; - } - - /** - * Gets collection of pending reducers. - * - * @return Collection of pending reducers. - */ - public Collection<Integer> pendingReducers() { - return pendingReducers; - } - - /** - * @return Job ID. - */ - public HadoopJobId jobId() { - return jobId; - } - - /** - * @param mrPlan Map-reduce plan. - */ - public void mapReducePlan(HadoopMapReducePlan mrPlan) { - assert this.mrPlan == null : "Map-reduce plan can only be initialized once."; - - this.mrPlan = mrPlan; - } - - /** - * @return Map-reduce plan. - */ - public HadoopMapReducePlan mapReducePlan() { - return mrPlan; - } - - /** - * @return Job info. - */ - public HadoopJobInfo jobInfo() { - return jobInfo; - } - - /** - * Returns job counters. - * - * @return Collection of counters. - */ - public HadoopCounters counters() { - return counters; - } - - /** - * Sets counters. - * - * @param counters Collection of counters. - */ - public void counters(HadoopCounters counters) { - this.counters = counters; - } - - /** - * @param failCause Fail cause. - */ - public void failCause(Throwable failCause) { - assert failCause != null; - - if (this.failCause == null) // Keep the first error. - this.failCause = failCause; - } - - /** - * @return Fail cause. - */ - public Throwable failCause() { - return failCause; - } - - /** - * @return Version. - */ - public long version() { - return ver; - } - - /** - * @param split Split. - * @return Task number. - */ - public int taskNumber(HadoopInputSplit split) { - return pendingSplits.get(split); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, submitNodeId); - out.writeObject(jobId); - out.writeObject(jobInfo); - out.writeObject(mrPlan); - out.writeObject(pendingSplits); - out.writeObject(pendingReducers); - out.writeObject(phase); - out.writeObject(failCause); - out.writeLong(ver); - out.writeObject(reducersAddrs); - out.writeObject(counters); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - submitNodeId = U.readUuid(in); - jobId = (HadoopJobId)in.readObject(); - jobInfo = (HadoopJobInfo)in.readObject(); - mrPlan = (HadoopMapReducePlan)in.readObject(); - pendingSplits = (Map<HadoopInputSplit,Integer>)in.readObject(); - pendingReducers = (Collection<Integer>)in.readObject(); - phase = (HadoopJobPhase)in.readObject(); - failCause = (Throwable)in.readObject(); - ver = in.readLong(); - reducersAddrs = (Map<Integer, HadoopProcessDescriptor>)in.readObject(); - counters = (HadoopCounters)in.readObject(); - } - - /** {@inheritDoc} */ - public String toString() { - return S.toString(HadoopJobMetadata.class, this, "pendingMaps", pendingSplits.size(), - "pendingReduces", pendingReducers.size(), "failCause", failCause == null ? null : - failCause.getClass().getName()); - } -} \ No newline at end of file