IGNITE-7639 Fixed NPE
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c917327b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c917327b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c917327b Branch: refs/heads/ignite-7485-2 Commit: c917327b74b3dbcb65b961277f5d2c99a71b7a8a Parents: ce7c147 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Wed Feb 7 21:10:32 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Wed Feb 7 21:10:32 2018 +0300 ---------------------------------------------------------------------- .../cluster/DiscoveryDataClusterState.java | 41 +++- .../IgniteClusterActivateDeactivateTest.java | 222 ++++++++----------- 2 files changed, 128 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c917327b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java index dea2ce7..b022754 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/DiscoveryDataClusterState.java @@ -28,22 +28,36 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; /** - * Discovery data related to cluster state. + * A pojo-object representing current cluster global state. The state includes cluster active flag and cluster + * baseline topology. + * <p> + * This object also captures a transitional cluster state, when one or more fields are changing. In this case, + * a {@code transitionReqId} field is set to a non-null value and {@code prevState} captures previous cluster state. + * A joining node catching the cluster in an intermediate state will observe {@code transitionReqId} field to be + * non-null, however the {@code prevState} will not be sent to the joining node. + * + * TODO https://issues.apache.org/jira/browse/IGNITE-7640 This class must be immutable, transitionRes must be set by calling finish(). */ public class DiscoveryDataClusterState implements Serializable { /** */ private static final long serialVersionUID = 0L; - /** */ + /** Flag indicating if the cluster in in active state. */ private final boolean active; - /** */ + /** Current cluster baseline topology. */ @Nullable private final BaselineTopology baselineTopology; - /** */ + /** + * Transition request ID. Set to a non-null value if the cluster is changing it's state. + * The ID is assigned on the initiating node. + */ private final UUID transitionReqId; - /** Topology version for state change exchange. */ + /** + * Topology version in the cluster when state change request was received by the coordinator. + * The exchange fired for the cluster state change will be on version {@code transitionTopVer.nextMinorVersion()}. + */ @GridToStringInclude private final AffinityTopologyVersion transitionTopVer; @@ -51,13 +65,18 @@ public class DiscoveryDataClusterState implements Serializable { @GridToStringExclude private final Set<UUID> transitionNodes; - /** Local flag for state transition result (global state is updated asynchronously by custom message). */ + /** + * Local flag for state transition active state result (global state is updated asynchronously by custom message), + * {@code null} means that state change is not completed yet. + */ private transient volatile Boolean transitionRes; - /** */ + /** + * Previous cluster state if this state is a transition state and it was not received by a joining node. + */ private transient DiscoveryDataClusterState prevState; - /** */ + /** Transition result error. */ private transient volatile Exception transitionError; /** @@ -86,6 +105,7 @@ public class DiscoveryDataClusterState implements Serializable { assert transitionReqId != null; assert transitionTopVer != null; assert !F.isEmpty(transitionNodes) : transitionNodes; + assert prevState != null; return new DiscoveryDataClusterState( prevState, @@ -156,7 +176,7 @@ public class DiscoveryDataClusterState implements Serializable { * @return {@code True} if cluster active state change is in progress, {@code false} otherwise. */ public boolean activeStateChanging() { - return transition() && active != prevState.active; + return transition() && (prevState == null || (prevState.active != active)); } /** @@ -202,6 +222,9 @@ public class DiscoveryDataClusterState implements Serializable { } /** + * Creates a non-transitional cluster state. This method effectively cleans all fields identifying the + * state as transitional and creates a new state with the state transition result. + * * @param success Transition success status. * @return Cluster state that finished transition. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c917327b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java index 71718c9..2337329 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java @@ -37,7 +37,6 @@ import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteClientReconnectAbstractTest; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; @@ -49,7 +48,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.TestTcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; @@ -85,9 +83,6 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest private boolean testSpi; /** */ - private boolean testDiscoSpi; - - /** */ private boolean testReconnectSpi; /** */ @@ -104,8 +99,6 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest spi.setJoinTimeout(2 * 60_000); } - else if (testDiscoSpi) - cfg.setDiscoverySpi(new TestTcpDiscoverySpi()); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); @@ -220,14 +213,14 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest } for (int i = 0; i < srvs + clients; i++) - assertFalse(ignite(i).active()); + assertFalse(ignite(i).cluster().active()); - ignite(activateFrom).active(false); // Should be no-op. + ignite(activateFrom).cluster().active(false); // Should be no-op. - ignite(activateFrom).active(true); + ignite(activateFrom).cluster().active(true); for (int i = 0; i < srvs + clients; i++) - assertTrue(ignite(i).active()); + assertTrue(ignite(i).cluster().active()); for (int i = 0; i < srvs + clients; i++) { for (int c = 0; c < 2; c++) @@ -308,16 +301,14 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest private void joinWhileActivate1(final boolean startClient, final boolean withNewCache) throws Exception { IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(2, 0, 0, false); - IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - client = startClient; + IgniteInternalFuture<?> startFut = GridTestUtils.runAsync((Callable<Void>)() -> { + client = startClient; - ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1(); + ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1(); - startGrid(2); + startGrid(2); - return null; - } + return null; }); TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite(1)); @@ -376,7 +367,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest int minorVer = 1; if (initiallyActive && persistenceEnabled()) { - ignite(0).active(true); + ignite(0).cluster().active(true); minorVer++; } @@ -396,11 +387,9 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest blockExchangeSingleMessage(spi, STATE_CHANGE_TOP_VER); } - IgniteInternalFuture<?> stateChangeFut = GridTestUtils.runAsync(new Runnable() { - @Override public void run() { - ignite(stateChangeFrom).active(!initiallyActive); - } - }); + IgniteInternalFuture<?> stateChangeFut = GridTestUtils.runAsync(() -> + ignite(stateChangeFrom).cluster().active(!initiallyActive) + ); for (TestRecordingCommunicationSpi spi : spis) spi.waitForBlocked(); @@ -417,17 +406,15 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest * @param topVer Exchange topology version. */ private void blockExchangeSingleMessage(TestRecordingCommunicationSpi spi, final AffinityTopologyVersion topVer) { - spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { - @Override public boolean apply(ClusterNode clusterNode, Message msg) { - if (msg instanceof GridDhtPartitionsSingleMessage) { - GridDhtPartitionsSingleMessage pMsg = (GridDhtPartitionsSingleMessage)msg; - - if (pMsg.exchangeId() != null && pMsg.exchangeId().topologyVersion().equals(topVer)) - return true; - } + spi.blockMessages((IgniteBiPredicate<ClusterNode, Message>)(clusterNode, msg) -> { + if (msg instanceof GridDhtPartitionsSingleMessage) { + GridDhtPartitionsSingleMessage pMsg = (GridDhtPartitionsSingleMessage)msg; - return false; + if (pMsg.exchangeId() != null && pMsg.exchangeId().topologyVersion().equals(topVer)) + return true; } + + return false; }); } @@ -460,16 +447,14 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest private void joinWhileDeactivate1(final boolean startClient, final boolean withNewCache) throws Exception { IgniteInternalFuture<?> activeFut = startNodesAndBlockStatusChange(2, 0, 0, true); - IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - client = startClient; + IgniteInternalFuture<?> startFut = GridTestUtils.runAsync((Callable<Void>)() -> { + client = startClient; - ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1(); + ccfgs = withNewCache ? cacheConfigurations2() : cacheConfigurations1(); - startGrid(2); + startGrid(2); - return null; - } + return null; }); TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite(1)); @@ -481,7 +466,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest checkNoCaches(3); - ignite(2).active(true); + ignite(2).cluster().active(true); for (int c = 0; c < 2; c++) checkCache(ignite(2), CACHE_NAME_PREFIX + c, true); @@ -529,30 +514,26 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest final CyclicBarrier b = new CyclicBarrier(START_NODES + 1); - IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - b.await(); + IgniteInternalFuture<Void> fut1 = GridTestUtils.runAsync(() -> { + b.await(); - Thread.sleep(ThreadLocalRandom.current().nextLong(100) + 1); + U.sleep(ThreadLocalRandom.current().nextLong(100) + 1); - ignite(0).active(true); + ignite(0).cluster().active(true); - return null; - } + return null; }); final AtomicInteger nodeIdx = new AtomicInteger(3); - IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - int idx = nodeIdx.getAndIncrement(); + IgniteInternalFuture<Long> fut2 = GridTestUtils.runMultiThreadedAsync((Callable<Void>)() -> { + int idx = nodeIdx.getAndIncrement(); - b.await(); + b.await(); - startGrid(idx); + startGrid(idx); - return null; - } + return null; }, START_NODES, "start-node"); fut1.get(); @@ -619,19 +600,19 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest } if (persistenceEnabled()) - ignite(deactivateFrom).active(true); + ignite(deactivateFrom).cluster().active(true); - ignite(deactivateFrom).active(true); // Should be no-op. + ignite(deactivateFrom).cluster().active(true); // Should be no-op. checkCaches(srvs + clients, CACHES); for (int i = 0; i < srvs + clients; i++) - assertTrue(ignite(i).active()); + assertTrue(ignite(i).cluster().active()); - ignite(deactivateFrom).active(false); + ignite(deactivateFrom).cluster().active(false); for (int i = 0; i < srvs + clients; i++) - assertFalse(ignite(i).active()); + assertFalse(ignite(i).cluster().active()); checkNoCaches(srvs + clients); @@ -648,12 +629,12 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest checkNoCaches(srvs + clients + 2); for (int i = 0; i < srvs + clients + 2; i++) - assertFalse(ignite(i).active()); + assertFalse(ignite(i).cluster().active()); - ignite(deactivateFrom).active(true); + ignite(deactivateFrom).cluster().active(true); for (int i = 0; i < srvs + clients + 2; i++) { - assertTrue(ignite(i).active()); + assertTrue(ignite(i).cluster().active()); checkCache(ignite(i), CU.UTILITY_CACHE_NAME, true); } @@ -695,7 +676,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest startWithCaches1(SRVS, CLIENTS); if (persistenceEnabled()) - ignite(0).active(true); + ignite(0).cluster().active(true); Ignite srv = ignite(0); Ignite client = ignite(SRVS); @@ -741,7 +722,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest checkNoCaches(SRVS + CLIENTS); - ignite(0).active(true); + ignite(0).cluster().active(true); checkCache(client, CU.UTILITY_CACHE_NAME, true); @@ -789,39 +770,38 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest IgniteEx client = grid(SRVS); if (persistenceEnabled()) - ignite(0).active(true); + ignite(0).cluster().active(true); checkCache(client, CU.UTILITY_CACHE_NAME, true); checkCaches1(SRVS + CLIENTS); + // Wait for late affinity assignment to finish. + grid(0).context().cache().context().exchange().affinityReadyFuture( + new AffinityTopologyVersion(SRVS + CLIENTS, 1)).get(); + final AffinityTopologyVersion STATE_CHANGE_TOP_VER = new AffinityTopologyVersion(SRVS + CLIENTS + 1, 1); final TestRecordingCommunicationSpi spi1 = transition ? TestRecordingCommunicationSpi.spi(ignite(1)) : null; final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>(); - IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, new Runnable() { - @Override public void run() { - if (transition) { - blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER); - - stateFut.set(GridTestUtils.runAsync(new Runnable() { - @Override public void run() { - srv.active(false); - } - }, "deactivate")); - - try { - U.sleep(500); - } - catch (Exception e) { - e.printStackTrace(); - } + IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, () -> { + if (transition) { + blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER); + + stateFut.set(GridTestUtils.runAsync(() -> srv.cluster().active(false), + "deactivate")); + + try { + U.sleep(500); + } + catch (Exception e) { + e.printStackTrace(); } - else - srv.active(false); } + else + srv.cluster().active(false); }); if (transition) { @@ -839,11 +819,11 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest checkNoCaches(SRVS + CLIENTS); - ignite(0).active(true); + ignite(0).cluster().active(true); checkCache(client, CU.UTILITY_CACHE_NAME, true); - assertTrue(client.active()); + assertTrue(client.cluster().active()); checkCaches1(SRVS + CLIENTS); @@ -900,27 +880,22 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest final AtomicReference<IgniteInternalFuture> stateFut = new AtomicReference<>(); - IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, new Runnable() { - @Override public void run() { - if (transition) { - blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER); - - stateFut.set(GridTestUtils.runAsync(new Runnable() { - @Override public void run() { - srv.active(true); - } - }, "activate")); - - try { - U.sleep(500); - } - catch (Exception e) { - e.printStackTrace(); - } + IgniteClientReconnectAbstractTest.reconnectClientNode(log, client, srv, () -> { + if (transition) { + blockExchangeSingleMessage(spi1, STATE_CHANGE_TOP_VER); + + stateFut.set(GridTestUtils.runAsync(() -> srv.cluster().active(true), + "activate")); + + try { + U.sleep(500); + } + catch (Exception e) { + e.printStackTrace(); } - else - srv.active(true); } + else + srv.cluster().active(true); }); if (transition) { @@ -989,7 +964,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest checkRecordedMessages(false); - ignite(0).active(true); + ignite(0).cluster().active(true); checkCaches1(SRVS + CLIENTS); @@ -1033,12 +1008,10 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest client = false; // Start one more node while transition is in progress. - IgniteInternalFuture startFut = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - startGrid(8); + IgniteInternalFuture<Void> startFut = GridTestUtils.runAsync(() -> { + startGrid(8); - return null; - } + return null; }, "start-node"); U.sleep(500); @@ -1061,7 +1034,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest if (!activate) { checkNoCaches(9); - ignite(0).active(true); + ignite(0).cluster().active(true); } checkCaches1(9); @@ -1092,19 +1065,16 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest client = false; // Start more nodes while transition is in progress. - IgniteInternalFuture startFut1 = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - startGrid(8); + IgniteInternalFuture<Void> startFut1 = GridTestUtils.runAsync(() -> { + startGrid(8); - return null; - } + return null; }, "start-node1"); - IgniteInternalFuture startFut2 = GridTestUtils.runAsync(new Callable() { - @Override public Object call() throws Exception { - startGrid(9); - return null; - } + IgniteInternalFuture<Void> startFut2 = GridTestUtils.runAsync(() -> { + startGrid(9); + + return null; }, "start-node2"); U.sleep(500); @@ -1132,7 +1102,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest if (!activate) { checkNoCaches(10); - ignite(0).active(true); + ignite(0).cluster().active(true); } checkCaches1(10); @@ -1214,7 +1184,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest ((IgniteEx)node).context().state().publicApiActiveState(true); - GridCacheAdapter cache = ((IgniteKernal)node).context().cache().internalCache(cacheName); + GridCacheAdapter cache = ((IgniteEx)node).context().cache().internalCache(cacheName); if (exp) assertNotNull("Cache not found [cache=" + cacheName + ", node=" + node.name() + ']', cache); @@ -1229,7 +1199,7 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest for (int i = 0; i < nodes; i++) { grid(i).context().state().publicApiActiveState(true); - GridCacheProcessor cache = ((IgniteKernal)ignite(i)).context().cache(); + GridCacheProcessor cache = ((IgniteEx)ignite(i)).context().cache(); assertTrue(cache.caches().isEmpty()); assertTrue(cache.internalCaches().isEmpty());