Mmuzaf commented on a change in pull request #86: URL: https://github.com/apache/ignite-extensions/pull/86#discussion_r783876321
########## File path: modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.plugin.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; +import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.CacheTopologyValidatorProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +import static java.lang.Boolean.FALSE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** */ +public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator { + /** */ + public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = "org.apache.ignite.topology.validator.enabled"; + + /** */ + private static final int[] TOP_CHANGED_EVTS = new int[] { + EVT_NODE_LEFT, + EVT_NODE_JOINED, + EVT_NODE_FAILED + }; + + /** */ + private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = new SimpleDistributedProperty<>( + TOP_VALIDATOR_ENABLED_PROP_NAME, + Boolean::parseBoolean + ); + + /** Ignite kernel context. */ + private GridKernalContext ctx; + + /** Ignite logger. */ + private IgniteLogger log; + + /** */ + private long lastCheckedTopVer; + + /** */ + private volatile State state; + + /** {@inheritDoc} */ + @Override public String name() { + return "Topology Validator"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0.0"; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin() { + return (T) new IgnitePlugin() { + // No-op. + }; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) { + ctx = ((IgniteEx)pluginCtx.grid()).context(); + + if (!ctx.clientNode()) { + registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() { + /** {@inheritDoc} */ + @Override public TopologyValidator topologyValidator(String cacheName) { + return CacheTopologyValidatorPluginProvider.this; + } + }); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext pluginCtx) { + if (ctx.clientNode()) + return; + + log = ctx.log(getClass()); + + onGlobalClusterStateChanged(ctx.state().clusterState().state()); + + ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateFinishMessage.class, + (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state()) + ); + + ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal); + }); + + ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout()); + }); + + ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( + new DistributedConfigurationLifecycleListener() { + /** {@inheritDoc} */ + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + dispatcher.registerProperty(topValidatorEnabledProp); + } + + /** {@inheritDoc} */ + @Override public void onReadyToWrite() { + boolean isLocNodeCrd = U.isLocalNodeCoordinator(ctx.discovery()); + + Boolean topValidatorEnabled = topValidatorEnabledProp.get(); + + if (topValidatorEnabled == null && !isLocNodeCrd || FALSE.equals(topValidatorEnabled)) { + U.warn(log, "Topology Validator will be disabled because it is not configured for the" + + " cluster the current node joined. Make sure the Topology Validator plugin is" + + " configured on all cluster nodes."); + } + + setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, log); + } + }); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return state; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + if (ctx.localNodeId().equals(nodeId)) + state = (State)data; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException { + if (node.isClient()) + return; + + if (data == null) { + String msg = "The Topology Validator plugin is not configured for the server node that is" + + " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" + + " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + + // If the new node is joining but some node failed/left events has not been handled by + // {@linkTopologyChangedEventListener} yet, we cannot guarantee that the {@link state} on the joining node will + // be consistent with one that on the cluster nodes. + if (state == State.VALID) { + DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0)); + + if (discoCache != null) { + for (ClusterNode srv : discoCache.serverNodes()) { + if (!ctx.discovery().alive(srv)) { + String msg = "Node join request was rejected due to concurrent node left" + + " process handling [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean validate(Collection<ClusterNode> nodes) { + assert state != null; + + return isDisabled() || state != State.INVALID; + } + + /** */ + private boolean isDisabled() { + return !topValidatorEnabledProp.getOrDefault(false); + } + + /** */ + private void onGlobalClusterStateChanged(ClusterState clusterState) { + state = clusterState == ACTIVE ? State.VALID : State.CLUSTER_WRITE_BLOCKED; + } + + /** + * Current implementation of segmentation detection compares node of each topology with configured baseline nodes. + * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change + * and the comparison described above makes no sense. + */ + private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long timeout) { + if (isDisabled()) + return false; + + if (enabled && timeout == 0L) { + U.warn(log, "Topology Validator is currently skipping validation of topology changes because" + + " Baseline Auto Adjustment with zero timeout is configured for the cluster. Configure" + + " Baseline Nodes explicitly or set Baseline Auto Adjustment Timeout to greater than zero."); + + return false; + } + + return true; + } + + /** */ + private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + State locStateCopy = state; + + lastCheckedTopVer = evt.topologyVersion(); + + boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration( + ctx.state().isBaselineAutoAdjustEnabled(), + ctx.state().baselineAutoAdjustTimeout() + ); + + if (!skipTopChangeCheck && state == State.VALID && evt.type() == EVT_NODE_FAILED) { + List<? extends BaselineNode> baselineNodes = discoCache.baselineNodes(); + + // Actually Ignite considers segmentation as the sequential node failures. So we detect segmentation + // even if the single node fails and less than half of baseline nodes are alive. + if (baselineNodes != null && aliveBaselineNodes(baselineNodes) < baselineNodes.size() / 2 + 1) { + locStateCopy = State.INVALID; + + try { + ctx.closure().runLocal(new GridPlainRunnable() { + @Override public void run() { + try { + ctx.cluster().get().state(ACTIVE_READ_ONLY); + } + catch (Throwable e) { + U.error(log, + "Failed to automatically switch state of the segmented cluster to the READ-ONLY" + + " mode. Cache writes were already restricted for all configured caches, but this" + + " step is still required in order to be able to unlock cache writes in the future." + + " Retry this operation manually, if possible [segmentedNodes=" + + formatTopologyNodes(discoCache.allNodes()) + "]", e); + } + } + }, PUBLIC_POOL); + } catch (Throwable e) { + U.error(log, "Failed to schedule cluster state change to the READ-ONLY mode.", e); + } + + U.warn(log, "Cluster segmentation was detected. Write to all user caches were blocked" + + " [segmentedNodes=" + formatTopologyNodes(discoCache.allNodes()) + ']'); + } + } + + state = locStateCopy; + } + + /** {@inheritDoc} */ + @Override public int order() { + return 0; + } + + /** + * @return Count of alive baseline nodes. + * Note that the following implementation is tied to how {@link DiscoCache#baselineNodes()} collection is + * populated. + */ + private int aliveBaselineNodes(Collection<? extends BaselineNode> baselineNodes) { + int res = 0; + + for (BaselineNode node : baselineNodes) { + if (!(node instanceof DetachedClusterNode)) + ++res; + } + + return res; + } + + /** @return String representation of the specified cluster node collection. */ + private String formatTopologyNodes(Collection<ClusterNode> nodes) { Review comment: I think it's better to use `F.viewReadOnly(nodes, F.node2id())` instead. ########## File path: modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.plugin.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; +import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.CacheTopologyValidatorProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +import static java.lang.Boolean.FALSE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** */ +public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator { + /** */ + public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = "org.apache.ignite.topology.validator.enabled"; + + /** */ + private static final int[] TOP_CHANGED_EVTS = new int[] { + EVT_NODE_LEFT, + EVT_NODE_JOINED, + EVT_NODE_FAILED + }; + + /** */ + private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = new SimpleDistributedProperty<>( + TOP_VALIDATOR_ENABLED_PROP_NAME, + Boolean::parseBoolean + ); + + /** Ignite kernel context. */ + private GridKernalContext ctx; + + /** Ignite logger. */ + private IgniteLogger log; + + /** */ + private long lastCheckedTopVer; + + /** */ + private volatile State state; + + /** {@inheritDoc} */ + @Override public String name() { + return "Topology Validator"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0.0"; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin() { + return (T) new IgnitePlugin() { + // No-op. + }; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) { + ctx = ((IgniteEx)pluginCtx.grid()).context(); + + if (!ctx.clientNode()) { + registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() { + /** {@inheritDoc} */ + @Override public TopologyValidator topologyValidator(String cacheName) { + return CacheTopologyValidatorPluginProvider.this; + } + }); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext pluginCtx) { + if (ctx.clientNode()) + return; + + log = ctx.log(getClass()); + + onGlobalClusterStateChanged(ctx.state().clusterState().state()); + + ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateFinishMessage.class, + (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state()) + ); + + ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal); + }); + + ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout()); + }); + + ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( + new DistributedConfigurationLifecycleListener() { + /** {@inheritDoc} */ + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + dispatcher.registerProperty(topValidatorEnabledProp); + } + + /** {@inheritDoc} */ + @Override public void onReadyToWrite() { + boolean isLocNodeCrd = U.isLocalNodeCoordinator(ctx.discovery()); + + Boolean topValidatorEnabled = topValidatorEnabledProp.get(); + + if (topValidatorEnabled == null && !isLocNodeCrd || FALSE.equals(topValidatorEnabled)) { + U.warn(log, "Topology Validator will be disabled because it is not configured for the" + + " cluster the current node joined. Make sure the Topology Validator plugin is" + + " configured on all cluster nodes."); + } + + setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, log); + } + }); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return state; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + if (ctx.localNodeId().equals(nodeId)) + state = (State)data; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException { + if (node.isClient()) + return; + + if (data == null) { + String msg = "The Topology Validator plugin is not configured for the server node that is" + + " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" + + " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + + // If the new node is joining but some node failed/left events has not been handled by + // {@linkTopologyChangedEventListener} yet, we cannot guarantee that the {@link state} on the joining node will Review comment: ```suggestion // {@link TopologyChangedEventListener} yet, we cannot guarantee that the {@code state} on the joining node will ``` ########## File path: modules/topology-validator-ext/src/test/java/com/sbt/ignite/cache/IgniteCacheTopologyValidatorTest.java ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.cache; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import com.sbt.ignite.plugin.cache.CacheTopologyValidatorPluginProvider; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils.RunnableX; +import org.junit.Test; + +import static com.sbt.ignite.plugin.cache.CacheTopologyValidatorPluginProvider.TOP_VALIDATOR_ENABLED_PROP_NAME; +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR; +import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT; +import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +public class IgniteCacheTopologyValidatorTest extends IgniteCacheTopologySplitAbstractTest { + /** */ + private static final String LOCAL_HOST = "localhost"; + + /** */ + private static final int CACHE_KEY_CNT = 1000; + + /** */ + public static final int CACHE_CNT = 2; + + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return getConfiguration(igniteInstanceName, true); + } + + /** */ + private IgniteConfiguration getConfiguration( + String igniteInstanceName, + boolean configureSegmentationResolverPlugin + ) throws Exception { + int idx = getTestIgniteInstanceIndex(igniteInstanceName); + + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName) + .setUserAttributes(singletonMap(IDX_ATTR, idx)); + + if (configureSegmentationResolverPlugin) + cfg.setPluginProviders(new CacheTopologyValidatorPluginProvider()); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()) + .setIpFinder(sharedStaticIpFinder) + .setLocalPortRange(1) + .setLocalPort(discoPort(idx)) + .setConnectionRecoveryTimeout(0); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected boolean isBlocked(int locPort, int rmtPort) { + return isDiscoPort(locPort) && isDiscoPort(rmtPort) && segment(locPort) != segment(rmtPort); + } + + /** */ + private int segment(int discoPort) { + return (discoPort - DFLT_PORT) % 2 == 0 ? 0 : 1; + } + + /** */ + @Override public int segment(ClusterNode node) { + return node.<Integer>attribute(IDX_ATTR) % 2 == 0 ? 0 : 1; + } + + /** */ + @Test + public void testConnectionToIncompatibleCluster() throws Exception { + startGrid(getConfiguration(getTestIgniteInstanceName(0), false)); + + startGrid(1); + + assertTrue(waitForCondition( + () -> !(Boolean)grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_ENABLED_PROP_NAME).get(), + getTestTimeout() + )); + + splitAndWait(); + + connectNodeToSegment(3, false, 1); + + assertTrue(waitForCondition( + () -> !(Boolean)grid(3).context().distributedConfiguration().property(TOP_VALIDATOR_ENABLED_PROP_NAME).get(), + getTestTimeout() + )); + } + + /** */ + @Test + public void testIncompatibleNodeConnection() throws Exception { + IgniteEx srv = startGrid(0); + + createCaches(); + + assertThrowsAnyCause( + log, + () -> startGrid(getConfiguration(getTestIgniteInstanceName(1), false)), + IgniteSpiException.class, + "The Topology Validator plugin is not configured for the server node that is trying to join the cluster." + ); + + startClientGrid(getConfiguration(getTestIgniteInstanceName(2), false)); + + assertEquals(2, srv.cluster().nodes().size()); + + checkPutGet(G.allGrids(), true); + } + + /** */ + @Test + public void testConnectionToSegmentedCluster() throws Exception { + startGridsMultiThreaded(6); + + grid(0).cluster().baselineAutoAdjustEnabled(false); + + createCaches(); + + stopGrid(4); + stopGrid(5); + + splitAndWait(); + + checkPutGet(G.allGrids(), false); + + connectNodeToSegment(4, false, 0); + connectNodeToSegment(6, true, 0); + checkPutGet(0, false); + + connectNodeToSegment(5, false, 1); + connectNodeToSegment(7, true, 1); + checkPutGet(1, false); + + stopSegmentNodes(1); + + unsplit(); + + startGrid(1); + + checkPutGet(G.allGrids(), false); + } + + /** */ + @Test + public void testRegularNodeStartStop() throws Exception { + startGrid(0); + + createCaches(); + + checkPutGetAfter(() -> startGrid(1)); + checkPutGetAfter(() -> stopGrid(1)); + + checkPutGetAfter(() -> startClientGrid(2)); + checkPutGetAfter(() -> stopGrid(2)); + + startGrid(1); + + grid(0).cluster().baselineAutoAdjustEnabled(false); + + checkPutGetAfter(() -> startGrid(3)); + checkPutGetAfter(() -> stopGrid(3)); + + checkPutGetAfter(() -> stopGrid(1)); + + checkPutGetAfter(() -> startClientGrid(2)); + checkPutGetAfter(() -> stopGrid(2)); + } + + /** */ + @Test + public void testClientNodeSegmentationIgnored() throws Exception { + IgniteEx srv = startGrid(0); + + startClientGrid(1); + + srv.cluster().baselineAutoAdjustEnabled(false); + + createCaches(); + + failNode(1, Collections.singleton(srv)); + + checkPutGet(Collections.singleton(srv), true); + } + + /** */ + @Test + public void testSplitWithoutBaseline() throws Exception { + startGridsMultiThreaded(4); + + createCaches(); + + splitAndWait(); + + checkPutGet(G.allGrids(), true); + } + + /** */ + @Test + public void testSplitWithBaseline() throws Exception { + startGridsMultiThreaded(3); + + grid(0).cluster().baselineAutoAdjustEnabled(false); + + createCaches(); + + startGrid(3); + + splitAndWait(); + + connectNodeToSegment(4, true, 0); + connectNodeToSegment(5, true, 1); + + checkPutGet(0, true); + checkPutGet(1, false); + + assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(1).cluster().state(), getTestTimeout())); + + stopSegmentNodes(1); + stopGrid(4); + + unsplit(); + + startGrid(1); + startGrid(3); + + grid(0).cluster().setBaselineTopology(grid(0).cluster().topologyVersion()); + + splitAndWait(); + + checkPutGet(G.allGrids(), false); + + assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(1).cluster().state(), getTestTimeout())); + assertTrue(waitForCondition(() -> ACTIVE_READ_ONLY == grid(0).cluster().state(), getTestTimeout())); + + grid(0).cluster().state(ACTIVE); + + checkPutGet(0, true); + checkPutGet(1, false); + } + + /** */ + @Test + public void testConsequentSegmentationResolving() throws Exception { + startGridsMultiThreaded(4); + + grid(0).cluster().baselineAutoAdjustEnabled(false); + + createCaches(); + + splitAndWait(); + + checkPutGet(G.allGrids(), false); + + grid(1).cluster().state(ACTIVE); + + checkPutGet(0, false); + checkPutGet(1, true); + + stopSegmentNodes(0); + + unsplit(); + + failNode(1, Collections.singleton(grid(3))); + + checkPutGet(Collections.singleton(grid(3)), false); + + grid(3).cluster().state(ACTIVE); + + checkPutGet(Collections.singleton(grid(3)), true); + } + + /** */ + @Test + public void testEnableProperty() throws Exception { + startGridsMultiThreaded(4); + + grid(0).cluster().baselineAutoAdjustEnabled(false); + + createCaches(); + + grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_ENABLED_PROP_NAME).propagate(false); + + splitAndWait(); + + connectNodeToSegment(4, true, 0); + connectNodeToSegment(5, true, 1); + + checkPutGet(G.allGrids(), true); + + stopSegmentNodes(0); + + unsplit(); + + grid(1).context().distributedConfiguration().property(TOP_VALIDATOR_ENABLED_PROP_NAME).propagate(true); + + failNode(1, Collections.singleton(grid(3))); + + checkPutGet(Collections.singleton(grid(3)), false); + } + + /** */ + @Test + public void testNodeJoinWithHalfBaselineNodesLeft() throws Exception { + startGridsMultiThreaded(4); + + grid(0).cluster().baselineAutoAdjustEnabled(false); + + createCaches(); + + stopGrid(0); + stopGrid(1); + stopGrid(2); + + checkPutGet(G.allGrids(), true); + + startGrid(0); + + checkPutGet(G.allGrids(), true); + } + + /** */ + @Test + public void testNodeJoinConcurrentWithLeftRejected() throws Exception { + IgniteEx srv = startGrids(2); + + grid(0).cluster().baselineAutoAdjustEnabled(false); + + createCaches(); + + CountDownLatch discoveryWorkerBlockedLatch = new CountDownLatch(1); + + try { + srv.events().localListen(evt -> { + try { + discoveryWorkerBlockedLatch.await(); + } + catch (InterruptedException e) { + U.error(log, e); Review comment: I think we should fail the test in this case. ########## File path: modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.plugin.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; +import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.CacheTopologyValidatorProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +import static java.lang.Boolean.FALSE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** */ +public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator { + /** */ + public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = "org.apache.ignite.topology.validator.enabled"; + + /** */ + private static final int[] TOP_CHANGED_EVTS = new int[] { + EVT_NODE_LEFT, + EVT_NODE_JOINED, + EVT_NODE_FAILED + }; + + /** */ + private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = new SimpleDistributedProperty<>( + TOP_VALIDATOR_ENABLED_PROP_NAME, + Boolean::parseBoolean + ); + + /** Ignite kernel context. */ + private GridKernalContext ctx; + + /** Ignite logger. */ + private IgniteLogger log; + + /** */ + private long lastCheckedTopVer; + + /** */ + private volatile State state; + + /** {@inheritDoc} */ + @Override public String name() { + return "Topology Validator"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0.0"; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin() { + return (T) new IgnitePlugin() { + // No-op. + }; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) { + ctx = ((IgniteEx)pluginCtx.grid()).context(); + + if (!ctx.clientNode()) { + registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() { + /** {@inheritDoc} */ + @Override public TopologyValidator topologyValidator(String cacheName) { + return CacheTopologyValidatorPluginProvider.this; + } + }); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext pluginCtx) { + if (ctx.clientNode()) + return; + + log = ctx.log(getClass()); + + onGlobalClusterStateChanged(ctx.state().clusterState().state()); + + ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateFinishMessage.class, + (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state()) + ); + + ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal); + }); + + ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout()); + }); + + ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( + new DistributedConfigurationLifecycleListener() { + /** {@inheritDoc} */ + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + dispatcher.registerProperty(topValidatorEnabledProp); + } + + /** {@inheritDoc} */ + @Override public void onReadyToWrite() { + boolean isLocNodeCrd = U.isLocalNodeCoordinator(ctx.discovery()); + + Boolean topValidatorEnabled = topValidatorEnabledProp.get(); + + if (topValidatorEnabled == null && !isLocNodeCrd || FALSE.equals(topValidatorEnabled)) { + U.warn(log, "Topology Validator will be disabled because it is not configured for the" + + " cluster the current node joined. Make sure the Topology Validator plugin is" + + " configured on all cluster nodes."); + } + + setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, log); + } + }); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return state; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + if (ctx.localNodeId().equals(nodeId)) + state = (State)data; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException { + if (node.isClient()) + return; + + if (data == null) { + String msg = "The Topology Validator plugin is not configured for the server node that is" + + " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" + + " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + + // If the new node is joining but some node failed/left events has not been handled by + // {@linkTopologyChangedEventListener} yet, we cannot guarantee that the {@link state} on the joining node will + // be consistent with one that on the cluster nodes. + if (state == State.VALID) { + DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0)); + + if (discoCache != null) { + for (ClusterNode srv : discoCache.serverNodes()) { + if (!ctx.discovery().alive(srv)) { + String msg = "Node join request was rejected due to concurrent node left" + + " process handling [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean validate(Collection<ClusterNode> nodes) { + assert state != null; + + return isDisabled() || state != State.INVALID; + } + + /** */ + private boolean isDisabled() { + return !topValidatorEnabledProp.getOrDefault(false); + } + + /** */ + private void onGlobalClusterStateChanged(ClusterState clusterState) { + state = clusterState == ACTIVE ? State.VALID : State.CLUSTER_WRITE_BLOCKED; + } + + /** + * Current implementation of segmentation detection compares node of each topology with configured baseline nodes. + * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change + * and the comparison described above makes no sense. + */ + private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long timeout) { + if (isDisabled()) + return false; + + if (enabled && timeout == 0L) { + U.warn(log, "Topology Validator is currently skipping validation of topology changes because" + + " Baseline Auto Adjustment with zero timeout is configured for the cluster. Configure" + + " Baseline Nodes explicitly or set Baseline Auto Adjustment Timeout to greater than zero."); + + return false; + } + + return true; + } + + /** */ + private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + State locStateCopy = state; + + lastCheckedTopVer = evt.topologyVersion(); + + boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration( + ctx.state().isBaselineAutoAdjustEnabled(), + ctx.state().baselineAutoAdjustTimeout() + ); + + if (!skipTopChangeCheck && state == State.VALID && evt.type() == EVT_NODE_FAILED) { Review comment: Should we handle the `ENT_NODE_LEFT` too? ########## File path: modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.plugin.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; +import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.CacheTopologyValidatorProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +import static java.lang.Boolean.FALSE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** */ +public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator { + /** */ + public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = "org.apache.ignite.topology.validator.enabled"; + + /** */ + private static final int[] TOP_CHANGED_EVTS = new int[] { + EVT_NODE_LEFT, + EVT_NODE_JOINED, + EVT_NODE_FAILED + }; + + /** */ + private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = new SimpleDistributedProperty<>( + TOP_VALIDATOR_ENABLED_PROP_NAME, + Boolean::parseBoolean + ); + + /** Ignite kernel context. */ + private GridKernalContext ctx; + + /** Ignite logger. */ + private IgniteLogger log; + + /** */ + private long lastCheckedTopVer; + + /** */ + private volatile State state; + + /** {@inheritDoc} */ + @Override public String name() { + return "Topology Validator"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0.0"; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin() { + return (T) new IgnitePlugin() { + // No-op. + }; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) { + ctx = ((IgniteEx)pluginCtx.grid()).context(); + + if (!ctx.clientNode()) { + registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() { + /** {@inheritDoc} */ + @Override public TopologyValidator topologyValidator(String cacheName) { + return CacheTopologyValidatorPluginProvider.this; + } + }); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext pluginCtx) { + if (ctx.clientNode()) + return; + + log = ctx.log(getClass()); + + onGlobalClusterStateChanged(ctx.state().clusterState().state()); + + ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateFinishMessage.class, + (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state()) + ); + + ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal); + }); + + ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout()); + }); + + ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( + new DistributedConfigurationLifecycleListener() { + /** {@inheritDoc} */ + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + dispatcher.registerProperty(topValidatorEnabledProp); + } + + /** {@inheritDoc} */ + @Override public void onReadyToWrite() { + boolean isLocNodeCrd = U.isLocalNodeCoordinator(ctx.discovery()); + + Boolean topValidatorEnabled = topValidatorEnabledProp.get(); + + if (topValidatorEnabled == null && !isLocNodeCrd || FALSE.equals(topValidatorEnabled)) { + U.warn(log, "Topology Validator will be disabled because it is not configured for the" + + " cluster the current node joined. Make sure the Topology Validator plugin is" + + " configured on all cluster nodes."); + } + + setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, log); + } + }); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return state; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + if (ctx.localNodeId().equals(nodeId)) + state = (State)data; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException { + if (node.isClient()) + return; + + if (data == null) { + String msg = "The Topology Validator plugin is not configured for the server node that is" + + " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" + + " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + + // If the new node is joining but some node failed/left events has not been handled by + // {@linkTopologyChangedEventListener} yet, we cannot guarantee that the {@link state} on the joining node will + // be consistent with one that on the cluster nodes. + if (state == State.VALID) { + DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0)); + + if (discoCache != null) { + for (ClusterNode srv : discoCache.serverNodes()) { + if (!ctx.discovery().alive(srv)) { + String msg = "Node join request was rejected due to concurrent node left" + + " process handling [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean validate(Collection<ClusterNode> nodes) { + assert state != null; + + return isDisabled() || state != State.INVALID; + } + + /** */ + private boolean isDisabled() { + return !topValidatorEnabledProp.getOrDefault(false); + } + + /** */ + private void onGlobalClusterStateChanged(ClusterState clusterState) { + state = clusterState == ACTIVE ? State.VALID : State.CLUSTER_WRITE_BLOCKED; + } + + /** + * Current implementation of segmentation detection compares node of each topology with configured baseline nodes. + * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change + * and the comparison described above makes no sense. + */ + private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long timeout) { + if (isDisabled()) + return false; + + if (enabled && timeout == 0L) { + U.warn(log, "Topology Validator is currently skipping validation of topology changes because" + + " Baseline Auto Adjustment with zero timeout is configured for the cluster. Configure" + + " Baseline Nodes explicitly or set Baseline Auto Adjustment Timeout to greater than zero."); + + return false; + } + + return true; + } + + /** */ + private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + State locStateCopy = state; + + lastCheckedTopVer = evt.topologyVersion(); + + boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration( + ctx.state().isBaselineAutoAdjustEnabled(), + ctx.state().baselineAutoAdjustTimeout() + ); + + if (!skipTopChangeCheck && state == State.VALID && evt.type() == EVT_NODE_FAILED) { + List<? extends BaselineNode> baselineNodes = discoCache.baselineNodes(); + + // Actually Ignite considers segmentation as the sequential node failures. So we detect segmentation + // even if the single node fails and less than half of baseline nodes are alive. + if (baselineNodes != null && aliveBaselineNodes(baselineNodes) < baselineNodes.size() / 2 + 1) { Review comment: Can we move this one to the configurable system property with the default value e.g. `IGNITE_CLUSTER_INACTIVATION_THRESHOLD`? `aliveBaselineNodes(baselineNodes) < baselineNodes.size() / 2 + 1` ########## File path: modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.plugin.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; +import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.CacheTopologyValidatorProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +import static java.lang.Boolean.FALSE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** */ +public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator { + /** */ + public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = "org.apache.ignite.topology.validator.enabled"; + + /** */ + private static final int[] TOP_CHANGED_EVTS = new int[] { + EVT_NODE_LEFT, + EVT_NODE_JOINED, + EVT_NODE_FAILED + }; + + /** */ + private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = new SimpleDistributedProperty<>( + TOP_VALIDATOR_ENABLED_PROP_NAME, + Boolean::parseBoolean + ); + + /** Ignite kernel context. */ + private GridKernalContext ctx; + + /** Ignite logger. */ + private IgniteLogger log; + + /** */ + private long lastCheckedTopVer; + + /** */ + private volatile State state; + + /** {@inheritDoc} */ + @Override public String name() { + return "Topology Validator"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0.0"; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin() { + return (T) new IgnitePlugin() { + // No-op. + }; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) { + ctx = ((IgniteEx)pluginCtx.grid()).context(); + + if (!ctx.clientNode()) { + registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() { + /** {@inheritDoc} */ + @Override public TopologyValidator topologyValidator(String cacheName) { + return CacheTopologyValidatorPluginProvider.this; + } + }); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext pluginCtx) { + if (ctx.clientNode()) + return; + + log = ctx.log(getClass()); + + onGlobalClusterStateChanged(ctx.state().clusterState().state()); + + ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateFinishMessage.class, + (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state()) + ); + + ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal); + }); + + ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout()); + }); + + ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( + new DistributedConfigurationLifecycleListener() { + /** {@inheritDoc} */ + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + dispatcher.registerProperty(topValidatorEnabledProp); + } + + /** {@inheritDoc} */ + @Override public void onReadyToWrite() { + boolean isLocNodeCrd = U.isLocalNodeCoordinator(ctx.discovery()); + + Boolean topValidatorEnabled = topValidatorEnabledProp.get(); + + if (topValidatorEnabled == null && !isLocNodeCrd || FALSE.equals(topValidatorEnabled)) { + U.warn(log, "Topology Validator will be disabled because it is not configured for the" + + " cluster the current node joined. Make sure the Topology Validator plugin is" + + " configured on all cluster nodes."); + } + + setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, log); + } + }); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return state; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + if (ctx.localNodeId().equals(nodeId)) + state = (State)data; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException { + if (node.isClient()) + return; + + if (data == null) { + String msg = "The Topology Validator plugin is not configured for the server node that is" + + " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" + + " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + + // If the new node is joining but some node failed/left events has not been handled by + // {@linkTopologyChangedEventListener} yet, we cannot guarantee that the {@link state} on the joining node will + // be consistent with one that on the cluster nodes. + if (state == State.VALID) { + DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0)); + + if (discoCache != null) { + for (ClusterNode srv : discoCache.serverNodes()) { + if (!ctx.discovery().alive(srv)) { + String msg = "Node join request was rejected due to concurrent node left" + + " process handling [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean validate(Collection<ClusterNode> nodes) { + assert state != null; + + return isDisabled() || state != State.INVALID; + } + + /** */ + private boolean isDisabled() { + return !topValidatorEnabledProp.getOrDefault(false); + } + + /** */ + private void onGlobalClusterStateChanged(ClusterState clusterState) { + state = clusterState == ACTIVE ? State.VALID : State.CLUSTER_WRITE_BLOCKED; + } + + /** + * Current implementation of segmentation detection compares node of each topology with configured baseline nodes. + * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change + * and the comparison described above makes no sense. + */ + private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long timeout) { Review comment: Let's be more precise here with the method naming e.g. `isTopologyValidationAppicable`. ########## File path: modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.plugin.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; +import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.CacheTopologyValidatorProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +import static java.lang.Boolean.FALSE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** */ +public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator { + /** */ + public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = "org.apache.ignite.topology.validator.enabled"; + + /** */ + private static final int[] TOP_CHANGED_EVTS = new int[] { + EVT_NODE_LEFT, + EVT_NODE_JOINED, + EVT_NODE_FAILED + }; + + /** */ + private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = new SimpleDistributedProperty<>( + TOP_VALIDATOR_ENABLED_PROP_NAME, + Boolean::parseBoolean + ); + + /** Ignite kernel context. */ + private GridKernalContext ctx; + + /** Ignite logger. */ + private IgniteLogger log; + + /** */ + private long lastCheckedTopVer; + + /** */ + private volatile State state; + + /** {@inheritDoc} */ + @Override public String name() { + return "Topology Validator"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0.0"; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin() { + return (T) new IgnitePlugin() { + // No-op. + }; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) { + ctx = ((IgniteEx)pluginCtx.grid()).context(); + + if (!ctx.clientNode()) { + registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() { + /** {@inheritDoc} */ + @Override public TopologyValidator topologyValidator(String cacheName) { + return CacheTopologyValidatorPluginProvider.this; + } + }); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext pluginCtx) { + if (ctx.clientNode()) + return; + + log = ctx.log(getClass()); + + onGlobalClusterStateChanged(ctx.state().clusterState().state()); + + ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateFinishMessage.class, + (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state()) + ); + + ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal); + }); + + ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout()); + }); + + ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( + new DistributedConfigurationLifecycleListener() { + /** {@inheritDoc} */ + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + dispatcher.registerProperty(topValidatorEnabledProp); + } + + /** {@inheritDoc} */ + @Override public void onReadyToWrite() { + boolean isLocNodeCrd = U.isLocalNodeCoordinator(ctx.discovery()); + + Boolean topValidatorEnabled = topValidatorEnabledProp.get(); + + if (topValidatorEnabled == null && !isLocNodeCrd || FALSE.equals(topValidatorEnabled)) { + U.warn(log, "Topology Validator will be disabled because it is not configured for the" + + " cluster the current node joined. Make sure the Topology Validator plugin is" + + " configured on all cluster nodes."); + } + + setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, log); + } + }); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return state; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + if (ctx.localNodeId().equals(nodeId)) + state = (State)data; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException { + if (node.isClient()) + return; + + if (data == null) { + String msg = "The Topology Validator plugin is not configured for the server node that is" + + " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" + + " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + + // If the new node is joining but some node failed/left events has not been handled by + // {@linkTopologyChangedEventListener} yet, we cannot guarantee that the {@link state} on the joining node will + // be consistent with one that on the cluster nodes. + if (state == State.VALID) { + DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0)); + + if (discoCache != null) { + for (ClusterNode srv : discoCache.serverNodes()) { + if (!ctx.discovery().alive(srv)) { + String msg = "Node join request was rejected due to concurrent node left" + + " process handling [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean validate(Collection<ClusterNode> nodes) { + assert state != null; + + return isDisabled() || state != State.INVALID; + } + + /** */ + private boolean isDisabled() { + return !topValidatorEnabledProp.getOrDefault(false); + } + + /** */ + private void onGlobalClusterStateChanged(ClusterState clusterState) { + state = clusterState == ACTIVE ? State.VALID : State.CLUSTER_WRITE_BLOCKED; + } + + /** + * Current implementation of segmentation detection compares node of each topology with configured baseline nodes. + * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change + * and the comparison described above makes no sense. + */ + private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long timeout) { + if (isDisabled()) + return false; + + if (enabled && timeout == 0L) { + U.warn(log, "Topology Validator is currently skipping validation of topology changes because" + + " Baseline Auto Adjustment with zero timeout is configured for the cluster. Configure" + + " Baseline Nodes explicitly or set Baseline Auto Adjustment Timeout to greater than zero."); + + return false; + } + + return true; + } + + /** */ + private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + State locStateCopy = state; + + lastCheckedTopVer = evt.topologyVersion(); + + boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration( + ctx.state().isBaselineAutoAdjustEnabled(), + ctx.state().baselineAutoAdjustTimeout() + ); + + if (!skipTopChangeCheck && state == State.VALID && evt.type() == EVT_NODE_FAILED) { + List<? extends BaselineNode> baselineNodes = discoCache.baselineNodes(); + + // Actually Ignite considers segmentation as the sequential node failures. So we detect segmentation + // even if the single node fails and less than half of baseline nodes are alive. + if (baselineNodes != null && aliveBaselineNodes(baselineNodes) < baselineNodes.size() / 2 + 1) { + locStateCopy = State.INVALID; + + try { + ctx.closure().runLocal(new GridPlainRunnable() { + @Override public void run() { + try { + ctx.cluster().get().state(ACTIVE_READ_ONLY); + } + catch (Throwable e) { + U.error(log, + "Failed to automatically switch state of the segmented cluster to the READ-ONLY" + + " mode. Cache writes were already restricted for all configured caches, but this" + + " step is still required in order to be able to unlock cache writes in the future." + + " Retry this operation manually, if possible [segmentedNodes=" + + formatTopologyNodes(discoCache.allNodes()) + "]", e); + } + } + }, PUBLIC_POOL); + } catch (Throwable e) { + U.error(log, "Failed to schedule cluster state change to the READ-ONLY mode.", e); + } + + U.warn(log, "Cluster segmentation was detected. Write to all user caches were blocked" + + " [segmentedNodes=" + formatTopologyNodes(discoCache.allNodes()) + ']'); + } + } + + state = locStateCopy; + } + + /** {@inheritDoc} */ + @Override public int order() { + return 0; + } + + /** + * @return Count of alive baseline nodes. + * Note that the following implementation is tied to how {@link DiscoCache#baselineNodes()} collection is + * populated. + */ + private int aliveBaselineNodes(Collection<? extends BaselineNode> baselineNodes) { Review comment: I think the `F.size` can be used instead. ########## File path: modules/topology-validator-ext/README.md ########## @@ -0,0 +1,64 @@ +#What problem this module is intended to solve? + +Some network issues can cause the Ignite cluster to split into several isolated parts - segments. Nodes from different +segments cannot communicate with each other, while nodes from the same segment do not experience communication problems. +In this case, each segment marks the nodes with which the connection was lost as failed and considers itself as an +independent Ignite cluster. Let's call this scenario cluster segmentation. + +Cluster segmentation can lead to cache data inconsistency across different segments because each segment can continue +to handle cache update requests independently. + +Apache Ignite allows the user to provide custom validation logic during cache configuration that will be applied to +each topology change, and if the validation fails, writes to the corresponding cache will be blocked. The mentioned +validation logic can be passed to Ignite cache configuration as an Ignite TopologyValidation interface implementation. + +This module represents an implementation of the Ignite TopologyValidator interface which provides the guarantee that +after cluster segmentation, no more than one segment can process write requests to the caches. + +The current implementation of TopologyValidation uses remaining Ignite baseline nodes in the topology to determine +segmentation. + +#In what cases cache writes will be blocked for the segment? + +The following rules are used to determine which segment can process cache write requests after segmentation and which +cannot: + +1. The segment is allowed to process cache writes requests after segmentation if and only if more than half of the +baseline nodes remain in the segment, otherwise all writes to the cache will be blocked. +2. If the cluster is split into two equal segments, writing to both of them will be blocked. +3. Since Ignite treats segmentation as sequential node failures, even a single node failure in a cluster in which only +half of the baseline nodes are alive is considered as segmentation and results in write blocks for all caches. + +#Configuration + +1. Configure SegmentationResolverPluginProvider on each server node: + + ``` + new IgniteConfiguration() + ... + .setPluginProviders(new SegmentationResolverPluginProvider()); + ``` + +2. Configure IgniteCacheTopologyValidator for each cache: Review comment: It seems the notes must be updated. ########## File path: modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.plugin.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; +import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.CacheTopologyValidatorProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +import static java.lang.Boolean.FALSE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** */ +public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator { + /** */ + public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = "org.apache.ignite.topology.validator.enabled"; + + /** */ + private static final int[] TOP_CHANGED_EVTS = new int[] { + EVT_NODE_LEFT, + EVT_NODE_JOINED, + EVT_NODE_FAILED + }; + + /** */ + private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = new SimpleDistributedProperty<>( + TOP_VALIDATOR_ENABLED_PROP_NAME, + Boolean::parseBoolean + ); + + /** Ignite kernel context. */ + private GridKernalContext ctx; + + /** Ignite logger. */ + private IgniteLogger log; + + /** */ + private long lastCheckedTopVer; + + /** */ + private volatile State state; + + /** {@inheritDoc} */ + @Override public String name() { + return "Topology Validator"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0.0"; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin() { + return (T) new IgnitePlugin() { + // No-op. + }; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) { + ctx = ((IgniteEx)pluginCtx.grid()).context(); + + if (!ctx.clientNode()) { + registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() { + /** {@inheritDoc} */ + @Override public TopologyValidator topologyValidator(String cacheName) { + return CacheTopologyValidatorPluginProvider.this; + } + }); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext pluginCtx) { + if (ctx.clientNode()) + return; + + log = ctx.log(getClass()); + + onGlobalClusterStateChanged(ctx.state().clusterState().state()); + + ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateFinishMessage.class, + (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state()) + ); + + ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal); + }); + + ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout()); + }); + + ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( + new DistributedConfigurationLifecycleListener() { + /** {@inheritDoc} */ + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + dispatcher.registerProperty(topValidatorEnabledProp); + } + + /** {@inheritDoc} */ + @Override public void onReadyToWrite() { + boolean isLocNodeCrd = U.isLocalNodeCoordinator(ctx.discovery()); + + Boolean topValidatorEnabled = topValidatorEnabledProp.get(); + + if (topValidatorEnabled == null && !isLocNodeCrd || FALSE.equals(topValidatorEnabled)) { + U.warn(log, "Topology Validator will be disabled because it is not configured for the" + + " cluster the current node joined. Make sure the Topology Validator plugin is" + + " configured on all cluster nodes."); + } + + setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, log); + } + }); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return state; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + if (ctx.localNodeId().equals(nodeId)) + state = (State)data; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException { + if (node.isClient()) + return; + + if (data == null) { + String msg = "The Topology Validator plugin is not configured for the server node that is" + + " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" + + " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + + // If the new node is joining but some node failed/left events has not been handled by + // {@linkTopologyChangedEventListener} yet, we cannot guarantee that the {@link state} on the joining node will + // be consistent with one that on the cluster nodes. + if (state == State.VALID) { + DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0)); + + if (discoCache != null) { + for (ClusterNode srv : discoCache.serverNodes()) { + if (!ctx.discovery().alive(srv)) { + String msg = "Node join request was rejected due to concurrent node left" + + " process handling [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean validate(Collection<ClusterNode> nodes) { + assert state != null; + + return isDisabled() || state != State.INVALID; + } + + /** */ + private boolean isDisabled() { + return !topValidatorEnabledProp.getOrDefault(false); + } + + /** */ + private void onGlobalClusterStateChanged(ClusterState clusterState) { + state = clusterState == ACTIVE ? State.VALID : State.CLUSTER_WRITE_BLOCKED; + } + + /** + * Current implementation of segmentation detection compares node of each topology with configured baseline nodes. + * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change + * and the comparison described above makes no sense. + */ + private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long timeout) { + if (isDisabled()) + return false; + + if (enabled && timeout == 0L) { + U.warn(log, "Topology Validator is currently skipping validation of topology changes because" + + " Baseline Auto Adjustment with zero timeout is configured for the cluster. Configure" + + " Baseline Nodes explicitly or set Baseline Auto Adjustment Timeout to greater than zero."); + + return false; + } + + return true; + } + + /** */ + private class TopologyChangedEventListener implements DiscoveryEventListener, HighPriorityListener { + /** {@inheritDoc} */ + @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) { + State locStateCopy = state; + + lastCheckedTopVer = evt.topologyVersion(); + + boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration( + ctx.state().isBaselineAutoAdjustEnabled(), + ctx.state().baselineAutoAdjustTimeout() + ); + + if (!skipTopChangeCheck && state == State.VALID && evt.type() == EVT_NODE_FAILED) { + List<? extends BaselineNode> baselineNodes = discoCache.baselineNodes(); + + // Actually Ignite considers segmentation as the sequential node failures. So we detect segmentation + // even if the single node fails and less than half of baseline nodes are alive. + if (baselineNodes != null && aliveBaselineNodes(baselineNodes) < baselineNodes.size() / 2 + 1) { + locStateCopy = State.INVALID; + + try { + ctx.closure().runLocal(new GridPlainRunnable() { + @Override public void run() { + try { + ctx.cluster().get().state(ACTIVE_READ_ONLY); + } + catch (Throwable e) { + U.error(log, + "Failed to automatically switch state of the segmented cluster to the READ-ONLY" + + " mode. Cache writes were already restricted for all configured caches, but this" + + " step is still required in order to be able to unlock cache writes in the future." + + " Retry this operation manually, if possible [segmentedNodes=" + + formatTopologyNodes(discoCache.allNodes()) + "]", e); + } + } + }, PUBLIC_POOL); + } catch (Throwable e) { + U.error(log, "Failed to schedule cluster state change to the READ-ONLY mode.", e); + } + + U.warn(log, "Cluster segmentation was detected. Write to all user caches were blocked" + + " [segmentedNodes=" + formatTopologyNodes(discoCache.allNodes()) + ']'); + } + } + + state = locStateCopy; + } + + /** {@inheritDoc} */ + @Override public int order() { + return 0; + } + + /** + * @return Count of alive baseline nodes. + * Note that the following implementation is tied to how {@link DiscoCache#baselineNodes()} collection is + * populated. + */ + private int aliveBaselineNodes(Collection<? extends BaselineNode> baselineNodes) { + int res = 0; + + for (BaselineNode node : baselineNodes) { + if (!(node instanceof DetachedClusterNode)) + ++res; + } + + return res; + } + + /** @return String representation of the specified cluster node collection. */ + private String formatTopologyNodes(Collection<ClusterNode> nodes) { + return nodes.stream().map(n -> n.id().toString()).collect(Collectors.joining(", ")); + } + } + + /** Represents possible states of the current segment. */ + private enum State { Review comment: Can we use the states from the `ClusterState` enum describing the current segmentation process? For instance, when the cluster segmentation is detected we set the `nextState=READ_ONLY` variable showing the cluster being in the transition state. When the transition process ends we set `nextState=null`. I don't think we need additional enum for the cluster states since it seems it is mapped directly 1 by 1. ########## File path: modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java ########## @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.plugin.cache; + +import java.io.Serializable; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.TopologyValidator; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.cluster.DetachedClusterNode; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; +import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; +import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty; +import org.apache.ignite.internal.util.lang.GridPlainRunnable; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.CachePluginContext; +import org.apache.ignite.plugin.CachePluginProvider; +import org.apache.ignite.plugin.CacheTopologyValidatorProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.IgnitePlugin; +import org.apache.ignite.plugin.PluginConfiguration; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.apache.ignite.plugin.PluginValidationException; +import org.jetbrains.annotations.Nullable; + +import static java.lang.Boolean.FALSE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL; + +/** */ +public class CacheTopologyValidatorPluginProvider implements PluginProvider<PluginConfiguration>, TopologyValidator { + /** */ + public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = "org.apache.ignite.topology.validator.enabled"; + + /** */ + private static final int[] TOP_CHANGED_EVTS = new int[] { + EVT_NODE_LEFT, + EVT_NODE_JOINED, + EVT_NODE_FAILED + }; + + /** */ + private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = new SimpleDistributedProperty<>( + TOP_VALIDATOR_ENABLED_PROP_NAME, + Boolean::parseBoolean + ); + + /** Ignite kernel context. */ + private GridKernalContext ctx; + + /** Ignite logger. */ + private IgniteLogger log; + + /** */ + private long lastCheckedTopVer; + + /** */ + private volatile State state; + + /** {@inheritDoc} */ + @Override public String name() { + return "Topology Validator"; + } + + /** {@inheritDoc} */ + @Override public String version() { + return "1.0.0"; + } + + /** {@inheritDoc} */ + @Override public <T extends IgnitePlugin> T plugin() { + return (T) new IgnitePlugin() { + // No-op. + }; + } + + /** {@inheritDoc} */ + @Override public String copyright() { + return ""; + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext pluginCtx, ExtensionRegistry registry) { + ctx = ((IgniteEx)pluginCtx.grid()).context(); + + if (!ctx.clientNode()) { + registry.registerExtension(CacheTopologyValidatorProvider.class, new CacheTopologyValidatorProvider() { + /** {@inheritDoc} */ + @Override public TopologyValidator topologyValidator(String cacheName) { + return CacheTopologyValidatorPluginProvider.this; + } + }); + } + } + + /** {@inheritDoc} */ + @Nullable @Override public <T> T createComponent(PluginContext ctx, Class<T> cls) { + return null; + } + + /** {@inheritDoc} */ + @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) { + return null; + } + + /** {@inheritDoc} */ + @Override public void start(PluginContext pluginCtx) { + if (ctx.clientNode()) + return; + + log = ctx.log(getClass()); + + onGlobalClusterStateChanged(ctx.state().clusterState().state()); + + ctx.event().addDiscoveryEventListener(new TopologyChangedEventListener(), TOP_CHANGED_EVTS); + + ctx.discovery().setCustomEventListener( + ChangeGlobalStateFinishMessage.class, + (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state()) + ); + + ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), newVal); + }); + + ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, oldVal, newVal) -> { + if (newVal != null) + checkBaselineAutoAdjustConfiguration(newVal, ctx.state().baselineAutoAdjustTimeout()); + }); + + ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener( + new DistributedConfigurationLifecycleListener() { + /** {@inheritDoc} */ + @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) { + dispatcher.registerProperty(topValidatorEnabledProp); + } + + /** {@inheritDoc} */ + @Override public void onReadyToWrite() { + boolean isLocNodeCrd = U.isLocalNodeCoordinator(ctx.discovery()); + + Boolean topValidatorEnabled = topValidatorEnabledProp.get(); + + if (topValidatorEnabled == null && !isLocNodeCrd || FALSE.equals(topValidatorEnabled)) { + U.warn(log, "Topology Validator will be disabled because it is not configured for the" + + " cluster the current node joined. Make sure the Topology Validator plugin is" + + " configured on all cluster nodes."); + } + + setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, log); + } + }); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStart() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void onIgniteStop(boolean cancel) { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) { + return state; + } + + /** {@inheritDoc} */ + @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) { + if (ctx.localNodeId().equals(nodeId)) + state = (State)data; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node) throws PluginValidationException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) throws PluginValidationException { + if (node.isClient()) + return; + + if (data == null) { + String msg = "The Topology Validator plugin is not configured for the server node that is" + + " trying to join the cluster. Since the Topology Validator is only applicable if all server nodes" + + " in the cluster have one, node join request will be rejected [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + + // If the new node is joining but some node failed/left events has not been handled by + // {@linkTopologyChangedEventListener} yet, we cannot guarantee that the {@link state} on the joining node will + // be consistent with one that on the cluster nodes. + if (state == State.VALID) { + DiscoCache discoCache = ctx.discovery().discoCache(new AffinityTopologyVersion(lastCheckedTopVer, 0)); + + if (discoCache != null) { + for (ClusterNode srv : discoCache.serverNodes()) { + if (!ctx.discovery().alive(srv)) { + String msg = "Node join request was rejected due to concurrent node left" + + " process handling [rejectedNodeId=" + node.id() + ']'; + + throw new PluginValidationException(msg, msg, node.id()); + } + } + } + } + } + + /** {@inheritDoc} */ + @Override public boolean validate(Collection<ClusterNode> nodes) { + assert state != null; + + return isDisabled() || state != State.INVALID; + } + + /** */ + private boolean isDisabled() { + return !topValidatorEnabledProp.getOrDefault(false); + } + + /** */ + private void onGlobalClusterStateChanged(ClusterState clusterState) { + state = clusterState == ACTIVE ? State.VALID : State.CLUSTER_WRITE_BLOCKED; + } + + /** + * Current implementation of segmentation detection compares node of each topology with configured baseline nodes. + * If baseline auto adjustment is configured with zero timeout - baseline is updated on each topology change + * and the comparison described above makes no sense. + */ + private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long timeout) { + if (isDisabled()) + return false; + + if (enabled && timeout == 0L) { + U.warn(log, "Topology Validator is currently skipping validation of topology changes because" + Review comment: Let's use `LT.warn` here and move it to the place where the method exacly called sicne this is a bad practice having logging in such chechking-methods. ########## File path: modules/topology-validator-ext/src/test/java/com/sbt/ignite/cache/IgniteCacheTopologyValidatorTest.java ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sbt.ignite.cache; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; +import com.sbt.ignite.plugin.cache.CacheTopologyValidatorPluginProvider; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; +import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils.RunnableX; +import org.junit.Test; + +import static com.sbt.ignite.plugin.cache.CacheTopologyValidatorPluginProvider.TOP_VALIDATOR_ENABLED_PROP_NAME; +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cluster.ClusterState.ACTIVE; +import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY; +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; +import static org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR; +import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT; +import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE; +import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +public class IgniteCacheTopologyValidatorTest extends IgniteCacheTopologySplitAbstractTest { + /** */ + private static final String LOCAL_HOST = "localhost"; + + /** */ + private static final int CACHE_KEY_CNT = 1000; + + /** */ + public static final int CACHE_CNT = 2; + + /** */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return getConfiguration(igniteInstanceName, true); + } + + /** */ + private IgniteConfiguration getConfiguration( Review comment: Can we parametrize test to check both in-memory and persistence cases? ########## File path: modules/topology-validator-ext/pom.xml ########## @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + POM file. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.ignite</groupId> + <artifactId>ignite-extensions-parent</artifactId> + <version>1</version> + <relativePath>../../parent</relativePath> + </parent> + + <artifactId>ignite-topology-validator-ext</artifactId> + <version>1.0.0-SNAPSHOT</version> + <url>http://ignite.apache.org</url> + + <properties> + <ignite.groupId>org.apache.ignite</ignite.groupId> + </properties> + + <dependencies> + <dependency> + <groupId>${ignite.groupId}</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>${ignite.groupId}</groupId> + <artifactId>ignite-core</artifactId> + <version>${ignite.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>${ignite.groupId}</groupId> + <artifactId>ignite-log4j</artifactId> + <version>${ignite.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>${ignite.groupId}</groupId> + <artifactId>ignite-spring</artifactId> + <version>${ignite.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> Review comment: Empty line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
