http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
new file mode 100644
index 0000000..1d59a95
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java
@@ -0,0 +1,1460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.SocketChannel;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import org.apache.ignite.internal.util.GridConcurrentFactory;
+import org.apache.ignite.internal.util.GridKeyLock;
+import org.apache.ignite.internal.util.ipc.IpcEndpoint;
+import 
org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
+import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryClientEndpoint;
+import org.apache.ignite.internal.util.ipc.shmem.IpcSharedMemoryServerEndpoint;
+import org.apache.ignite.internal.util.nio.GridBufferedParser;
+import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.LT;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ * Hadoop external communication class.
+ */
+public class HadoopExternalCommunication {
+    /** IPC error message. */
+    public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate 
shared memory segment " +
+        "(switching to TCP, may be slower).";
+
+    /** Default port which node sets listener to (value is <tt>47100</tt>). */
+    public static final int DFLT_PORT = 27100;
+
+    /** Default connection timeout (value is <tt>1000</tt>ms). */
+    public static final long DFLT_CONN_TIMEOUT = 1000;
+
+    /** Default Maximum connection timeout (value is <tt>600,000</tt>ms). */
+    public static final long DFLT_MAX_CONN_TIMEOUT = 10 * 60 * 1000;
+
+    /** Default reconnect attempts count (value is <tt>10</tt>). */
+    public static final int DFLT_RECONNECT_CNT = 10;
+
+    /** Default message queue limit per connection (for incoming and outgoing 
. */
+    public static final int DFLT_MSG_QUEUE_LIMIT = 
GridNioServer.DFLT_SEND_QUEUE_LIMIT;
+
+    /**
+     * Default count of selectors for TCP server equals to
+     * {@code "Math.min(4, Runtime.getRuntime().availableProcessors())"}.
+     */
+    public static final int DFLT_SELECTORS_CNT = 1;
+
+    /** Node ID meta for session. */
+    private static final int PROCESS_META = 
GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Handshake timeout meta for session. */
+    private static final int HANDSHAKE_FINISH_META = 
GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Message tracker meta for session. */
+    private static final int TRACKER_META = 
GridNioSessionMetaKey.nextUniqueKey();
+
+    /**
+     * Default local port range (value is <tt>100</tt>).
+     * See {@link #setLocalPortRange(int)} for details.
+     */
+    public static final int DFLT_PORT_RANGE = 100;
+
+    /** Default value for {@code TCP_NODELAY} socket option (value is 
<tt>true</tt>). */
+    public static final boolean DFLT_TCP_NODELAY = true;
+
+    /** Server listener. */
+    private final GridNioServerListener<HadoopMessage> srvLsnr =
+        new GridNioServerListenerAdapter<HadoopMessage>() {
+            @Override public void onConnected(GridNioSession ses) {
+                HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
+
+                assert desc != null : "Received connected notification without 
finished handshake: " + ses;
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
+                if (log.isDebugEnabled())
+                    log.debug("Closed connection for session: " + ses);
+
+                if (e != null)
+                    U.error(log, "Session disconnected due to exception: " + 
ses, e);
+
+                HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
+
+                if (desc != null) {
+                    HadoopCommunicationClient rmv = 
clients.remove(desc.processId());
+
+                    if (rmv != null)
+                        rmv.forceClose();
+                }
+
+                HadoopMessageListener lsnr0 = lsnr;
+
+                if (lsnr0 != null)
+                    // Notify listener about connection close.
+                    lsnr0.onConnectionLost(desc);
+            }
+
+            /** {@inheritDoc} */
+            @Override public void onMessage(GridNioSession ses, HadoopMessage 
msg) {
+                
notifyListener(ses.<HadoopProcessDescriptor>meta(PROCESS_META), msg);
+
+                if (msgQueueLimit > 0) {
+                    GridNioMessageTracker tracker = ses.meta(TRACKER_META);
+
+                    assert tracker != null : "Missing tracker for limited 
message queue: " + ses;
+
+                    tracker.run();
+                }
+            }
+        };
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Local process descriptor. */
+    private HadoopProcessDescriptor locProcDesc;
+
+    /** Marshaller. */
+    private Marshaller marsh;
+
+    /** Message notification executor service. */
+    private ExecutorService execSvc;
+
+    /** Grid name. */
+    private String gridName;
+
+    /** Complex variable that represents this node IP address. */
+    private volatile InetAddress locHost;
+
+    /** Local port which node uses. */
+    private int locPort = DFLT_PORT;
+
+    /** Local port range. */
+    private int locPortRange = DFLT_PORT_RANGE;
+
+    /** Local port which node uses to accept shared memory connections. */
+    private int shmemPort = -1;
+
+    /** Allocate direct buffer or heap buffer. */
+    private boolean directBuf = true;
+
+    /** Connect timeout. */
+    private long connTimeout = DFLT_CONN_TIMEOUT;
+
+    /** Maximum connect timeout. */
+    private long maxConnTimeout = DFLT_MAX_CONN_TIMEOUT;
+
+    /** Reconnect attempts count. */
+    @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+    private int reconCnt = DFLT_RECONNECT_CNT;
+
+    /** Socket send buffer. */
+    private int sockSndBuf;
+
+    /** Socket receive buffer. */
+    private int sockRcvBuf;
+
+    /** Message queue limit. */
+    private int msgQueueLimit = DFLT_MSG_QUEUE_LIMIT;
+
+    /** NIO server. */
+    private GridNioServer<HadoopMessage> nioSrvr;
+
+    /** Shared memory server. */
+    private IpcSharedMemoryServerEndpoint shmemSrv;
+
+    /** {@code TCP_NODELAY} option value for created sockets. */
+    private boolean tcpNoDelay = DFLT_TCP_NODELAY;
+
+    /** Shared memory accept worker. */
+    private ShmemAcceptWorker shmemAcceptWorker;
+
+    /** Shared memory workers. */
+    private final Collection<ShmemWorker> shmemWorkers = new 
ConcurrentLinkedDeque8<>();
+
+    /** Clients. */
+    private final ConcurrentMap<UUID, HadoopCommunicationClient> clients = 
GridConcurrentFactory.newMap();
+
+    /** Message listener. */
+    private volatile HadoopMessageListener lsnr;
+
+    /** Bound port. */
+    private int boundTcpPort = -1;
+
+    /** Bound port for shared memory server. */
+    private int boundTcpShmemPort = -1;
+
+    /** Count of selectors to use in TCP server. */
+    private int selectorsCnt = DFLT_SELECTORS_CNT;
+
+    /** Local node ID message. */
+    private ProcessHandshakeMessage locIdMsg;
+
+    /** Locks. */
+    private final GridKeyLock locks = new GridKeyLock();
+
+    /**
+     * @param parentNodeId Parent node ID.
+     * @param procId Process ID.
+     * @param marsh Marshaller to use.
+     * @param log Logger.
+     * @param execSvc Executor service for message notification.
+     * @param gridName Grid name.
+     */
+    public HadoopExternalCommunication(
+        UUID parentNodeId,
+        UUID procId,
+        Marshaller marsh,
+        IgniteLogger log,
+        ExecutorService execSvc,
+        String gridName
+    ) {
+        locProcDesc = new HadoopProcessDescriptor(parentNodeId, procId);
+
+        this.marsh = marsh;
+        this.log = log.getLogger(HadoopExternalCommunication.class);
+        this.execSvc = execSvc;
+        this.gridName = gridName;
+    }
+
+    /**
+     * Sets local port for socket binding.
+     * <p>
+     * If not provided, default value is {@link #DFLT_PORT}.
+     *
+     * @param locPort Port number.
+     */
+    public void setLocalPort(int locPort) {
+        this.locPort = locPort;
+    }
+
+    /**
+     * Gets local port for socket binding.
+     *
+     * @return Local port.
+     */
+    public int getLocalPort() {
+        return locPort;
+    }
+
+    /**
+     * Sets local port range for local host ports (value must greater than or 
equal to <tt>0</tt>).
+     * If provided local port (see {@link #setLocalPort(int)}} is occupied,
+     * implementation will try to increment the port number for as long as it 
is less than
+     * initial value plus this range.
+     * <p>
+     * If port range value is <tt>0</tt>, then implementation will try bind 
only to the port provided by
+     * {@link #setLocalPort(int)} method and fail if binding to this port did 
not succeed.
+     * <p>
+     * Local port range is very useful during development when more than one 
grid nodes need to run
+     * on the same physical machine.
+     * <p>
+     * If not provided, default value is {@link #DFLT_PORT_RANGE}.
+     *
+     * @param locPortRange New local port range.
+     */
+    public void setLocalPortRange(int locPortRange) {
+        this.locPortRange = locPortRange;
+    }
+
+    /**
+     * @return Local port range.
+     */
+    public int getLocalPortRange() {
+        return locPortRange;
+    }
+
+    /**
+     * Sets local port to accept shared memory connections.
+     * <p>
+     * If set to {@code -1} shared memory communication will be disabled.
+     * <p>
+     * If not provided, shared memory is disabled.
+     *
+     * @param shmemPort Port number.
+     */
+    public void setSharedMemoryPort(int shmemPort) {
+        this.shmemPort = shmemPort;
+    }
+
+    /**
+     * Gets shared memory port to accept incoming connections.
+     *
+     * @return Shared memory port.
+     */
+    public int getSharedMemoryPort() {
+        return shmemPort;
+    }
+
+    /**
+     * Sets connect timeout used when establishing connection
+     * with remote nodes.
+     * <p>
+     * {@code 0} is interpreted as infinite timeout.
+     * <p>
+     * If not provided, default value is {@link #DFLT_CONN_TIMEOUT}.
+     *
+     * @param connTimeout Connect timeout.
+     */
+    public void setConnectTimeout(long connTimeout) {
+        this.connTimeout = connTimeout;
+    }
+
+    /**
+     * @return Connection timeout.
+     */
+    public long getConnectTimeout() {
+        return connTimeout;
+    }
+
+    /**
+     * Sets maximum connect timeout. If handshake is not established within 
connect timeout,
+     * then SPI tries to repeat handshake procedure with increased connect 
timeout.
+     * Connect timeout can grow till maximum timeout value,
+     * if maximum timeout value is reached then the handshake is considered as 
failed.
+     * <p>
+     * {@code 0} is interpreted as infinite timeout.
+     * <p>
+     * If not provided, default value is {@link #DFLT_MAX_CONN_TIMEOUT}.
+     *
+     * @param maxConnTimeout Maximum connect timeout.
+     */
+    public void setMaxConnectTimeout(long maxConnTimeout) {
+        this.maxConnTimeout = maxConnTimeout;
+    }
+
+    /**
+     * Gets maximum connection timeout.
+     *
+     * @return Maximum connection timeout.
+     */
+    public long getMaxConnectTimeout() {
+        return maxConnTimeout;
+    }
+
+    /**
+     * Sets maximum number of reconnect attempts used when establishing 
connection
+     * with remote nodes.
+     * <p>
+     * If not provided, default value is {@link #DFLT_RECONNECT_CNT}.
+     *
+     * @param reconCnt Maximum number of reconnection attempts.
+     */
+    public void setReconnectCount(int reconCnt) {
+        this.reconCnt = reconCnt;
+    }
+
+    /**
+     * @return Reconnect count.
+     */
+    public int getReconnectCount() {
+        return reconCnt;
+    }
+
+    /**
+     * Sets flag to allocate direct or heap buffer in SPI.
+     * If value is {@code true}, then SPI will use {@link 
ByteBuffer#allocateDirect(int)} call.
+     * Otherwise, SPI will use {@link ByteBuffer#allocate(int)} call.
+     * <p>
+     * If not provided, default value is {@code true}.
+     *
+     * @param directBuf Flag indicates to allocate direct or heap buffer in 
SPI.
+     */
+    public void setDirectBuffer(boolean directBuf) {
+        this.directBuf = directBuf;
+    }
+
+    /**
+     * @return Direct buffer flag.
+     */
+    public boolean isDirectBuffer() {
+        return directBuf;
+    }
+
+    /**
+     * Sets the count of selectors te be used in TCP server.
+     * <p/>
+     * If not provided, default value is {@link #DFLT_SELECTORS_CNT}.
+     *
+     * @param selectorsCnt Selectors count.
+     */
+    public void setSelectorsCount(int selectorsCnt) {
+        this.selectorsCnt = selectorsCnt;
+    }
+
+    /**
+     * @return Number of selectors to use.
+     */
+    public int getSelectorsCount() {
+        return selectorsCnt;
+    }
+
+    /**
+     * Sets value for {@code TCP_NODELAY} socket option. Each
+     * socket will be opened using provided value.
+     * <p>
+     * Setting this option to {@code true} disables Nagle's algorithm
+     * for socket decreasing latency and delivery time for small messages.
+     * <p>
+     * For systems that work under heavy network load it is advisable to
+     * set this value to {@code false}.
+     * <p>
+     * If not provided, default value is {@link #DFLT_TCP_NODELAY}.
+     *
+     * @param tcpNoDelay {@code True} to disable TCP delay.
+     */
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    /**
+     * @return {@code TCP_NO_DELAY} flag.
+     */
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    /**
+     * Sets receive buffer size for sockets created or accepted by this SPI.
+     * <p>
+     * If not provided, default is {@code 0} which leaves buffer unchanged 
after
+     * socket creation (OS defaults).
+     *
+     * @param sockRcvBuf Socket receive buffer size.
+     */
+    public void setSocketReceiveBuffer(int sockRcvBuf) {
+        this.sockRcvBuf = sockRcvBuf;
+    }
+
+    /**
+     * @return Socket receive buffer size.
+     */
+    public int getSocketReceiveBuffer() {
+        return sockRcvBuf;
+    }
+
+    /**
+     * Sets send buffer size for sockets created or accepted by this SPI.
+     * <p>
+     * If not provided, default is {@code 0} which leaves the buffer unchanged
+     * after socket creation (OS defaults).
+     *
+     * @param sockSndBuf Socket send buffer size.
+     */
+    public void setSocketSendBuffer(int sockSndBuf) {
+        this.sockSndBuf = sockSndBuf;
+    }
+
+    /**
+     * @return Socket send buffer size.
+     */
+    public int getSocketSendBuffer() {
+        return sockSndBuf;
+    }
+
+    /**
+     * Sets message queue limit for incoming and outgoing messages.
+     * <p>
+     * When set to positive number send queue is limited to the configured 
value.
+     * {@code 0} disables the size limitations.
+     * <p>
+     * If not provided, default is {@link #DFLT_MSG_QUEUE_LIMIT}.
+     *
+     * @param msgQueueLimit Send queue size limit.
+     */
+    public void setMessageQueueLimit(int msgQueueLimit) {
+        this.msgQueueLimit = msgQueueLimit;
+    }
+
+    /**
+     * @return Message queue size limit.
+     */
+    public int getMessageQueueLimit() {
+        return msgQueueLimit;
+    }
+
+    /**
+     * Sets Hadoop communication message listener.
+     *
+     * @param lsnr Message listener.
+     */
+    public void setListener(HadoopMessageListener lsnr) {
+        this.lsnr = lsnr;
+    }
+
+    /**
+     * @return Outbound message queue size.
+     */
+    public int getOutboundMessagesQueueSize() {
+        return nioSrvr.outboundMessagesQueueSize();
+    }
+
+    /**
+     * Starts communication.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void start() throws IgniteCheckedException {
+        try {
+            locHost = U.getLocalHost();
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException("Failed to initialize local 
address.", e);
+        }
+
+        try {
+            shmemSrv = resetShmemServer();
+        }
+        catch (IgniteCheckedException e) {
+            U.warn(log, "Failed to start shared memory communication server.", 
e);
+        }
+
+        try {
+            // This method potentially resets local port to the value
+            // local node was bound to.
+            nioSrvr = resetNioServer();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Failed to initialize TCP server: 
" + locHost, e);
+        }
+
+        locProcDesc.address(locHost.getHostAddress());
+        locProcDesc.sharedMemoryPort(boundTcpShmemPort);
+        locProcDesc.tcpPort(boundTcpPort);
+
+        locIdMsg = new ProcessHandshakeMessage(locProcDesc);
+
+        if (shmemSrv != null) {
+            shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
+
+            new IgniteThread(shmemAcceptWorker).start();
+        }
+
+        nioSrvr.start();
+    }
+
+    /**
+     * Gets local process descriptor.
+     *
+     * @return Local process descriptor.
+     */
+    public HadoopProcessDescriptor localProcessDescriptor() {
+        return locProcDesc;
+    }
+
+    /**
+     * Gets filters used by communication.
+     *
+     * @return Filters array.
+     */
+    private GridNioFilter[] filters() {
+        return new GridNioFilter[] {
+            new GridNioAsyncNotifyFilter(gridName, execSvc, log),
+            new HandshakeAndBackpressureFilter(),
+            new HadoopMarshallerFilter(marsh),
+            new GridNioCodecFilter(new GridBufferedParser(directBuf, 
ByteOrder.nativeOrder()), log, false)
+        };
+    }
+
+    /**
+     * Recreates tpcSrvr socket instance.
+     *
+     * @return Server instance.
+     * @throws IgniteCheckedException Thrown if it's not possible to create 
server.
+     */
+    private GridNioServer<HadoopMessage> resetNioServer() throws 
IgniteCheckedException {
+        if (boundTcpPort >= 0)
+            throw new IgniteCheckedException("Tcp NIO server was already 
created on port " + boundTcpPort);
+
+        IgniteCheckedException lastEx = null;
+
+        // If configured TCP port is busy, find first available in range.
+        for (int port = locPort; port < locPort + locPortRange; port++) {
+            try {
+                GridNioServer<HadoopMessage> srvr =
+                    GridNioServer.<HadoopMessage>builder()
+                        .address(locHost)
+                        .port(port)
+                        .listener(srvLsnr)
+                        .logger(log.getLogger(GridNioServer.class))
+                        .selectorCount(selectorsCnt)
+                        .gridName(gridName)
+                        .tcpNoDelay(tcpNoDelay)
+                        .directBuffer(directBuf)
+                        .byteOrder(ByteOrder.nativeOrder())
+                        .socketSendBufferSize(sockSndBuf)
+                        .socketReceiveBufferSize(sockRcvBuf)
+                        .sendQueueLimit(msgQueueLimit)
+                        .directMode(false)
+                        .filters(filters())
+                        .build();
+
+                boundTcpPort = port;
+
+                // Ack Port the TCP server was bound to.
+                if (log.isInfoEnabled())
+                    log.info("Successfully bound to TCP port [port=" + 
boundTcpPort +
+                        ", locHost=" + locHost + ']');
+
+                return srvr;
+            }
+            catch (IgniteCheckedException e) {
+                lastEx = e;
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to bind to local port (will try next 
port within range) [port=" + port +
+                        ", locHost=" + locHost + ']');
+            }
+        }
+
+        // If free port wasn't found.
+        throw new IgniteCheckedException("Failed to bind to any port within 
range [startPort=" + locPort +
+            ", portRange=" + locPortRange + ", locHost=" + locHost + ']', 
lastEx);
+    }
+
+    /**
+     * Creates new shared memory communication server.
+     * @return Server.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable private IpcSharedMemoryServerEndpoint resetShmemServer() throws 
IgniteCheckedException {
+        if (boundTcpShmemPort >= 0)
+            throw new IgniteCheckedException("Shared memory server was already 
created on port " + boundTcpShmemPort);
+
+        if (shmemPort == -1 || U.isWindows())
+            return null;
+
+        IgniteCheckedException lastEx = null;
+
+        // If configured TCP port is busy, find first available in range.
+        for (int port = shmemPort; port < shmemPort + locPortRange; port++) {
+            try {
+                IpcSharedMemoryServerEndpoint srv = new 
IpcSharedMemoryServerEndpoint(
+                    log.getLogger(IpcSharedMemoryServerEndpoint.class),
+                    locProcDesc.processId(), gridName);
+
+                srv.setPort(port);
+
+                srv.omitOutOfResourcesWarning(true);
+
+                srv.start();
+
+                boundTcpShmemPort = port;
+
+                // Ack Port the TCP server was bound to.
+                if (log.isInfoEnabled())
+                    log.info("Successfully bound shared memory communication 
to TCP port [port=" + boundTcpShmemPort +
+                        ", locHost=" + locHost + ']');
+
+                return srv;
+            }
+            catch (IgniteCheckedException e) {
+                lastEx = e;
+
+                if (log.isDebugEnabled())
+                    log.debug("Failed to bind to local port (will try next 
port within range) [port=" + port +
+                        ", locHost=" + locHost + ']');
+            }
+        }
+
+        // If free port wasn't found.
+        throw new IgniteCheckedException("Failed to bind shared memory 
communication to any port within range [startPort=" +
+            locPort + ", portRange=" + locPortRange + ", locHost=" + locHost + 
']', lastEx);
+    }
+
+    /**
+     * Stops the server.
+     *
+     * @throws IgniteCheckedException
+     */
+    public void stop() throws IgniteCheckedException {
+        // Stop TCP server.
+        if (nioSrvr != null)
+            nioSrvr.stop();
+
+        U.cancel(shmemAcceptWorker);
+        U.join(shmemAcceptWorker, log);
+
+        U.cancel(shmemWorkers);
+        U.join(shmemWorkers, log);
+
+        shmemWorkers.clear();
+
+        // Force closing on stop (safety).
+        for (HadoopCommunicationClient client : clients.values())
+            client.forceClose();
+
+        // Clear resources.
+        nioSrvr = null;
+
+        boundTcpPort = -1;
+    }
+
+    /**
+     * Sends message to Hadoop process.
+     *
+     * @param desc
+     * @param msg
+     * @throws IgniteCheckedException
+     */
+    public void sendMessage(HadoopProcessDescriptor desc, HadoopMessage msg) 
throws
+        IgniteCheckedException {
+        assert desc != null;
+        assert msg != null;
+
+        if (log.isTraceEnabled())
+            log.trace("Sending message to Hadoop process [desc=" + desc + ", 
msg=" + msg + ']');
+
+        HadoopCommunicationClient client = null;
+
+        boolean closeOnRelease = true;
+
+        try {
+            client = reserveClient(desc);
+
+            client.sendMessage(desc, msg);
+
+            closeOnRelease = false;
+        }
+        finally {
+            if (client != null) {
+                if (closeOnRelease) {
+                    client.forceClose();
+
+                    clients.remove(desc.processId(), client);
+                }
+                else
+                    client.release();
+            }
+        }
+    }
+
+    /**
+     * Returns existing or just created client to node.
+     *
+     * @param desc Node to which client should be open.
+     * @return The existing or just created client.
+     * @throws IgniteCheckedException Thrown if any exception occurs.
+     */
+    private HadoopCommunicationClient reserveClient(HadoopProcessDescriptor 
desc) throws IgniteCheckedException {
+        assert desc != null;
+
+        UUID procId = desc.processId();
+
+        while (true) {
+            HadoopCommunicationClient client = clients.get(procId);
+
+            if (client == null) {
+                if (log.isDebugEnabled())
+                    log.debug("Did not find client for remote process 
[locProcDesc=" + locProcDesc + ", desc=" +
+                        desc + ']');
+
+                // Do not allow concurrent connects.
+                Object sync = locks.lock(procId);
+
+                try {
+                    client = clients.get(procId);
+
+                    if (client == null) {
+                        HadoopCommunicationClient old = clients.put(procId, 
client = createNioClient(desc));
+
+                        assert old == null;
+                    }
+                }
+                finally {
+                    locks.unlock(procId, sync);
+                }
+
+                assert client != null;
+            }
+
+            if (client.reserve())
+                return client;
+            else
+                // Client has just been closed by idle worker. Help it and try 
again.
+                clients.remove(procId, client);
+        }
+    }
+
+    /**
+     * @param desc Process descriptor.
+     * @return Client.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable protected HadoopCommunicationClient 
createNioClient(HadoopProcessDescriptor desc)
+        throws  IgniteCheckedException {
+        assert desc != null;
+
+        int shmemPort = desc.sharedMemoryPort();
+
+        // If remote node has shared memory server enabled and has the same 
set of MACs
+        // then we are likely to run on the same host and shared memory 
communication could be tried.
+        if (shmemPort != -1 && 
locProcDesc.parentNodeId().equals(desc.parentNodeId())) {
+            try {
+                return createShmemClient(desc, shmemPort);
+            }
+            catch (IgniteCheckedException e) {
+                if (e.hasCause(IpcOutOfSystemResourcesException.class))
+                    // Has cause or is itself the 
IpcOutOfSystemResourcesException.
+                    LT.warn(log, null, OUT_OF_RESOURCES_TCP_MSG);
+                else if (log.isDebugEnabled())
+                    log.debug("Failed to establish shared memory connection 
with local hadoop process: " +
+                        desc);
+            }
+        }
+
+        return createTcpClient(desc);
+    }
+
+    /**
+     * @param desc Process descriptor.
+     * @param port Port.
+     * @return Client.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable protected HadoopCommunicationClient 
createShmemClient(HadoopProcessDescriptor desc, int port)
+        throws IgniteCheckedException {
+        int attempt = 1;
+
+        int connectAttempts = 1;
+
+        long connTimeout0 = connTimeout;
+
+        while (true) {
+            IpcEndpoint clientEndpoint;
+
+            try {
+                clientEndpoint = new IpcSharedMemoryClientEndpoint(port, 
(int)connTimeout, log);
+            }
+            catch (IgniteCheckedException e) {
+                // Reconnect for the second time, if connection is not 
established.
+                if (connectAttempts < 2 && X.hasCause(e, 
ConnectException.class)) {
+                    connectAttempts++;
+
+                    continue;
+                }
+
+                throw e;
+            }
+
+            HadoopCommunicationClient client = null;
+
+            try {
+                ShmemWorker worker = new ShmemWorker(clientEndpoint, false);
+
+                shmemWorkers.add(worker);
+
+                GridNioSession ses = worker.session();
+
+                HandshakeFinish fin = new HandshakeFinish();
+
+                // We are in lock, it is safe to get session and attach
+                ses.addMeta(HANDSHAKE_FINISH_META, fin);
+
+                client = new HadoopTcpNioCommunicationClient(ses);
+
+                new IgniteThread(worker).start();
+
+                fin.await(connTimeout0);
+            }
+            catch (HadoopHandshakeTimeoutException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Handshake timed out (will retry with increased 
timeout) [timeout=" + connTimeout0 +
+                        ", err=" + e.getMessage() + ", client=" + client + 
']');
+
+                if (client != null)
+                    client.forceClose();
+
+                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timedout (will stop attempts to 
perform the handshake) " +
+                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + 
maxConnTimeout +
+                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+                            ", err=" + e.getMessage() + ", client=" + client + 
']');
+
+                    throw e;
+                }
+                else {
+                    attempt++;
+
+                    connTimeout0 *= 2;
+
+                    continue;
+                }
+            }
+            catch (RuntimeException | Error e) {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Caught exception (will close client) [err=" + 
e.getMessage() + ", client=" + client + ']');
+
+                if (client != null)
+                    client.forceClose();
+
+                throw e;
+            }
+
+            return client;
+        }
+    }
+
+    /**
+     * Establish TCP connection to remote hadoop process and returns client.
+     *
+     * @param desc Process descriptor.
+     * @return Client.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected HadoopCommunicationClient 
createTcpClient(HadoopProcessDescriptor desc) throws IgniteCheckedException {
+        String addr = desc.address();
+
+        int port = desc.tcpPort();
+
+        if (log.isDebugEnabled())
+            log.debug("Trying to connect to remote process [locProcDesc=" + 
locProcDesc + ", desc=" + desc + ']');
+
+        boolean conn = false;
+        HadoopTcpNioCommunicationClient client = null;
+        IgniteCheckedException errs = null;
+
+        int connectAttempts = 1;
+
+        long connTimeout0 = connTimeout;
+
+        int attempt = 1;
+
+        while (!conn) { // Reconnection on handshake timeout.
+            try {
+                SocketChannel ch = SocketChannel.open();
+
+                ch.configureBlocking(true);
+
+                ch.socket().setTcpNoDelay(tcpNoDelay);
+                ch.socket().setKeepAlive(true);
+
+                if (sockRcvBuf > 0)
+                    ch.socket().setReceiveBufferSize(sockRcvBuf);
+
+                if (sockSndBuf > 0)
+                    ch.socket().setSendBufferSize(sockSndBuf);
+
+                ch.socket().connect(new InetSocketAddress(addr, port), 
(int)connTimeout);
+
+                HandshakeFinish fin = new HandshakeFinish();
+
+                GridNioSession ses = nioSrvr.createSession(ch, 
F.asMap(HANDSHAKE_FINISH_META, fin)).get();
+
+                client = new HadoopTcpNioCommunicationClient(ses);
+
+                if (log.isDebugEnabled())
+                    log.debug("Waiting for handshake finish for client: " + 
client);
+
+                fin.await(connTimeout0);
+
+                conn = true;
+            }
+            catch (HadoopHandshakeTimeoutException e) {
+                if (client != null) {
+                    client.forceClose();
+
+                    client = null;
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Handshake timedout (will retry with increased 
timeout) [timeout=" + connTimeout0 +
+                            ", desc=" + desc + ", port=" + port + ", err=" + e 
+ ']');
+
+                if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
+                    if (log.isDebugEnabled())
+                        log.debug("Handshake timed out (will stop attempts to 
perform the handshake) " +
+                            "[timeout=" + connTimeout0 + ", maxConnTimeout=" + 
maxConnTimeout +
+                            ", attempt=" + attempt + ", reconCnt=" + reconCnt +
+                            ", err=" + e.getMessage() + ", addr=" + addr + 
']');
+
+                    if (errs == null)
+                        errs = new IgniteCheckedException("Failed to connect 
to remote Hadoop process " +
+                            "(is process still running?) [desc=" + desc + ", 
addrs=" + addr + ']');
+
+                    errs.addSuppressed(e);
+
+                    break;
+                }
+                else {
+                    attempt++;
+
+                    connTimeout0 *= 2;
+
+                    // Continue loop.
+                }
+            }
+            catch (Exception e) {
+                if (client != null) {
+                    client.forceClose();
+
+                    client = null;
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Client creation failed [addr=" + addr + ", 
port=" + port +
+                        ", err=" + e + ']');
+
+                if (X.hasCause(e, SocketTimeoutException.class))
+                    LT.warn(log, null, "Connect timed out (consider increasing 
'connTimeout' " +
+                        "configuration property) [addr=" + addr + ", port=" + 
port + ']');
+
+                if (errs == null)
+                    errs = new IgniteCheckedException("Failed to connect to 
remote Hadoop process (is process still running?) " +
+                        "[desc=" + desc + ", addrs=" + addr + ']');
+
+                errs.addSuppressed(e);
+
+                // Reconnect for the second time, if connection is not 
established.
+                if (connectAttempts < 2 &&
+                    (e instanceof ConnectException || X.hasCause(e, 
ConnectException.class))) {
+                    connectAttempts++;
+
+                    continue;
+                }
+
+                break;
+            }
+        }
+
+        if (client == null) {
+            assert errs != null;
+
+            if (X.hasCause(errs, ConnectException.class))
+                LT.warn(log, null, "Failed to connect to a remote Hadoop 
process (is process still running?). " +
+                    "Make sure operating system firewall is disabled on local 
and remote host) " +
+                    "[addrs=" + addr + ", port=" + port + ']');
+
+            throw errs;
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Created client: " + client);
+
+        return client;
+    }
+
+    /**
+     * @param desc Sender process descriptor.
+     * @param msg Communication message.
+     */
+    protected void notifyListener(HadoopProcessDescriptor desc, HadoopMessage 
msg) {
+        HadoopMessageListener lsnr = this.lsnr;
+
+        if (lsnr != null)
+            // Notify listener of a new message.
+            lsnr.onMessageReceived(desc, msg);
+        else if (log.isDebugEnabled())
+            log.debug("Received communication message without any registered 
listeners (will ignore) " +
+                "[senderProcDesc=" + desc + ", msg=" + msg + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopExternalCommunication.class, this);
+    }
+
+    /**
+     * This worker takes responsibility to shut the server down when stopping,
+     * No other thread shall stop passed server.
+     */
+    private class ShmemAcceptWorker extends GridWorker {
+        /** */
+        private final IpcSharedMemoryServerEndpoint srv;
+
+        /**
+         * @param srv Server.
+         */
+        ShmemAcceptWorker(IpcSharedMemoryServerEndpoint srv) {
+            super(gridName, "shmem-communication-acceptor", 
HadoopExternalCommunication.this.log);
+
+            this.srv = srv;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            try {
+                while (!Thread.interrupted()) {
+                    ShmemWorker e = new ShmemWorker(srv.accept(), true);
+
+                    shmemWorkers.add(e);
+
+                    new IgniteThread(e).start();
+                }
+            }
+            catch (IgniteCheckedException e) {
+                if (!isCancelled())
+                    U.error(log, "Shmem server failed.", e);
+            }
+            finally {
+                srv.close();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            super.cancel();
+
+            srv.close();
+        }
+    }
+
+    /**
+     *
+     */
+    private class ShmemWorker extends GridWorker {
+        /** */
+        private final IpcEndpoint endpoint;
+
+        /** Adapter. */
+        private HadoopIpcToNioAdapter<HadoopMessage> adapter;
+
+        /**
+         * @param endpoint Endpoint.
+         */
+        private ShmemWorker(IpcEndpoint endpoint, boolean accepted) {
+            super(gridName, "shmem-worker", 
HadoopExternalCommunication.this.log);
+
+            this.endpoint = endpoint;
+
+            adapter = new HadoopIpcToNioAdapter<>(
+                HadoopExternalCommunication.this.log,
+                endpoint,
+                accepted,
+                srvLsnr,
+                filters());
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            try {
+                adapter.serve();
+            }
+            finally {
+                shmemWorkers.remove(this);
+
+                endpoint.close();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancel() {
+            super.cancel();
+
+            endpoint.close();
+        }
+
+        /** @{@inheritDoc} */
+        @Override protected void cleanup() {
+            super.cleanup();
+
+            endpoint.close();
+        }
+
+        /** @{@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ShmemWorker.class, this);
+        }
+
+        /**
+         * @return NIO session for this worker.
+         */
+        public GridNioSession session() {
+            return adapter.session();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class HandshakeFinish {
+        /** Await latch. */
+        private CountDownLatch latch = new CountDownLatch(1);
+
+        /**
+         * Finishes handshake.
+         */
+        public void finish() {
+            latch.countDown();
+        }
+
+        /**
+         * @param time Time to wait.
+         * @throws HadoopHandshakeTimeoutException If failed to wait.
+         */
+        public void await(long time) throws HadoopHandshakeTimeoutException {
+            try {
+                if (!latch.await(time, TimeUnit.MILLISECONDS))
+                    throw new HadoopHandshakeTimeoutException("Failed to wait 
for handshake to finish [timeout=" +
+                        time + ']');
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new HadoopHandshakeTimeoutException("Failed to wait for 
handshake to finish (thread was " +
+                    "interrupted) [timeout=" + time + ']', e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private class HandshakeAndBackpressureFilter extends GridNioFilterAdapter {
+        /**
+         * Assigns filter name to a filter.
+         */
+        protected HandshakeAndBackpressureFilter() {
+            super("HadoopHandshakeFilter");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionOpened(final GridNioSession ses) throws 
IgniteCheckedException {
+            if (ses.accepted()) {
+                if (log.isDebugEnabled())
+                    log.debug("Accepted connection, initiating handshake: " + 
ses);
+
+                // Server initiates handshake.
+                ses.send(locIdMsg).listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> fut) {
+                        try {
+                            // Make sure there were no errors.
+                            fut.get();
+                        }
+                        catch (IgniteCheckedException e) {
+                            log.warning("Failed to send handshake message, 
will close session: " + ses, e);
+
+                            ses.close();
+                        }
+                    }
+                });
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionClosed(GridNioSession ses) throws 
IgniteCheckedException {
+            proceedSessionClosed(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExceptionCaught(GridNioSession ses, 
IgniteCheckedException ex) throws IgniteCheckedException {
+            proceedExceptionCaught(ses, ex);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, 
Object msg) throws IgniteCheckedException {
+            if (ses.meta(PROCESS_META) == null && !(msg instanceof 
ProcessHandshakeMessage))
+                log.warning("Writing message before handshake has finished 
[ses=" + ses + ", msg=" + msg + ']');
+
+            return proceedSessionWrite(ses, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(GridNioSession ses, Object 
msg) throws IgniteCheckedException {
+            HadoopProcessDescriptor desc = ses.meta(PROCESS_META);
+
+            UUID rmtProcId = desc == null ? null : desc.processId();
+
+            if (rmtProcId == null) {
+                if (!(msg instanceof ProcessHandshakeMessage)) {
+                    log.warning("Invalid handshake message received, will 
close connection [ses=" + ses +
+                        ", msg=" + msg + ']');
+
+                    ses.close();
+
+                    return;
+                }
+
+                ProcessHandshakeMessage nId = (ProcessHandshakeMessage)msg;
+
+                if (log.isDebugEnabled())
+                    log.debug("Received handshake message [ses=" + ses + ", 
msg=" + msg + ']');
+
+                ses.addMeta(PROCESS_META, nId.processDescriptor());
+
+                if (!ses.accepted())
+                    // Send handshake reply.
+                    ses.send(locIdMsg);
+                else {
+                    //
+                    rmtProcId = nId.processDescriptor().processId();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Finished handshake with remote client: " + 
ses);
+
+                    Object sync = locks.tryLock(rmtProcId);
+
+                    if (sync != null) {
+                        try {
+                            if (clients.get(rmtProcId) == null) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Will reuse session for 
descriptor: " + rmtProcId);
+
+                                // Handshake finished flag is true.
+                                clients.put(rmtProcId, new 
HadoopTcpNioCommunicationClient(ses));
+                            }
+                            else {
+                                if (log.isDebugEnabled())
+                                    log.debug("Will not reuse client as 
another already exists [locProcDesc=" +
+                                        locProcDesc + ", desc=" + desc + ']');
+                            }
+                        }
+                        finally {
+                            locks.unlock(rmtProcId, sync);
+                        }
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Concurrent connection is being 
established, will not reuse client session [" +
+                                "locProcDesc=" + locProcDesc + ", desc=" + 
desc + ']');
+                    }
+                }
+
+                if (log.isDebugEnabled())
+                    log.debug("Handshake is finished for session [ses=" + ses 
+ ", locProcDesc=" + locProcDesc + ']');
+
+                HandshakeFinish to = ses.meta(HANDSHAKE_FINISH_META);
+
+                if (to != null)
+                    to.finish();
+
+                // Notify session opened (both parties).
+                proceedSessionOpened(ses);
+            }
+            else {
+                if (msgQueueLimit > 0) {
+                    GridNioMessageTracker tracker = ses.meta(TRACKER_META);
+
+                    if (tracker == null) {
+                        GridNioMessageTracker old = ses.addMeta(TRACKER_META, 
tracker =
+                            new GridNioMessageTracker(ses, msgQueueLimit));
+
+                        assert old == null;
+                    }
+
+                    tracker.onMessageReceived();
+                }
+
+                proceedMessageReceived(ses, msg);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession 
ses) throws IgniteCheckedException {
+            return proceedSessionClose(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionIdleTimeout(GridNioSession ses) throws 
IgniteCheckedException {
+            proceedSessionIdleTimeout(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionWriteTimeout(GridNioSession ses) throws 
IgniteCheckedException {
+            proceedSessionWriteTimeout(ses);
+        }
+    }
+
+    /**
+     * Process ID message.
+     */
+    @SuppressWarnings("PublicInnerClass")
+    public static class ProcessHandshakeMessage implements HadoopMessage {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Node ID. */
+        private HadoopProcessDescriptor procDesc;
+
+        /** */
+        public ProcessHandshakeMessage() {
+            // No-op.
+        }
+
+        /**
+         * @param procDesc Process descriptor.
+         */
+        private ProcessHandshakeMessage(HadoopProcessDescriptor procDesc) {
+            this.procDesc = procDesc;
+        }
+
+        /**
+         * @return Process ID.
+         */
+        public HadoopProcessDescriptor processDescriptor() {
+            return procDesc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            out.writeObject(procDesc);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            procDesc = (HadoopProcessDescriptor)in.readObject();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ProcessHandshakeMessage.class, this);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
new file mode 100644
index 0000000..b2a85e1
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopHandshakeTimeoutException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.jetbrains.annotations.Nullable;
+
+/** Internal exception class for proper timeout handling. */
+class HadoopHandshakeTimeoutException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * @param msg Message.
+     */
+    HadoopHandshakeTimeoutException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * @param msg Message.
+     * @param cause Cause.
+     */
+    HadoopHandshakeTimeoutException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
new file mode 100644
index 0000000..a8de999
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopIpcToNioAdapter.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.ipc.IpcEndpoint;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
+import org.apache.ignite.internal.util.nio.GridNioFilterChain;
+import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionImpl;
+
+/**
+ * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically 
shared memory IPC)
+ * communications.
+ *
+ * Note that this class consumes an entire thread inside {@link #serve()} 
method
+ * in order to serve one {@link 
org.apache.ignite.internal.util.ipc.IpcEndpoint}.
+ */
+public class HadoopIpcToNioAdapter<T> {
+    /** */
+    private final IpcEndpoint endp;
+
+    /** */
+    private final GridNioFilterChain<T> chain;
+
+    /** */
+    private final GridNioSessionImpl ses;
+
+    /** */
+    private final AtomicReference<CountDownLatch> latchRef = new 
AtomicReference<>();
+
+    /** */
+    private final ByteBuffer writeBuf;
+
+    /**
+     * @param log Log.
+     * @param endp Endpoint.
+     * @param lsnr Listener.
+     * @param filters Filters.
+     */
+    public HadoopIpcToNioAdapter(IgniteLogger log, IpcEndpoint endp, boolean 
accepted,
+        GridNioServerListener<T> lsnr, GridNioFilter... filters) {
+        this.endp = endp;
+
+        chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
+        ses = new GridNioSessionImpl(chain, null, null, accepted);
+
+        writeBuf = ByteBuffer.allocate(8 << 10);
+
+        writeBuf.order(ByteOrder.nativeOrder());
+    }
+
+    /**
+     * Serves given set of listeners repeatedly reading data from the endpoint.
+     *
+     * @throws InterruptedException If interrupted.
+     */
+    public void serve() throws InterruptedException {
+        try {
+            chain.onSessionOpened(ses);
+
+            InputStream in = endp.inputStream();
+
+            ByteBuffer readBuf = ByteBuffer.allocate(8 << 10);
+
+            readBuf.order(ByteOrder.nativeOrder());
+
+            assert readBuf.hasArray();
+
+            while (!Thread.interrupted()) {
+                int pos = readBuf.position();
+
+                int read = in.read(readBuf.array(), pos, readBuf.remaining());
+
+                if (read > 0) {
+                    readBuf.position(0);
+                    readBuf.limit(pos + read);
+
+                    chain.onMessageReceived(ses, readBuf);
+
+                    if (readBuf.hasRemaining())
+                        readBuf.compact();
+                    else
+                        readBuf.clear();
+
+                    CountDownLatch latch = latchRef.get();
+
+                    if (latch != null)
+                        latch.await();
+                }
+                else if (read < 0) {
+                    endp.close();
+
+                    break; // And close below.
+                }
+            }
+
+            // Assuming remote end closed connection - pushing event from head 
to tail.
+            chain.onSessionClosed(ses);
+        }
+        catch (Exception e) {
+            chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to 
read from IPC endpoint.", e));
+        }
+    }
+
+    /**
+     * Gets dummy session for this adapter.
+     *
+     * @return Session.
+     */
+    public GridNioSession session() {
+        return ses;
+    }
+
+    /**
+     * Handles write events on chain.
+     *
+     * @param msg Buffer to send.
+     * @return Send result.
+     */
+    private GridNioFuture<?> send(ByteBuffer msg) {
+        assert writeBuf.hasArray();
+
+        try {
+            while (msg.hasRemaining()) {
+                writeBuf.clear();
+
+                writeBuf.put(msg);
+
+                endp.outputStream().write(writeBuf.array(), 0, 
writeBuf.position());
+            }
+        }
+        catch (IOException | IgniteCheckedException e) {
+            return new GridNioFinishedFuture<Object>(e);
+        }
+
+        return new GridNioFinishedFuture<>((Object)null);
+    }
+
+    /**
+     * Filter forwarding messages from chain's head to this server.
+     */
+    private class HeadFilter extends GridNioFilterAdapter {
+        /**
+         * Assigns filter name.
+         */
+        protected HeadFilter() {
+            super("HeadFilter");
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionOpened(GridNioSession ses) throws 
IgniteCheckedException {
+            proceedSessionOpened(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionClosed(GridNioSession ses) throws 
IgniteCheckedException {
+            proceedSessionClosed(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onExceptionCaught(GridNioSession ses, 
IgniteCheckedException ex) throws IgniteCheckedException {
+            proceedExceptionCaught(ses, ex);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, 
Object msg) {
+            assert ses == HadoopIpcToNioAdapter.this.ses : "ses=" + ses +
+                ", this.ses=" + HadoopIpcToNioAdapter.this.ses;
+
+            return send((ByteBuffer)msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessageReceived(GridNioSession ses, Object 
msg) throws IgniteCheckedException {
+            proceedMessageReceived(ses, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) 
throws IgniteCheckedException {
+            // This call should be synced externally to avoid races.
+            boolean b = latchRef.compareAndSet(null, new CountDownLatch(1));
+
+            assert b;
+
+            return new GridNioFinishedFuture<>(b);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) 
throws IgniteCheckedException {
+            // This call should be synced externally to avoid races.
+            CountDownLatch latch = latchRef.getAndSet(null);
+
+            if (latch != null)
+                latch.countDown();
+
+            return new GridNioFinishedFuture<Object>(latch != null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession 
ses) {
+            assert ses == HadoopIpcToNioAdapter.this.ses;
+
+            boolean closed = HadoopIpcToNioAdapter.this.ses.setClosed();
+
+            if (closed)
+                endp.close();
+
+            return new GridNioFinishedFuture<>(closed);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionIdleTimeout(GridNioSession ses) throws 
IgniteCheckedException {
+            proceedSessionIdleTimeout(ses);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onSessionWriteTimeout(GridNioSession ses) throws 
IgniteCheckedException {
+            proceedSessionWriteTimeout(ses);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
new file mode 100644
index 0000000..3f79469
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMarshallerFilter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.marshaller.Marshaller;
+
+/**
+ * Serialization filter.
+ */
+public class HadoopMarshallerFilter extends GridNioFilterAdapter {
+    /** Marshaller. */
+    private Marshaller marshaller;
+
+    /**
+     * @param marshaller Marshaller to use.
+     */
+    public HadoopMarshallerFilter(Marshaller marshaller) {
+        super("HadoopMarshallerFilter");
+
+        this.marshaller = marshaller;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionOpened(GridNioSession ses) throws 
IgniteCheckedException {
+        proceedSessionOpened(ses);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionClosed(GridNioSession ses) throws 
IgniteCheckedException {
+        proceedSessionClosed(ses);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onExceptionCaught(GridNioSession ses, 
IgniteCheckedException ex) throws IgniteCheckedException {
+        proceedExceptionCaught(ses, ex);
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, 
Object msg) throws IgniteCheckedException {
+        assert msg instanceof HadoopMessage : "Invalid message type: " + msg;
+
+        return proceedSessionWrite(ses, marshaller.marshal(msg));
+    }
+
+    @Override public void onMessageReceived(GridNioSession ses, Object msg) 
throws IgniteCheckedException {
+        assert msg instanceof byte[];
+
+        // Always unmarshal with system classloader.
+        proceedMessageReceived(ses, marshaller.unmarshal((byte[])msg, null));
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) 
throws IgniteCheckedException {
+        return proceedSessionClose(ses);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionIdleTimeout(GridNioSession ses) throws 
IgniteCheckedException {
+        proceedSessionIdleTimeout(ses);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSessionWriteTimeout(GridNioSession ses) throws 
IgniteCheckedException {
+        proceedSessionWriteTimeout(ses);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
new file mode 100644
index 0000000..6d50f43
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopMessageListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+
+/**
+ * Hadoop communication message listener.
+ */
+public interface HadoopMessageListener {
+    /**
+     * @param desc Process descriptor.
+     * @param msg Hadoop message.
+     */
+    public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage 
msg);
+
+    /**
+     * Called when connection to remote process was lost.
+     *
+     * @param desc Process descriptor.
+     */
+    public void onConnectionLost(HadoopProcessDescriptor desc);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
new file mode 100644
index 0000000..17c2ff5
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopTcpNioCommunicationClient.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage;
+import 
org.apache.ignite.internal.processors.hadoop.taskexecutor.external.HadoopProcessDescriptor;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Grid client for NIO server.
+ */
+public class HadoopTcpNioCommunicationClient extends 
HadoopAbstractCommunicationClient {
+    /** Socket. */
+    private final GridNioSession ses;
+
+    /**
+     * Constructor for test purposes only.
+     */
+    public HadoopTcpNioCommunicationClient() {
+        ses = null;
+    }
+
+    /**
+     * @param ses Session.
+     */
+    public HadoopTcpNioCommunicationClient(GridNioSession ses) {
+        assert ses != null;
+
+        this.ses = ses;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean close() {
+        boolean res = super.close();
+
+        if (res)
+            ses.close();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void forceClose() {
+        super.forceClose();
+
+        ses.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void sendMessage(HadoopProcessDescriptor desc, 
HadoopMessage msg)
+        throws IgniteCheckedException {
+        if (closed())
+            throw new IgniteCheckedException("Client was closed: " + this);
+
+        GridNioFuture<?> fut = ses.send(msg);
+
+        if (fut.isDone())
+            fut.get();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getIdleTime() {
+        long now = U.currentTimeMillis();
+
+        // Session can be used for receiving and sending.
+        return Math.min(Math.min(now - ses.lastReceiveTime(), now - 
ses.lastSendScheduleTime()),
+            now - ses.lastSendTime());
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopTcpNioCommunicationClient.class, this, 
super.toString());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
new file mode 100644
index 0000000..750b314
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1CleanupTask.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import java.io.IOException;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.OutputCommitter;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
+
+/**
+ * Hadoop cleanup task implementation for v1 API.
+ */
+public class HadoopV1CleanupTask extends HadoopV1Task {
+    /** Abort flag. */
+    private final boolean abort;
+
+    /**
+     * @param taskInfo Task info.
+     * @param abort Abort flag.
+     */
+    public HadoopV1CleanupTask(HadoopTaskInfo taskInfo, boolean abort) {
+        super(taskInfo);
+
+        this.abort = abort;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        JobContext jobCtx = ctx.jobContext();
+
+        try {
+            OutputCommitter committer = 
jobCtx.getJobConf().getOutputCommitter();
+
+            if (abort)
+                committer.abortJob(jobCtx, JobStatus.State.FAILED);
+            else
+                committer.commitJob(jobCtx);
+        }
+        catch (IOException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
new file mode 100644
index 0000000..c623eab
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1Counter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.ignite.internal.processors.hadoop.counter.HadoopLongCounter;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2Counter;
+
+import static 
org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
+
+/**
+ * Hadoop counter implementation for v1 API.
+ */
+public class HadoopV1Counter extends Counters.Counter {
+    /** Delegate. */
+    private final HadoopLongCounter cntr;
+
+    /**
+     * Creates new instance.
+     *
+     * @param cntr Delegate counter.
+     */
+    public HadoopV1Counter(HadoopLongCounter cntr) {
+        this.cntr = cntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setDisplayName(String displayName) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getName() {
+        return cntr.name();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getDisplayName() {
+        return getName();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getValue() {
+        return cntr.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setValue(long val) {
+        cntr.value(val);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void increment(long incr) {
+        cntr.increment(incr);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(DataOutput out) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFields(DataInput in) throws IOException {
+        throw new UnsupportedOperationException("not implemented");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String makeEscapedCompactString() {
+        return toEscapedCompactString(new HadoopV2Counter(cntr));
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override public boolean contentEquals(Counters.Counter cntr) {
+        return getUnderlyingCounter().equals(cntr.getUnderlyingCounter());
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getCounter() {
+        return cntr.value();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Counter getUnderlyingCounter() {
+        return this;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b7489457/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
new file mode 100644
index 0000000..fb2266a
--- /dev/null
+++ 
b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/v1/HadoopV1MapTask.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.v1;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskCancelledException;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
+import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo;
+import org.apache.ignite.internal.processors.hadoop.v2.HadoopV2TaskContext;
+
+/**
+ * Hadoop map task implementation for v1 API.
+ */
+public class HadoopV1MapTask extends HadoopV1Task {
+    /** */
+    private static final String[] EMPTY_HOSTS = new String[0];
+
+    /**
+     * Constructor.
+     *
+     * @param taskInfo 
+     */
+    public HadoopV1MapTask(HadoopTaskInfo taskInfo) {
+        super(taskInfo);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void run(HadoopTaskContext taskCtx) throws 
IgniteCheckedException {
+        HadoopJob job = taskCtx.job();
+
+        HadoopV2TaskContext ctx = (HadoopV2TaskContext)taskCtx;
+
+        JobConf jobConf = ctx.jobConf();
+
+        InputFormat inFormat = jobConf.getInputFormat();
+
+        HadoopInputSplit split = info().inputSplit();
+
+        InputSplit nativeSplit;
+
+        if (split instanceof HadoopFileBlock) {
+            HadoopFileBlock block = (HadoopFileBlock)split;
+
+            nativeSplit = new FileSplit(new Path(block.file().toString()), 
block.start(), block.length(), EMPTY_HOSTS);
+        }
+        else
+            nativeSplit = (InputSplit)ctx.getNativeSplit(split);
+
+        assert nativeSplit != null;
+
+        Reporter reporter = new HadoopV1Reporter(taskCtx);
+
+        HadoopV1OutputCollector collector = null;
+
+        try {
+            collector = collector(jobConf, ctx, !job.info().hasCombiner() && 
!job.info().hasReducer(),
+                fileName(), ctx.attemptId());
+
+            RecordReader reader = inFormat.getRecordReader(nativeSplit, 
jobConf, reporter);
+
+            Mapper mapper = 
ReflectionUtils.newInstance(jobConf.getMapperClass(), jobConf);
+
+            Object key = reader.createKey();
+            Object val = reader.createValue();
+
+            assert mapper != null;
+
+            try {
+                try {
+                    while (reader.next(key, val)) {
+                        if (isCancelled())
+                            throw new HadoopTaskCancelledException("Map task 
cancelled.");
+
+                        mapper.map(key, val, collector, reporter);
+                    }
+                }
+                finally {
+                    mapper.close();
+                }
+            }
+            finally {
+                collector.closeWriter();
+            }
+
+            collector.commit();
+        }
+        catch (Exception e) {
+            if (collector != null)
+                collector.abort();
+
+            throw new IgniteCheckedException(e);
+        }
+    }
+}
\ No newline at end of file

Reply via email to