http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
deleted file mode 100644
index 4515131..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
-import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Splitter;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
-
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
-
-/**
- * Hadoop job implementation for v2 API.
- */
-public class HadoopV2Job implements HadoopJob {
-    /** */
-    private final JobConf jobConf;
-
-    /** */
-    private final JobContextImpl jobCtx;
-
-    /** */
-    private final HadoopHelper helper;
-
-    /** Hadoop job ID. */
-    private final HadoopJobId jobId;
-
-    /** Job info. */
-    protected final HadoopJobInfo jobInfo;
-
-    /** Native library names. */
-    private final String[] libNames;
-
-    /** */
-    private final JobID hadoopJobID;
-
-    /** */
-    private final HadoopV2JobResourceManager rsrcMgr;
-
-    /** */
-    private final ConcurrentMap<T2<HadoopTaskType, Integer>, 
GridFutureAdapter<HadoopTaskContext>> ctxs =
-        new ConcurrentHashMap8<>();
-
-    /** Pooling task context class and thus class loading environment. */
-    private final Queue<Class<? extends HadoopTaskContext>> taskCtxClsPool = 
new ConcurrentLinkedQueue<>();
-
-    /** All created contexts. */
-    private final Queue<Class<? extends HadoopTaskContext>> fullCtxClsQueue = 
new ConcurrentLinkedDeque<>();
-
-    /** File system cache map. */
-    private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = 
createHadoopLazyConcurrentMap();
-
-    /** Local node ID */
-    private volatile UUID locNodeId;
-
-    /** Serialized JobConf. */
-    private volatile byte[] jobConfData;
-
-    /**
-     * Constructor.
-     *
-     * @param jobId Job ID.
-     * @param jobInfo Job info.
-     * @param log Logger.
-     * @param libNames Optional additional native library names.
-     */
-    public HadoopV2Job(HadoopJobId jobId, final HadoopDefaultJobInfo jobInfo, 
IgniteLogger log,
-        @Nullable String[] libNames, HadoopHelper helper) {
-        assert jobId != null;
-        assert jobInfo != null;
-
-        this.jobId = jobId;
-        this.jobInfo = jobInfo;
-        this.libNames = libNames;
-        this.helper = helper;
-
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
-        try {
-            hadoopJobID = new JobID(jobId.globalId().toString(), 
jobId.localId());
-
-            jobConf = new JobConf();
-
-            HadoopFileSystemsUtils.setupFileSystems(jobConf);
-
-            for (Map.Entry<String,String> e : jobInfo.properties().entrySet())
-                jobConf.set(e.getKey(), e.getValue());
-
-            jobCtx = new JobContextImpl(jobConf, hadoopJobID);
-
-            rsrcMgr = new HadoopV2JobResourceManager(jobId, jobCtx, log, this);
-        }
-        finally {
-            HadoopUtils.setContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobId id() {
-        return jobId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopJobInfo info() {
-        return jobInfo;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<HadoopInputSplit> input() throws 
IgniteCheckedException {
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(jobConf.getClassLoader());
-
-        try {
-            String jobDirPath = jobConf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
-
-            if (jobDirPath == null) { // Probably job was submitted not by 
hadoop client.
-                // Assume that we have needed classes and try to generate 
input splits ourself.
-                if (jobConf.getUseNewMapper())
-                    return HadoopV2Splitter.splitJob(jobCtx);
-                else
-                    return HadoopV1Splitter.splitJob(jobConf);
-            }
-
-            Path jobDir = new Path(jobDirPath);
-
-            try {
-                FileSystem fs = fileSystem(jobDir.toUri(), jobConf);
-
-                JobSplit.TaskSplitMetaInfo[] metaInfos = 
SplitMetaInfoReader.readSplitMetaInfo(hadoopJobID, fs, jobConf,
-                    jobDir);
-
-                if (F.isEmpty(metaInfos))
-                    throw new IgniteCheckedException("No input splits found.");
-
-                Path splitsFile = JobSubmissionFiles.getJobSplitFile(jobDir);
-
-                try (FSDataInputStream in = fs.open(splitsFile)) {
-                    Collection<HadoopInputSplit> res = new 
ArrayList<>(metaInfos.length);
-
-                    for (JobSplit.TaskSplitMetaInfo metaInfo : metaInfos) {
-                        long off = metaInfo.getStartOffset();
-
-                        String[] hosts = metaInfo.getLocations();
-
-                        in.seek(off);
-
-                        String clsName = Text.readString(in);
-
-                        HadoopFileBlock block = 
HadoopV1Splitter.readFileBlock(clsName, in, hosts);
-
-                        if (block == null)
-                            block = HadoopV2Splitter.readFileBlock(clsName, 
in, hosts);
-
-                        res.add(block != null ? block : new 
HadoopExternalSplit(hosts, off));
-                    }
-
-                    return res;
-                }
-            }
-            catch (Throwable e) {
-                if (e instanceof Error)
-                    throw (Error)e;
-                else
-                    throw transformException(e);
-            }
-        }
-        finally {
-            HadoopUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked", "MismatchedQueryAndUpdateOfCollection" })
-    @Override public HadoopTaskContext getTaskContext(HadoopTaskInfo info) 
throws IgniteCheckedException {
-        T2<HadoopTaskType, Integer> locTaskId = new T2<>(info.type(),  
info.taskNumber());
-
-        GridFutureAdapter<HadoopTaskContext> fut = ctxs.get(locTaskId);
-
-        if (fut != null)
-            return fut.get();
-
-        GridFutureAdapter<HadoopTaskContext> old = ctxs.putIfAbsent(locTaskId, 
fut = new GridFutureAdapter<>());
-
-        if (old != null)
-            return old.get();
-
-        Class<? extends HadoopTaskContext> cls = taskCtxClsPool.poll();
-
-        try {
-            if (cls == null) {
-                // If there is no pooled class, then load new one.
-                // Note that the classloader identified by the task it was 
initially created for,
-                // but later it may be reused for other tasks.
-                HadoopClassLoader ldr = new 
HadoopClassLoader(rsrcMgr.classPath(),
-                    HadoopClassLoader.nameForTask(info, false), libNames, 
helper);
-
-                cls = (Class<? extends 
HadoopTaskContext>)ldr.loadClass(HadoopV2TaskContext.class.getName());
-
-                fullCtxClsQueue.add(cls);
-            }
-
-            Constructor<?> ctr = cls.getConstructor(HadoopTaskInfo.class, 
HadoopJob.class,
-                HadoopJobId.class, UUID.class, DataInput.class);
-
-            if (jobConfData == null)
-                synchronized(jobConf) {
-                    if (jobConfData == null) {
-                        ByteArrayOutputStream buf = new 
ByteArrayOutputStream();
-
-                        jobConf.write(new DataOutputStream(buf));
-
-                        jobConfData = buf.toByteArray();
-                    }
-                }
-
-            HadoopTaskContext res = (HadoopTaskContext)ctr.newInstance(info, 
this, jobId, locNodeId,
-                new DataInputStream(new ByteArrayInputStream(jobConfData)));
-
-            fut.onDone(res);
-
-            return res;
-        }
-        catch (Throwable e) {
-            IgniteCheckedException te = transformException(e);
-
-            fut.onDone(te);
-
-            if (e instanceof Error)
-                throw (Error)e;
-
-            throw te;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void initialize(boolean external, UUID locNodeId) throws 
IgniteCheckedException {
-        assert locNodeId != null;
-
-        this.locNodeId = locNodeId;
-
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
-        try {
-            rsrcMgr.prepareJobEnvironment(!external, jobLocalDir(locNodeId, 
jobId));
-        }
-        finally {
-            HadoopUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ThrowFromFinallyBlock")
-    @Override public void dispose(boolean external) throws 
IgniteCheckedException {
-        try {
-            if (rsrcMgr != null && !external) {
-                File jobLocDir = jobLocalDir(locNodeId, jobId);
-
-                if (jobLocDir.exists())
-                    U.delete(jobLocDir);
-            }
-        }
-        finally {
-            taskCtxClsPool.clear();
-
-            Throwable err = null;
-
-            // Stop the daemon threads that have been created
-            // with the task class loaders:
-            while (true) {
-                Class<? extends HadoopTaskContext> cls = 
fullCtxClsQueue.poll();
-
-                if (cls == null)
-                    break;
-
-                try {
-                    final ClassLoader ldr = cls.getClassLoader();
-
-                    try {
-                        // Stop Hadoop daemons for this *task*:
-                        stopHadoopFsDaemons(ldr);
-                    }
-                    catch (Exception e) {
-                        if (err == null)
-                            err = e;
-                    }
-
-                    // Also close all the FileSystems cached in
-                    // HadoopLazyConcurrentMap for this *task* class loader:
-                    closeCachedTaskFileSystems(ldr);
-                }
-                catch (Throwable e) {
-                    if (err == null)
-                        err = e;
-
-                    if (e instanceof Error)
-                        throw (Error)e;
-                }
-            }
-
-            assert fullCtxClsQueue.isEmpty();
-
-            try {
-                // Close all cached file systems for this *Job*:
-                fsMap.close();
-            }
-            catch (Exception e) {
-                if (err == null)
-                    err = e;
-            }
-
-            if (err != null)
-                throw U.cast(err);
-        }
-    }
-
-    /**
-     * Stops Hadoop Fs daemon threads.
-     * @param ldr The task ClassLoader to stop the daemons for.
-     * @throws Exception On error.
-     */
-    private void stopHadoopFsDaemons(ClassLoader ldr) throws Exception {
-        Class<?> daemonCls = ldr.loadClass(HadoopClassLoader.CLS_DAEMON);
-
-        Method m = daemonCls.getMethod("dequeueAndStopAll");
-
-        m.invoke(null);
-    }
-
-    /**
-     * Closes all the file systems user by task
-     * @param ldr The task class loader.
-     * @throws Exception On error.
-     */
-    private void closeCachedTaskFileSystems(ClassLoader ldr) throws Exception {
-        Class<?> clazz = ldr.loadClass(HadoopV2TaskContext.class.getName());
-
-        Method m = clazz.getMethod("close");
-
-        m.invoke(null);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
-        rsrcMgr.prepareTaskWorkDir(taskLocalDir(locNodeId, info));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cleanupTaskEnvironment(HadoopTaskInfo info) throws 
IgniteCheckedException {
-        HadoopTaskContext ctx = ctxs.remove(new T2<>(info.type(), 
info.taskNumber())).get();
-
-        taskCtxClsPool.add(ctx.getClass());
-
-        File locDir = taskLocalDir(locNodeId, info);
-
-        if (locDir.exists())
-            U.delete(locDir);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cleanupStagingDirectory() {
-        rsrcMgr.cleanupStagingDirectory();
-    }
-
-    /**
-     * Getter for job configuration.
-     * @return The job configuration.
-     */
-    public JobConf jobConf() {
-        return jobConf;
-    }
-
-    /**
-     * Gets file system for this job.
-     * @param uri The uri.
-     * @param cfg The configuration.
-     * @return The file system.
-     * @throws IOException On error.
-     */
-    public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws 
IOException {
-        return fileSystemForMrUserWithCaching(uri, cfg, fsMap);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
deleted file mode 100644
index 33aef60..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.FileSystemException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.util.RunJar;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Provides all resources are needed to the job execution. Downloads the main 
jar, the configuration and additional
- * files are needed to be placed on local files system.
- */
-class HadoopV2JobResourceManager {
-    /** File type Fs disable caching property name. */
-    private static final String FILE_DISABLE_CACHING_PROPERTY_NAME =
-        HadoopFileSystemsUtils.disableFsCachePropertyName("file");
-
-    /** Hadoop job context. */
-    private final JobContextImpl ctx;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /** Job ID. */
-    private final HadoopJobId jobId;
-
-    /** Class path list. */
-    private URL[] clsPath;
-
-    /** Set of local resources. */
-    private final Collection<File> rsrcSet = new HashSet<>();
-
-    /** Staging directory to delivery job jar and config to the work nodes. */
-    private Path stagingDir;
-
-    /** The job. */
-    private final HadoopV2Job job;
-
-    /**
-     * Creates new instance.
-     * @param jobId Job ID.
-     * @param ctx Hadoop job context.
-     * @param log Logger.
-     */
-    public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, 
IgniteLogger log, HadoopV2Job job) {
-        this.jobId = jobId;
-        this.ctx = ctx;
-        this.log = log.getLogger(HadoopV2JobResourceManager.class);
-        this.job = job;
-    }
-
-    /**
-     * Set working directory in local file system.
-     *
-     * @param dir Working directory.
-     * @throws IOException If fails.
-     */
-    private void setLocalFSWorkingDirectory(File dir) throws IOException {
-        JobConf cfg = ctx.getJobConf();
-
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(cfg.getClassLoader());
-
-        try {
-            cfg.set(HadoopFileSystemsUtils.LOC_FS_WORK_DIR_PROP, 
dir.getAbsolutePath());
-
-            if (!cfg.getBoolean(FILE_DISABLE_CACHING_PROPERTY_NAME, false))
-                FileSystem.getLocal(cfg).setWorkingDirectory(new 
Path(dir.getAbsolutePath()));
-        }
-        finally {
-            HadoopUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /**
-     * Prepare job resources. Resolve the classpath list and download it if 
needed.
-     *
-     * @param download {@code true} If need to download resources.
-     * @param jobLocDir Work directory for the job.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void prepareJobEnvironment(boolean download, File jobLocDir) throws 
IgniteCheckedException {
-        try {
-            if (jobLocDir.exists())
-                throw new IgniteCheckedException("Local job directory already 
exists: " + jobLocDir.getAbsolutePath());
-
-            JobConf cfg = ctx.getJobConf();
-
-            String mrDir = cfg.get("mapreduce.job.dir");
-
-            if (mrDir != null) {
-                stagingDir = new Path(new URI(mrDir));
-
-                if (download) {
-                    FileSystem fs = job.fileSystem(stagingDir.toUri(), cfg);
-
-                    if (!fs.exists(stagingDir))
-                        throw new IgniteCheckedException("Failed to find 
map-reduce submission " +
-                            "directory (does not exist): " + stagingDir);
-
-                    if (!FileUtil.copy(fs, stagingDir, jobLocDir, false, cfg))
-                        throw new IgniteCheckedException("Failed to copy job 
submission directory "
-                            + "contents to local file system "
-                            + "[path=" + stagingDir + ", locDir=" + 
jobLocDir.getAbsolutePath()
-                            + ", jobId=" + jobId + ']');
-                }
-
-                File jarJobFile = new File(jobLocDir, "job.jar");
-
-                Collection<URL> clsPathUrls = new ArrayList<>();
-
-                clsPathUrls.add(jarJobFile.toURI().toURL());
-
-                rsrcSet.add(jarJobFile);
-                rsrcSet.add(new File(jobLocDir, "job.xml"));
-
-                processFiles(jobLocDir, ctx.getCacheFiles(), download, false, 
null, MRJobConfig.CACHE_LOCALFILES);
-                processFiles(jobLocDir, ctx.getCacheArchives(), download, 
true, null, MRJobConfig.CACHE_LOCALARCHIVES);
-                processFiles(jobLocDir, ctx.getFileClassPaths(), download, 
false, clsPathUrls, null);
-                processFiles(jobLocDir, ctx.getArchiveClassPaths(), download, 
true, clsPathUrls, null);
-
-                if (!clsPathUrls.isEmpty()) {
-                    clsPath = new URL[clsPathUrls.size()];
-
-                    clsPathUrls.toArray(clsPath);
-                }
-            }
-            else if (!jobLocDir.mkdirs())
-                throw new IgniteCheckedException("Failed to create local job 
directory: "
-                    + jobLocDir.getAbsolutePath());
-
-            setLocalFSWorkingDirectory(jobLocDir);
-        }
-        catch (URISyntaxException | IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /**
-     * Process list of resources.
-     *
-     * @param jobLocDir Job working directory.
-     * @param files Array of {@link java.net.URI} or {@link 
org.apache.hadoop.fs.Path} to process resources.
-     * @param download {@code true}, if need to download. Process class path 
only else.
-     * @param extract {@code true}, if need to extract archive.
-     * @param clsPathUrls Collection to add resource as classpath resource.
-     * @param rsrcNameProp Property for resource name array setting.
-     * @throws IOException If failed.
-     */
-    private void processFiles(File jobLocDir, @Nullable Object[] files, 
boolean download, boolean extract,
-        @Nullable Collection<URL> clsPathUrls, @Nullable String rsrcNameProp) 
throws IOException {
-        if (F.isEmptyOrNulls(files))
-            return;
-
-        Collection<String> res = new ArrayList<>();
-
-        for (Object pathObj : files) {
-            Path srcPath;
-
-            if (pathObj instanceof URI) {
-                URI uri = (URI)pathObj;
-
-                srcPath = new Path(uri);
-            }
-            else
-                srcPath = (Path)pathObj;
-
-            String locName = srcPath.getName();
-
-            File dstPath = new File(jobLocDir.getAbsolutePath(), locName);
-
-            res.add(locName);
-
-            rsrcSet.add(dstPath);
-
-            if (clsPathUrls != null)
-                clsPathUrls.add(dstPath.toURI().toURL());
-
-            if (!download)
-                continue;
-
-            JobConf cfg = ctx.getJobConf();
-
-            FileSystem dstFs = FileSystem.getLocal(cfg);
-
-            FileSystem srcFs = job.fileSystem(srcPath.toUri(), cfg);
-
-            if (extract) {
-                File archivesPath = new File(jobLocDir.getAbsolutePath(), 
".cached-archives");
-
-                if (!archivesPath.exists() && !archivesPath.mkdir())
-                    throw new IOException("Failed to create directory " +
-                        "[path=" + archivesPath + ", jobId=" + jobId + ']');
-
-                File archiveFile = new File(archivesPath, locName);
-
-                FileUtil.copy(srcFs, srcPath, dstFs, new 
Path(archiveFile.toString()), false, cfg);
-
-                String archiveNameLC = archiveFile.getName().toLowerCase();
-
-                if (archiveNameLC.endsWith(".jar"))
-                    RunJar.unJar(archiveFile, dstPath);
-                else if (archiveNameLC.endsWith(".zip"))
-                    FileUtil.unZip(archiveFile, dstPath);
-                else if (archiveNameLC.endsWith(".tar.gz") ||
-                    archiveNameLC.endsWith(".tgz") ||
-                    archiveNameLC.endsWith(".tar"))
-                    FileUtil.unTar(archiveFile, dstPath);
-                else
-                    throw new IOException("Cannot unpack archive [path=" + 
srcPath + ", jobId=" + jobId + ']');
-            }
-            else
-                FileUtil.copy(srcFs, srcPath, dstFs, new 
Path(dstPath.toString()), false, cfg);
-        }
-
-        if (!res.isEmpty() && rsrcNameProp != null)
-            ctx.getJobConf().setStrings(rsrcNameProp, res.toArray(new 
String[res.size()]));
-    }
-
-    /**
-     * Prepares working directory for the task.
-     *
-     * <ul>
-     *     <li>Creates working directory.</li>
-     *     <li>Creates symbolic links to all job resources in working 
directory.</li>
-     * </ul>
-     *
-     * @param path Path to working directory of the task.
-     * @throws IgniteCheckedException If fails.
-     */
-    public void prepareTaskWorkDir(File path) throws IgniteCheckedException {
-        try {
-            if (path.exists())
-                throw new IOException("Task local directory already exists: " 
+ path);
-
-            if (!path.mkdir())
-                throw new IOException("Failed to create directory: " + path);
-
-            for (File resource : rsrcSet) {
-                File symLink = new File(path, resource.getName());
-
-                try {
-                    Files.createSymbolicLink(symLink.toPath(), 
resource.toPath());
-                }
-                catch (IOException e) {
-                    String msg = "Unable to create symlink \"" + symLink + "\" 
to \"" + resource + "\".";
-
-                    if (U.isWindows() && e instanceof FileSystemException)
-                        msg += "\n\nAbility to create symbolic links is 
required!\n" +
-                                "On Windows platform you have to grant 
permission 'Create symbolic links'\n" +
-                                "to your user or run the Accelerator as 
Administrator.\n";
-
-                    throw new IOException(msg, e);
-                }
-            }
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException("Unable to prepare local working 
directory for the task " +
-                 "[jobId=" + jobId + ", path=" + path+ ']', e);
-        }
-    }
-
-    /**
-     * Cleans up job staging directory.
-     */
-    public void cleanupStagingDirectory() {
-        try {
-            if (stagingDir != null) {
-                FileSystem fs = job.fileSystem(stagingDir.toUri(), 
ctx.getJobConf());
-
-                fs.delete(stagingDir, true);
-            }
-        }
-        catch (Exception e) {
-            log.error("Failed to remove job staging directory [path=" + 
stagingDir + ", jobId=" + jobId + ']' , e);
-        }
-    }
-
-    /**
-     * Returns array of class path for current job.
-     *
-     * @return Class path collection.
-     */
-    @Nullable public URL[] classPath() {
-        return clsPath;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
deleted file mode 100644
index fafa79b..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2MapTask.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-
-/**
- * Hadoop map task implementation for v2 API.
- */
-public class HadoopV2MapTask extends HadoopV2Task {
-    /**
-     * @param taskInfo Task info.
-     */
-    public HadoopV2MapTask(HadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"ConstantConditions", "unchecked"})
-    @Override public void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
-        OutputFormat outputFormat = null;
-        Exception err = null;
-
-        JobContextImpl jobCtx = taskCtx.jobContext();
-
-        try {
-            InputSplit nativeSplit = hadoopContext().getInputSplit();
-
-            if (nativeSplit == null)
-                throw new IgniteCheckedException("Input split cannot be 
null.");
-
-            InputFormat inFormat = 
ReflectionUtils.newInstance(jobCtx.getInputFormatClass(),
-                hadoopContext().getConfiguration());
-
-            RecordReader reader = inFormat.createRecordReader(nativeSplit, 
hadoopContext());
-
-            reader.initialize(nativeSplit, hadoopContext());
-
-            hadoopContext().reader(reader);
-
-            HadoopJobInfo jobInfo = taskCtx.job().info();
-
-            outputFormat = jobInfo.hasCombiner() || jobInfo.hasReducer() ? 
null : prepareWriter(jobCtx);
-
-            Mapper mapper = 
ReflectionUtils.newInstance(jobCtx.getMapperClass(), 
hadoopContext().getConfiguration());
-
-            try {
-                mapper.run(new WrappedMapper().getMapContext(hadoopContext()));
-            }
-            finally {
-                closeWriter();
-            }
-
-            commit(outputFormat);
-        }
-        catch (InterruptedException e) {
-            err = e;
-
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
-        catch (Exception e) {
-            err = e;
-
-            throw new IgniteCheckedException(e);
-        }
-        finally {
-            if (err != null)
-                abort(outputFormat);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
deleted file mode 100644
index e199ede..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Partitioner.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Partitioner;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
-
-/**
- * Hadoop partitioner adapter for v2 API.
- */
-public class HadoopV2Partitioner implements HadoopPartitioner {
-    /** Partitioner instance. */
-    private Partitioner<Object, Object> part;
-
-    /**
-     * @param cls Hadoop partitioner class.
-     * @param conf Job configuration.
-     */
-    public HadoopV2Partitioner(Class<? extends Partitioner<?, ?>> cls, 
Configuration conf) {
-        part = (Partitioner<Object, Object>) ReflectionUtils.newInstance(cls, 
conf);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int partition(Object key, Object val, int parts) {
-        return part.getPartition(key, val, parts);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
deleted file mode 100644
index e5c2ed2..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2ReduceTask.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-
-/**
- * Hadoop reduce task implementation for v2 API.
- */
-public class HadoopV2ReduceTask extends HadoopV2Task {
-    /** {@code True} if reduce, {@code false} if combine. */
-    private final boolean reduce;
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     * @param reduce {@code True} if reduce, {@code false} if combine.
-     */
-    public HadoopV2ReduceTask(HadoopTaskInfo taskInfo, boolean reduce) {
-        super(taskInfo);
-
-        this.reduce = reduce;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"ConstantConditions", "unchecked"})
-    @Override public void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
-        OutputFormat outputFormat = null;
-        Exception err = null;
-
-        JobContextImpl jobCtx = taskCtx.jobContext();
-
-        try {
-            outputFormat = reduce || !taskCtx.job().info().hasReducer() ? 
prepareWriter(jobCtx) : null;
-
-            Reducer reducer;
-            if (reduce) reducer = 
ReflectionUtils.newInstance(jobCtx.getReducerClass(),
-                jobCtx.getConfiguration());
-            else reducer = 
ReflectionUtils.newInstance(jobCtx.getCombinerClass(),
-                jobCtx.getConfiguration());
-
-            try {
-                reducer.run(new 
WrappedReducer().getReducerContext(hadoopContext()));
-            }
-            finally {
-                closeWriter();
-            }
-
-            commit(outputFormat);
-        }
-        catch (InterruptedException e) {
-            err = e;
-
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
-        catch (Exception e) {
-            err = e;
-
-            throw new IgniteCheckedException(e);
-        }
-        finally {
-            if (err != null)
-                abort(outputFormat);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
deleted file mode 100644
index 49b5ee7..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2SetupTask.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.IOException;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-
-/**
- * Hadoop setup task (prepares job).
- */
-public class HadoopV2SetupTask extends HadoopV2Task {
-    /**
-     * Constructor.
-     *
-     * @param taskInfo task info.
-     */
-    public HadoopV2SetupTask(HadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ConstantConditions")
-    @Override protected void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException {
-        try {
-            JobContextImpl jobCtx = taskCtx.jobContext();
-
-            OutputFormat outputFormat = getOutputFormat(jobCtx);
-
-            outputFormat.checkOutputSpecs(jobCtx);
-
-            OutputCommitter committer = 
outputFormat.getOutputCommitter(hadoopContext());
-
-            if (committer != null)
-                committer.setupJob(jobCtx);
-        }
-        catch (ClassNotFoundException | IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
deleted file mode 100644
index f4ed668..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Splitter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Hadoop API v2 splitter.
- */
-public class HadoopV2Splitter {
-    /** */
-    private static final String[] EMPTY_HOSTS = {};
-
-    /**
-     * @param ctx Job context.
-     * @return Collection of mapped splits.
-     * @throws IgniteCheckedException If mapping failed.
-     */
-    public static Collection<HadoopInputSplit> splitJob(JobContext ctx) throws 
IgniteCheckedException {
-        try {
-            InputFormat<?, ?> format = 
ReflectionUtils.newInstance(ctx.getInputFormatClass(), ctx.getConfiguration());
-
-            assert format != null;
-
-            List<InputSplit> splits = format.getSplits(ctx);
-
-            Collection<HadoopInputSplit> res = new ArrayList<>(splits.size());
-
-            int id = 0;
-
-            for (InputSplit nativeSplit : splits) {
-                if (nativeSplit instanceof FileSplit) {
-                    FileSplit s = (FileSplit)nativeSplit;
-
-                    res.add(new HadoopFileBlock(s.getLocations(), 
s.getPath().toUri(), s.getStart(), s.getLength()));
-                }
-                else
-                    res.add(HadoopUtils.wrapSplit(id, nativeSplit, 
nativeSplit.getLocations()));
-
-                id++;
-            }
-
-            return res;
-        }
-        catch (IOException | ClassNotFoundException e) {
-            throw new IgniteCheckedException(e);
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException(e);
-        }
-    }
-
-    /**
-     * @param clsName Input split class name.
-     * @param in Input stream.
-     * @param hosts Optional hosts.
-     * @return File block or {@code null} if it is not a {@link FileSplit} 
instance.
-     * @throws IgniteCheckedException If failed.
-     */
-    public static HadoopFileBlock readFileBlock(String clsName, DataInput in, 
@Nullable String[] hosts)
-        throws IgniteCheckedException {
-        if (!FileSplit.class.getName().equals(clsName))
-            return null;
-
-        FileSplit split = new FileSplit();
-
-        try {
-            split.readFields(in);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        if (hosts == null)
-            hosts = EMPTY_HOSTS;
-
-        return new HadoopFileBlock(hosts, split.getPath().toUri(), 
split.getStart(), split.getLength());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
deleted file mode 100644
index 1383a61..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Task.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.IOException;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTask;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Extended Hadoop v2 task.
- */
-public abstract class HadoopV2Task extends HadoopTask {
-    /** Hadoop context. */
-    private HadoopV2Context hadoopCtx;
-
-    /**
-     * Constructor.
-     *
-     * @param taskInfo Task info.
-     */
-    protected HadoopV2Task(HadoopTaskInfo taskInfo) {
-        super(taskInfo);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
-        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
-
-        hadoopCtx = new HadoopV2Context(ctx);
-
-        run0(ctx);
-    }
-
-    /**
-     * Internal task routine.
-     *
-     * @param taskCtx Task context.
-     * @throws IgniteCheckedException
-     */
-    protected abstract void run0(HadoopV2TaskContext taskCtx) throws 
IgniteCheckedException;
-
-    /**
-     * @return hadoop context.
-     */
-    protected HadoopV2Context hadoopContext() {
-        return hadoopCtx;
-    }
-
-    /**
-     * Create and configure an OutputFormat instance.
-     *
-     * @param jobCtx Job context.
-     * @return Instance of OutputFormat is specified in job configuration.
-     * @throws ClassNotFoundException If specified class not found.
-     */
-    protected OutputFormat getOutputFormat(JobContext jobCtx) throws 
ClassNotFoundException {
-        return ReflectionUtils.newInstance(jobCtx.getOutputFormatClass(), 
hadoopContext().getConfiguration());
-    }
-
-    /**
-     * Put write into Hadoop context and return associated output format 
instance.
-     *
-     * @param jobCtx Job context.
-     * @return Output format.
-     * @throws IgniteCheckedException In case of Grid exception.
-     * @throws InterruptedException In case of interrupt.
-     */
-    protected OutputFormat prepareWriter(JobContext jobCtx)
-        throws IgniteCheckedException, InterruptedException {
-        try {
-            OutputFormat outputFormat = getOutputFormat(jobCtx);
-
-            assert outputFormat != null;
-
-            OutputCommitter outCommitter = 
outputFormat.getOutputCommitter(hadoopCtx);
-
-            if (outCommitter != null)
-                outCommitter.setupTask(hadoopCtx);
-
-            RecordWriter writer = outputFormat.getRecordWriter(hadoopCtx);
-
-            hadoopCtx.writer(writer);
-
-            return outputFormat;
-        }
-        catch (IOException | ClassNotFoundException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /**
-     * Closes writer.
-     *
-     * @throws Exception If fails and logger hasn't been specified.
-     */
-    protected void closeWriter() throws Exception {
-        RecordWriter writer = hadoopCtx.writer();
-
-        if (writer != null)
-            writer.close(hadoopCtx);
-    }
-
-    /**
-     * Setup task.
-     *
-     * @param outputFormat Output format.
-     * @throws IOException In case of IO exception.
-     * @throws InterruptedException In case of interrupt.
-     */
-    protected void setup(@Nullable OutputFormat outputFormat) throws 
IOException, InterruptedException {
-        if (hadoopCtx.writer() != null) {
-            assert outputFormat != null;
-
-            outputFormat.getOutputCommitter(hadoopCtx).setupTask(hadoopCtx);
-        }
-    }
-
-    /**
-     * Commit task.
-     *
-     * @param outputFormat Output format.
-     * @throws IgniteCheckedException In case of Grid exception.
-     * @throws IOException In case of IO exception.
-     * @throws InterruptedException In case of interrupt.
-     */
-    protected void commit(@Nullable OutputFormat outputFormat) throws 
IgniteCheckedException, IOException, InterruptedException {
-        if (hadoopCtx.writer() != null) {
-            assert outputFormat != null;
-
-            OutputCommitter outputCommitter = 
outputFormat.getOutputCommitter(hadoopCtx);
-
-            if (outputCommitter.needsTaskCommit(hadoopCtx))
-                outputCommitter.commitTask(hadoopCtx);
-        }
-    }
-
-    /**
-     * Abort task.
-     *
-     * @param outputFormat Output format.
-     */
-    protected void abort(@Nullable OutputFormat outputFormat) {
-        if (hadoopCtx.writer() != null) {
-            assert outputFormat != null;
-
-            try {
-                
outputFormat.getOutputCommitter(hadoopCtx).abortTask(hadoopCtx);
-            }
-            catch (IOException ignore) {
-                // Ignore.
-            }
-            catch (InterruptedException ignore) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        hadoopCtx.cancel();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
deleted file mode 100644
index 4b1121c..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java
+++ /dev/null
@@ -1,560 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.DataInput;
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Comparator;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.Serialization;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
-import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.processors.hadoop.HadoopTask;
-import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
-import org.apache.ignite.internal.processors.hadoop.HadoopTaskType;
-import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounter;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
-import org.apache.ignite.internal.processors.hadoop.counter.HadoopCountersImpl;
-import org.apache.ignite.internal.processors.hadoop.fs.HadoopLazyConcurrentMap;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1CleanupTask;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1MapTask;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1Partitioner;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1ReduceTask;
-import org.apache.ignite.internal.processors.hadoop.v1.HadoopV1SetupTask;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.jetbrains.annotations.Nullable;
-
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.jobLocalDir;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.taskLocalDir;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.transformException;
-import static 
org.apache.ignite.internal.processors.hadoop.HadoopUtils.unwrapSplit;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.FsCacheKey;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.createHadoopLazyConcurrentMap;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtils.fileSystemForMrUserWithCaching;
-import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.PARAM_IGFS_PREFER_LOCAL_WRITES;
-
-/**
- * Context for task execution.
- */
-public class HadoopV2TaskContext extends HadoopTaskContext {
-    /** */
-    private static final boolean COMBINE_KEY_GROUPING_SUPPORTED;
-
-    /** Lazy per-user file system cache used by the Hadoop task. */
-    private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap
-        = createHadoopLazyConcurrentMap();
-
-    /**
-     * This method is called with reflection upon Job finish with class loader 
of each task.
-     * This will clean up all the Fs created for specific task.
-     * Each class loader sees uses its own instance of <code>fsMap<code/> 
since the class loaders
-     * are different.
-     *
-     * @throws IgniteCheckedException On error.
-     */
-    public static void close() throws IgniteCheckedException {
-        fsMap.close();
-    }
-
-    /**
-     * Check for combiner grouping support (available since Hadoop 2.3).
-     */
-    static {
-        boolean ok;
-
-        try {
-            
JobContext.class.getDeclaredMethod("getCombinerKeyGroupingComparator");
-
-            ok = true;
-        }
-        catch (NoSuchMethodException ignore) {
-            ok = false;
-        }
-
-        COMBINE_KEY_GROUPING_SUPPORTED = ok;
-    }
-
-    /** Flag is set if new context-object code is used for running the mapper. 
*/
-    private final boolean useNewMapper;
-
-    /** Flag is set if new context-object code is used for running the 
reducer. */
-    private final boolean useNewReducer;
-
-    /** Flag is set if new context-object code is used for running the 
combiner. */
-    private final boolean useNewCombiner;
-
-    /** */
-    private final JobContextImpl jobCtx;
-
-    /** Set if task is to cancelling. */
-    private volatile boolean cancelled;
-
-    /** Current task. */
-    private volatile HadoopTask task;
-
-    /** Local node ID */
-    private final UUID locNodeId;
-
-    /** Counters for task. */
-    private final HadoopCounters cntrs = new HadoopCountersImpl();
-
-    /**
-     * @param taskInfo Task info.
-     * @param job Job.
-     * @param jobId Job ID.
-     * @param locNodeId Local node ID.
-     * @param jobConfDataInput DataInput for read JobConf.
-     */
-    public HadoopV2TaskContext(HadoopTaskInfo taskInfo, HadoopJob job, 
HadoopJobId jobId,
-        @Nullable UUID locNodeId, DataInput jobConfDataInput) throws 
IgniteCheckedException {
-        super(taskInfo, job);
-        this.locNodeId = locNodeId;
-
-        // Before create JobConf instance we should set new context class 
loader.
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(getClass().getClassLoader());
-
-        try {
-            JobConf jobConf = new JobConf();
-
-            try {
-                jobConf.readFields(jobConfDataInput);
-            }
-            catch (IOException e) {
-                throw new IgniteCheckedException(e);
-            }
-
-            // For map-reduce jobs prefer local writes.
-            jobConf.setBooleanIfUnset(PARAM_IGFS_PREFER_LOCAL_WRITES, true);
-
-            jobCtx = new JobContextImpl(jobConf, new 
JobID(jobId.globalId().toString(), jobId.localId()));
-
-            useNewMapper = jobConf.getUseNewMapper();
-            useNewReducer = jobConf.getUseNewReducer();
-            useNewCombiner = jobConf.getCombinerClass() == null;
-        }
-        finally {
-            HadoopUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T extends HadoopCounter> T counter(String grp, String 
name, Class<T> cls) {
-        return cntrs.counter(grp, name, cls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopCounters counters() {
-        return cntrs;
-    }
-
-    /**
-     * Creates appropriate task from current task info.
-     *
-     * @return Task.
-     */
-    private HadoopTask createTask() {
-        boolean isAbort = taskInfo().type() == HadoopTaskType.ABORT;
-
-        switch (taskInfo().type()) {
-            case SETUP:
-                return useNewMapper ? new HadoopV2SetupTask(taskInfo()) : new 
HadoopV1SetupTask(taskInfo());
-
-            case MAP:
-                return useNewMapper ? new HadoopV2MapTask(taskInfo()) : new 
HadoopV1MapTask(taskInfo());
-
-            case REDUCE:
-                return useNewReducer ? new HadoopV2ReduceTask(taskInfo(), 
true) :
-                    new HadoopV1ReduceTask(taskInfo(), true);
-
-            case COMBINE:
-                return useNewCombiner ? new HadoopV2ReduceTask(taskInfo(), 
false) :
-                    new HadoopV1ReduceTask(taskInfo(), false);
-
-            case COMMIT:
-            case ABORT:
-                return useNewReducer ? new HadoopV2CleanupTask(taskInfo(), 
isAbort) :
-                    new HadoopV1CleanupTask(taskInfo(), isAbort);
-
-            default:
-                return null;
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run() throws IgniteCheckedException {
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
-
-        try {
-            try {
-                task = createTask();
-            }
-            catch (Throwable e) {
-                if (e instanceof Error)
-                    throw e;
-
-                throw transformException(e);
-            }
-
-            if (cancelled)
-                throw new HadoopTaskCancelledException("Task cancelled.");
-
-            try {
-                task.run(this);
-            }
-            catch (Throwable e) {
-                if (e instanceof Error)
-                    throw e;
-
-                throw transformException(e);
-            }
-        }
-        finally {
-            task = null;
-
-            HadoopUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cancel() {
-        cancelled = true;
-
-        HadoopTask t = task;
-
-        if (t != null)
-            t.cancel();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareTaskEnvironment() throws 
IgniteCheckedException {
-        File locDir;
-
-        switch(taskInfo().type()) {
-            case MAP:
-            case REDUCE:
-                job().prepareTaskEnvironment(taskInfo());
-
-                locDir = taskLocalDir(locNodeId, taskInfo());
-
-                break;
-
-            default:
-                locDir = jobLocalDir(locNodeId, taskInfo().jobId());
-        }
-
-        ClassLoader oldLdr = 
HadoopUtils.setContextClassLoader(jobConf().getClassLoader());
-
-        try {
-            FileSystem.get(jobConf());
-
-            LocalFileSystem locFs = FileSystem.getLocal(jobConf());
-
-            locFs.setWorkingDirectory(new Path(locDir.getAbsolutePath()));
-        }
-        catch (Throwable e) {
-            if (e instanceof Error)
-                throw (Error)e;
-
-            throw transformException(e);
-        }
-        finally {
-            HadoopUtils.restoreContextClassLoader(oldLdr);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void cleanupTaskEnvironment() throws 
IgniteCheckedException {
-        job().cleanupTaskEnvironment(taskInfo());
-    }
-
-    /**
-     * Creates Hadoop attempt ID.
-     *
-     * @return Attempt ID.
-     */
-    public TaskAttemptID attemptId() {
-        TaskID tid = new TaskID(jobCtx.getJobID(), 
taskType(taskInfo().type()), taskInfo().taskNumber());
-
-        return new TaskAttemptID(tid, taskInfo().attempt());
-    }
-
-    /**
-     * @param type Task type.
-     * @return Hadoop task type.
-     */
-    private TaskType taskType(HadoopTaskType type) {
-        switch (type) {
-            case SETUP:
-                return TaskType.JOB_SETUP;
-            case MAP:
-            case COMBINE:
-                return TaskType.MAP;
-
-            case REDUCE:
-                return TaskType.REDUCE;
-
-            case COMMIT:
-            case ABORT:
-                return TaskType.JOB_CLEANUP;
-
-            default:
-                return null;
-        }
-    }
-
-    /**
-     * Gets job configuration of the task.
-     *
-     * @return Job configuration.
-     */
-    public JobConf jobConf() {
-        return jobCtx.getJobConf();
-    }
-
-    /**
-     * Gets job context of the task.
-     *
-     * @return Job context.
-     */
-    public JobContextImpl jobContext() {
-        return jobCtx;
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopPartitioner partitioner() throws 
IgniteCheckedException {
-        Class<?> partClsOld = jobConf().getClass("mapred.partitioner.class", 
null);
-
-        if (partClsOld != null)
-            return new HadoopV1Partitioner(jobConf().getPartitionerClass(), 
jobConf());
-
-        try {
-            return new HadoopV2Partitioner(jobCtx.getPartitionerClass(), 
jobConf());
-        }
-        catch (ClassNotFoundException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /**
-     * Gets serializer for specified class.
-     *
-     * @param cls Class.
-     * @param jobConf Job configuration.
-     * @return Appropriate serializer.
-     */
-    @SuppressWarnings("unchecked")
-    private HadoopSerialization getSerialization(Class<?> cls, Configuration 
jobConf) throws IgniteCheckedException {
-        A.notNull(cls, "cls");
-
-        SerializationFactory factory = new SerializationFactory(jobConf);
-
-        Serialization<?> serialization = factory.getSerialization(cls);
-
-        if (serialization == null)
-            throw new IgniteCheckedException("Failed to find serialization 
for: " + cls.getName());
-
-        if (serialization.getClass() == WritableSerialization.class)
-            return new HadoopWritableSerialization((Class<? extends 
Writable>)cls);
-
-        return new HadoopSerializationWrapper(serialization, cls);
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopSerialization keySerialization() throws 
IgniteCheckedException {
-        return getSerialization(jobCtx.getMapOutputKeyClass(), jobConf());
-    }
-
-    /** {@inheritDoc} */
-    @Override public HadoopSerialization valueSerialization() throws 
IgniteCheckedException {
-        return getSerialization(jobCtx.getMapOutputValueClass(), jobConf());
-    }
-
-    /** {@inheritDoc} */
-    @Override public Comparator<Object> sortComparator() {
-        return (Comparator<Object>)jobCtx.getSortComparator();
-    }
-
-    /** {@inheritDoc} */
-    @Override public Comparator<Object> groupComparator() {
-        Comparator<?> res;
-
-        switch (taskInfo().type()) {
-            case COMBINE:
-                res = COMBINE_KEY_GROUPING_SUPPORTED ?
-                    jobContext().getCombinerKeyGroupingComparator() : 
jobContext().getGroupingComparator();
-
-                break;
-
-            case REDUCE:
-                res = jobContext().getGroupingComparator();
-
-                break;
-
-            default:
-                return null;
-        }
-
-        if (res != null && res.getClass() != sortComparator().getClass())
-            return (Comparator<Object>)res;
-
-        return null;
-    }
-
-    /**
-     * @param split Split.
-     * @return Native Hadoop split.
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings("unchecked")
-    public Object getNativeSplit(HadoopInputSplit split) throws 
IgniteCheckedException {
-        if (split instanceof HadoopExternalSplit)
-            return readExternalSplit((HadoopExternalSplit)split);
-
-        if (split instanceof HadoopSplitWrapper)
-            return unwrapSplit((HadoopSplitWrapper)split);
-
-        throw new IllegalStateException("Unknown split: " + split);
-    }
-
-    /**
-     * @param split External split.
-     * @return Native input split.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    private Object readExternalSplit(HadoopExternalSplit split) throws 
IgniteCheckedException {
-        Path jobDir = new Path(jobConf().get(MRJobConfig.MAPREDUCE_JOB_DIR));
-
-        FileSystem fs;
-
-        try {
-            // This assertion uses .startsWith() instead of .equals() because 
task class loaders may
-            // be reused between tasks of the same job.
-            assert ((HadoopClassLoader)getClass().getClassLoader()).name()
-                .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true));
-
-            // We also cache Fs there, all them will be cleared explicitly 
upon the Job end.
-            fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), 
fsMap);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        try (
-            FSDataInputStream in = 
fs.open(JobSubmissionFiles.getJobSplitFile(jobDir))) {
-
-            in.seek(split.offset());
-
-            String clsName = Text.readString(in);
-
-            Class<?> cls = jobConf().getClassByName(clsName);
-
-            assert cls != null;
-
-            Serialization serialization = new 
SerializationFactory(jobConf()).getSerialization(cls);
-
-            Deserializer deserializer = serialization.getDeserializer(cls);
-
-            deserializer.open(in);
-
-            Object res = deserializer.deserialize(null);
-
-            deserializer.close();
-
-            assert res != null;
-
-            return res;
-        }
-        catch (IOException | ClassNotFoundException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <T> T runAsJobOwner(final Callable<T> c) throws 
IgniteCheckedException {
-        String user = job.info().user();
-
-        user = IgfsUtils.fixUserName(user);
-
-        assert user != null;
-
-        String ugiUser;
-
-        try {
-            UserGroupInformation currUser = 
UserGroupInformation.getCurrentUser();
-
-            assert currUser != null;
-
-            ugiUser = currUser.getShortUserName();
-        }
-        catch (IOException ioe) {
-            throw new IgniteCheckedException(ioe);
-        }
-
-        try {
-            if (F.eq(user, ugiUser))
-                // if current UGI context user is the same, do direct call:
-                return c.call();
-            else {
-                UserGroupInformation ugi = 
UserGroupInformation.getBestUGI(null, user);
-
-                return ugi.doAs(new PrivilegedExceptionAction<T>() {
-                    @Override public T run() throws Exception {
-                        return c.call();
-                    }
-                });
-            }
-        }
-        catch (Exception e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/67b4da70/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
deleted file mode 100644
index f46f068..0000000
--- 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopWritableSerialization.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.v2;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import org.apache.hadoop.io.Writable;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.hadoop.HadoopSerialization;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Optimized serialization for Hadoop {@link Writable} types.
- */
-public class HadoopWritableSerialization implements HadoopSerialization {
-    /** */
-    private final Class<? extends Writable> cls;
-
-    /**
-     * @param cls Class.
-     */
-    public HadoopWritableSerialization(Class<? extends Writable> cls) {
-        assert cls != null;
-
-        this.cls = cls;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(DataOutput out, Object obj) throws 
IgniteCheckedException {
-        assert cls.isAssignableFrom(obj.getClass()) : cls + " " + 
obj.getClass();
-
-        try {
-            ((Writable)obj).write(out);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object read(DataInput in, @Nullable Object obj) throws 
IgniteCheckedException {
-        Writable w = obj == null ? U.newInstance(cls) : cls.cast(obj);
-
-        try {
-            w.readFields(in);
-        }
-        catch (IOException e) {
-            throw new IgniteCheckedException(e);
-        }
-
-        return w;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        // No-op.
-    }
-}
\ No newline at end of file

Reply via email to