http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java new file mode 100644 index 0000000..c42fa57 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationConnectionCheckFuture.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.internal; + +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.BitSet; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.util.GridLeanMap; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.spi.IgniteSpiTimeoutObject; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; + +/** + * Tcp Communication Connection Check Future. + */ +public class TcpCommunicationConnectionCheckFuture extends GridFutureAdapter<BitSet> implements IgniteSpiTimeoutObject, GridLocalEventListener { + /** Session future. */ + public static final int SES_FUT_META = GridNioSessionMetaKey.nextUniqueKey(); + + /** */ + private static final AtomicIntegerFieldUpdater<SingleAddressConnectFuture> connFutDoneUpdater = + AtomicIntegerFieldUpdater.newUpdater(SingleAddressConnectFuture.class, "done"); + + /** */ + private static final AtomicIntegerFieldUpdater<MultipleAddressesConnectFuture> connResCntUpdater = + AtomicIntegerFieldUpdater.newUpdater(MultipleAddressesConnectFuture.class, "resCnt"); + + /** */ + private final AtomicInteger resCntr = new AtomicInteger(); + + /** */ + private final List<ClusterNode> nodes; + + /** */ + private volatile ConnectFuture[] futs; + + /** */ + private final GridNioServer nioSrvr; + + /** */ + private final TcpCommunicationSpi spi; + + /** */ + private final IgniteUuid timeoutObjId = IgniteUuid.randomUuid(); + + /** */ + private final BitSet resBitSet; + + /** */ + private long endTime; + + /** */ + private final IgniteLogger log; + + /** + * @param spi SPI instance. + * @param log Logger. + * @param nioSrvr NIO server. + * @param nodes Nodes to check. + */ + public TcpCommunicationConnectionCheckFuture(TcpCommunicationSpi spi, + IgniteLogger log, + GridNioServer nioSrvr, + List<ClusterNode> nodes) + { + this.spi = spi; + this.log = log; + this.nioSrvr = nioSrvr; + this.nodes = nodes; + + resBitSet = new BitSet(nodes.size()); + } + + /** + * @param timeout Connect timeout. + */ + public void init(long timeout) { + ConnectFuture[] futs = new ConnectFuture[nodes.size()]; + + UUID locId = spi.getSpiContext().localNode().id(); + + for (int i = 0; i < nodes.size(); i++) { + ClusterNode node = nodes.get(i); + + if (!node.id().equals(locId)) { + if (spi.getSpiContext().node(node.id()) == null) { + receivedConnectionStatus(i, false); + + continue; + } + + Collection<InetSocketAddress> addrs; + + try { + addrs = spi.nodeAddresses(node, false); + } + catch (Exception e) { + U.error(log, "Failed to get node addresses: " + node, e); + + receivedConnectionStatus(i, false); + + continue; + } + + if (addrs.size() == 1) { + SingleAddressConnectFuture fut = new SingleAddressConnectFuture(i); + + fut.init(addrs.iterator().next(), node.id()); + + futs[i] = fut; + } + else { + MultipleAddressesConnectFuture fut = new MultipleAddressesConnectFuture(i); + + fut.init(addrs, node.id()); + + futs[i] = fut; + } + } + else + receivedConnectionStatus(i, true); + } + + this.futs = futs; + + spi.getSpiContext().addLocalEventListener(this, EVT_NODE_LEFT, EVT_NODE_FAILED); + + if (!isDone()) { + endTime = System.currentTimeMillis() + timeout; + + spi.getSpiContext().addTimeoutObject(this); + } + } + + /** + * @param idx Node index. + * @param res Success flag. + */ + private void receivedConnectionStatus(int idx, boolean res) { + assert resCntr.get() < nodes.size(); + + synchronized (resBitSet) { + resBitSet.set(idx, res); + } + + if (resCntr.incrementAndGet() == nodes.size()) + onDone(resBitSet); + } + + /** + * @param nodeIdx Node index. + * @return Node ID. + */ + private UUID nodeId(int nodeIdx) { + return nodes.get(nodeIdx).id(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return timeoutObjId; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return endTime; + } + + /** {@inheritDoc} */ + @Override public void onEvent(Event evt) { + if (isDone()) + return; + + assert evt instanceof DiscoveryEvent : evt; + assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ; + + UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); + + for (int i = 0; i < nodes.size(); i++) { + if (nodes.get(i).id().equals(nodeId)) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onNodeFailed(); + + return; + } + } + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (isDone()) + return; + + ConnectFuture[] futs = this.futs; + + for (int i = 0; i < futs.length; i++) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onTimeout(); + } + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable BitSet res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + spi.getSpiContext().removeTimeoutObject(this); + + spi.getSpiContext().removeLocalEventListener(this); + + return true; + } + + return false; + } + + /** + * + */ + private interface ConnectFuture { + /** + * + */ + void onTimeout(); + + /** + * + */ + void onNodeFailed(); + } + + /** + * + */ + private class SingleAddressConnectFuture implements TcpCommunicationNodeConnectionCheckFuture, ConnectFuture { + /** */ + final int nodeIdx; + + /** */ + volatile int done; + + /** */ + Map<Integer, Object> sesMeta; + + /** */ + private SocketChannel ch; + + /** + * @param nodeIdx Node index. + */ + SingleAddressConnectFuture(int nodeIdx) { + this.nodeIdx = nodeIdx; + } + + /** + * @param addr Node address. + * @param rmtNodeId Id of node to open connection check session with. + */ + public void init(InetSocketAddress addr, UUID rmtNodeId) { + boolean connect; + + try { + ch = SocketChannel.open(); + + ch.configureBlocking(false); + + ch.socket().setTcpNoDelay(true); + ch.socket().setKeepAlive(false); + + connect = ch.connect(addr); + } + catch (Exception e) { + finish(false); + + return; + } + + if (!connect) { + sesMeta = new GridLeanMap<>(3); + + // Set dummy key to identify connection-check outgoing connection. + sesMeta.put(TcpCommunicationSpi.CONN_IDX_META, new ConnectionKey(rmtNodeId, -1, -1, true)); + sesMeta.put(SES_FUT_META, this); + + nioSrvr.createSession(ch, sesMeta, true, new IgniteInClosure<IgniteInternalFuture<GridNioSession>>() { + @Override public void apply(IgniteInternalFuture<GridNioSession> fut) { + if (fut.error() != null) + finish(false); + } + }); + } + } + + /** + * + */ + @SuppressWarnings("unchecked") + void cancel() { + if (finish(false)) + nioSrvr.cancelConnect(ch, sesMeta); + } + + /** {@inheritDoc} */ + public void onTimeout() { + cancel(); + } + + /** {@inheritDoc} */ + public void onConnected(UUID rmtNodeId) { + finish(nodeId(nodeIdx).equals(rmtNodeId)); + } + + /** {@inheritDoc} */ + @Override public void onNodeFailed() { + cancel(); + } + + /** + * @param res Result. + * @return {@code True} if result was set by this call. + */ + public boolean finish(boolean res) { + if (connFutDoneUpdater.compareAndSet(this, 0, 1)) { + onStatusReceived(res); + + return true; + } + + return false; + } + + /** + * @param res Result. + */ + void onStatusReceived(boolean res) { + receivedConnectionStatus(nodeIdx, res); + } + } + + /** + * + */ + private class MultipleAddressesConnectFuture implements ConnectFuture { + /** */ + volatile int resCnt; + + /** */ + volatile SingleAddressConnectFuture[] futs; + + /** */ + final int nodeIdx; + + /** + * @param nodeIdx Node index. + */ + MultipleAddressesConnectFuture(int nodeIdx) { + this.nodeIdx = nodeIdx; + + } + + /** {@inheritDoc} */ + @Override public void onNodeFailed() { + SingleAddressConnectFuture[] futs = this.futs; + + for (int i = 0; i < futs.length; i++) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onNodeFailed(); + } + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + SingleAddressConnectFuture[] futs = this.futs; + + for (int i = 0; i < futs.length; i++) { + ConnectFuture fut = futs[i]; + + if (fut != null) + fut.onTimeout(); + } + } + + /** + * @param addrs Node addresses. + * @param rmtNodeId Id of node to open connection check session with. + */ + void init(Collection<InetSocketAddress> addrs, UUID rmtNodeId) { + SingleAddressConnectFuture[] futs = new SingleAddressConnectFuture[addrs.size()]; + + for (int i = 0; i < addrs.size(); i++) { + SingleAddressConnectFuture fut = new SingleAddressConnectFuture(nodeIdx) { + @Override void onStatusReceived(boolean res) { + receivedAddressStatus(res); + } + }; + + futs[i] = fut; + } + + this.futs = futs; + + int idx = 0; + + for (InetSocketAddress addr : addrs) { + futs[idx++].init(addr, rmtNodeId); + + if (resCnt == Integer.MAX_VALUE) + return; + } + + // Close race. + if (done()) + cancelFutures(); + } + + /** + * @return {@code True} + */ + private boolean done() { + int resCnt0 = resCnt; + + return resCnt0 == Integer.MAX_VALUE || resCnt0 == futs.length; + } + + /** + * + */ + private void cancelFutures() { + SingleAddressConnectFuture[] futs = this.futs; + + if (futs != null) { + for (int i = 0; i < futs.length; i++) { + SingleAddressConnectFuture fut = futs[i]; + + fut.cancel(); + } + } + } + + /** + * @param res Result. + */ + void receivedAddressStatus(boolean res) { + if (res) { + for (;;) { + int resCnt0 = resCnt; + + if (resCnt0 == Integer.MAX_VALUE) + return; + + if (connResCntUpdater.compareAndSet(this, resCnt0, Integer.MAX_VALUE)) { + receivedConnectionStatus(nodeIdx, true); + + cancelFutures(); // Cancel others connects if they are still in progress. + + return; + } + } + } + else { + for (;;) { + int resCnt0 = resCnt; + + if (resCnt0 == Integer.MAX_VALUE) + return; + + int resCnt1 = resCnt0 + 1; + + if (connResCntUpdater.compareAndSet(this, resCnt0, resCnt1)) { + if (resCnt1 == futs.length) + receivedConnectionStatus(nodeIdx, false); + + return; + } + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java new file mode 100644 index 0000000..cbf27b5 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpCommunicationNodeConnectionCheckFuture.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.internal; + +import java.util.UUID; + +/** + * Tcp Communication Node Connection Check Future. + */ +public interface TcpCommunicationNodeConnectionCheckFuture { + /** + * @param nodeId Remote node ID. + */ + public void onConnected(UUID nodeId); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java index a0f9b75..f26ad33 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiCustomMessage.java @@ -30,12 +30,23 @@ import org.jetbrains.annotations.Nullable; */ public interface DiscoverySpiCustomMessage extends Serializable { /** - * Called when message passed the ring. + * Called when custom message has been handled by all nodes. + * + * @return Ack message or {@code null} if ack is not required. */ @Nullable public DiscoverySpiCustomMessage ackMessage(); /** - * @return {@code true} if message can be modified during listener notification. Changes will be send to next nodes. + * @return {@code True} if message can be modified during listener notification. Changes will be send to next nodes. */ public boolean isMutable(); + + /** + * Called on discovery coordinator node after listener is notified. If returns {@code true} + * then message is not passed to others nodes, if after this method {@link #ackMessage()} returns non-null ack + * message, it is sent to all nodes. + * + * @return {@code True} if message should not be sent to all nodes. + */ + public boolean stopProcess(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java new file mode 100644 index 0000000..37aa323 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiMutableCustomMessageSupport.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * This annotation is for all implementations of {@link DiscoverySpi} that support + * topology mutable {@link DiscoverySpiCustomMessage}s. + */ +@Documented +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface DiscoverySpiMutableCustomMessageSupport { + /** + * @return Whether or not target SPI supports mutable {@link DiscoverySpiCustomMessage}s. + */ + public boolean value(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 2d9a314..f0a5186 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -55,6 +55,8 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; @@ -88,6 +90,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiDataExchange; import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport; import org.apache.ignite.spi.discovery.DiscoverySpiListener; +import org.apache.ignite.spi.discovery.DiscoverySpiMutableCustomMessageSupport; import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; @@ -103,6 +106,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAuthFailedMessag import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCheckFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDuplicateIdMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryEnsureDelivery; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONSISTENT_ID_BY_HOST_WITHOUT_PORT; @@ -223,7 +227,8 @@ import static org.apache.ignite.IgniteSystemProperties.getBoolean; @IgniteSpiMultipleInstancesSupport(true) @DiscoverySpiOrderSupport(true) @DiscoverySpiHistorySupport(true) -public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { +@DiscoverySpiMutableCustomMessageSupport(true) +public class TcpDiscoverySpi extends IgniteSpiAdapter implements IgniteDiscoverySpi { /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */ public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs"; @@ -409,6 +414,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { /** */ protected IgniteSpiContext spiCtx; + /** */ + private IgniteDiscoverySpiInternalListener internalLsnr; + /** * Gets current SPI state. * @@ -473,6 +481,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { /** {@inheritDoc} */ @Override public void sendCustomEvent(DiscoverySpiCustomMessage msg) throws IgniteException { + IgniteDiscoverySpiInternalListener internalLsnr = this.internalLsnr; + + if (internalLsnr != null) { + if (!internalLsnr.beforeSendCustomEvent(this, log, msg)) + return; + } + impl.sendCustomEvent(msg); } @@ -1559,6 +1574,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { + if (internalLsnr != null && msg instanceof TcpDiscoveryJoinRequestMessage) + internalLsnr.beforeJoin(locNode, log); + assert sock != null; assert msg != null; assert out != null; @@ -2118,15 +2136,31 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { return ignite().configuration().getSslContextFactory() != null; } - /** - * Force reconnect to cluster. - * - * @throws IgniteSpiException If failed. - */ - public void reconnect() throws IgniteSpiException { + /** {@inheritDoc} */ + public void clientReconnect() throws IgniteSpiException { impl.reconnect(); } + /** {@inheritDoc} */ + @Override public boolean knownNode(UUID nodeId) { + return getNode0(nodeId) != null; + } + + /** {@inheritDoc} */ + @Override public boolean clientReconnectSupported() { + return !clientReconnectDisabled; + } + + /** {@inheritDoc} */ + @Override public boolean supportsCommunicationFailureResolve() { + return false; + } + + /** {@inheritDoc} */ + @Override public void resolveCommunicationFailure(ClusterNode node, Exception err) { + throw new UnsupportedOperationException(); + } + /** * <strong>FOR TEST ONLY!!!</strong> */ @@ -2148,6 +2182,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { sndMsgLsnrs.add(lsnr); } + /** {@inheritDoc} */ + @Override public void setInternalListener(IgniteDiscoverySpiInternalListener lsnr) { + this.internalLsnr = lsnr; + } + /** * <strong>FOR TEST ONLY!!!</strong> */ @@ -2185,7 +2224,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi { * <p> * This method is intended for test purposes only. */ - protected void simulateNodeFailure() { + public void simulateNodeFailure() { impl.simulateNodeFailure(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java index 01534f7..55fe4e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java @@ -33,9 +33,9 @@ import java.util.UUID; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.ClusterMetricsSnapshot; import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -58,7 +58,7 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTE * <strong>This class is not intended for public use</strong> and has been made * <tt>public</tt> due to certain limitations of Java technology. */ -public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements ClusterNode, +public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements IgniteClusterNode, Comparable<TcpDiscoveryNode>, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -291,26 +291,14 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste return metrics; } - /** - * Sets node metrics. - * - * @param metrics Node metrics. - */ + /** {@inheritDoc} */ public void setMetrics(ClusterMetrics metrics) { assert metrics != null; this.metrics = metrics; } - /** - * Gets collections of cache metrics for this node. Note that node cache metrics are constantly updated - * and provide up to date information about caches. - * <p> - * Cache metrics are updated with some delay which is directly related to metrics update - * frequency. For example, by default the update will happen every {@code 2} seconds. - * - * @return Runtime metrics snapshots for this node. - */ + /** {@inheritDoc} */ public Map<Integer, CacheMetrics> cacheMetrics() { if (metricsProvider != null) { Map<Integer, CacheMetrics> cacheMetrics0 = metricsProvider.cacheMetrics(); @@ -323,11 +311,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste return cacheMetrics; } - /** - * Sets node cache metrics. - * - * @param cacheMetrics Cache metrics. - */ + /** {@inheritDoc} */ public void setCacheMetrics(Map<Integer, CacheMetrics> cacheMetrics) { this.cacheMetrics = cacheMetrics != null ? cacheMetrics : Collections.<Integer, CacheMetrics>emptyMap(); } @@ -544,11 +528,7 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste return node; } - /** - * Whether this node is cache client (see {@link IgniteConfiguration#isClientMode()}). - * - * @return {@code True if client}. - */ + /** {@inheritDoc} */ public boolean isCacheClient() { if (!cacheCliInit) { cacheCli = CU.clientNodeDirect(this); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/main/resources/META-INF/classnames.properties ---------------------------------------------------------------------- diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index f0f143d..6dc3d85 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -832,6 +832,7 @@ org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPar org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$1 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader$2 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments +org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionCountersMap2 org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap @@ -1129,6 +1130,7 @@ org.apache.ignite.internal.processors.closure.GridClosureProcessor$T8 org.apache.ignite.internal.processors.closure.GridClosureProcessor$T9 org.apache.ignite.internal.processors.closure.GridClosureProcessor$TaskNoReduceAdapter org.apache.ignite.internal.processors.closure.GridPeerDeployAwareTaskAdapter +org.apache.ignite.internal.processors.cluster.ClusterNodeMetrics org.apache.ignite.internal.processors.cluster.BaselineTopology org.apache.ignite.internal.processors.cluster.BaselineTopologyHistory org.apache.ignite.internal.processors.cluster.BaselineTopologyHistoryItem http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java index 900d4f5..eee47c7 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AffinityFunctionExcludeNeighborsAbstractSelfTest.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -124,12 +123,9 @@ public abstract class AffinityFunctionExcludeNeighborsAbstractSelfTest extends G Affinity<Object> aff = g.affinity(DEFAULT_CACHE_NAME); - List<TcpDiscoveryNode> top = new ArrayList<>(); + List<ClusterNode> top = new ArrayList<>(g.cluster().nodes()); - for (ClusterNode node : g.cluster().nodes()) - top.add((TcpDiscoveryNode) node); - - Collections.sort(top); + Collections.sort((List)top); assertEquals(grids, top.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java index 4e4d75a..5eca7d6 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java @@ -120,6 +120,10 @@ public class FailureHandlerTriggeredTest extends GridCommonAbstractTest { @Override public boolean isMutable() { return false; } + + @Override public boolean stopProcess() { + return false; + } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java index 2328c84..141f4af 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupHostsSelfTest.java @@ -61,6 +61,9 @@ public class ClusterGroupHostsSelfTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testForHosts() throws Exception { + if (!tcpDiscovery()) + return; + Ignite ignite = grid(); assertEquals(1, ignite.cluster().forHost("h_1").nodes().size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java index 9df561a..99006d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java @@ -68,6 +68,8 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest { if (i == 0) ignite = g; } + + waitForTopology(NODES_CNT); } finally { Ignition.setClientMode(false); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java new file mode 100644 index 0000000..6e6b4a4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterNodeMetricsUpdateTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import junit.framework.AssertionFailedError; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCompute; +import org.apache.ignite.Ignition; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.cluster.ClusterMetrics; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * + */ +public class ClusterNodeMetricsUpdateTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setMetricsUpdateFrequency(500); + + cfg.setClientMode(client); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testMetrics() throws Exception { + int NODES = 6; + + Ignite srv0 = startGridsMultiThreaded(NODES / 2); + + client = true; + + startGridsMultiThreaded(NODES / 2, NODES / 2); + + Map<UUID, Integer> expJobs = new HashMap<>(); + + for (int i = 0; i < NODES; i++) + expJobs.put(nodeId(i), 0); + + checkMetrics(NODES, expJobs); + + for (int i = 0; i < NODES; i++) { + UUID nodeId = nodeId(i); + + IgniteCompute c = srv0.compute(srv0.cluster().forNodeId(nodeId(i))); + + c.call(new DummyCallable(null)); + + expJobs.put(nodeId, 1); + } + } + + /** + * @param expNodes Expected nodes. + * @param expJobs Expected jobs number per node. + */ + private void checkMetrics0(int expNodes, Map<UUID, Integer> expJobs) { + List<Ignite> nodes = Ignition.allGrids(); + + assertEquals(expNodes, nodes.size()); + assertEquals(expNodes, expJobs.size()); + + int totalJobs = 0; + + for (Integer c : expJobs.values()) + totalJobs += c; + + for (final Ignite ignite : nodes) { + ClusterMetrics m = ignite.cluster().metrics(); + + assertEquals(expNodes, m.getTotalNodes()); + assertEquals(totalJobs, m.getTotalExecutedJobs()); + + for (Map.Entry<UUID, Integer> e : expJobs.entrySet()) { + UUID nodeId = e.getKey(); + + ClusterGroup g = ignite.cluster().forNodeId(nodeId); + + ClusterMetrics nodeM = g.metrics(); + + assertEquals(e.getValue(), (Integer)nodeM.getTotalExecutedJobs()); + } + } + } + + /** + * @param expNodes Expected nodes. + * @param expJobs Expected jobs number per node. + * @throws Exception If failed. + */ + private void checkMetrics(final int expNodes, final Map<UUID, Integer> expJobs) throws Exception { + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + checkMetrics0(expNodes, expJobs); + } + catch (AssertionFailedError e) { + return false; + } + + return true; + } + }, 5000); + + checkMetrics0(expNodes, expJobs); + } + + /** + * + */ + private static class DummyCallable implements IgniteCallable<Object> { + /** */ + private byte[] data; + + /** + * @param data Data. + */ + DummyCallable(byte[] data) { + this.data = data; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + return data; + } + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java new file mode 100644 index 0000000..46d9edc --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/DiscoverySpiTestListener.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpiInternalListener; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.DiscoverySpi; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Test callback for discovery SPI. + * <p> + * Allows block/delay node join and custom event sending. + */ +public class DiscoverySpiTestListener implements IgniteDiscoverySpiInternalListener { + /** */ + private volatile CountDownLatch joinLatch; + + /** */ + private Set<Class<?>> blockCustomEvtCls; + + /** */ + private final Object mux = new Object(); + + /** */ + private List<DiscoverySpiCustomMessage> blockedMsgs = new ArrayList<>(); + + /** */ + private volatile DiscoverySpi spi; + + /** */ + private volatile IgniteLogger log; + + /** + * + */ + public void startBlockJoin() { + joinLatch = new CountDownLatch(1); + } + + /** + * + */ + public void stopBlockJoin() { + joinLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) { + try { + CountDownLatch writeLatch0 = joinLatch; + + if (writeLatch0 != null) { + log.info("Block join"); + + U.await(writeLatch0); + } + } + catch (Exception e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public boolean beforeSendCustomEvent(DiscoverySpi spi, IgniteLogger log, DiscoverySpiCustomMessage msg) { + this.spi = spi; + this.log = log; + + synchronized (mux) { + if (blockCustomEvtCls != null) { + DiscoveryCustomMessage msg0 = GridTestUtils.getFieldValue(msg, "delegate"); + + if (blockCustomEvtCls.contains(msg0.getClass())) { + log.info("Block custom message: " + msg0); + + blockedMsgs.add(msg); + + mux.notifyAll(); + + return false; + } + } + } + + return true; + } + + /** + * @param blockCustomEvtCls Event class to block. + */ + public void blockCustomEvent(Class<?> cls0, Class<?> ... blockCustomEvtCls) { + synchronized (mux) { + assert blockedMsgs.isEmpty() : blockedMsgs; + + this.blockCustomEvtCls = new HashSet<>(); + + this.blockCustomEvtCls.add(cls0); + + Collections.addAll(this.blockCustomEvtCls, blockCustomEvtCls); + } + } + + /** + * @throws InterruptedException If interrupted. + */ + public void waitCustomEvent() throws InterruptedException { + synchronized (mux) { + while (blockedMsgs.isEmpty()) + mux.wait(); + } + } + + /** + * + */ + public void stopBlockCustomEvents() { + if (spi == null) + return; + + List<DiscoverySpiCustomMessage> msgs; + + synchronized (this) { + msgs = new ArrayList<>(blockedMsgs); + + blockCustomEvtCls = null; + + blockedMsgs.clear(); + } + + for (DiscoverySpiCustomMessage msg : msgs) { + log.info("Resend blocked message: " + msg); + + spi.sendCustomEvent(msg); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java index e6b678b..883d677 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridDiscoverySelfTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.lang.IgniteProductVersion.fromString; @@ -158,10 +159,10 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest { final AtomicInteger cnt = new AtomicInteger(); - /** Joined nodes counter. */ + // Joined nodes counter. final CountDownLatch joinedCnt = new CountDownLatch(NODES_CNT); - /** Left nodes counter. */ + // Left nodes counter. final CountDownLatch leftCnt = new CountDownLatch(NODES_CNT); IgnitePredicate<Event> lsnr = new IgnitePredicate<Event>() { @@ -171,7 +172,7 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest { joinedCnt.countDown(); } - else if (EVT_NODE_LEFT == evt.type()) { + else if (EVT_NODE_LEFT == evt.type() || EVT_NODE_FAILED == evt.type()) { int i = cnt.decrementAndGet(); assert i >= 0; @@ -185,7 +186,10 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest { } }; - ignite.events().localListen(lsnr, EVT_NODE_LEFT, EVT_NODE_JOINED); + int[] evts = tcpDiscovery() ? new int[]{EVT_NODE_LEFT, EVT_NODE_JOINED} : + new int[]{EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_NODE_JOINED}; + + ignite.events().localListen(lsnr, evts); try { for (int i = 0; i < NODES_CNT; i++) @@ -242,6 +246,8 @@ public class GridDiscoverySelfTest extends GridCommonAbstractTest { for (int i = 0; i < NODES_CNT; i++) stopGrid(i); + waitForTopology(1); + final long topVer = discoMgr.topologyVersion(); assert topVer == topVer0 + NODES_CNT * 2 : "Unexpected topology version: " + topVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java index cd6b2c0..a8be541 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobMasterLeaveAwareSelfTest.java @@ -259,6 +259,8 @@ public class GridJobMasterLeaveAwareSelfTest extends GridCommonAbstractTest { // Now we stop master grid. stopGrid(lastGridIdx, true); + waitForTopology(GRID_CNT - 1); + // Release communication SPI wait latches. As master node is stopped, job worker will receive and exception. for (int i = 0; i < lastGridIdx; i++) ((CommunicationSpi)grid(i).configuration().getCommunicationSpi()).releaseWaitLatch(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java index f3a19aa..6824d51 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridJobStealingSelfTest.java @@ -187,6 +187,8 @@ public class GridJobStealingSelfTest extends GridCommonAbstractTest { public void testProjectionPredicateInternalStealing() throws Exception { final Ignite ignite3 = startGrid(3); + waitForTopology(3); + final UUID node1 = ignite1.cluster().localNode().id(); final UUID node3 = ignite3.cluster().localNode().id(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java index 66e9cf4..a04c38e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSameVmStartupSelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import java.util.Collection; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; @@ -75,8 +76,10 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest { ignite2.events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { - assert evt.type() != EVT_NODE_FAILED : - "Node1 did not exit gracefully."; + boolean tcpDiscovery = tcpDiscovery(); + + if (tcpDiscovery) + assert evt.type() != EVT_NODE_FAILED : "Node1 did not exit gracefully."; if (evt instanceof DiscoveryEvent) { // Local node can send METRICS_UPDATED event. @@ -86,8 +89,14 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest { ((DiscoveryEvent) evt).eventNode().id() + ", expected=" + grid1LocNodeId + ", type=" + evt.type() + ']'; - if (evt.type() == EVT_NODE_LEFT) - latch.countDown(); + if (tcpDiscovery) { + if (evt.type() == EVT_NODE_LEFT) + latch.countDown(); + } + else { + if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) + latch.countDown(); + } } return true; @@ -96,7 +105,7 @@ public class GridSameVmStartupSelfTest extends GridCommonAbstractTest { stopGrid(1); - latch.await(); + assertTrue(latch.await(10, TimeUnit.SECONDS)); Collection<ClusterNode> top2 = ignite2.cluster().forRemotes().nodes(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java index 7e368cb..f71ffb0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java @@ -45,6 +45,8 @@ public class GridSelfTest extends ClusterGroupAbstractTest { for (int i = 0; i < NODES_CNT; i++) startGrid(i); + + waitForTopology(NODES_CNT); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java index fa9cc35..e68ea13 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -38,6 +39,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; @@ -143,6 +145,14 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra /** * @param ignite Node. + * @return Discovery SPI. + */ + protected static IgniteDiscoverySpi spi0(Ignite ignite) { + return ((IgniteDiscoverySpi)ignite.configuration().getDiscoverySpi()); + } + + /** + * @param ignite Node. * @return Communication SPI. */ protected BlockTcpCommunicationSpi commSpi(Ignite ignite) { @@ -185,16 +195,28 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra * @return Server node client connected to. */ protected Ignite clientRouter(Ignite client) { - TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode(); + if (tcpDiscovery()) { + TcpDiscoveryNode node = (TcpDiscoveryNode)client.cluster().localNode(); + + assertTrue(node.isClient()); + assertNotNull(node.clientRouterNodeId()); - assertTrue(node.isClient()); - assertNotNull(node.clientRouterNodeId()); + Ignite srv = G.ignite(node.clientRouterNodeId()); - Ignite srv = G.ignite(node.clientRouterNodeId()); + assertNotNull(srv); + + return srv; + } + else { + for (Ignite node : G.allGrids()) { + if (!node.cluster().localNode().isClient()) + return node; + } - assertNotNull(srv); + fail(); - return srv; + return null; + } } /** @@ -251,15 +273,24 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra List<Ignite> clients, Ignite srv, @Nullable Runnable disconnectedC) throws Exception { - final TestTcpDiscoverySpi srvSpi = spi(srv); + final IgniteDiscoverySpi srvSpi = spi0(srv); final CountDownLatch disconnectLatch = new CountDownLatch(clients.size()); final CountDownLatch reconnectLatch = new CountDownLatch(clients.size()); log.info("Block reconnect."); - for (Ignite client : clients) - spi(client).writeLatch = new CountDownLatch(1); + List<DiscoverySpiTestListener> blockLsnrs = new ArrayList<>(); + + for (Ignite client : clients) { + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + lsnr.startBlockJoin(); + + blockLsnrs.add(lsnr); + + spi0(client).setInternalListener(lsnr); + } IgnitePredicate<Event> p = new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { @@ -291,8 +322,8 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra log.info("Allow reconnect."); - for (Ignite client : clients) - spi(client).writeLatch.countDown(); + for (DiscoverySpiTestListener blockLsnr : blockLsnrs) + blockLsnr.stopBlockJoin(); waitReconnectEvent(log, reconnectLatch); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java index 06bde99..43da2d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectApiExceptionTest.java @@ -44,6 +44,7 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.CollectionConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; @@ -51,6 +52,7 @@ import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import static java.util.concurrent.TimeUnit.SECONDS; @@ -99,7 +101,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect * @throws Exception If failed. */ @SuppressWarnings("unchecked") - public void dataStructureOperationsTest() throws Exception { + private void dataStructureOperationsTest() throws Exception { clientMode = true; final Ignite client = startGrid(serverCount()); @@ -219,7 +221,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect * @throws Exception If failed. */ @SuppressWarnings("unchecked") - public void cacheOperationsTest() throws Exception { + private void cacheOperationsTest() throws Exception { clientMode = true; final Ignite client = startGrid(serverCount()); @@ -537,7 +539,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect * @throws Exception If failed. */ @SuppressWarnings("unchecked") - public void igniteOperationsTest() throws Exception { + private void igniteOperationsTest() throws Exception { clientMode = true; final Ignite client = startGrid(serverCount()); @@ -775,11 +777,11 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect throws Exception { assertNotNull(client.cache(DEFAULT_CACHE_NAME)); - final TestTcpDiscoverySpi clientSpi = spi(client); + final IgniteDiscoverySpi clientSpi = spi0(client); Ignite srv = clientRouter(client); - TestTcpDiscoverySpi srvSpi = spi(srv); + DiscoverySpi srvSpi = spi0(srv); final CountDownLatch disconnectLatch = new CountDownLatch(1); @@ -787,7 +789,10 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect log.info("Block reconnect."); - clientSpi.writeLatch = new CountDownLatch(1); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + clientSpi.setInternalListener(lsnr); + lsnr.startBlockJoin(); final List<IgniteInternalFuture> futs = new ArrayList<>(); @@ -832,7 +837,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect log.info("Allow reconnect."); - clientSpi.writeLatch.countDown(); + lsnr.stopBlockJoin(); waitReconnectEvent(reconnectLatch); @@ -857,7 +862,7 @@ public class IgniteClientReconnectApiExceptionTest extends IgniteClientReconnect } } finally { - clientSpi.writeLatch.countDown(); + lsnr.stopBlockJoin(); for (IgniteInternalFuture fut : futs) fut.cancel(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java index 00daf5f..d1e3ade 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAtomicsTest.java @@ -111,7 +111,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeq", 0, true); @@ -144,7 +144,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final IgniteAtomicSequence clientAtomicSeq = client.atomicSequence("atomicSeqRmv", 0, true); @@ -192,7 +192,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); BlockTcpCommunicationSpi commSpi = commSpi(srv); @@ -253,7 +253,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRef", "1st value", true); @@ -294,7 +294,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRefRemoved", "1st value", true); @@ -347,7 +347,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final IgniteAtomicReference<String> clientAtomicRef = client.atomicReference("atomicRefInProg", "1st value", true); @@ -414,7 +414,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStamped", 0, 0, true); @@ -455,7 +455,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedRemoved", 0, 0, true); @@ -506,7 +506,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final IgniteAtomicStamped clientAtomicStamped = client.atomicStamped("atomicStampedInProgress", 0, 0, true); @@ -574,7 +574,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLong", 0, true); @@ -605,7 +605,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final IgniteAtomicLong clientAtomicLong = client.atomicLong("atomicLongRmv", 0, true); @@ -646,7 +646,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); BlockTcpCommunicationSpi commSpi = commSpi(srv); @@ -701,7 +701,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); IgniteCountDownLatch clientLatch = client.countDownLatch("latch1", 3, false, true); @@ -742,7 +742,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); IgniteSemaphore clientSemaphore = client.semaphore("semaphore1", 3, false, true); @@ -789,7 +789,7 @@ public class IgniteClientReconnectAtomicsTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); IgniteLock clientLock = client.reentrantLock("lock1", true, fair, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 518e674..3cb82e0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; @@ -67,6 +68,7 @@ import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.transactions.Transaction; @@ -155,11 +157,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac IgniteEx client = startGrid(SRV_CNT); - final TestTcpDiscoverySpi clientSpi = spi(client); + final IgniteDiscoverySpi clientSpi = spi0(client); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); - TestTcpDiscoverySpi srvSpi = spi(srv); + DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi(); final IgniteCache<Object, Object> cache = client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)); @@ -188,7 +190,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Block reconnect."); - clientSpi.writeLatch = new CountDownLatch(1); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + clientSpi.setInternalListener(lsnr); + + lsnr.startBlockJoin(); final AtomicReference<IgniteInternalFuture> blockPutRef = new AtomicReference<>(); @@ -254,7 +260,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Allow reconnect."); - clientSpi.writeLatch.countDown(); + lsnr.stopBlockJoin(); assertTrue(reconnectLatch.await(5000, MILLISECONDS)); @@ -319,7 +325,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac IgniteEx client = startGrid(SRV_CNT); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME); @@ -412,17 +418,21 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac final TransactionConcurrency txConcurrency, final IgniteCache<Object, Object> cache) throws Exception { - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); - final TestTcpDiscoverySpi clientSpi = spi(client); - final TestTcpDiscoverySpi srvSpi = spi(srv); + final IgniteDiscoverySpi clientSpi = spi0(client); + final DiscoverySpi srvSpi = spi0(srv); final CountDownLatch disconnectLatch = new CountDownLatch(1); final CountDownLatch reconnectLatch = new CountDownLatch(1); log.info("Block reconnect."); - clientSpi.writeLatch = new CountDownLatch(1); + DiscoverySpiTestListener lsnr = new DiscoverySpiTestListener(); + + clientSpi.setInternalListener(lsnr); + + lsnr.startBlockJoin(); client.events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { @@ -530,7 +540,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac assertTrue(putFailed.await(5000, MILLISECONDS)); - clientSpi.writeLatch.countDown(); + lsnr.stopBlockJoin(); waitReconnectEvent(reconnectLatch); @@ -604,9 +614,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac IgniteEx client = startGrid(SRV_CNT); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); - TestTcpDiscoverySpi srvSpi = spi(srv); + DiscoverySpi srvSpi = spi0(srv); TestCommunicationSpi coordCommSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi(); @@ -691,7 +701,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() { @Override public Boolean call() throws Exception { try { - Ignition.start(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT)))); + startGrid(optimize(getConfiguration(getTestIgniteInstanceName(SRV_CNT)))); // Commented due to IGNITE-4473, because // IgniteClientDisconnectedException won't @@ -722,7 +732,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac } }); - TestTcpDiscoverySpi srvSpi = spi(srv); + DiscoverySpi srvSpi = spi0(srv); try { if (!joinLatch.await(10_000, MILLISECONDS)) { @@ -1256,30 +1266,35 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac * */ static class TestClass1 implements Serializable { + // No-op. } /** * */ static class TestClass2 implements Serializable { + // No-op. } /** * */ static class TestClass3 implements Serializable { + // No-op. } /** * */ static class TestClass4 implements Serializable { + // No-op. } /** * */ static class TestClass5 implements Serializable { + // No-op. } /** @@ -1294,11 +1309,11 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac Class<?> msgToBlock, final IgniteInClosure<IgniteCache<Object, Object>> c) throws Exception { - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final UUID id = client.localNode().id(); - TestTcpDiscoverySpi srvSpi = spi(srv); + DiscoverySpi srvSpi = spi0(srv); final IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java index 3f0e33d..5be59b0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -180,7 +180,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA private void serverNodeReconnect(CollectionConfiguration colCfg) throws Exception { final Ignite client = grid(serverCount()); - final Ignite srv = clientRouter(client); + final Ignite srv = ignite(0); assertNotNull(srv.queue("q", 0, colCfg)); assertNotNull(srv.set("s", colCfg)); @@ -201,7 +201,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final String setName = "set-" + colCfg.getAtomicityMode(); @@ -235,7 +235,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA assertTrue(client.cluster().localNode().isClient()); - final Ignite srv = clientRouter(client); + final Ignite srv = ignite(0); final String setName = "set-rm-" + colCfg.getAtomicityMode(); @@ -281,7 +281,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA assertTrue(client.cluster().localNode().isClient()); - final Ignite srv = clientRouter(client); + final Ignite srv = ignite(0); final String setName = "set-in-progress-" + colCfg.getAtomicityMode(); @@ -347,7 +347,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final String setName = "queue-" + colCfg.getAtomicityMode(); @@ -379,7 +379,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final String setName = "queue-rmv" + colCfg.getAtomicityMode(); @@ -423,7 +423,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); final String setName = "queue-rmv" + colCfg.getAtomicityMode(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java index cce0c7e..57d3188 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectComputeTest.java @@ -49,7 +49,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); IgniteCache<Integer, Integer> cache = client.getOrCreateCache("test-cache"); @@ -103,7 +103,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); BlockTcpCommunicationSpi commSpi = commSpi(srv); @@ -152,7 +152,7 @@ public class IgniteClientReconnectComputeTest extends IgniteClientReconnectAbstr assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); BlockTcpCommunicationSpi commSpi = commSpi(srv); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java index ca0d889..d68fc1c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteRunnable; @@ -61,9 +62,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); - TestTcpDiscoverySpi srvSpi = spi(srv); + IgniteDiscoverySpi srvSpi = spi0(srv); EventListener lsnr = new EventListener(); @@ -133,9 +134,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe assertTrue(client.cluster().localNode().isClient()); - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); - TestTcpDiscoverySpi srvSpi = spi(srv); + IgniteDiscoverySpi srvSpi = spi0(srv); final String topic = "testTopic"; @@ -309,9 +310,9 @@ public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientRe CacheEventListener lsnr) throws Exception { - Ignite srv = clientRouter(client); + Ignite srv = ignite(0); - TestTcpDiscoverySpi srvSpi = spi(srv); + IgniteDiscoverySpi srvSpi = spi0(srv); final CountDownLatch reconnectLatch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/ignite/blob/a64b941d/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java index c071ee2..6e77742 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java @@ -27,6 +27,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.Event; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.DiscoverySpi; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED; import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED; @@ -64,20 +65,23 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne nodeCnt.put(1, 1); nodeCnt.put(2, 2); nodeCnt.put(3, 3); - nodeCnt.put(4, 4); - for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) { - Collection<ClusterNode> nodes = cluster.topology(e.getKey()); + if (tcpDiscovery()) { + nodeCnt.put(4, 4); - assertNotNull("No nodes for topology: " + e.getKey(), nodes); - assertEquals((int)e.getValue(), nodes.size()); + for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) { + Collection<ClusterNode> nodes = cluster.topology(e.getKey()); + + assertNotNull("No nodes for topology: " + e.getKey(), nodes); + assertEquals((int)e.getValue(), nodes.size()); + } } ClusterNode locNode = cluster.localNode(); assertEquals(topVer, locNode.order()); - TestTcpDiscoverySpi srvSpi = spi(clientRouter(client)); + DiscoverySpi srvSpi = ignite(0).configuration().getDiscoverySpi(); final CountDownLatch reconnectLatch = new CountDownLatch(1); @@ -112,7 +116,11 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne assertEquals(topVer, locNode.order()); assertEquals(topVer, cluster.topologyVersion()); - nodeCnt.put(5, 3); + if (tcpDiscovery()) + nodeCnt.put(5, 3); + else + nodeCnt.clear(); + nodeCnt.put(6, 4); for (Map.Entry<Integer, Integer> e : nodeCnt.entrySet()) {