http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 01a5a71..cebe4b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -56,6 +56,8 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.discovery.CustomEventListener; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageResultsCollector; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -63,6 +65,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -147,6 +150,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Query sequence number for message topic. */ private final AtomicLong seq = new AtomicLong(); + /** */ + private ContinuousRoutinesInfo routinesInfo; + + /** */ + private int discoProtoVer; + /** * @param ctx Kernal context. */ @@ -156,6 +165,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void start() throws IgniteCheckedException { + discoProtoVer = ctx.discovery().mutableCustomMessages() ? 1 : 2; + + if (discoProtoVer == 2) + routinesInfo = new ContinuousRoutinesInfo(); + if (ctx.config().isDaemon()) return; @@ -177,6 +191,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineDiscoveryMessage msg) { + assert discoProtoVer == 1 : discoProtoVer; + if (ctx.isStopping()) return; @@ -184,6 +200,20 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } }); + ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class, + new CustomEventListener<StartRoutineDiscoveryMessageV2>() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, + ClusterNode snd, + StartRoutineDiscoveryMessageV2 msg) { + assert discoProtoVer == 2 : discoProtoVer; + + if (ctx.isStopping()) + return; + + processStartRequestV2(topVer, snd, msg); + } + }); + ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, new CustomEventListener<StartRoutineAckDiscoveryMessage>() { @Override public void onCustomEvent(AffinityTopologyVersion topVer, @@ -201,6 +231,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineDiscoveryMessage msg) { + if (discoProtoVer == 2) + routinesInfo.removeRoutine(msg.routineId); + if (ctx.isStopping()) return; @@ -222,32 +255,36 @@ public class GridContinuousProcessor extends GridProcessorAdapter { ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() { @Override public void onMessage(UUID nodeId, Object obj, byte plc) { - GridContinuousMessage msg = (GridContinuousMessage)obj; + if (obj instanceof ContinuousRoutineStartResultMessage) + processRoutineStartResultMessage(nodeId, (ContinuousRoutineStartResultMessage)obj); + else { + GridContinuousMessage msg = (GridContinuousMessage)obj; - if (msg.data() == null && msg.dataBytes() != null) { - try { - msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config()))); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process message (ignoring): " + msg, e); + if (msg.data() == null && msg.dataBytes() != null) { + try { + msg.data(U.unmarshal(marsh, msg.dataBytes(), U.resolveClassLoader(ctx.config()))); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to process message (ignoring): " + msg, e); - return; + return; + } } - } - switch (msg.type()) { - case MSG_EVT_NOTIFICATION: - processNotification(nodeId, msg); + switch (msg.type()) { + case MSG_EVT_NOTIFICATION: + processNotification(nodeId, msg); - break; + break; - case MSG_EVT_ACK: - processMessageAck(msg); + case MSG_EVT_ACK: + processMessageAck(msg); - break; + break; - default: - assert false : "Unexpected message received: " + msg.type(); + default: + assert false : "Unexpected message received: " + msg.type(); + } } } }); @@ -341,6 +378,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { + if (ctx.isDaemon()) + return; + + if (discoProtoVer == 2) { + routinesInfo.collectJoiningNodeData(dataBag); + + return; + } + Serializable data = getDiscoveryData(dataBag.joiningNodeId()); if (data != null) @@ -349,6 +395,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { + if (ctx.isDaemon()) + return; + + if (discoProtoVer == 2) { + routinesInfo.collectGridNodeData(dataBag); + + return; + } + Serializable data = getDiscoveryData(dataBag.joiningNodeId()); if (data != null) @@ -393,6 +448,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return data; } + return null; } @@ -430,22 +486,118 @@ public class GridContinuousProcessor extends GridProcessorAdapter { @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) { if (log.isDebugEnabled()) { log.debug("onJoiningNodeDataReceived [joining=" + data.joiningNodeId() + - ", loc=" + ctx.localNodeId() + - ", data=" + data.joiningNodeData() + - ']'); + ", loc=" + ctx.localNodeId() + + ", data=" + data.joiningNodeData() + + ']'); } - if (data.hasJoiningNodeData()) - onDiscoDataReceived((DiscoveryData) data.joiningNodeData()); + if (discoProtoVer == 2) { + if (data.hasJoiningNodeData()) { + ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) + data.joiningNodeData(); + + for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { + routinesInfo.addRoutineInfo(routineInfo); + + startDiscoveryDataRoutine(routineInfo); + } + } + } + else { + if (data.hasJoiningNodeData()) + onDiscoDataReceived((DiscoveryData) data.joiningNodeData()); + } } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData(); + if (discoProtoVer == 2) { + if (ctx.isDaemon()) + return; + + if (data.commonData() != null) { + ContinuousRoutinesCommonDiscoveryData commonData = + (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + + for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { + if (routinesInfo.routineExists(routineInfo.routineId)) + continue; + + routinesInfo.addRoutineInfo(routineInfo); + + startDiscoveryDataRoutine(routineInfo); + } + } + } + else { + Map<UUID, Serializable> nodeSpecData = data.nodeSpecificData(); + + if (nodeSpecData != null) { + for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) + onDiscoDataReceived((DiscoveryData) e.getValue()); + } + } + } + + /** + * @param routineInfo Routine info. + */ + private void startDiscoveryDataRoutine(ContinuousRoutineInfo routineInfo) { + IgnitePredicate<ClusterNode> nodeFilter = null; + + try { + if (routineInfo.nodeFilter != null) { + nodeFilter = U.unmarshal(marsh, routineInfo.nodeFilter, U.resolveClassLoader(ctx.config())); + + ctx.resource().injectGeneric(nodeFilter); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal continuous routine filter, ignore routine [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + + return; + } + + if (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())) { + GridContinuousHandler hnd; - if (nodeSpecData != null) { - for (Map.Entry<UUID, Serializable> e : nodeSpecData.entrySet()) - onDiscoDataReceived((DiscoveryData) e.getValue()); + try { + hnd = U.unmarshal(marsh, routineInfo.hnd, U.resolveClassLoader(ctx.config())); + + if (ctx.config().isPeerClassLoadingEnabled()) + hnd.p2pUnmarshal(routineInfo.srcNodeId, ctx); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal continuous routine handler, ignore routine [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + + return; + } + + try { + registerHandler(routineInfo.srcNodeId, + routineInfo.routineId, + hnd, + routineInfo.bufSize, + routineInfo.interval, + routineInfo.autoUnsubscribe, + false); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to register continuous routine handler, ignore routine [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']', e); + } + } + else { + if (log.isDebugEnabled()) { + log.debug("Do not register continuous routine, rejected by node filter [" + + "routineId=" + routineInfo.routineId + + ", srcNodeId=" + routineInfo.srcNodeId + ']'); + } } } @@ -564,13 +716,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { * @param rmtFilter Remote filter. * @param prjPred Projection predicate. * @return Routine ID. + * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") public UUID registerStaticRoutine( String cacheName, CacheEntryUpdatedListener<?, ?> locLsnr, CacheEntryEventSerializableFilter rmtFilter, - @Nullable IgnitePredicate<ClusterNode> prjPred) { + @Nullable IgnitePredicate<ClusterNode> prjPred) throws IgniteCheckedException { String topicPrefix = "CONTINUOUS_QUERY_STATIC" + "_" + cacheName; CacheContinuousQueryHandler hnd = new CacheContinuousQueryHandler( @@ -589,6 +742,17 @@ public class GridContinuousProcessor extends GridProcessorAdapter { LocalRoutineInfo routineInfo = new LocalRoutineInfo(prjPred, hnd, 1, 0, true); + if (discoProtoVer == 2) { + routinesInfo.addRoutineInfo(createRoutineInfo( + ctx.localNodeId(), + routineId, + hnd, + prjPred, + routineInfo.bufSize, + routineInfo.interval, + routineInfo.autoUnsubscribe)); + } + locInfos.put(routineId, routineInfo); registerMessageListener(hnd); @@ -597,6 +761,40 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param srcNodeId Source node ID. + * @param routineId Routine ID. + * @param hnd Handler. + * @param nodeFilter Node filter. + * @param bufSize Handler buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @return Routine info instance. + * @throws IgniteCheckedException If failed. + */ + private ContinuousRoutineInfo createRoutineInfo( + UUID srcNodeId, + UUID routineId, + GridContinuousHandler hnd, + @Nullable IgnitePredicate<ClusterNode> nodeFilter, + int bufSize, + long interval, + boolean autoUnsubscribe) + throws IgniteCheckedException { + byte[] hndBytes = marsh.marshal(hnd); + + byte[] filterBytes = nodeFilter != null ? marsh.marshal(nodeFilter) : null; + + return new ContinuousRoutineInfo( + srcNodeId, + routineId, + hndBytes, + filterBytes, + bufSize, + interval, + autoUnsubscribe); + } + + /** * @param hnd Handler. * @param bufSize Buffer size. * @param interval Time interval. @@ -638,30 +836,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter { // Whether local node is included in routine. boolean locIncluded = prjPred == null || prjPred.apply(ctx.discovery().localNode()); - StartRequestData reqData = new StartRequestData(prjPred, hnd.clone(), bufSize, interval, autoUnsubscribe); + AbstractContinuousMessage msg; try { - if (ctx.config().isPeerClassLoadingEnabled()) { - // Handle peer deployment for projection predicate. - if (prjPred != null && !U.isGrid(prjPred.getClass())) { - Class cls = U.detectClass(prjPred); - - String clsName = cls.getName(); - - GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); - - if (dep == null) - throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + prjPred); - - reqData.className(clsName); - reqData.deploymentInfo(new GridDeploymentInfoBean(dep)); - - reqData.p2pMarshal(marsh); - } - - // Handle peer deployment for other handler-specific objects. - reqData.handler().p2pMarshal(ctx); - } + msg = createStartMessage(routineId, hnd, bufSize, interval, autoUnsubscribe, prjPred); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -674,20 +852,26 @@ public class GridContinuousProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(new NodeStoppingException("Failed to start continuous query (node is stopping)")); try { - StartFuture fut = new StartFuture(ctx, routineId); + StartFuture fut = new StartFuture(routineId); startFuts.put(routineId, fut); try { - if (locIncluded || hnd.isQuery()) - registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true); + if (locIncluded || hnd.isQuery()) { + registerHandler(ctx.localNodeId(), + routineId, + hnd, + bufSize, + interval, + autoUnsubscribe, + true); + } - ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData, - reqData.handler().keepBinary())); - } - catch (IgniteCheckedException e) { - startFuts.remove(routineId); - locInfos.remove(routineId); + ctx.discovery().sendCustomEvent(msg); + } + catch (IgniteCheckedException e) { + startFuts.remove(routineId); + locInfos.remove(routineId); unregisterHandler(routineId, hnd, true); @@ -707,6 +891,92 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param routineId Routine ID. + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Interval. + * @param autoUnsubscribe Auto unsubscribe flag. + * @param nodeFilter Node filter. + * @return Routine start message. + * @throws IgniteCheckedException If failed. + */ + private AbstractContinuousMessage createStartMessage(UUID routineId, + GridContinuousHandler hnd, + int bufSize, + long interval, + boolean autoUnsubscribe, + @Nullable IgnitePredicate<ClusterNode> nodeFilter) + throws IgniteCheckedException + { + hnd = hnd.clone(); + + String clsName = null; + GridDeploymentInfoBean dep = null; + + if (ctx.config().isPeerClassLoadingEnabled()) { + // Handle peer deployment for projection predicate. + if (nodeFilter != null && !U.isGrid(nodeFilter.getClass())) { + Class cls = U.detectClass(nodeFilter); + + clsName = cls.getName(); + + GridDeployment dep0 = ctx.deploy().deploy(cls, U.detectClassLoader(cls)); + + if (dep0 == null) + throw new IgniteDeploymentCheckedException("Failed to deploy projection predicate: " + nodeFilter); + + dep = new GridDeploymentInfoBean(dep0); + } + + // Handle peer deployment for other handler-specific objects. + hnd.p2pMarshal(ctx); + } + + if (discoProtoVer == 1) { + StartRequestData reqData = new StartRequestData( + nodeFilter, + hnd, + bufSize, + interval, + autoUnsubscribe); + + if (clsName != null) { + reqData.className(clsName); + reqData.deploymentInfo(dep); + + reqData.p2pMarshal(marsh); + } + + return new StartRoutineDiscoveryMessage( + routineId, + reqData, + reqData.handler().keepBinary()); + } + else { + assert discoProtoVer == 2 : discoProtoVer; + + byte[] nodeFilterBytes = nodeFilter != null ? U.marshal(marsh, nodeFilter) : null; + byte[] hndBytes = U.marshal(marsh, hnd); + + StartRequestDataV2 reqData = new StartRequestDataV2(nodeFilterBytes, + hndBytes, + bufSize, + interval, + autoUnsubscribe); + + if (clsName != null) { + reqData.className(clsName); + reqData.deploymentInfo(dep); + } + + return new StartRoutineDiscoveryMessageV2( + routineId, + reqData, + hnd.keepBinary()); + } + } + + /** * @param hnd Handler. */ private void registerMessageListener(GridContinuousHandler hnd) { @@ -760,29 +1030,38 @@ public class GridContinuousProcessor extends GridProcessorAdapter { doStop = true; } - if (doStop) { - // Unregister routine locally. - LocalRoutineInfo routine = locInfos.remove(routineId); - - // Finish if routine is not found (wrong ID is provided). - if (routine == null) { - stopFuts.remove(routineId); + if (doStop) { + boolean stop = false; - fut.onDone(); + // Unregister routine locally. + LocalRoutineInfo routine = locInfos.remove(routineId); - return fut; - } + if (routine != null) { + stop = true; // Unregister handler locally. unregisterHandler(routineId, routine.hnd, true); + } - try { - ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); - } - catch (IgniteCheckedException e) { - fut.onDone(e); + if (!stop && discoProtoVer == 2) + stop = routinesInfo.routineExists(routineId); + + // Finish if routine is not found (wrong ID is provided). + if (!stop) { + stopFuts.remove(routineId); + + fut.onDone(); + + return fut; } + try { + ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId)); + } + catch (IgniteCheckedException e) { + fut.onDone(e); + } + if (ctx.isStopping()) fut.onDone(); } @@ -924,6 +1203,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { clientInfos.clear(); + if (discoProtoVer == 2) + routinesInfo.onClientDisconnected(locInfos.keySet()); + if (log.isDebugEnabled()) { log.debug("after onDisconnected [rmtInfos=" + rmtInfos + ", locInfos=" + locInfos + @@ -996,35 +1278,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter { StartFuture fut = startFuts.remove(msg.routineId()); if (fut != null) { - if (msg.errs().isEmpty()) { - LocalRoutineInfo routine = locInfos.get(msg.routineId()); - - // Update partition counters. - if (routine != null && routine.handler().isQuery()) { - Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = msg.updateCountersPerNode(); - Map<Integer, T2<Long, Long>> cntrs = msg.updateCounters(); - - GridCacheAdapter<Object, Object> interCache = - ctx.cache().internalCache(routine.handler().cacheName()); - - GridCacheContext cctx = interCache != null ? interCache.context() : null; - - if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) - cntrsPerNode.put(ctx.localNodeId(), - toCountersMap(cctx.topology().localUpdateCounters(false))); - - routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); - } - - fut.onRemoteRegistered(); - } - else { - IgniteCheckedException firstEx = F.first(msg.errs().values()); - - fut.onDone(firstEx); - - stopRoutine(msg.routineId()); - } + fut.onAllRemoteRegistered( + topVer, + msg.errs(), + msg.updateCountersPerNode(), + msg.updateCounters()); } } @@ -1138,6 +1396,199 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * @param sndId Sender node ID. + * @param msg Message. + */ + private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStartResultMessage msg) { + StartFuture fut = startFuts.get(msg.routineId()); + + if (fut != null) + fut.onResult(sndId, msg); + } + + /** + * @param topVer Current topology version. + * @param snd Sender. + * @param msg Start request. + */ + private void processStartRequestV2(final AffinityTopologyVersion topVer, + final ClusterNode snd, + final StartRoutineDiscoveryMessageV2 msg) { + StartRequestDataV2 reqData = msg.startRequestData(); + + ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), + msg.routineId(), + reqData.handlerBytes(), + reqData.nodeFilterBytes(), + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe()); + + routinesInfo.addRoutineInfo(routineInfo); + + final DiscoCache discoCache = ctx.discovery().discoCache(topVer); + + // Should not use marshaller and send messages from discovery thread. + ctx.getSystemExecutorService().execute(new Runnable() { + @Override public void run() { + if (snd.id().equals(ctx.localNodeId())) { + StartFuture fut = startFuts.get(msg.routineId()); + + if (fut != null) + fut.initRemoteNodes(discoCache); + + return; + } + + StartRequestDataV2 reqData = msg.startRequestData(); + + Exception err = null; + + IgnitePredicate<ClusterNode> nodeFilter = null; + + byte[] cntrs = null; + + if (reqData.nodeFilterBytes() != null) { + try { + if (ctx.config().isPeerClassLoadingEnabled() && reqData.className() != null) { + String clsName = reqData.className(); + GridDeploymentInfo depInfo = reqData.deploymentInfo(); + + GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), + clsName, + clsName, + depInfo.userVersion(), + snd.id(), + depInfo.classLoaderId(), + depInfo.participants(), + null); + + if (dep == null) { + throw new IgniteDeploymentCheckedException("Failed to obtain deployment " + + "for class: " + clsName); + } + + nodeFilter = U.unmarshal(marsh, + reqData.nodeFilterBytes(), + U.resolveClassLoader(dep.classLoader(), ctx.config())); + } + else { + nodeFilter = U.unmarshal(marsh, + reqData.nodeFilterBytes(), + U.resolveClassLoader(ctx.config())); + } + + if (nodeFilter != null) + ctx.resource().injectGeneric(nodeFilter); + } + catch (Exception e) { + err = e; + + U.error(log, "Failed to unmarshal continuous routine filter [" + + "routineId=" + msg.routineId + + ", srcNodeId=" + snd.id() + ']', e); + } + } + + boolean register = err == null && + (nodeFilter == null || nodeFilter.apply(ctx.discovery().localNode())); + + if (register) { + try { + GridContinuousHandler hnd = U.unmarshal(marsh, + reqData.handlerBytes(), + U.resolveClassLoader(ctx.config())); + + if (ctx.config().isPeerClassLoadingEnabled()) + hnd.p2pUnmarshal(snd.id(), ctx); + + if (msg.keepBinary()) { + assert hnd instanceof CacheContinuousQueryHandler : hnd; + + ((CacheContinuousQueryHandler)hnd).keepBinary(true); + } + + GridContinuousHandler hnd0 = hnd instanceof GridMessageListenHandler ? + new GridMessageListenHandler((GridMessageListenHandler)hnd) : + hnd; + + registerHandler(snd.id(), + msg.routineId, + hnd0, + reqData.bufferSize(), + reqData.interval(), + reqData.autoUnsubscribe(), + false); + + if (hnd0.isQuery()) { + GridCacheProcessor proc = ctx.cache(); + + if (proc != null) { + GridCacheAdapter cache = ctx.cache().internalCache(hnd0.cacheName()); + + if (cache != null && !cache.isLocal() && cache.context().userCache()) { + CachePartitionPartialCountersMap cntrsMap = + cache.context().topology().localUpdateCounters(false); + + cntrs = U.marshal(marsh, cntrsMap); + } + } + } + } + catch (Exception e) { + err = e; + + U.error(log, "Failed to register continuous routine handler [" + + "routineId=" + msg.routineId + + ", srcNodeId=" + snd.id() + ']', e); + } + } + + sendMessageStartResult(snd, msg.routineId(), cntrs, err); + } + }); + } + + /** + * @param node Target node. + * @param routineId Routine ID. + * @param cntrsMapBytes Marshalled {@link CachePartitionPartialCountersMap}. + * @param err Start error if any. + */ + private void sendMessageStartResult(final ClusterNode node, + final UUID routineId, + byte[] cntrsMapBytes, + final @Nullable Exception err) + { + byte[] errBytes = null; + + if (err != null) { + try { + errBytes = U.marshal(marsh, err); + } + catch (Exception e) { + U.error(log, "Failed to marshal routine start error: " + e, e); + } + } + + ContinuousRoutineStartResultMessage msg = new ContinuousRoutineStartResultMessage(routineId, + cntrsMapBytes, + errBytes, + err != null); + + try { + ctx.io().sendToGridTopic(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, null); + } + catch (ClusterTopologyCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send routine start result, node failed: " + e); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send routine start result: " + e, e); + } + } + + /** * @param msg Message. */ private void processMessageAck(GridContinuousMessage msg) { @@ -1455,6 +1906,13 @@ public class GridContinuousProcessor extends GridProcessorAdapter { UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); + if (discoProtoVer == 2) { + routinesInfo.onNodeFail(nodeId); + + for (StartFuture fut : startFuts.values()) + fut.onNodeFail(nodeId); + } + clientInfos.remove(nodeId); // Unregister handlers created by left node. @@ -1894,10 +2352,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** * Future for start routine. */ - private static class StartFuture extends GridFutureAdapter<UUID> { - /** */ - private GridKernalContext ctx; - + private class StartFuture extends GridFutureAdapter<UUID> { /** Consume ID. */ private UUID routineId; @@ -1907,56 +2362,170 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** All remote listeners are registered. */ private volatile boolean rmt; - /** Timeout object. */ - private volatile GridTimeoutObject timeoutObj; + /** */ + private final DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, RoutineRegisterResults> + resCollect; /** - * @param ctx Kernal context. * @param routineId Consume ID. */ - StartFuture(GridKernalContext ctx, UUID routineId) { - this.ctx = ctx; - + StartFuture(UUID routineId) { this.routineId = routineId; + + resCollect = new DiscoveryMessageResultsCollector<ContinuousRoutineStartResultMessage, RoutineRegisterResults>(ctx) { + @Override protected RoutineRegisterResults createResult(Map<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> rcvd) { + Map<UUID, Exception> errs = null; + Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode = null; + + for (Map.Entry<UUID, NodeMessage<ContinuousRoutineStartResultMessage>> entry : rcvd.entrySet()) { + ContinuousRoutineStartResultMessage msg = entry.getValue().message(); + + if (msg == null) + continue; + + if (msg.error()) { + byte[] errBytes = msg.errorBytes(); + + Exception err = null; + + if (errBytes != null) { + try { + err = U.unmarshal(marsh, errBytes, U.resolveClassLoader(ctx.config())); + } + catch (Exception e) { + U.warn(log, "Failed to unmarhal continuous routine start error: " + e); + } + } + + if (err == null) { + err = new IgniteCheckedException("Failed to start continuous " + + "routine on node: " + entry.getKey()); + } + + if (errs == null) + errs = new HashMap<>(); + + errs.put(entry.getKey(), err); + } + else { + byte[] cntrsMapBytes = msg.countersMapBytes(); + + if (cntrsMapBytes != null) { + try { + CachePartitionPartialCountersMap cntrsMap = U.unmarshal( + marsh, + cntrsMapBytes, + U.resolveClassLoader(ctx.config())); + + if (cntrsPerNode == null) + cntrsPerNode = new HashMap<>(); + + cntrsPerNode.put(entry.getKey(), CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); + } + catch (Exception e) { + U.warn(log, "Failed to unmarhal continuous query update counters: " + e); + } + } + } + } + + return new RoutineRegisterResults(discoCache.version(), errs, cntrsPerNode); + } + + @Override protected void onResultsCollected(RoutineRegisterResults res0) { + onAllRemoteRegistered(res0.topVer, res0.errs, res0.cntrsPerNode, null); + } + + @Override protected boolean waitForNode(DiscoCache discoCache, ClusterNode node) { + return !ctx.localNodeId().equals(node.id()); + } + }; } /** - * Called when local listener is registered. + * @param topVer Topology version. + * @param errs Errors. + * @param cntrsPerNode Update counters. + * @param cntrs Update counters. */ - public void onLocalRegistered() { - loc = true; + private void onAllRemoteRegistered( + AffinityTopologyVersion topVer, + @Nullable Map<UUID, ? extends Exception> errs, + Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode, + Map<Integer, T2<Long, Long>> cntrs) { + try { + if (errs == null || errs.isEmpty()) { + LocalRoutineInfo routine = locInfos.get(routineId); - if (rmt && !isDone()) - onDone(routineId); + // Update partition counters. + if (routine != null && routine.handler().isQuery()) { + GridCacheAdapter<Object, Object> interCache = + ctx.cache().internalCache(routine.handler().cacheName()); + + GridCacheContext cctx = interCache != null ? interCache.context() : null; + + if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode()) + cntrsPerNode.put(ctx.localNodeId(), + toCountersMap(cctx.topology().localUpdateCounters(false))); + + routine.handler().updateCounters(topVer, cntrsPerNode, cntrs); + } + + onRemoteRegistered(); + } + else { + Exception firstEx = F.first(errs.values()); + + onDone(firstEx); + + stopRoutine(routineId); + } + } + finally { + startFuts.remove(routineId, this); + } } /** - * Called when all remote listeners are registered. + * @param discoCache Discovery state. */ - public void onRemoteRegistered() { - rmt = true; + void initRemoteNodes(DiscoCache discoCache) { + resCollect.init(discoCache); + } - if (loc && !isDone()) - onDone(routineId); + /** + * @param nodeId Node ID. + * @param msg Message. + */ + void onResult(UUID nodeId, ContinuousRoutineStartResultMessage msg) { + resCollect.onMessage(nodeId, msg); } /** - * @param timeoutObj Timeout object. + * @param nodeId Failed node ID. */ - public void addTimeoutObject(GridTimeoutObject timeoutObj) { - assert timeoutObj != null; + void onNodeFail(UUID nodeId) { + resCollect.onNodeFail(nodeId); + } - this.timeoutObj = timeoutObj; + /** + * Called when local listener is registered. + */ + void onLocalRegistered() { + loc = true; - ctx.timeout().addTimeoutObject(timeoutObj); + if (rmt && !isDone()) + onDone(routineId); } - /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable UUID res, @Nullable Throwable err) { - if (timeoutObj != null) - ctx.timeout().removeTimeoutObject(timeoutObj); + /** + * Called when all remote listeners are registered. + */ + void onRemoteRegistered() { + rmt = true; - return super.onDone(res, err); + if (loc && !isDone()) + onDone(routineId); } /** {@inheritDoc} */ @@ -1966,6 +2535,33 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } /** + * + */ + private static class RoutineRegisterResults { + /** */ + private final AffinityTopologyVersion topVer; + + /** */ + private final Map<UUID, ? extends Exception> errs; + + /** */ + private final Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode; + + /** + * @param topVer Topology version. + * @param errs Errors. + * @param cntrsPerNode Update counters. + */ + RoutineRegisterResults(AffinityTopologyVersion topVer, + Map<UUID, ? extends Exception> errs, + Map<UUID, Map<Integer, T2<Long, Long>>> cntrsPerNode) { + this.topVer = topVer; + this.errs = errs; + this.cntrsPerNode = cntrsPerNode; + } + } + + /** * Future for stop routine. */ private static class StopFuture extends GridFutureAdapter<Object> {
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java new file mode 100644 index 0000000..c001616 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRequestDataV2.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.io.Serializable; +import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Start request data. + */ +class StartRequestDataV2 implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Serialized node filter. */ + private byte[] nodeFilterBytes; + + /** Deployment class name. */ + private String clsName; + + /** Deployment info. */ + private GridDeploymentInfo depInfo; + + /** Serialized handler. */ + private byte[] hndBytes; + + /** Buffer size. */ + private int bufSize; + + /** Time interval. */ + private long interval; + + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + + /** + * @param nodeFilterBytes Serialized node filter. + * @param hndBytes Serialized handler. + * @param bufSize Buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + */ + StartRequestDataV2( + byte[] nodeFilterBytes, + byte[] hndBytes, + int bufSize, + long interval, + boolean autoUnsubscribe) { + assert hndBytes != null; + assert bufSize > 0; + assert interval >= 0; + + this.nodeFilterBytes = nodeFilterBytes; + this.hndBytes = hndBytes; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + } + + /** + * @return Serialized node filter. + */ + public byte[] nodeFilterBytes() { + return nodeFilterBytes; + } + + /** + * @return Deployment class name. + */ + public String className() { + return clsName; + } + + /** + * @param clsName New deployment class name. + */ + public void className(String clsName) { + this.clsName = clsName; + } + + /** + * @return Deployment info. + */ + public GridDeploymentInfo deploymentInfo() { + return depInfo; + } + + /** + * @param depInfo New deployment info. + */ + public void deploymentInfo(GridDeploymentInfo depInfo) { + this.depInfo = depInfo; + } + + /** + * @return Handler. + */ + public byte[] handlerBytes() { + return hndBytes; + } + + /** + * @return Buffer size. + */ + public int bufferSize() { + return bufSize; + } + + /** + * @param bufSize New buffer size. + */ + public void bufferSize(int bufSize) { + this.bufSize = bufSize; + } + + /** + * @return Time interval. + */ + public long interval() { + return interval; + } + + /** + * @param interval New time interval. + */ + public void interval(long interval) { + this.interval = interval; + } + + /** + * @return Automatic unsubscribe flag. + */ + public boolean autoUnsubscribe() { + return autoUnsubscribe; + } + + /** + * @param autoUnsubscribe New automatic unsubscribe flag. + */ + public void autoUnsubscribe(boolean autoUnsubscribe) { + this.autoUnsubscribe = autoUnsubscribe; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRequestDataV2.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java new file mode 100644 index 0000000..275765d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * + */ +public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final int KEEP_BINARY_FLAG = 0x01; + + /** */ + private final StartRequestDataV2 startReqData; + + /** Flags. */ + private int flags; + + /** + * @param routineId Routine id. + * @param startReqData Start request data. + * @param keepBinary Keep binary flag. + */ + StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestDataV2 startReqData, boolean keepBinary) { + super(routineId); + + this.startReqData = startReqData; + + if (keepBinary) + flags |= KEEP_BINARY_FLAG; + } + + /** + * @return Start request data. + */ + public StartRequestDataV2 startRequestData() { + return startReqData; + } + + /** + * @return {@code True} if keep binary flag was set on continuous handler. + */ + public boolean keepBinary() { + return (flags & KEEP_BINARY_FLAG) != 0; + } + + /** {@inheritDoc} */ + @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRoutineDiscoveryMessageV2.class, this, "routineId", routineId()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java index 79d8b29..dfba0e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -42,6 +42,11 @@ public class StopRoutineAckDiscoveryMessage extends AbstractContinuousMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index 4a893f4..8cad342 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -47,6 +47,7 @@ import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteDataStreamerTimeoutException; import org.apache.ignite.IgniteException; @@ -100,6 +101,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.GPC; @@ -1059,6 +1061,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed return; while (true) { + if (disconnectErr != null) + throw disconnectErr; + Queue<IgniteInternalFuture<?>> q = null; for (Buffer buf : bufMappings.values()) { @@ -1826,15 +1831,19 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed catch (IgniteCheckedException e) { GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut); - try { - if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) - fut0.onDone(e); - else - fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): " - + node.id())); - } - catch (IgniteClientDisconnectedCheckedException e0) { - fut0.onDone(e0); + if (X.hasCause(e, IgniteClientDisconnectedCheckedException.class, IgniteClientDisconnectedException.class)) + fut0.onDone(e); + else { + try { + if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id())) + fut0.onDone(e); + else + fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): " + + node.id())); + } + catch (IgniteClientDisconnectedCheckedException e0) { + fut0.onDone(e0); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java index 7af0559..80e3f7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingAcceptedMessage.java @@ -63,6 +63,11 @@ public class MappingAcceptedMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java index b4e13fb..9358585 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappingProposedMessage.java @@ -98,6 +98,11 @@ public class MappingProposedMessage implements DiscoveryCustomMessage { } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, AffinityTopologyVersion topVer, DiscoCache discoCache) { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java index 2245b24..f802e09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java @@ -59,6 +59,11 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public boolean exchange() { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java index 0e1270b..62b6d6a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java @@ -60,6 +60,11 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag } /** {@inheritDoc} */ + @Override public boolean stopProcess() { + return false; + } + + /** {@inheritDoc} */ @Override public boolean exchange() { return exchange; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index e0ec8d1..0fcde0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -491,6 +491,17 @@ public class GridNioServer<T> { /** * @param ses Session. + */ + public void closeFromWorkerThread(GridNioSession ses) { + assert ses instanceof GridSelectorNioSessionImpl : ses; + + GridSelectorNioSessionImpl ses0 = (GridSelectorNioSessionImpl)ses; + + ((AbstractNioClientWorker)ses0.worker()).close((GridSelectorNioSessionImpl)ses, null); + } + + /** + * @param ses Session. * @param msg Message. * @param createFut {@code True} if future should be created. * @param ackC Closure invoked when message ACK is received. @@ -2170,7 +2181,12 @@ public class GridNioServer<T> { dumpSelectorInfo(sb, keys); for (SelectionKey key : keys) { - GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment(); + GridNioKeyAttachment attach = (GridNioKeyAttachment)key.attachment(); + + if (!attach.hasSession()) + continue; + + GridSelectorNioSessionImpl ses = attach.session(); boolean sesInfo = p == null || p.apply(ses); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index 1754cc8..e8c27d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -963,5 +963,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi { @Override public Map<String, Object> nodeAttributes() { return Collections.emptyMap(); } + + /** {@inheritDoc} */ + @Override public boolean communicationFailureResolveSupported() { + return false; + } + + /** {@inheritDoc} */ + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { + throw new UnsupportedOperationException(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java index 108c4d4..d4402f4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiContext.java @@ -365,4 +365,15 @@ public interface IgniteSpiContext { * @return Current node attributes. */ public Map<String, Object> nodeAttributes(); + + /** + * @return {@code True} if cluster supports communication error resolving. + */ + public boolean communicationFailureResolveSupported(); + + /** + * @param node Problem node. + * @param err Error. + */ + public void resolveCommunicationFailure(ClusterNode node, Exception err); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index e1addd8..4a0710e 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -32,6 +32,7 @@ import java.nio.channels.SocketChannel; import java.nio.channels.spi.AbstractInterruptibleChannel; import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -66,12 +67,14 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.ipc.IpcEndpoint; import org.apache.ignite.internal.util.ipc.IpcToNioAdapter; import org.apache.ignite.internal.util.ipc.shmem.IpcOutOfSystemResourcesException; @@ -134,6 +137,9 @@ import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey; +import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture; +import org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationNodeConnectionCheckFuture; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage; import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2; import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; @@ -146,6 +152,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; +import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT; import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING; @@ -310,7 +317,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4"); /** Connection index meta for session. */ - private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); + public static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); /** Message tracker meta for session. */ private static final int TRACKER_META = GridNioSessionMetaKey.nextUniqueKey(); @@ -408,6 +415,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ConnectionKey connId = ses.meta(CONN_IDX_META); if (connId != null) { + if (connId.dummy()) + return; + UUID id = connId.nodeId(); GridCommunicationClient[] nodeClients = clients.get(id); @@ -481,20 +491,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (rmtNode == null) { DiscoverySpi discoverySpi = ignite().configuration().getDiscoverySpi(); - assert discoverySpi instanceof TcpDiscoverySpi; - - TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi; + boolean unknownNode = true; - ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId); + if (discoverySpi instanceof TcpDiscoverySpi) { + TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi; - boolean unknownNode = true; + ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId); - if (node0 != null) { - assert node0.isClient() : node0; + if (node0 != null) { + assert node0.isClient() : node0; - if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) - unknownNode = false; + if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0) + unknownNode = false; + } } + else if (discoverySpi instanceof IgniteDiscoverySpi) + unknownNode = !((IgniteDiscoverySpi) discoverySpi).knownNode(sndId); if (unknownNode) { U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']'); @@ -709,9 +721,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } else { - metricsLsnr.onMessageReceived(msg, connKey.nodeId()); - if (msg instanceof RecoveryLastReceivedMessage) { + metricsLsnr.onMessageReceived(msg, connKey.nodeId()); + GridNioRecoveryDescriptor recovery = ses.outRecoveryDescriptor(); if (recovery != null) { @@ -724,9 +736,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } recovery.ackReceived(msg0.received()); - - return; } + + return; } else { GridNioRecoveryDescriptor recovery = ses.inRecoveryDescriptor(); @@ -746,8 +758,23 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati recovery.lastAcknowledged(rcvCnt); } } + else if (connKey.dummy()) { + assert msg instanceof NodeIdMessage : msg; + + TcpCommunicationNodeConnectionCheckFuture fut = ses.meta(SES_FUT_META); + + assert fut != null : msg; + + fut.onConnected(U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0)); + + nioSrvr.closeFromWorkerThread(ses); + + return; + } } + metricsLsnr.onMessageReceived(msg, connKey.nodeId()); + IgniteRunnable c; if (msgQueueLimit > 0) { @@ -2112,6 +2139,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } + /** + * @return Bound TCP server port. + */ + public int boundPort() { + return boundTcpPort; + } + /** {@inheritDoc} */ @Override public void spiStart(String igniteInstanceName) throws IgniteSpiException { assert locHost != null; @@ -2570,6 +2604,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } /** + * @param nodes Nodes to check connection with. + * @return Result future (each bit in result BitSet contains connection status to corresponding node). + */ + public IgniteFuture<BitSet> checkConnection(List<ClusterNode> nodes) { + TcpCommunicationConnectionCheckFuture fut = new TcpCommunicationConnectionCheckFuture( + this, + log.getLogger(TcpCommunicationConnectionCheckFuture.class), + nioSrvr, + nodes); + + long timeout = failureDetectionTimeoutEnabled() ? failureDetectionTimeout() : connTimeout; + + if (log.isInfoEnabled()) + log.info("Start check connection process [nodeCnt=" + nodes.size() + ", timeout=" + timeout + ']'); + + fut.init(timeout); + + return new IgniteFutureImpl<>(fut); + } + + /** * Sends given message to destination node. Note that characteristics of the * exchange such as durability, guaranteed delivery or error notification is * dependant on SPI implementation. @@ -3010,7 +3065,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ConnectionKey id = ses.meta(CONN_IDX_META); if (id != null) { - ClusterNode node = getSpiContext().node(id.nodeId); + ClusterNode node = getSpiContext().node(id.nodeId()); if (node != null && node.isClient()) { String msg = "Client node outbound message queue size exceeded slowClientQueueLimit, " + @@ -3031,9 +3086,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** * @param node Node. * @return Node addresses. + * @throws IgniteCheckedException If failed. + */ + private Collection<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException { + return nodeAddresses(node, filterReachableAddresses); + } + + /** + * @param node Node. + * @param filterReachableAddresses Filter addresses flag. + * @return Node addresses. * @throws IgniteCheckedException If node does not have addresses. */ - private LinkedHashSet<InetSocketAddress> nodeAddresses(ClusterNode node) throws IgniteCheckedException { + public Collection<InetSocketAddress> nodeAddresses(ClusterNode node, boolean filterReachableAddresses) + throws IgniteCheckedException { Collection<String> rmtAddrs0 = node.attribute(createSpiAttributeName(ATTR_ADDRS)); Collection<String> rmtHostNames0 = node.attribute(createSpiAttributeName(ATTR_HOST_NAMES)); Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT)); @@ -3114,7 +3180,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @throws IgniteCheckedException If failed. */ protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { - LinkedHashSet<InetSocketAddress> addrs = nodeAddresses(node); + Collection<InetSocketAddress> addrs = nodeAddresses(node); GridCommunicationClient client = null; IgniteCheckedException errs = null; @@ -3132,6 +3198,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati int lastWaitingTimeout = 1; while (client == null) { // Reconnection on handshake timeout. + if (stopping) + throw new IgniteSpiException("Node is stopping."); + if (addr.getAddress().isLoopbackAddress() && addr.getPort() == boundTcpPort) { if (log.isDebugEnabled()) log.debug("Skipping local address [addr=" + addr + @@ -3372,8 +3441,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati "operating system firewall is disabled on local and remote hosts) " + "[addrs=" + addrs + ']'); - if (enableForcibleNodeKill) { - if (getSpiContext().node(node.id()) != null + boolean commErrResolve = false; + + IgniteSpiContext ctx = getSpiContext(); + + if (connectionError(errs) && ctx.communicationFailureResolveSupported()) { + commErrResolve = true; + + ctx.resolveCommunicationFailure(node, errs); + } + + if (!commErrResolve && enableForcibleNodeKill) { + if (ctx.node(node.id()) != null && (CU.clientNode(node) || !CU.clientNode(getLocalNode())) && connectionError(errs)) { String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " + @@ -3384,7 +3463,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati else U.warn(log, msg); - getSpiContext().failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + + ctx.failNode(node.id(), "TcpCommunicationSpi failed to establish connection to node [" + "rmtNode=" + node + ", errs=" + errs + ", connectErrs=" + Arrays.toString(errs.getSuppressed()) + ']'); @@ -4590,77 +4669,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati /** * */ - private static class ConnectionKey { - /** */ - private final UUID nodeId; - - /** */ - private final int idx; - - /** */ - private final long connCnt; - - /** - * @param nodeId Node ID. - * @param idx Connection index. - * @param connCnt Connection counter (set only for incoming connections). - */ - ConnectionKey(UUID nodeId, int idx, long connCnt) { - this.nodeId = nodeId; - this.idx = idx; - this.connCnt = connCnt; - } - - /** - * @return Connection counter. - */ - long connectCount() { - return connCnt; - } - - /** - * @return Node ID. - */ - UUID nodeId() { - return nodeId; - } - - /** - * @return Connection index. - */ - int connectionIndex() { - return idx; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - - if (o == null || getClass() != o.getClass()) - return false; - - ConnectionKey key = (ConnectionKey) o; - - return idx == key.idx && nodeId.equals(key.nodeId); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = nodeId.hashCode(); - res = 31 * res + idx; - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(ConnectionKey.class, this); - } - } - - /** - * - */ interface ConnectionPolicy { /** * @return Thread connection index. http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java new file mode 100644 index 0000000..0559df7 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/ConnectionKey.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.internal; + +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * Connection Key. + */ +public class ConnectionKey { + /** */ + private final UUID nodeId; + + /** */ + private final int idx; + + /** */ + private final long connCnt; + + /** */ + private final boolean dummy; + + /** + * Creates ConnectionKey with false value of dummy flag. + * + * @param nodeId Node ID. Should be not null. + * @param idx Connection index. + * @param connCnt Connection counter (set only for incoming connections). + */ + public ConnectionKey(@NotNull UUID nodeId, int idx, long connCnt) { + this(nodeId, idx, connCnt, false); + } + + /** + * @param nodeId Node ID. Should be not null. + * @param idx Connection index. + * @param connCnt Connection counter (set only for incoming connections). + * @param dummy Indicates that session with this ConnectionKey is temporary + * (for now dummy sessions are used only for Communication Failure Resolving process). + */ + public ConnectionKey(@NotNull UUID nodeId, int idx, long connCnt, boolean dummy) { + this.nodeId = nodeId; + this.idx = idx; + this.connCnt = connCnt; + this.dummy = dummy; + } + + /** + * @return Connection counter. + */ + public long connectCount() { + return connCnt; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** + * @return Connection index. + */ + public int connectionIndex() { + return idx; + } + + /** + * @return {@code True} if this ConnectionKey is dummy and serves temporary session. + */ + public boolean dummy() { + return dummy; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ConnectionKey key = (ConnectionKey) o; + + return idx == key.idx && nodeId.equals(key.nodeId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = nodeId.hashCode(); + res = 31 * res + idx; + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ConnectionKey.class, this); + } +}