http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 01a5a71..cebe4b1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -56,6 +56,8 @@ import 
org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import 
org.apache.ignite.internal.managers.discovery.DiscoveryMessageResultsCollector;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
@@ -63,6 +65,7 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -147,6 +150,12 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /** Query sequence number for message topic. */
     private final AtomicLong seq = new AtomicLong();
 
+    /** */
+    private ContinuousRoutinesInfo routinesInfo;
+
+    /** */
+    private int discoProtoVer;
+
     /**
      * @param ctx Kernal context.
      */
@@ -156,6 +165,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
+        discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2;
+
+        if (discoProtoVer == 2)
+            routinesInfo = new ContinuousRoutinesInfo();
+
         if (ctx.config().isDaemon())
             return;
 
@@ -177,6 +191,8 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
                     ClusterNode snd,
                     StartRoutineDiscoveryMessage msg) {
+                    assert discoProtoVer == 1 : discoProtoVer;
+
                     if (ctx.isStopping())
                         return;
 
@@ -184,6 +200,20 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 }
             });
 
+        
ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class,
+            new CustomEventListener<StartRoutineDiscoveryMessageV2>() {
+                @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
+                    ClusterNode snd,
+                    StartRoutineDiscoveryMessageV2 msg) {
+                    assert discoProtoVer == 2 : discoProtoVer;
+
+                    if (ctx.isStopping())
+                        return;
+
+                    processStartRequestV2(topVer, snd, msg);
+                }
+            });
+
         
ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class,
             new CustomEventListener<StartRoutineAckDiscoveryMessage>() {
                 @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
@@ -201,6 +231,9 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                 @Override public void onCustomEvent(AffinityTopologyVersion 
topVer,
                     ClusterNode snd,
                     StopRoutineDiscoveryMessage msg) {
+                    if (discoProtoVer == 2)
+                        routinesInfo.removeRoutine(msg.routineId);
+
                     if (ctx.isStopping())
                         return;
 
@@ -222,32 +255,36 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         ctx.io().addMessageListener(TOPIC_CONTINUOUS, new 
GridMessageListener() {
             @Override public void onMessage(UUID nodeId, Object obj, byte plc) 
{
-                GridContinuousMessage msg = (GridContinuousMessage)obj;
+                if (obj instanceof ContinuousRoutineStartResultMessage)
+                    processRoutineStartResultMessage(nodeId, 
(ContinuousRoutineStartResultMessage)obj);
+                else {
+                    GridContinuousMessage msg = (GridContinuousMessage)obj;
 
-                if (msg.data() == null && msg.dataBytes() != null) {
-                    try {
-                        msg.data(U.unmarshal(marsh, msg.dataBytes(), 
U.resolveClassLoader(ctx.config())));
-                    }
-                    catch (IgniteCheckedException e) {
-                        U.error(log, "Failed to process message (ignoring): " 
+ msg, e);
+                    if (msg.data() == null && msg.dataBytes() != null) {
+                        try {
+                            msg.data(U.unmarshal(marsh, msg.dataBytes(), 
U.resolveClassLoader(ctx.config())));
+                        }
+                        catch (IgniteCheckedException e) {
+                            U.error(log, "Failed to process message 
(ignoring): " + msg, e);
 
-                        return;
+                            return;
+                        }
                     }
-                }
 
-                switch (msg.type()) {
-                    case MSG_EVT_NOTIFICATION:
-                        processNotification(nodeId, msg);
+                    switch (msg.type()) {
+                        case MSG_EVT_NOTIFICATION:
+                            processNotification(nodeId, msg);
 
-                        break;
+                            break;
 
-                    case MSG_EVT_ACK:
-                        processMessageAck(msg);
+                        case MSG_EVT_ACK:
+                            processMessageAck(msg);
 
-                        break;
+                            break;
 
-                    default:
-                        assert false : "Unexpected message received: " + 
msg.type();
+                        default:
+                            assert false : "Unexpected message received: " + 
msg.type();
+                    }
                 }
             }
         });
@@ -341,6 +378,15 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.isDaemon())
+            return;
+
+        if (discoProtoVer == 2) {
+            routinesInfo.collectJoiningNodeData(dataBag);
+
+            return;
+        }
+
         Serializable data = getDiscoveryData(dataBag.joiningNodeId());
 
         if (data != null)
@@ -349,6 +395,15 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
     /** {@inheritDoc} */
     @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.isDaemon())
+            return;
+
+        if (discoProtoVer == 2) {
+            routinesInfo.collectGridNodeData(dataBag);
+
+            return;
+        }
+
         Serializable data = getDiscoveryData(dataBag.joiningNodeId());
 
         if (data != null)
@@ -393,6 +448,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             return data;
         }
+
         return null;
     }
 
@@ -430,22 +486,118 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData 
data) {
         if (log.isDebugEnabled()) {
             log.debug("onJoiningNodeDataReceived [joining=" + 
data.joiningNodeId() +
-                    ", loc=" + ctx.localNodeId() +
-                    ", data=" + data.joiningNodeData() +
-                    ']');
+                ", loc=" + ctx.localNodeId() +
+                ", data=" + data.joiningNodeData() +
+                ']');
         }
 
-        if (data.hasJoiningNodeData())
-            onDiscoDataReceived((DiscoveryData) data.joiningNodeData());
+        if (discoProtoVer == 2) {
+            if (data.hasJoiningNodeData())    {
+                ContinuousRoutinesJoiningNodeDiscoveryData nodeData = 
(ContinuousRoutinesJoiningNodeDiscoveryData)
+                    data.joiningNodeData();
+
+                for (ContinuousRoutineInfo routineInfo : 
nodeData.startedRoutines) {
+                    routinesInfo.addRoutineInfo(routineInfo);
+
+                    startDiscoveryDataRoutine(routineInfo);
+                }
+            }
+        }
+        else {
+            if (data.hasJoiningNodeData())
+                onDiscoDataReceived((DiscoveryData) data.joiningNodeData());
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void onGridDataReceived(GridDiscoveryData data) {
-        Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
+        if (discoProtoVer == 2) {
+            if (ctx.isDaemon())
+                return;
+
+            if (data.commonData() != null) {
+                ContinuousRoutinesCommonDiscoveryData commonData =
+                    (ContinuousRoutinesCommonDiscoveryData)data.commonData();
+
+                for (ContinuousRoutineInfo routineInfo : 
commonData.startedRoutines) {
+                    if (routinesInfo.routineExists(routineInfo.routineId))
+                        continue;
+
+                    routinesInfo.addRoutineInfo(routineInfo);
+
+                    startDiscoveryDataRoutine(routineInfo);
+                }
+            }
+        }
+        else {
+            Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData();
+
+            if (nodeSpecData != null) {
+                for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
+                    onDiscoDataReceived((DiscoveryData) e.getValue());
+            }
+        }
+    }
+
+    /**
+     * @param routineInfo Routine info.
+     */
+    private void startDiscoveryDataRoutine(ContinuousRoutineInfo routineInfo) {
+        IgnitePredicate<ClusterNode> nodeFilter = null;
+
+        try {
+            if (routineInfo.nodeFilter != null) {
+                nodeFilter = U.unmarshal(marsh, routineInfo.nodeFilter, 
U.resolveClassLoader(ctx.config()));
+
+                ctx.resource().injectGeneric(nodeFilter);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to unmarshal continuous routine filter, 
ignore routine [" +
+                "routineId=" + routineInfo.routineId +
+                ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+
+            return;
+        }
+
+        if (nodeFilter == null || 
nodeFilter.apply(ctx.discovery().localNode())) {
+            GridContinuousHandler hnd;
 
-        if (nodeSpecData != null) {
-            for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet())
-                onDiscoDataReceived((DiscoveryData) e.getValue());
+            try {
+                hnd = U.unmarshal(marsh, routineInfo.hnd, 
U.resolveClassLoader(ctx.config()));
+
+                if (ctx.config().isPeerClassLoadingEnabled())
+                    hnd.p2pUnmarshal(routineInfo.srcNodeId, ctx);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to unmarshal continuous routine handler, 
ignore routine [" +
+                    "routineId=" + routineInfo.routineId +
+                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+
+                return;
+            }
+
+            try {
+                registerHandler(routineInfo.srcNodeId,
+                    routineInfo.routineId,
+                    hnd,
+                    routineInfo.bufSize,
+                    routineInfo.interval,
+                    routineInfo.autoUnsubscribe,
+                    false);
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to register continuous routine handler, 
ignore routine [" +
+                    "routineId=" + routineInfo.routineId +
+                    ", srcNodeId=" + routineInfo.srcNodeId + ']', e);
+            }
+        }
+        else {
+            if (log.isDebugEnabled()) {
+                log.debug("Do not register continuous routine, rejected by 
node filter [" +
+                    "routineId=" + routineInfo.routineId +
+                    ", srcNodeId=" + routineInfo.srcNodeId + ']');
+            }
         }
     }
 
@@ -564,13 +716,14 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
      * @param rmtFilter Remote filter.
      * @param prjPred Projection predicate.
      * @return Routine ID.
+     * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings("unchecked")
     public UUID registerStaticRoutine(
         String cacheName,
         CacheEntryUpdatedListener<?, ?> locLsnr,
         CacheEntryEventSerializableFilter rmtFilter,
-        @Nullable IgnitePredicate<ClusterNode> prjPred) {
+        @Nullable IgnitePredicate<ClusterNode> prjPred) throws 
IgniteCheckedException {
         String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName;
 
         CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler(
@@ -589,6 +742,17 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 
0, true);
 
+        if (discoProtoVer == 2) {
+            routinesInfo.addRoutineInfo(createRoutineInfo(
+                ctx.localNodeId(),
+                routineId,
+                hnd,
+                prjPred,
+                routineInfo.bufSize,
+                routineInfo.interval,
+                routineInfo.autoUnsubscribe));
+        }
+
         locInfos.put(routineId, routineInfo);
 
         registerMessageListener(hnd);
@@ -597,6 +761,40 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param srcNodeId Source node ID.
+     * @param routineId Routine ID.
+     * @param hnd Handler.
+     * @param nodeFilter Node filter.
+     * @param bufSize Handler buffer size.
+     * @param interval Time interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @return Routine info instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    private ContinuousRoutineInfo createRoutineInfo(
+        UUID srcNodeId,
+        UUID routineId,
+        GridContinuousHandler hnd,
+        @Nullable IgnitePredicate<ClusterNode> nodeFilter,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe)
+        throws IgniteCheckedException {
+        byte[] hndBytes = marsh.marshal(hnd);
+
+        byte[] filterBytes = nodeFilter != null ? marsh.marshal(nodeFilter) : 
null;
+
+        return new ContinuousRoutineInfo(
+            srcNodeId,
+            routineId,
+            hndBytes,
+            filterBytes,
+            bufSize,
+            interval,
+            autoUnsubscribe);
+    }
+
+    /**
      * @param hnd Handler.
      * @param bufSize Buffer size.
      * @param interval Time interval.
@@ -638,30 +836,10 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         // Whether local node is included in routine.
         boolean locIncluded = prjPred == null || 
prjPred.apply(ctx.discovery().localNode());
 
-        StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), 
bufSize, interval, autoUnsubscribe);
+        AbstractContinuousMessage msg;
 
         try {
-            if (ctx.config().isPeerClassLoadingEnabled()) {
-                // Handle peer deployment for projection predicate.
-                if (prjPred != null && !U.isGrid(prjPred.getClass())) {
-                    Class cls = U.detectClass(prjPred);
-
-                    String clsName = cls.getName();
-
-                    GridDeployment dep = ctx.deploy().deploy(cls, 
U.detectClassLoader(cls));
-
-                    if (dep == null)
-                        throw new IgniteDeploymentCheckedException("Failed to 
deploy projection predicate: " + prjPred);
-
-                    reqData.className(clsName);
-                    reqData.deploymentInfo(new GridDeploymentInfoBean(dep));
-
-                    reqData.p2pMarshal(marsh);
-                }
-
-                // Handle peer deployment for other handler-specific objects.
-                reqData.handler().p2pMarshal(ctx);
-            }
+            msg = createStartMessage(routineId, hnd, bufSize, interval, 
autoUnsubscribe, prjPred);
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture<>(e);
@@ -674,20 +852,26 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
             return new GridFinishedFuture<>(new NodeStoppingException("Failed 
to start continuous query (node is stopping)"));
 
         try {
-            StartFuture fut = new StartFuture(ctx, routineId);
+            StartFuture fut = new StartFuture(routineId);
 
             startFuts.put(routineId, fut);
 
             try {
-                if (locIncluded || hnd.isQuery())
-                    registerHandler(ctx.localNodeId(), routineId, hnd, 
bufSize, interval, autoUnsubscribe, true);
+                if (locIncluded || hnd.isQuery()) {
+                    registerHandler(ctx.localNodeId(),
+                        routineId,
+                        hnd,
+                        bufSize,
+                        interval,
+                        autoUnsubscribe,
+                        true);
+                }
 
-                ctx.discovery().sendCustomEvent(new 
StartRoutineDiscoveryMessage(routineId, reqData,
-                    reqData.handler().keepBinary()));
-            }
-            catch (IgniteCheckedException e) {
-                startFuts.remove(routineId);
-                locInfos.remove(routineId);
+            ctx.discovery().sendCustomEvent(msg);
+        }
+        catch (IgniteCheckedException e) {
+            startFuts.remove(routineId);
+            locInfos.remove(routineId);
 
                 unregisterHandler(routineId, hnd, true);
 
@@ -707,6 +891,92 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param routineId Routine ID.
+     * @param hnd Handler.
+     * @param bufSize Buffer size.
+     * @param interval Interval.
+     * @param autoUnsubscribe Auto unsubscribe flag.
+     * @param nodeFilter Node filter.
+     * @return Routine start message.
+     * @throws IgniteCheckedException If failed.
+     */
+    private AbstractContinuousMessage createStartMessage(UUID routineId,
+        GridContinuousHandler hnd,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe,
+        @Nullable IgnitePredicate<ClusterNode> nodeFilter)
+        throws IgniteCheckedException
+    {
+        hnd = hnd.clone();
+
+        String clsName = null;
+        GridDeploymentInfoBean dep = null;
+
+        if (ctx.config().isPeerClassLoadingEnabled()) {
+            // Handle peer deployment for projection predicate.
+            if (nodeFilter != null && !U.isGrid(nodeFilter.getClass())) {
+                Class cls = U.detectClass(nodeFilter);
+
+                clsName = cls.getName();
+
+                GridDeployment dep0 = ctx.deploy().deploy(cls, 
U.detectClassLoader(cls));
+
+                if (dep0 == null)
+                    throw new IgniteDeploymentCheckedException("Failed to 
deploy projection predicate: " + nodeFilter);
+
+                dep = new GridDeploymentInfoBean(dep0);
+            }
+
+            // Handle peer deployment for other handler-specific objects.
+            hnd.p2pMarshal(ctx);
+        }
+
+        if (discoProtoVer == 1) {
+            StartRequestData reqData = new StartRequestData(
+                nodeFilter,
+                hnd,
+                bufSize,
+                interval,
+                autoUnsubscribe);
+
+            if (clsName != null) {
+                reqData.className(clsName);
+                reqData.deploymentInfo(dep);
+
+                reqData.p2pMarshal(marsh);
+            }
+
+            return new StartRoutineDiscoveryMessage(
+                routineId,
+                reqData,
+                reqData.handler().keepBinary());
+        }
+        else {
+            assert discoProtoVer == 2 : discoProtoVer;
+
+            byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh, 
nodeFilter) : null;
+            byte[] hndBytes =  U.marshal(marsh, hnd);
+
+            StartRequestDataV2 reqData = new 
StartRequestDataV2(nodeFilterBytes,
+                hndBytes,
+                bufSize,
+                interval,
+                autoUnsubscribe);
+
+            if (clsName != null) {
+                reqData.className(clsName);
+                reqData.deploymentInfo(dep);
+            }
+
+            return new StartRoutineDiscoveryMessageV2(
+                routineId,
+                reqData,
+                hnd.keepBinary());
+        }
+    }
+
+    /**
      * @param hnd Handler.
      */
     private void registerMessageListener(GridContinuousHandler hnd) {
@@ -760,29 +1030,38 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
                     doStop = true;
             }
 
-            if (doStop) {
-                // Unregister routine locally.
-                LocalRoutineInfo routine = locInfos.remove(routineId);
-
-                // Finish if routine is not found (wrong ID is provided).
-                if (routine == null) {
-                    stopFuts.remove(routineId);
+        if (doStop) {
+            boolean stop = false;
 
-                    fut.onDone();
+            // Unregister routine locally.
+            LocalRoutineInfo routine = locInfos.remove(routineId);
 
-                    return fut;
-                }
+            if (routine != null) {
+                stop = true;
 
                 // Unregister handler locally.
                 unregisterHandler(routineId, routine.hnd, true);
+            }
 
-                try {
-                    ctx.discovery().sendCustomEvent(new 
StopRoutineDiscoveryMessage(routineId));
-                }
-                catch (IgniteCheckedException e) {
-                    fut.onDone(e);
+            if (!stop && discoProtoVer == 2)
+                stop = routinesInfo.routineExists(routineId);
+
+            // Finish if routine is not found (wrong ID is provided).
+            if (!stop) {
+                stopFuts.remove(routineId);
+
+                    fut.onDone();
+
+                    return fut;
                 }
 
+            try {
+                ctx.discovery().sendCustomEvent(new 
StopRoutineDiscoveryMessage(routineId));
+            }
+            catch (IgniteCheckedException e) {
+                fut.onDone(e);
+            }
+
                 if (ctx.isStopping())
                     fut.onDone();
             }
@@ -924,6 +1203,9 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
         clientInfos.clear();
 
+        if (discoProtoVer == 2)
+            routinesInfo.onClientDisconnected(locInfos.keySet());
+
         if (log.isDebugEnabled()) {
             log.debug("after onDisconnected [rmtInfos=" + rmtInfos +
                 ", locInfos=" + locInfos +
@@ -996,35 +1278,11 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         StartFuture fut = startFuts.remove(msg.routineId());
 
         if (fut != null) {
-            if (msg.errs().isEmpty()) {
-                LocalRoutineInfo routine = locInfos.get(msg.routineId());
-
-                // Update partition counters.
-                if (routine != null && routine.handler().isQuery()) {
-                    Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = 
msg.updateCountersPerNode();
-                    Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters();
-
-                    GridCacheAdapter<Object, Object> interCache =
-                        
ctx.cache().internalCache(routine.handler().cacheName());
-
-                    GridCacheContext cctx = interCache != null ? 
interCache.context() : null;
-
-                    if (cctx != null && cntrsPerNode != null && 
!cctx.isLocal() && cctx.affinityNode())
-                        cntrsPerNode.put(ctx.localNodeId(),
-                            
toCountersMap(cctx.topology().localUpdateCounters(false)));
-
-                    routine.handler().updateCounters(topVer, cntrsPerNode, 
cntrs);
-                }
-
-                fut.onRemoteRegistered();
-            }
-            else {
-                IgniteCheckedException firstEx = F.first(msg.errs().values());
-
-                fut.onDone(firstEx);
-
-                stopRoutine(msg.routineId());
-            }
+            fut.onAllRemoteRegistered(
+                topVer,
+                msg.errs(),
+                msg.updateCountersPerNode(),
+                msg.updateCounters());
         }
     }
 
@@ -1138,6 +1396,199 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     * @param sndId Sender node ID.
+     * @param msg Message.
+     */
+    private void processRoutineStartResultMessage(UUID sndId, 
ContinuousRoutineStartResultMessage msg) {
+        StartFuture fut = startFuts.get(msg.routineId());
+
+        if (fut != null)
+            fut.onResult(sndId, msg);
+    }
+
+    /**
+     * @param topVer Current topology version.
+     * @param snd Sender.
+     * @param msg Start request.
+     */
+    private void processStartRequestV2(final AffinityTopologyVersion topVer,
+        final ClusterNode snd,
+        final StartRoutineDiscoveryMessageV2 msg) {
+        StartRequestDataV2 reqData = msg.startRequestData();
+
+        ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(),
+            msg.routineId(),
+            reqData.handlerBytes(),
+            reqData.nodeFilterBytes(),
+            reqData.bufferSize(),
+            reqData.interval(),
+            reqData.autoUnsubscribe());
+
+        routinesInfo.addRoutineInfo(routineInfo);
+
+        final DiscoCache discoCache = ctx.discovery().discoCache(topVer);
+
+        // Should not use marshaller and send messages from discovery thread.
+        ctx.getSystemExecutorService().execute(new Runnable() {
+            @Override public void run() {
+                if (snd.id().equals(ctx.localNodeId())) {
+                    StartFuture fut = startFuts.get(msg.routineId());
+
+                    if (fut != null)
+                        fut.initRemoteNodes(discoCache);
+
+                    return;
+                }
+
+                StartRequestDataV2 reqData = msg.startRequestData();
+
+                Exception err = null;
+
+                IgnitePredicate<ClusterNode> nodeFilter = null;
+
+                byte[] cntrs = null;
+
+                if (reqData.nodeFilterBytes() != null) {
+                    try {
+                        if (ctx.config().isPeerClassLoadingEnabled() && 
reqData.className() != null) {
+                            String clsName = reqData.className();
+                            GridDeploymentInfo depInfo = 
reqData.deploymentInfo();
+
+                            GridDeployment dep = 
ctx.deploy().getGlobalDeployment(depInfo.deployMode(),
+                                clsName,
+                                clsName,
+                                depInfo.userVersion(),
+                                snd.id(),
+                                depInfo.classLoaderId(),
+                                depInfo.participants(),
+                                null);
+
+                            if (dep == null) {
+                                throw new 
IgniteDeploymentCheckedException("Failed to obtain deployment " +
+                                    "for class: " + clsName);
+                            }
+
+                            nodeFilter = U.unmarshal(marsh,
+                                reqData.nodeFilterBytes(),
+                                U.resolveClassLoader(dep.classLoader(), 
ctx.config()));
+                        }
+                        else {
+                            nodeFilter = U.unmarshal(marsh,
+                                reqData.nodeFilterBytes(),
+                                U.resolveClassLoader(ctx.config()));
+                        }
+
+                        if (nodeFilter != null)
+                            ctx.resource().injectGeneric(nodeFilter);
+                    }
+                    catch (Exception e) {
+                        err = e;
+
+                        U.error(log, "Failed to unmarshal continuous routine 
filter [" +
+                            "routineId=" + msg.routineId +
+                            ", srcNodeId=" + snd.id() + ']', e);
+                    }
+                }
+
+                boolean register = err == null &&
+                    (nodeFilter == null || 
nodeFilter.apply(ctx.discovery().localNode()));
+
+                if (register) {
+                    try {
+                        GridContinuousHandler hnd = U.unmarshal(marsh,
+                            reqData.handlerBytes(),
+                            U.resolveClassLoader(ctx.config()));
+
+                        if (ctx.config().isPeerClassLoadingEnabled())
+                            hnd.p2pUnmarshal(snd.id(), ctx);
+
+                        if (msg.keepBinary()) {
+                            assert hnd instanceof CacheContinuousQueryHandler 
: hnd;
+
+                            
((CacheContinuousQueryHandler)hnd).keepBinary(true);
+                        }
+
+                        GridContinuousHandler hnd0 = hnd instanceof 
GridMessageListenHandler ?
+                            new 
GridMessageListenHandler((GridMessageListenHandler)hnd) :
+                            hnd;
+
+                        registerHandler(snd.id(),
+                            msg.routineId,
+                            hnd0,
+                            reqData.bufferSize(),
+                            reqData.interval(),
+                            reqData.autoUnsubscribe(),
+                            false);
+
+                        if (hnd0.isQuery()) {
+                            GridCacheProcessor proc = ctx.cache();
+
+                            if (proc != null) {
+                                GridCacheAdapter cache = 
ctx.cache().internalCache(hnd0.cacheName());
+
+                                if (cache != null && !cache.isLocal() && 
cache.context().userCache()) {
+                                    CachePartitionPartialCountersMap cntrsMap =
+                                        
cache.context().topology().localUpdateCounters(false);
+
+                                    cntrs = U.marshal(marsh, cntrsMap);
+                                }
+                            }
+                        }
+                    }
+                    catch (Exception e) {
+                        err = e;
+
+                        U.error(log, "Failed to register continuous routine 
handler [" +
+                            "routineId=" + msg.routineId +
+                            ", srcNodeId=" + snd.id() + ']', e);
+                    }
+                }
+
+                sendMessageStartResult(snd, msg.routineId(), cntrs, err);
+            }
+        });
+    }
+
+    /**
+     * @param node Target node.
+     * @param routineId Routine ID.
+     * @param cntrsMapBytes Marshalled {@link 
CachePartitionPartialCountersMap}.
+     * @param err Start error if any.
+     */
+    private void sendMessageStartResult(final ClusterNode node,
+        final UUID routineId,
+        byte[] cntrsMapBytes,
+        final @Nullable Exception err)
+    {
+        byte[] errBytes = null;
+
+        if (err != null) {
+            try {
+                errBytes = U.marshal(marsh, err);
+            }
+            catch (Exception e) {
+                U.error(log, "Failed to marshal routine start error: " + e, e);
+            }
+        }
+
+        ContinuousRoutineStartResultMessage msg = new 
ContinuousRoutineStartResultMessage(routineId,
+            cntrsMapBytes,
+            errBytes,
+            err != null);
+
+        try {
+            ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, 
null);
+        }
+        catch (ClusterTopologyCheckedException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to send routine start result, node failed: " 
+ e);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send routine start result: " + e, e);
+        }
+    }
+
+    /**
      * @param msg Message.
      */
     private void processMessageAck(GridContinuousMessage msg) {
@@ -1455,6 +1906,13 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
 
             UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
+            if (discoProtoVer == 2) {
+                routinesInfo.onNodeFail(nodeId);
+
+                for (StartFuture fut : startFuts.values())
+                    fut.onNodeFail(nodeId);
+            }
+
             clientInfos.remove(nodeId);
 
             // Unregister handlers created by left node.
@@ -1894,10 +2352,7 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     /**
      * Future for start routine.
      */
-    private static class StartFuture extends GridFutureAdapter<UUID> {
-        /** */
-        private GridKernalContext ctx;
-
+    private class StartFuture extends GridFutureAdapter<UUID> {
         /** Consume ID. */
         private UUID routineId;
 
@@ -1907,56 +2362,170 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
         /** All remote listeners are registered. */
         private volatile boolean rmt;
 
-        /** Timeout object. */
-        private volatile GridTimeoutObject timeoutObj;
+        /** */
+        private final 
DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, 
RoutineRegisterResults>
+            resCollect;
 
         /**
-         * @param ctx Kernal context.
          * @param routineId Consume ID.
          */
-        StartFuture(GridKernalContext ctx, UUID routineId) {
-            this.ctx = ctx;
-
+        StartFuture(UUID routineId) {
             this.routineId = routineId;
+
+            resCollect = new 
DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, 
RoutineRegisterResults>(ctx) {
+                @Override protected RoutineRegisterResults 
createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) {
+                    Map<UUID, Exception> errs = null;
+                    Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = 
null;
+
+                    for (Map.Entry<UUID, 
NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) {
+                        ContinuousRoutineStartResultMessage msg = 
entry.getValue().message();
+
+                        if (msg == null)
+                            continue;
+
+                        if (msg.error()) {
+                            byte[] errBytes = msg.errorBytes();
+
+                            Exception err = null;
+
+                            if (errBytes != null) {
+                                try {
+                                    err = U.unmarshal(marsh, errBytes, 
U.resolveClassLoader(ctx.config()));
+                                }
+                                catch (Exception e) {
+                                    U.warn(log, "Failed to unmarhal continuous 
routine start error: " + e);
+                                }
+                            }
+
+                            if (err == null) {
+                                err = new IgniteCheckedException("Failed to 
start continuous " +
+                                    "routine on node: " + entry.getKey());
+                            }
+
+                            if (errs == null)
+                                errs = new HashMap<>();
+
+                            errs.put(entry.getKey(), err);
+                        }
+                        else {
+                            byte[] cntrsMapBytes = msg.countersMapBytes();
+
+                            if (cntrsMapBytes != null) {
+                                try {
+                                    CachePartitionPartialCountersMap cntrsMap 
= U.unmarshal(
+                                        marsh,
+                                        cntrsMapBytes,
+                                        U.resolveClassLoader(ctx.config()));
+
+                                    if (cntrsPerNode == null)
+                                        cntrsPerNode = new HashMap<>();
+
+                                    cntrsPerNode.put(entry.getKey(), 
CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
+                                }
+                                catch (Exception e) {
+                                    U.warn(log, "Failed to unmarhal continuous 
query update counters: " + e);
+                                }
+                            }
+                        }
+                    }
+
+                    return new RoutineRegisterResults(discoCache.version(), 
errs, cntrsPerNode);
+                }
+
+                @Override protected void 
onResultsCollected(RoutineRegisterResults res0) {
+                    onAllRemoteRegistered(res0.topVer, res0.errs, 
res0.cntrsPerNode, null);
+                }
+
+                @Override protected boolean waitForNode(DiscoCache discoCache, 
ClusterNode node) {
+                    return !ctx.localNodeId().equals(node.id());
+                }
+            };
         }
 
         /**
-         * Called when local listener is registered.
+         * @param topVer Topology version.
+         * @param errs Errors.
+         * @param cntrsPerNode Update counters.
+         * @param cntrs Update counters.
          */
-        public void onLocalRegistered() {
-            loc = true;
+        private void onAllRemoteRegistered(
+            AffinityTopologyVersion topVer,
+            @Nullable Map<UUID, ? extends Exception> errs,
+            Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode,
+            Map<Integer, T2<Long, Long>> cntrs) {
+            try {
+                if (errs == null || errs.isEmpty()) {
+                    LocalRoutineInfo routine = locInfos.get(routineId);
 
-            if (rmt && !isDone())
-                onDone(routineId);
+                    // Update partition counters.
+                    if (routine != null && routine.handler().isQuery()) {
+                        GridCacheAdapter<Object, Object> interCache =
+                            
ctx.cache().internalCache(routine.handler().cacheName());
+
+                        GridCacheContext cctx = interCache != null ? 
interCache.context() : null;
+
+                        if (cctx != null && cntrsPerNode != null && 
!cctx.isLocal() && cctx.affinityNode())
+                            cntrsPerNode.put(ctx.localNodeId(),
+                                
toCountersMap(cctx.topology().localUpdateCounters(false)));
+
+                        routine.handler().updateCounters(topVer, cntrsPerNode, 
cntrs);
+                    }
+
+                    onRemoteRegistered();
+                }
+                else {
+                    Exception firstEx = F.first(errs.values());
+
+                    onDone(firstEx);
+
+                    stopRoutine(routineId);
+                }
+            }
+            finally {
+                startFuts.remove(routineId, this);
+            }
         }
 
         /**
-         * Called when all remote listeners are registered.
+         * @param discoCache Discovery state.
          */
-        public void onRemoteRegistered() {
-            rmt = true;
+        void initRemoteNodes(DiscoCache discoCache) {
+            resCollect.init(discoCache);
+        }
 
-            if (loc && !isDone())
-                onDone(routineId);
+        /**
+         * @param nodeId Node ID.
+         * @param msg Message.
+         */
+        void onResult(UUID nodeId, ContinuousRoutineStartResultMessage msg) {
+            resCollect.onMessage(nodeId, msg);
         }
 
         /**
-         * @param timeoutObj Timeout object.
+         * @param nodeId Failed node ID.
          */
-        public void addTimeoutObject(GridTimeoutObject timeoutObj) {
-            assert timeoutObj != null;
+        void onNodeFail(UUID nodeId) {
+            resCollect.onNodeFail(nodeId);
+        }
 
-            this.timeoutObj = timeoutObj;
+        /**
+         * Called when local listener is registered.
+         */
+        void onLocalRegistered() {
+            loc = true;
 
-            ctx.timeout().addTimeoutObject(timeoutObj);
+            if (rmt && !isDone())
+                onDone(routineId);
         }
 
-        /** {@inheritDoc} */
-        @Override public boolean onDone(@Nullable UUID res, @Nullable 
Throwable err) {
-            if (timeoutObj != null)
-                ctx.timeout().removeTimeoutObject(timeoutObj);
+        /**
+         * Called when all remote listeners are registered.
+         */
+        void onRemoteRegistered() {
+            rmt = true;
 
-            return super.onDone(res, err);
+            if (loc && !isDone())
+                onDone(routineId);
         }
 
         /** {@inheritDoc} */
@@ -1966,6 +2535,33 @@ public class GridContinuousProcessor extends 
GridProcessorAdapter {
     }
 
     /**
+     *
+     */
+    private static class RoutineRegisterResults {
+        /** */
+        private final AffinityTopologyVersion topVer;
+
+        /** */
+        private final Map<UUID, ? extends Exception> errs;
+
+        /** */
+        private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode;
+
+        /**
+         * @param topVer Topology version.
+         * @param errs Errors.
+         * @param cntrsPerNode Update counters.
+         */
+        RoutineRegisterResults(AffinityTopologyVersion topVer,
+            Map<UUID, ? extends Exception> errs,
+            Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) {
+            this.topVer = topVer;
+            this.errs = errs;
+            this.cntrsPerNode = cntrsPerNode;
+        }
+    }
+
+    /**
      * Future for stop routine.
      */
     private static class StopFuture extends GridFutureAdapter<Object> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
new file mode 100644
index 0000000..c001616
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Start request data.
+ */
+class StartRequestDataV2 implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Serialized node filter. */
+    private byte[] nodeFilterBytes;
+
+    /** Deployment class name. */
+    private String clsName;
+
+    /** Deployment info. */
+    private GridDeploymentInfo depInfo;
+
+    /** Serialized handler. */
+    private byte[] hndBytes;
+
+    /** Buffer size. */
+    private int bufSize;
+
+    /** Time interval. */
+    private long interval;
+
+    /** Automatic unsubscribe flag. */
+    private boolean autoUnsubscribe;
+
+    /**
+     * @param nodeFilterBytes Serialized node filter.
+     * @param hndBytes Serialized handler.
+     * @param bufSize Buffer size.
+     * @param interval Time interval.
+     * @param autoUnsubscribe Automatic unsubscribe flag.
+     */
+    StartRequestDataV2(
+        byte[] nodeFilterBytes,
+        byte[] hndBytes,
+        int bufSize,
+        long interval,
+        boolean autoUnsubscribe) {
+        assert hndBytes != null;
+        assert bufSize > 0;
+        assert interval >= 0;
+
+        this.nodeFilterBytes = nodeFilterBytes;
+        this.hndBytes = hndBytes;
+        this.bufSize = bufSize;
+        this.interval = interval;
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+
+    /**
+     * @return Serialized node filter.
+     */
+    public byte[] nodeFilterBytes() {
+        return nodeFilterBytes;
+    }
+
+    /**
+     * @return Deployment class name.
+     */
+    public String className() {
+        return clsName;
+    }
+
+    /**
+     * @param clsName New deployment class name.
+     */
+    public void className(String clsName) {
+        this.clsName = clsName;
+    }
+
+    /**
+     * @return Deployment info.
+     */
+    public GridDeploymentInfo deploymentInfo() {
+        return depInfo;
+    }
+
+    /**
+     * @param depInfo New deployment info.
+     */
+    public void deploymentInfo(GridDeploymentInfo depInfo) {
+        this.depInfo = depInfo;
+    }
+
+    /**
+     * @return Handler.
+     */
+    public byte[] handlerBytes() {
+        return hndBytes;
+    }
+
+    /**
+     * @return Buffer size.
+     */
+    public int bufferSize() {
+        return bufSize;
+    }
+
+    /**
+     * @param bufSize New buffer size.
+     */
+    public void bufferSize(int bufSize) {
+        this.bufSize = bufSize;
+    }
+
+    /**
+     * @return Time interval.
+     */
+    public long interval() {
+        return interval;
+    }
+
+    /**
+     * @param interval New time interval.
+     */
+    public void interval(long interval) {
+        this.interval = interval;
+    }
+
+    /**
+     * @return Automatic unsubscribe flag.
+     */
+    public boolean autoUnsubscribe() {
+        return autoUnsubscribe;
+    }
+
+    /**
+     * @param autoUnsubscribe New automatic unsubscribe flag.
+     */
+    public void autoUnsubscribe(boolean autoUnsubscribe) {
+        this.autoUnsubscribe = autoUnsubscribe;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StartRequestDataV2.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
new file mode 100644
index 0000000..275765d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.UUID;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final int KEEP_BINARY_FLAG = 0x01;
+
+    /** */
+    private final StartRequestDataV2 startReqData;
+
+    /** Flags. */
+    private int flags;
+
+    /**
+     * @param routineId Routine id.
+     * @param startReqData Start request data.
+     * @param keepBinary Keep binary flag.
+     */
+    StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 
startReqData, boolean keepBinary) {
+        super(routineId);
+
+        this.startReqData = startReqData;
+
+        if (keepBinary)
+            flags |= KEEP_BINARY_FLAG;
+    }
+
+    /**
+     * @return Start request data.
+     */
+    public StartRequestDataV2 startRequestData() {
+        return startReqData;
+    }
+
+    /**
+     * @return {@code True} if keep binary flag was set on continuous handler.
+     */
+    public boolean keepBinary() {
+        return (flags & KEEP_BINARY_FLAG) != 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryCustomMessage ackMessage() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(StartRoutineDiscoveryMessageV2.class, this, 
"routineId", routineId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
index 79d8b29..dfba0e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -42,6 +42,11 @@ public class StopRoutineAckDiscoveryMessage extends 
AbstractContinuousMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(StopRoutineAckDiscoveryMessage.class, this, 
"routineId", routineId());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 4a893f4..8cad342 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -47,6 +47,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteDataStreamerTimeoutException;
 import org.apache.ignite.IgniteException;
@@ -100,6 +101,7 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.GPC;
@@ -1059,6 +1061,9 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
             return;
 
         while (true) {
+            if (disconnectErr != null)
+                throw disconnectErr;
+
             Queue<IgniteInternalFuture<?>> q = null;
 
             for (Buffer buf : bufMappings.values()) {
@@ -1826,15 +1831,19 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
                 catch (IgniteCheckedException e) {
                     GridFutureAdapter<Object> fut0 = 
((GridFutureAdapter<Object>)fut);
 
-                    try {
-                        if (ctx.discovery().alive(node) && 
ctx.discovery().pingNode(node.id()))
-                            fut0.onDone(e);
-                        else
-                            fut0.onDone(new 
ClusterTopologyCheckedException("Failed to send request (node has left): "
-                                + node.id()));
-                    }
-                    catch (IgniteClientDisconnectedCheckedException e0) {
-                        fut0.onDone(e0);
+                    if (X.hasCause(e, 
IgniteClientDisconnectedCheckedException.class, 
IgniteClientDisconnectedException.class))
+                        fut0.onDone(e);
+                    else {
+                        try {
+                            if (ctx.discovery().alive(node) && 
ctx.discovery().pingNode(node.id()))
+                                fut0.onDone(e);
+                            else
+                                fut0.onDone(new 
ClusterTopologyCheckedException("Failed to send request (node has left): "
+                                    + node.id()));
+                        }
+                        catch (IgniteClientDisconnectedCheckedException e0) {
+                            fut0.onDone(e0);
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
index 7af0559..80e3f7d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java
@@ -63,6 +63,11 @@ public class MappingAcceptedMessage implements 
DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
index b4e13fb..9358585 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java
@@ -98,6 +98,11 @@ public class MappingProposedMessage implements 
DiscoveryCustomMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public DiscoCache 
createDiscoCache(GridDiscoveryManager mgr,
         AffinityTopologyVersion topVer, DiscoCache discoCache) {
         throw new UnsupportedOperationException();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
index 2245b24..f802e09 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -59,6 +59,11 @@ public class SchemaFinishDiscoveryMessage extends 
SchemaAbstractDiscoveryMessage
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean exchange() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
index 0e1270b..62b6d6a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -60,6 +60,11 @@ public class SchemaProposeDiscoveryMessage extends 
SchemaAbstractDiscoveryMessag
     }
 
     /** {@inheritDoc} */
+    @Override public boolean stopProcess() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean exchange() {
         return exchange;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index e0ec8d1..0fcde0e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -491,6 +491,17 @@ public class GridNioServer<T> {
 
     /**
      * @param ses Session.
+     */
+    public void closeFromWorkerThread(GridNioSession ses) {
+        assert ses instanceof GridSelectorNioSessionImpl : ses;
+
+        GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses;
+
+        
((AbstractNioClientWorker)ses0.worker()).close((GridSelectorNioSessionImpl)ses, 
null);
+    }
+
+    /**
+     * @param ses Session.
      * @param msg Message.
      * @param createFut {@code True} if future should be created.
      * @param ackC Closure invoked when message ACK is received.
@@ -2170,7 +2181,12 @@ public class GridNioServer<T> {
                 dumpSelectorInfo(sb, keys);
 
             for (SelectionKey key : keys) {
-                GridSelectorNioSessionImpl ses = 
(GridSelectorNioSessionImpl)key.attachment();
+                GridNioKeyAttachment attach = 
(GridNioKeyAttachment)key.attachment();
+
+                if (!attach.hasSession())
+                    continue;
+
+                GridSelectorNioSessionImpl ses = attach.session();
 
                 boolean sesInfo = p == null || p.apply(ses);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 1754cc8..e8c27d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -963,5 +963,15 @@ public abstract class IgniteSpiAdapter implements 
IgniteSpi {
         @Override public Map<String, Object> nodeAttributes() {
             return Collections.emptyMap();
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean communicationFailureResolveSupported() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void resolveCommunicationFailure(ClusterNode node, 
Exception err) {
+            throw new UnsupportedOperationException();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
index 108c4d4..d4402f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java
@@ -365,4 +365,15 @@ public interface IgniteSpiContext {
      * @return Current node attributes.
      */
     public Map<String, Object> nodeAttributes();
+
+    /**
+     * @return {@code True} if cluster supports communication error resolving.
+     */
+    public boolean communicationFailureResolveSupported();
+
+    /**
+     * @param node Problem node.
+     * @param err Error.
+     */
+    public void resolveCommunicationFailure(ClusterNode node, Exception err);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e1addd8..4a0710e 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -32,6 +32,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.AbstractInterruptibleChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -66,12 +67,14 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.ipc.IpcEndpoint;
 import org.apache.ignite.internal.util.ipc.IpcToNioAdapter;
 import 
org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException;
@@ -134,6 +137,9 @@ import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
+import 
org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture;
+import 
org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage;
 import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2;
 import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage;
@@ -146,6 +152,7 @@ import org.jetbrains.annotations.Nullable;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static 
org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
+import static 
org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
 import static 
org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
 import static 
org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
 import static 
org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
@@ -310,7 +317,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
     private static final IgniteProductVersion 
VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = 
IgniteProductVersion.fromString("2.1.4");
 
     /** Connection index meta for session. */
-    private static final int CONN_IDX_META = 
GridNioSessionMetaKey.nextUniqueKey();
+    public static final int CONN_IDX_META = 
GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = 
GridNioSessionMetaKey.nextUniqueKey();
@@ -408,6 +415,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                 ConnectionKey connId = ses.meta(CONN_IDX_META);
 
                 if (connId != null) {
+                    if (connId.dummy())
+                        return;
+
                     UUID id = connId.nodeId();
 
                     GridCommunicationClient[] nodeClients = clients.get(id);
@@ -481,20 +491,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                 if (rmtNode == null) {
                     DiscoverySpi discoverySpi = 
ignite().configuration().getDiscoverySpi();
 
-                    assert discoverySpi instanceof TcpDiscoverySpi;
-
-                    TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) 
discoverySpi;
+                    boolean unknownNode = true;
 
-                    ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
+                    if (discoverySpi instanceof TcpDiscoverySpi) {
+                        TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) 
discoverySpi;
 
-                    boolean unknownNode = true;
+                        ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
 
-                    if (node0 != null) {
-                        assert node0.isClient() : node0;
+                        if (node0 != null) {
+                            assert node0.isClient() : node0;
 
-                        if 
(node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0)
-                            unknownNode = false;
+                            if 
(node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0)
+                                unknownNode = false;
+                        }
                     }
+                    else if (discoverySpi instanceof IgniteDiscoverySpi)
+                        unknownNode = !((IgniteDiscoverySpi) 
discoverySpi).knownNode(sndId);
 
                     if (unknownNode) {
                         U.warn(log, "Close incoming connection, unknown node 
[nodeId=" + sndId + ", ses=" + ses + ']');
@@ -709,9 +721,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     }
                 }
                 else {
-                    metricsLsnr.onMessageReceived(msg, connKey.nodeId());
-
                     if (msg instanceof RecoveryLastReceivedMessage) {
+                        metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+
                         GridNioRecoveryDescriptor recovery = 
ses.outRecoveryDescriptor();
 
                         if (recovery != null) {
@@ -724,9 +736,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                             }
 
                             recovery.ackReceived(msg0.received());
-
-                            return;
                         }
+
+                        return;
                     }
                     else {
                         GridNioRecoveryDescriptor recovery = 
ses.inRecoveryDescriptor();
@@ -746,8 +758,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                                 recovery.lastAcknowledged(rcvCnt);
                             }
                         }
+                        else if (connKey.dummy()) {
+                            assert msg instanceof NodeIdMessage : msg;
+
+                            TcpCommunicationNodeConnectionCheckFuture fut = 
ses.meta(SES_FUT_META);
+
+                            assert fut != null : msg;
+
+                            
fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0));
+
+                            nioSrvr.closeFromWorkerThread(ses);
+
+                            return;
+                        }
                     }
 
+                    metricsLsnr.onMessageReceived(msg, connKey.nodeId());
+
                     IgniteRunnable c;
 
                     if (msgQueueLimit > 0) {
@@ -2112,6 +2139,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
         }
     }
 
+    /**
+     * @return Bound TCP server port.
+     */
+    public int boundPort() {
+        return boundTcpPort;
+    }
+
     /** {@inheritDoc} */
     @Override public void spiStart(String igniteInstanceName) throws 
IgniteSpiException {
         assert locHost != null;
@@ -2570,6 +2604,27 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * @param nodes Nodes to check connection with.
+     * @return Result future (each bit in result BitSet contains connection 
status to corresponding node).
+     */
+    public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) {
+        TcpCommunicationConnectionCheckFuture fut = new 
TcpCommunicationConnectionCheckFuture(
+            this,
+            log.getLogger(TcpCommunicationConnectionCheckFuture.class),
+            nioSrvr,
+            nodes);
+
+        long timeout = failureDetectionTimeoutEnabled() ? 
failureDetectionTimeout() : connTimeout;
+
+        if (log.isInfoEnabled())
+            log.info("Start check connection process [nodeCnt=" + nodes.size() 
+ ", timeout=" + timeout + ']');
+
+        fut.init(timeout);
+
+        return new IgniteFutureImpl<>(fut);
+    }
+
+    /**
      * Sends given message to destination node. Note that characteristics of 
the
      * exchange such as durability, guaranteed delivery or error notification 
is
      * dependant on SPI implementation.
@@ -3010,7 +3065,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
             ConnectionKey id = ses.meta(CONN_IDX_META);
 
             if (id != null) {
-                ClusterNode node = getSpiContext().node(id.nodeId);
+                ClusterNode node = getSpiContext().node(id.nodeId());
 
                 if (node != null && node.isClient()) {
                     String msg = "Client node outbound message queue size 
exceeded slowClientQueueLimit, " +
@@ -3031,9 +3086,20 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
     /**
      * @param node Node.
      * @return Node addresses.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Collection<InetSocketAddress> nodeAddresses(ClusterNode node) 
throws IgniteCheckedException {
+        return nodeAddresses(node, filterReachableAddresses);
+    }
+
+    /**
+     * @param node Node.
+     * @param filterReachableAddresses Filter addresses flag.
+     * @return Node addresses.
      * @throws IgniteCheckedException If node does not have addresses.
      */
-    private LinkedHashSet<InetSocketAddress> nodeAddresses(ClusterNode node) 
throws IgniteCheckedException {
+    public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, 
boolean filterReachableAddresses)
+        throws IgniteCheckedException {
         Collection<String> rmtAddrs0 = 
node.attribute(createSpiAttributeName(ATTR_ADDRS));
         Collection<String> rmtHostNames0 = 
node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
         Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -3114,7 +3180,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
      * @throws IgniteCheckedException If failed.
      */
     protected GridCommunicationClient createTcpClient(ClusterNode node, int 
connIdx) throws IgniteCheckedException {
-        LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node);
+        Collection<InetSocketAddress> addrs = nodeAddresses(node);
 
         GridCommunicationClient client = null;
         IgniteCheckedException errs = null;
@@ -3132,6 +3198,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
             int lastWaitingTimeout = 1;
 
             while (client == null) { // Reconnection on handshake timeout.
+                if (stopping)
+                    throw new IgniteSpiException("Node is stopping.");
+
                 if (addr.getAddress().isLoopbackAddress() && addr.getPort() == 
boundTcpPort) {
                     if (log.isDebugEnabled())
                         log.debug("Skipping local address [addr=" + addr +
@@ -3372,8 +3441,18 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
                     "operating system firewall is disabled on local and remote 
hosts) " +
                     "[addrs=" + addrs + ']');
 
-            if (enableForcibleNodeKill) {
-                if (getSpiContext().node(node.id()) != null
+            boolean commErrResolve = false;
+
+            IgniteSpiContext ctx = getSpiContext();
+
+            if (connectionError(errs) && 
ctx.communicationFailureResolveSupported()) {
+                commErrResolve = true;
+
+                ctx.resolveCommunicationFailure(node, errs);
+            }
+
+            if (!commErrResolve && enableForcibleNodeKill) {
+                if (ctx.node(node.id()) != null
                     && (CU.clientNode(node) ||  
!CU.clientNode(getLocalNode())) &&
                     connectionError(errs)) {
                     String msg = "TcpCommunicationSpi failed to establish 
connection to node, node will be dropped from " +
@@ -3384,7 +3463,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter 
implements Communicati
                     else
                         U.warn(log, msg);
 
-                    getSpiContext().failNode(node.id(), "TcpCommunicationSpi 
failed to establish connection to node [" +
+                    ctx.failNode(node.id(), "TcpCommunicationSpi failed to 
establish connection to node [" +
                         "rmtNode=" + node +
                         ", errs=" + errs +
                         ", connectErrs=" + 
Arrays.toString(errs.getSuppressed()) + ']');
@@ -4590,77 +4669,6 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter implements Communicati
     /**
      *
      */
-    private static class ConnectionKey {
-        /** */
-        private final UUID nodeId;
-
-        /** */
-        private final int idx;
-
-        /** */
-        private final long connCnt;
-
-        /**
-         * @param nodeId Node ID.
-         * @param idx Connection index.
-         * @param connCnt Connection counter (set only for incoming 
connections).
-         */
-        ConnectionKey(UUID nodeId, int idx, long connCnt) {
-            this.nodeId = nodeId;
-            this.idx = idx;
-            this.connCnt = connCnt;
-        }
-
-        /**
-         * @return Connection counter.
-         */
-        long connectCount() {
-            return connCnt;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        UUID nodeId() {
-            return nodeId;
-        }
-
-        /**
-         * @return Connection index.
-         */
-        int connectionIndex() {
-            return idx;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            ConnectionKey key = (ConnectionKey) o;
-
-            return idx == key.idx && nodeId.equals(key.nodeId);
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-            res = 31 * res + idx;
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ConnectionKey.class, this);
-        }
-    }
-
-    /**
-     *
-     */
     interface ConnectionPolicy {
         /**
          * @return Thread connection index.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
new file mode 100644
index 0000000..0559df7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp.internal;
+
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Connection Key.
+ */
+public class ConnectionKey {
+    /** */
+    private final UUID nodeId;
+
+    /** */
+    private final int idx;
+
+    /** */
+    private final long connCnt;
+
+    /** */
+    private final boolean dummy;
+
+    /**
+     * Creates ConnectionKey with false value of dummy flag.
+     *
+     * @param nodeId Node ID. Should be not null.
+     * @param idx Connection index.
+     * @param connCnt Connection counter (set only for incoming connections).
+     */
+    public ConnectionKey(@NotNull UUID nodeId, int idx, long connCnt) {
+        this(nodeId, idx, connCnt, false);
+    }
+
+    /**
+     * @param nodeId Node ID. Should be not null.
+     * @param idx Connection index.
+     * @param connCnt Connection counter (set only for incoming connections).
+     * @param dummy Indicates that session with this ConnectionKey is temporary
+     *              (for now dummy sessions are used only for Communication 
Failure Resolving process).
+     */
+    public ConnectionKey(@NotNull UUID nodeId, int idx, long connCnt, boolean 
dummy) {
+        this.nodeId = nodeId;
+        this.idx = idx;
+        this.connCnt = connCnt;
+        this.dummy = dummy;
+    }
+
+    /**
+     * @return Connection counter.
+     */
+    public long connectCount() {
+        return connCnt;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Connection index.
+     */
+    public int connectionIndex() {
+        return idx;
+    }
+
+    /**
+     * @return {@code True} if this ConnectionKey is dummy and serves 
temporary session.
+     */
+    public boolean dummy() {
+        return dummy;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        ConnectionKey key = (ConnectionKey) o;
+
+        return idx == key.idx && nodeId.equals(key.nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int res = nodeId.hashCode();
+        res = 31 * res + idx;
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ConnectionKey.class, this);
+    }
+}

Reply via email to