ololo3000 commented on a change in pull request #86:
URL: https://github.com/apache/ignite-extensions/pull/86#discussion_r786564992



##########
File path: 
modules/topology-validator-ext/src/test/java/com/sbt/ignite/cache/IgniteCacheTopologyValidatorTest.java
##########
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.cache;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+import com.sbt.ignite.plugin.cache.CacheTopologyValidatorPluginProvider;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheTopologySplitAbstractTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils.RunnableX;
+import org.junit.Test;
+
+import static 
com.sbt.ignite.plugin.cache.CacheTopologyValidatorPluginProvider.TOP_VALIDATOR_ENABLED_PROP_NAME;
+import static java.util.Collections.singletonMap;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static 
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT;
+import static 
org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
+import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class IgniteCacheTopologyValidatorTest extends 
IgniteCacheTopologySplitAbstractTest {
+    /** */
+    private static final String LOCAL_HOST = "localhost";
+
+    /** */
+    private static final int CACHE_KEY_CNT = 1000;
+
+    /** */
+    public static final int CACHE_CNT = 2;
+
+    /** */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return getConfiguration(igniteInstanceName, true);
+    }
+
+    /** */
+    private IgniteConfiguration getConfiguration(

Review comment:
       Done.

##########
File path: modules/topology-validator-ext/pom.xml
##########
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-extensions-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-topology-validator-ext</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <properties>
+        <ignite.groupId>org.apache.ignite</ignite.groupId>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>${ignite.groupId}</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${ignite.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>${ignite.groupId}</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${ignite.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>${ignite.groupId}</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>${ignite.groupId}</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

Review comment:
       Done.

##########
File path: 
modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import 
org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements 
PluginProvider<PluginConfiguration>, TopologyValidator {
+    /** */
+    public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = 
"org.apache.ignite.topology.validator.enabled";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = 
new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /** */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, 
ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, 
new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String 
cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        onGlobalClusterStateChanged(ctx.state().clusterState().state());
+
+        ctx.event().addDiscoveryEventListener(new 
TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state())
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                
checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), 
newVal);
+        });
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                checkBaselineAutoAdjustConfiguration(newVal, 
ctx.state().baselineAutoAdjustTimeout());
+        });
+
+        
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void 
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(topValidatorEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    boolean isLocNodeCrd = 
U.isLocalNodeCoordinator(ctx.discovery());
+
+                    Boolean topValidatorEnabled = 
topValidatorEnabledProp.get();
+
+                    if (topValidatorEnabled == null && !isLocNodeCrd || 
FALSE.equals(topValidatorEnabled)) {
+                        U.warn(log, "Topology Validator will be disabled 
because it is not configured for the" +
+                            " cluster the current node joined. Make sure the 
Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, 
log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) 
{
+        if (ctx.localNodeId().equals(nodeId))
+            state = (State)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws 
PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) 
throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for 
the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is 
only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected 
[rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not 
been handled by
+        // {@linkTopologyChangedEventListener} yet, we cannot guarantee that 
the {@link state} on the joining node will
+        // be consistent with one that on the cluster nodes.
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new 
AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv)) {
+                        String msg =  "Node join request was rejected due to 
concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + 
']';
+
+                        throw new PluginValidationException(msg, msg, 
node.id());
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean validate(Collection<ClusterNode> nodes) {
+        assert state != null;
+
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    private boolean isDisabled() {
+        return !topValidatorEnabledProp.getOrDefault(false);
+    }
+
+    /** */
+    private void onGlobalClusterStateChanged(ClusterState clusterState) {
+        state = clusterState == ACTIVE ? State.VALID : 
State.CLUSTER_WRITE_BLOCKED;
+    }
+
+    /**
+     * Current implementation of segmentation detection compares node of each 
topology with configured baseline nodes.
+     * If baseline auto adjustment is configured with zero timeout - baseline 
is updated on each topology change
+     * and the comparison described above makes no sense.
+     */
+    private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long 
timeout) {
+        if (isDisabled())
+            return false;
+
+        if (enabled && timeout == 0L) {
+            U.warn(log, "Topology Validator is currently skipping validation 
of topology changes because" +
+                " Baseline Auto Adjustment with zero timeout is configured for 
the cluster. Configure" +
+                " Baseline Nodes explicitly or set Baseline Auto Adjustment 
Timeout to greater than zero.");
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements 
DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache 
discoCache) {
+            State locStateCopy = state;
+
+            lastCheckedTopVer = evt.topologyVersion();
+
+            boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration(
+                ctx.state().isBaselineAutoAdjustEnabled(),
+                ctx.state().baselineAutoAdjustTimeout()
+            );
+
+            if (!skipTopChangeCheck && state == State.VALID && evt.type() == 
EVT_NODE_FAILED) {
+                List<? extends BaselineNode> baselineNodes = 
discoCache.baselineNodes();
+
+                // Actually Ignite considers segmentation as the sequential 
node failures. So we detect segmentation
+                // even if the single node fails and less than half of 
baseline nodes are alive.
+                if (baselineNodes != null && aliveBaselineNodes(baselineNodes) 
< baselineNodes.size() / 2 + 1) {
+                    locStateCopy = State.INVALID;
+
+                    try {
+                        ctx.closure().runLocal(new GridPlainRunnable() {
+                            @Override public void run() {
+                                try {
+                                    
ctx.cluster().get().state(ACTIVE_READ_ONLY);
+                                }
+                                catch (Throwable e) {
+                                    U.error(log,
+                                        "Failed to automatically switch state 
of the segmented cluster to the READ-ONLY" +
+                                        " mode. Cache writes were already 
restricted for all configured caches, but this" +
+                                        " step is still required in order to 
be able to unlock cache writes in the future." +
+                                        " Retry this operation manually, if 
possible [segmentedNodes=" +
+                                        
formatTopologyNodes(discoCache.allNodes()) + "]", e);
+                                }
+                            }
+                        }, PUBLIC_POOL);
+                    } catch (Throwable e) {
+                        U.error(log, "Failed to schedule cluster state change 
to the READ-ONLY mode.", e);
+                    }
+
+                    U.warn(log, "Cluster segmentation was detected. Write to 
all user caches were blocked" +
+                        " [segmentedNodes=" + 
formatTopologyNodes(discoCache.allNodes()) + ']');
+                }
+            }
+
+            state = locStateCopy;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+
+        /**
+         * @return Count of alive baseline nodes.
+         * Note that the following implementation is tied to how {@link 
DiscoCache#baselineNodes()} collection is
+         * populated.
+         */
+        private int aliveBaselineNodes(Collection<? extends BaselineNode> 
baselineNodes) {

Review comment:
       Done.

##########
File path: 
modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import 
org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements 
PluginProvider<PluginConfiguration>, TopologyValidator {
+    /** */
+    public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = 
"org.apache.ignite.topology.validator.enabled";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = 
new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /** */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, 
ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, 
new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String 
cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        onGlobalClusterStateChanged(ctx.state().clusterState().state());
+
+        ctx.event().addDiscoveryEventListener(new 
TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state())
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                
checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), 
newVal);
+        });
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                checkBaselineAutoAdjustConfiguration(newVal, 
ctx.state().baselineAutoAdjustTimeout());
+        });
+
+        
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void 
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(topValidatorEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    boolean isLocNodeCrd = 
U.isLocalNodeCoordinator(ctx.discovery());
+
+                    Boolean topValidatorEnabled = 
topValidatorEnabledProp.get();
+
+                    if (topValidatorEnabled == null && !isLocNodeCrd || 
FALSE.equals(topValidatorEnabled)) {
+                        U.warn(log, "Topology Validator will be disabled 
because it is not configured for the" +
+                            " cluster the current node joined. Make sure the 
Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, 
log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) 
{
+        if (ctx.localNodeId().equals(nodeId))
+            state = (State)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws 
PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) 
throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for 
the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is 
only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected 
[rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not 
been handled by
+        // {@linkTopologyChangedEventListener} yet, we cannot guarantee that 
the {@link state} on the joining node will
+        // be consistent with one that on the cluster nodes.
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new 
AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv)) {
+                        String msg =  "Node join request was rejected due to 
concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + 
']';
+
+                        throw new PluginValidationException(msg, msg, 
node.id());
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean validate(Collection<ClusterNode> nodes) {
+        assert state != null;
+
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    private boolean isDisabled() {
+        return !topValidatorEnabledProp.getOrDefault(false);
+    }
+
+    /** */
+    private void onGlobalClusterStateChanged(ClusterState clusterState) {
+        state = clusterState == ACTIVE ? State.VALID : 
State.CLUSTER_WRITE_BLOCKED;
+    }
+
+    /**
+     * Current implementation of segmentation detection compares node of each 
topology with configured baseline nodes.
+     * If baseline auto adjustment is configured with zero timeout - baseline 
is updated on each topology change
+     * and the comparison described above makes no sense.
+     */
+    private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long 
timeout) {

Review comment:
       Done.

##########
File path: 
modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import 
org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements 
PluginProvider<PluginConfiguration>, TopologyValidator {
+    /** */
+    public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = 
"org.apache.ignite.topology.validator.enabled";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = 
new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /** */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, 
ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, 
new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String 
cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        onGlobalClusterStateChanged(ctx.state().clusterState().state());
+
+        ctx.event().addDiscoveryEventListener(new 
TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state())
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                
checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), 
newVal);
+        });
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                checkBaselineAutoAdjustConfiguration(newVal, 
ctx.state().baselineAutoAdjustTimeout());
+        });
+
+        
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void 
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(topValidatorEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    boolean isLocNodeCrd = 
U.isLocalNodeCoordinator(ctx.discovery());
+
+                    Boolean topValidatorEnabled = 
topValidatorEnabledProp.get();
+
+                    if (topValidatorEnabled == null && !isLocNodeCrd || 
FALSE.equals(topValidatorEnabled)) {
+                        U.warn(log, "Topology Validator will be disabled 
because it is not configured for the" +
+                            " cluster the current node joined. Make sure the 
Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, 
log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) 
{
+        if (ctx.localNodeId().equals(nodeId))
+            state = (State)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws 
PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) 
throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for 
the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is 
only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected 
[rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not 
been handled by
+        // {@linkTopologyChangedEventListener} yet, we cannot guarantee that 
the {@link state} on the joining node will
+        // be consistent with one that on the cluster nodes.
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new 
AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv)) {
+                        String msg =  "Node join request was rejected due to 
concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + 
']';
+
+                        throw new PluginValidationException(msg, msg, 
node.id());
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean validate(Collection<ClusterNode> nodes) {
+        assert state != null;
+
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    private boolean isDisabled() {
+        return !topValidatorEnabledProp.getOrDefault(false);
+    }
+
+    /** */
+    private void onGlobalClusterStateChanged(ClusterState clusterState) {
+        state = clusterState == ACTIVE ? State.VALID : 
State.CLUSTER_WRITE_BLOCKED;
+    }
+
+    /**
+     * Current implementation of segmentation detection compares node of each 
topology with configured baseline nodes.
+     * If baseline auto adjustment is configured with zero timeout - baseline 
is updated on each topology change
+     * and the comparison described above makes no sense.
+     */
+    private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long 
timeout) {
+        if (isDisabled())
+            return false;
+
+        if (enabled && timeout == 0L) {
+            U.warn(log, "Topology Validator is currently skipping validation 
of topology changes because" +
+                " Baseline Auto Adjustment with zero timeout is configured for 
the cluster. Configure" +
+                " Baseline Nodes explicitly or set Baseline Auto Adjustment 
Timeout to greater than zero.");
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements 
DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache 
discoCache) {
+            State locStateCopy = state;
+
+            lastCheckedTopVer = evt.topologyVersion();
+
+            boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration(
+                ctx.state().isBaselineAutoAdjustEnabled(),
+                ctx.state().baselineAutoAdjustTimeout()
+            );
+
+            if (!skipTopChangeCheck && state == State.VALID && evt.type() == 
EVT_NODE_FAILED) {
+                List<? extends BaselineNode> baselineNodes = 
discoCache.baselineNodes();
+
+                // Actually Ignite considers segmentation as the sequential 
node failures. So we detect segmentation
+                // even if the single node fails and less than half of 
baseline nodes are alive.
+                if (baselineNodes != null && aliveBaselineNodes(baselineNodes) 
< baselineNodes.size() / 2 + 1) {
+                    locStateCopy = State.INVALID;
+
+                    try {
+                        ctx.closure().runLocal(new GridPlainRunnable() {
+                            @Override public void run() {
+                                try {
+                                    
ctx.cluster().get().state(ACTIVE_READ_ONLY);
+                                }
+                                catch (Throwable e) {
+                                    U.error(log,
+                                        "Failed to automatically switch state 
of the segmented cluster to the READ-ONLY" +
+                                        " mode. Cache writes were already 
restricted for all configured caches, but this" +
+                                        " step is still required in order to 
be able to unlock cache writes in the future." +
+                                        " Retry this operation manually, if 
possible [segmentedNodes=" +
+                                        
formatTopologyNodes(discoCache.allNodes()) + "]", e);
+                                }
+                            }
+                        }, PUBLIC_POOL);
+                    } catch (Throwable e) {
+                        U.error(log, "Failed to schedule cluster state change 
to the READ-ONLY mode.", e);
+                    }
+
+                    U.warn(log, "Cluster segmentation was detected. Write to 
all user caches were blocked" +
+                        " [segmentedNodes=" + 
formatTopologyNodes(discoCache.allNodes()) + ']');
+                }
+            }
+
+            state = locStateCopy;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+
+        /**
+         * @return Count of alive baseline nodes.
+         * Note that the following implementation is tied to how {@link 
DiscoCache#baselineNodes()} collection is
+         * populated.
+         */
+        private int aliveBaselineNodes(Collection<? extends BaselineNode> 
baselineNodes) {
+            int res = 0;
+
+            for (BaselineNode node : baselineNodes) {
+                if (!(node instanceof DetachedClusterNode))
+                    ++res;
+            }
+
+            return res;
+        }
+
+        /** @return String representation of the specified cluster node 
collection. */
+        private String formatTopologyNodes(Collection<ClusterNode> nodes) {
+            return nodes.stream().map(n -> 
n.id().toString()).collect(Collectors.joining(", "));
+        }
+    }
+
+    /** Represents possible states of the current segment. */
+    private enum State {

Review comment:
       Done.

##########
File path: 
modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import 
org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements 
PluginProvider<PluginConfiguration>, TopologyValidator {
+    /** */
+    public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = 
"org.apache.ignite.topology.validator.enabled";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = 
new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /** */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, 
ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, 
new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String 
cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        onGlobalClusterStateChanged(ctx.state().clusterState().state());
+
+        ctx.event().addDiscoveryEventListener(new 
TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state())
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                
checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), 
newVal);
+        });
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                checkBaselineAutoAdjustConfiguration(newVal, 
ctx.state().baselineAutoAdjustTimeout());
+        });
+
+        
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void 
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(topValidatorEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    boolean isLocNodeCrd = 
U.isLocalNodeCoordinator(ctx.discovery());
+
+                    Boolean topValidatorEnabled = 
topValidatorEnabledProp.get();
+
+                    if (topValidatorEnabled == null && !isLocNodeCrd || 
FALSE.equals(topValidatorEnabled)) {
+                        U.warn(log, "Topology Validator will be disabled 
because it is not configured for the" +
+                            " cluster the current node joined. Make sure the 
Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, 
log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) 
{
+        if (ctx.localNodeId().equals(nodeId))
+            state = (State)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws 
PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) 
throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for 
the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is 
only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected 
[rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not 
been handled by
+        // {@linkTopologyChangedEventListener} yet, we cannot guarantee that 
the {@link state} on the joining node will
+        // be consistent with one that on the cluster nodes.
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new 
AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv)) {
+                        String msg =  "Node join request was rejected due to 
concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + 
']';
+
+                        throw new PluginValidationException(msg, msg, 
node.id());
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean validate(Collection<ClusterNode> nodes) {
+        assert state != null;
+
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    private boolean isDisabled() {
+        return !topValidatorEnabledProp.getOrDefault(false);
+    }
+
+    /** */
+    private void onGlobalClusterStateChanged(ClusterState clusterState) {
+        state = clusterState == ACTIVE ? State.VALID : 
State.CLUSTER_WRITE_BLOCKED;
+    }
+
+    /**
+     * Current implementation of segmentation detection compares node of each 
topology with configured baseline nodes.
+     * If baseline auto adjustment is configured with zero timeout - baseline 
is updated on each topology change
+     * and the comparison described above makes no sense.
+     */
+    private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long 
timeout) {
+        if (isDisabled())
+            return false;
+
+        if (enabled && timeout == 0L) {
+            U.warn(log, "Topology Validator is currently skipping validation 
of topology changes because" +

Review comment:
       Done.

##########
File path: 
modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import 
org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements 
PluginProvider<PluginConfiguration>, TopologyValidator {
+    /** */
+    public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = 
"org.apache.ignite.topology.validator.enabled";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = 
new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /** */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, 
ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, 
new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String 
cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        onGlobalClusterStateChanged(ctx.state().clusterState().state());
+
+        ctx.event().addDiscoveryEventListener(new 
TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state())
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                
checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), 
newVal);
+        });
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                checkBaselineAutoAdjustConfiguration(newVal, 
ctx.state().baselineAutoAdjustTimeout());
+        });
+
+        
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void 
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(topValidatorEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    boolean isLocNodeCrd = 
U.isLocalNodeCoordinator(ctx.discovery());
+
+                    Boolean topValidatorEnabled = 
topValidatorEnabledProp.get();
+
+                    if (topValidatorEnabled == null && !isLocNodeCrd || 
FALSE.equals(topValidatorEnabled)) {
+                        U.warn(log, "Topology Validator will be disabled 
because it is not configured for the" +
+                            " cluster the current node joined. Make sure the 
Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, 
log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) 
{
+        if (ctx.localNodeId().equals(nodeId))
+            state = (State)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws 
PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) 
throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for 
the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is 
only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected 
[rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not 
been handled by
+        // {@linkTopologyChangedEventListener} yet, we cannot guarantee that 
the {@link state} on the joining node will
+        // be consistent with one that on the cluster nodes.
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new 
AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv)) {
+                        String msg =  "Node join request was rejected due to 
concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + 
']';
+
+                        throw new PluginValidationException(msg, msg, 
node.id());
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean validate(Collection<ClusterNode> nodes) {
+        assert state != null;
+
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    private boolean isDisabled() {
+        return !topValidatorEnabledProp.getOrDefault(false);
+    }
+
+    /** */
+    private void onGlobalClusterStateChanged(ClusterState clusterState) {
+        state = clusterState == ACTIVE ? State.VALID : 
State.CLUSTER_WRITE_BLOCKED;
+    }
+
+    /**
+     * Current implementation of segmentation detection compares node of each 
topology with configured baseline nodes.
+     * If baseline auto adjustment is configured with zero timeout - baseline 
is updated on each topology change
+     * and the comparison described above makes no sense.
+     */
+    private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long 
timeout) {
+        if (isDisabled())
+            return false;
+
+        if (enabled && timeout == 0L) {
+            U.warn(log, "Topology Validator is currently skipping validation 
of topology changes because" +
+                " Baseline Auto Adjustment with zero timeout is configured for 
the cluster. Configure" +
+                " Baseline Nodes explicitly or set Baseline Auto Adjustment 
Timeout to greater than zero.");
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements 
DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache 
discoCache) {
+            State locStateCopy = state;
+
+            lastCheckedTopVer = evt.topologyVersion();
+
+            boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration(
+                ctx.state().isBaselineAutoAdjustEnabled(),
+                ctx.state().baselineAutoAdjustTimeout()
+            );
+
+            if (!skipTopChangeCheck && state == State.VALID && evt.type() == 
EVT_NODE_FAILED) {
+                List<? extends BaselineNode> baselineNodes = 
discoCache.baselineNodes();
+
+                // Actually Ignite considers segmentation as the sequential 
node failures. So we detect segmentation
+                // even if the single node fails and less than half of 
baseline nodes are alive.
+                if (baselineNodes != null && aliveBaselineNodes(baselineNodes) 
< baselineNodes.size() / 2 + 1) {

Review comment:
       Done.

##########
File path: 
modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import 
org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements 
PluginProvider<PluginConfiguration>, TopologyValidator {
+    /** */
+    public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = 
"org.apache.ignite.topology.validator.enabled";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = 
new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /** */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, 
ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, 
new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String 
cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        onGlobalClusterStateChanged(ctx.state().clusterState().state());
+
+        ctx.event().addDiscoveryEventListener(new 
TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state())
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                
checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), 
newVal);
+        });
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                checkBaselineAutoAdjustConfiguration(newVal, 
ctx.state().baselineAutoAdjustTimeout());
+        });
+
+        
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void 
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(topValidatorEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    boolean isLocNodeCrd = 
U.isLocalNodeCoordinator(ctx.discovery());
+
+                    Boolean topValidatorEnabled = 
topValidatorEnabledProp.get();
+
+                    if (topValidatorEnabled == null && !isLocNodeCrd || 
FALSE.equals(topValidatorEnabled)) {
+                        U.warn(log, "Topology Validator will be disabled 
because it is not configured for the" +
+                            " cluster the current node joined. Make sure the 
Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, 
log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) 
{
+        if (ctx.localNodeId().equals(nodeId))
+            state = (State)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws 
PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) 
throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for 
the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is 
only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected 
[rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not 
been handled by
+        // {@linkTopologyChangedEventListener} yet, we cannot guarantee that 
the {@link state} on the joining node will
+        // be consistent with one that on the cluster nodes.
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new 
AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv)) {
+                        String msg =  "Node join request was rejected due to 
concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + 
']';
+
+                        throw new PluginValidationException(msg, msg, 
node.id());
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean validate(Collection<ClusterNode> nodes) {
+        assert state != null;
+
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    private boolean isDisabled() {
+        return !topValidatorEnabledProp.getOrDefault(false);
+    }
+
+    /** */
+    private void onGlobalClusterStateChanged(ClusterState clusterState) {
+        state = clusterState == ACTIVE ? State.VALID : 
State.CLUSTER_WRITE_BLOCKED;
+    }
+
+    /**
+     * Current implementation of segmentation detection compares node of each 
topology with configured baseline nodes.
+     * If baseline auto adjustment is configured with zero timeout - baseline 
is updated on each topology change
+     * and the comparison described above makes no sense.
+     */
+    private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long 
timeout) {
+        if (isDisabled())
+            return false;
+
+        if (enabled && timeout == 0L) {
+            U.warn(log, "Topology Validator is currently skipping validation 
of topology changes because" +
+                " Baseline Auto Adjustment with zero timeout is configured for 
the cluster. Configure" +
+                " Baseline Nodes explicitly or set Baseline Auto Adjustment 
Timeout to greater than zero.");
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements 
DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache 
discoCache) {
+            State locStateCopy = state;
+
+            lastCheckedTopVer = evt.topologyVersion();
+
+            boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration(
+                ctx.state().isBaselineAutoAdjustEnabled(),
+                ctx.state().baselineAutoAdjustTimeout()
+            );
+
+            if (!skipTopChangeCheck && state == State.VALID && evt.type() == 
EVT_NODE_FAILED) {

Review comment:
       No, we should not. EVT_NODE_LEFT is raised on graceful shutdown and has 
nothing to do with segmentation.

##########
File path: 
modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import 
org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements 
PluginProvider<PluginConfiguration>, TopologyValidator {
+    /** */
+    public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = 
"org.apache.ignite.topology.validator.enabled";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = 
new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /** */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, 
ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, 
new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String 
cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        onGlobalClusterStateChanged(ctx.state().clusterState().state());
+
+        ctx.event().addDiscoveryEventListener(new 
TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state())
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                
checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), 
newVal);
+        });
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                checkBaselineAutoAdjustConfiguration(newVal, 
ctx.state().baselineAutoAdjustTimeout());
+        });
+
+        
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void 
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(topValidatorEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    boolean isLocNodeCrd = 
U.isLocalNodeCoordinator(ctx.discovery());
+
+                    Boolean topValidatorEnabled = 
topValidatorEnabledProp.get();
+
+                    if (topValidatorEnabled == null && !isLocNodeCrd || 
FALSE.equals(topValidatorEnabled)) {
+                        U.warn(log, "Topology Validator will be disabled 
because it is not configured for the" +
+                            " cluster the current node joined. Make sure the 
Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, 
log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) 
{
+        if (ctx.localNodeId().equals(nodeId))
+            state = (State)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws 
PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) 
throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for 
the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is 
only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected 
[rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not 
been handled by
+        // {@linkTopologyChangedEventListener} yet, we cannot guarantee that 
the {@link state} on the joining node will

Review comment:
       Done.

##########
File path: 
modules/topology-validator-ext/src/main/java/com/sbt/ignite/plugin/cache/CacheTopologyValidatorPluginProvider.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.sbt.ignite.plugin.cache;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.BaselineNode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.TopologyValidator;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.DetachedClusterNode;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
+import org.apache.ignite.internal.managers.eventstorage.HighPriorityListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import 
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
+import 
org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.CacheTopologyValidatorProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+import static java.lang.Boolean.FALSE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE_READ_ONLY;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static 
org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
+import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
+
+/** */
+public class CacheTopologyValidatorPluginProvider implements 
PluginProvider<PluginConfiguration>, TopologyValidator {
+    /** */
+    public static final String TOP_VALIDATOR_ENABLED_PROP_NAME = 
"org.apache.ignite.topology.validator.enabled";
+
+    /** */
+    private static final int[] TOP_CHANGED_EVTS = new int[] {
+        EVT_NODE_LEFT,
+        EVT_NODE_JOINED,
+        EVT_NODE_FAILED
+    };
+
+    /** */
+    private final SimpleDistributedProperty<Boolean> topValidatorEnabledProp = 
new SimpleDistributedProperty<>(
+        TOP_VALIDATOR_ENABLED_PROP_NAME,
+        Boolean::parseBoolean
+    );
+
+    /** Ignite kernel context. */
+    private GridKernalContext ctx;
+
+    /** Ignite logger. */
+    private IgniteLogger log;
+
+    /** */
+    private long lastCheckedTopVer;
+
+    /** */
+    private volatile State state;
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Topology Validator";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0";
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T extends IgnitePlugin> T plugin() {
+        return (T) new IgnitePlugin() {
+            // No-op.
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext pluginCtx, 
ExtensionRegistry registry) {
+        ctx = ((IgniteEx)pluginCtx.grid()).context();
+
+        if (!ctx.clientNode()) {
+            registry.registerExtension(CacheTopologyValidatorProvider.class, 
new CacheTopologyValidatorProvider() {
+                /** {@inheritDoc} */
+                @Override public TopologyValidator topologyValidator(String 
cacheName) {
+                    return CacheTopologyValidatorPluginProvider.this;
+                }
+            });
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider 
createCacheProvider(CachePluginContext ctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext pluginCtx) {
+        if (ctx.clientNode())
+            return;
+
+        log = ctx.log(getClass());
+
+        onGlobalClusterStateChanged(ctx.state().clusterState().state());
+
+        ctx.event().addDiscoveryEventListener(new 
TopologyChangedEventListener(), TOP_CHANGED_EVTS);
+
+        ctx.discovery().setCustomEventListener(
+            ChangeGlobalStateFinishMessage.class,
+            (topVer, snd, msg) -> onGlobalClusterStateChanged(msg.state())
+        );
+
+        ctx.state().baselineConfiguration().listenAutoAdjustTimeout((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                
checkBaselineAutoAdjustConfiguration(ctx.state().isBaselineAutoAdjustEnabled(), 
newVal);
+        });
+
+        ctx.state().baselineConfiguration().listenAutoAdjustEnabled((name, 
oldVal, newVal) -> {
+            if (newVal != null)
+                checkBaselineAutoAdjustConfiguration(newVal, 
ctx.state().baselineAutoAdjustTimeout());
+        });
+
+        
ctx.internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void 
onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    dispatcher.registerProperty(topValidatorEnabledProp);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    boolean isLocNodeCrd = 
U.isLocalNodeCoordinator(ctx.discovery());
+
+                    Boolean topValidatorEnabled = 
topValidatorEnabledProp.get();
+
+                    if (topValidatorEnabled == null && !isLocNodeCrd || 
FALSE.equals(topValidatorEnabled)) {
+                        U.warn(log, "Topology Validator will be disabled 
because it is not configured for the" +
+                            " cluster the current node joined. Make sure the 
Topology Validator plugin is" +
+                            " configured on all cluster nodes.");
+                    }
+
+                    setDefaultValue(topValidatorEnabledProp, isLocNodeCrd, 
log);
+                }
+            });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Serializable provideDiscoveryData(UUID nodeId) {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) 
{
+        if (ctx.localNodeId().equals(nodeId))
+            state = (State)data;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws 
PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node, Serializable data) 
throws PluginValidationException {
+        if (node.isClient())
+            return;
+
+        if (data == null) {
+            String msg = "The Topology Validator plugin is not configured for 
the server node that is" +
+                " trying to join the cluster. Since the Topology Validator is 
only applicable if all server nodes" +
+                " in the cluster have one, node join request will be rejected 
[rejectedNodeId=" + node.id() + ']';
+
+            throw new PluginValidationException(msg, msg, node.id());
+        }
+
+        // If the new node is joining but some node failed/left events has not 
been handled by
+        // {@linkTopologyChangedEventListener} yet, we cannot guarantee that 
the {@link state} on the joining node will
+        // be consistent with one that on the cluster nodes.
+        if (state == State.VALID) {
+            DiscoCache discoCache = ctx.discovery().discoCache(new 
AffinityTopologyVersion(lastCheckedTopVer, 0));
+
+            if (discoCache != null) {
+                for (ClusterNode srv : discoCache.serverNodes()) {
+                    if (!ctx.discovery().alive(srv)) {
+                        String msg =  "Node join request was rejected due to 
concurrent node left" +
+                            " process handling [rejectedNodeId=" + node.id() + 
']';
+
+                        throw new PluginValidationException(msg, msg, 
node.id());
+                    }
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean validate(Collection<ClusterNode> nodes) {
+        assert state != null;
+
+        return isDisabled() || state != State.INVALID;
+    }
+
+    /** */
+    private boolean isDisabled() {
+        return !topValidatorEnabledProp.getOrDefault(false);
+    }
+
+    /** */
+    private void onGlobalClusterStateChanged(ClusterState clusterState) {
+        state = clusterState == ACTIVE ? State.VALID : 
State.CLUSTER_WRITE_BLOCKED;
+    }
+
+    /**
+     * Current implementation of segmentation detection compares node of each 
topology with configured baseline nodes.
+     * If baseline auto adjustment is configured with zero timeout - baseline 
is updated on each topology change
+     * and the comparison described above makes no sense.
+     */
+    private boolean checkBaselineAutoAdjustConfiguration(boolean enabled, long 
timeout) {
+        if (isDisabled())
+            return false;
+
+        if (enabled && timeout == 0L) {
+            U.warn(log, "Topology Validator is currently skipping validation 
of topology changes because" +
+                " Baseline Auto Adjustment with zero timeout is configured for 
the cluster. Configure" +
+                " Baseline Nodes explicitly or set Baseline Auto Adjustment 
Timeout to greater than zero.");
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /** */
+    private class TopologyChangedEventListener implements 
DiscoveryEventListener, HighPriorityListener {
+        /** {@inheritDoc} */
+        @Override public void onEvent(DiscoveryEvent evt, DiscoCache 
discoCache) {
+            State locStateCopy = state;
+
+            lastCheckedTopVer = evt.topologyVersion();
+
+            boolean skipTopChangeCheck = !checkBaselineAutoAdjustConfiguration(
+                ctx.state().isBaselineAutoAdjustEnabled(),
+                ctx.state().baselineAutoAdjustTimeout()
+            );
+
+            if (!skipTopChangeCheck && state == State.VALID && evt.type() == 
EVT_NODE_FAILED) {
+                List<? extends BaselineNode> baselineNodes = 
discoCache.baselineNodes();
+
+                // Actually Ignite considers segmentation as the sequential 
node failures. So we detect segmentation
+                // even if the single node fails and less than half of 
baseline nodes are alive.
+                if (baselineNodes != null && aliveBaselineNodes(baselineNodes) 
< baselineNodes.size() / 2 + 1) {
+                    locStateCopy = State.INVALID;
+
+                    try {
+                        ctx.closure().runLocal(new GridPlainRunnable() {
+                            @Override public void run() {
+                                try {
+                                    
ctx.cluster().get().state(ACTIVE_READ_ONLY);
+                                }
+                                catch (Throwable e) {
+                                    U.error(log,
+                                        "Failed to automatically switch state 
of the segmented cluster to the READ-ONLY" +
+                                        " mode. Cache writes were already 
restricted for all configured caches, but this" +
+                                        " step is still required in order to 
be able to unlock cache writes in the future." +
+                                        " Retry this operation manually, if 
possible [segmentedNodes=" +
+                                        
formatTopologyNodes(discoCache.allNodes()) + "]", e);
+                                }
+                            }
+                        }, PUBLIC_POOL);
+                    } catch (Throwable e) {
+                        U.error(log, "Failed to schedule cluster state change 
to the READ-ONLY mode.", e);
+                    }
+
+                    U.warn(log, "Cluster segmentation was detected. Write to 
all user caches were blocked" +
+                        " [segmentedNodes=" + 
formatTopologyNodes(discoCache.allNodes()) + ']');
+                }
+            }
+
+            state = locStateCopy;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int order() {
+            return 0;
+        }
+
+        /**
+         * @return Count of alive baseline nodes.
+         * Note that the following implementation is tied to how {@link 
DiscoCache#baselineNodes()} collection is
+         * populated.
+         */
+        private int aliveBaselineNodes(Collection<? extends BaselineNode> 
baselineNodes) {
+            int res = 0;
+
+            for (BaselineNode node : baselineNodes) {
+                if (!(node instanceof DetachedClusterNode))
+                    ++res;
+            }
+
+            return res;
+        }
+
+        /** @return String representation of the specified cluster node 
collection. */
+        private String formatTopologyNodes(Collection<ClusterNode> nodes) {

Review comment:
       Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to