petrov-mg commented on code in PR #13195:
URL: https://github.com/apache/ignite/pull/13195#discussion_r3442922876


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/rollingupgrade/RollingUpgradeProcessor.java:
##########
@@ -17,331 +17,564 @@
 
 package org.apache.ignite.internal.processors.rollingupgrade;
 
-import java.util.Objects;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteVersionUtils;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import 
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
-import 
org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
-import 
org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
 import 
org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
-import org.apache.ignite.internal.util.lang.IgnitePair;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeature;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeatureManager;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeatureSet;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteProductFeatures;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.SupportedFeaturesRegistry;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_VALIDATION_FAILED;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
-import static 
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage.IGNITE_INTERNAL_KEY_PREFIX;
-
-/** Rolling upgrade processor. Manages current and target versions of cluster. 
*/
+import static 
org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.ROLLING_UPGRADE_PROC;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_COMPLETE_VERSION_FINALIZATION;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_ENABLE;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_PREPARE_VERSION_FINALIZATION;
+import static 
org.apache.ignite.plugin.security.SecurityPermission.ADMIN_ROLLING_UPGRADE;
+
+/** */
 public class RollingUpgradeProcessor extends GridProcessorAdapter implements 
DiscoveryNodeValidationProcessor {
-    /** Key for the distributed property that holds current and target 
versions. */
-    private static final String ROLLING_UPGRADE_VERSIONS_KEY = 
IGNITE_INTERNAL_KEY_PREFIX + "rolling.upgrade.versions";
+    /** */
+    private final IgniteFeatureManager featureMgr;
 
-    /** Metastorage with the write access. */
-    @Nullable private volatile DistributedMetaStorage metastorage;
+    /** */
+    private final ClusterVersionUpgradeEnableProcess enableProc;
+
+    /** */
+    private final ClusterVersionFinalizationProcess finalizeProc;
 
-    /** Last joining node. */
-    private ClusterNode lastJoiningNode;
+    /** */
+    private final Object topGuard = new Object();
 
-    /** Lock for synchronization between tcp-disco-msg-worker thread and 
management operations. */
-    private final Object lock = new Object();
+    /** */
+    private final Set<ClusterNode> joiningNodes = new HashSet<>();
 
     /** */
-    private final CountDownLatch startLatch = new CountDownLatch(1);
+    private volatile boolean isNodeFenceActive;
 
-    /** Pair with current and target versions. {@code null} when rolling 
upgrade is disabled. */
-    @Nullable private volatile IgnitePair<IgniteProductVersion> rollUpVers;
+    /** */
+    private volatile boolean isVerUpgradeEnabled;
 
-    /**
-     * @param ctx Context.
-     */
+    /** */
     public RollingUpgradeProcessor(GridKernalContext ctx) {
+        this(ctx, () -> new IgniteProductFeatures(
+            IgniteVersionUtils.VER,
+            IgniteFeatureSet.buildFrom(SupportedFeaturesRegistry.class))
+        );
+    }
+
+    /** */
+    protected RollingUpgradeProcessor(GridKernalContext ctx, 
Supplier<IgniteProductFeatures> locVerFeaturesProv) {
         super(ctx);
+
+        enableProc = new ClusterVersionUpgradeEnableProcess();
+        finalizeProc = new ClusterVersionFinalizationProcess();
+        featureMgr = new IgniteFeatureManager(ctx, locVerFeaturesProv);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) throws 
IgniteCheckedException {
-        startLatch.countDown();
+    /** @return Whether nodes running a higher Ignite version are allowed to 
join the cluster. */
+    public boolean isVersionUpgradeEnabled() {
+        return isVerUpgradeEnabled;
+    }
+
+    /**
+     * Allows nodes running a higher Ignite version to join the cluster.
+     * Until the cluster version is finalized, nodes running a higher version 
operate in
+     * a compatibility mode that emulates the behavior of the current cluster 
version.
+     */
+    public void enableVersionUpgrade() throws IgniteCheckedException {
+        ctx.security().authorize(ADMIN_ROLLING_UPGRADE);
+
+        if (isVerUpgradeEnabled)
+            return;
+
+        enableProc.start().get();
+
+        if (log.isInfoEnabled())
+            log.info("Cluster version Rolling Upgrade was successfully 
enabled");
+    }
+
+    /**
+     * Tries to finalize the cluster version.
+     *
+     * <p>If all cluster nodes are running the same Ignite version, this 
method:</p>
+     * <ol>
+     *     <li>Prevents nodes running a higher Ignite version from joining the 
cluster.</li>
+     *     <li>Activates all {@link IgniteFeature}s supported by the finalized 
cluster version.</li>
+     * </ol>
+     *
+     * <p>If the cluster contains nodes running different Ignite versions, the 
operation fails.</p>
+     */
+    public void finalizeClusterVersion() throws IgniteCheckedException {
+        ctx.security().authorize(ADMIN_ROLLING_UPGRADE);
+
+        if (!isVerUpgradeEnabled)
+            return;
+
+        finalizeProc.start().get();
+
+        if (log.isInfoEnabled())
+            log.info("Cluster version was successfully finalized 
[activeLogicalVer=" + clusterLogicalVersion() + ']');
+    }
+
+    /** */
+    public IgniteFeatureManager features() {
+        return featureMgr;
+    }
+
+    /** */
+    RollingUpgradeState state() {
+        synchronized (topGuard) {
+            return detectRollingUpgradeState();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
+        ctx.addNodeAttribute(
+            ATTR_IGNITE_FEATURES,
+            U.marshal(ctx.marshallerContext().jdkMarshaller(), 
featureMgr.localVersionFeatures().features()));
+
         ctx.event().addLocalEventListener(
             evt -> {
-                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
-
-                synchronized (lock) {
-                    if (lastJoiningNode != null && 
lastJoiningNode.id().equals(nodeId))
-                        lastJoiningNode = null;
+                synchronized (topGuard) {
+                    joiningNodes.remove(((DiscoveryEvent)evt).eventNode());
                 }
             },
             EVT_NODE_JOINED,
             EVT_NODE_FAILED,
             EVT_NODE_LEFT,
             EVT_NODE_VALIDATION_FAILED
         );
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return ROLLING_UPGRADE_PROC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.clientNode())
+            return;
+
+        int cmpId = discoveryDataType().ordinal();
+
+        if (!dataBag.commonDataCollectedFor(cmpId))
+            dataBag.addGridCommonData(cmpId, collectRollingUpgradeNodeData());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        RollingUpgradeNodeData gridData = 
(RollingUpgradeNodeData)data.commonData();
+
+        isVerUpgradeEnabled = gridData.isVersionUpgradeEnabled();
+        isNodeFenceActive = gridData.isNodeFenceActive();
 
-        
ctx.internalSubscriptionProcessor().registerDistributedMetastorageListener(new 
DistributedMetastorageLifecycleListener() {
-            @Override public void onReadyForWrite(DistributedMetaStorage 
metastorage) {
-                RollingUpgradeProcessor.this.metastorage = metastorage;
+        featureMgr.onGridDataReceived(gridData.activeFeatures());
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IgniteNodeValidationResult 
validateNode(ClusterNode joiningNode) {
+        synchronized (topGuard) {
+            if (isNodeFenceActive) {
+                return new IgniteNodeValidationResult(
+                    joiningNode.id(),
+                    "Node joins are not allowed during cluster version 
finalization [joiningNode=" + joiningNode + ']');
             }
 
-            @Override public void 
onReadyForRead(ReadableDistributedMetaStorage metastorage) {
+            if (isVerUpgradeEnabled) {
+                RollingUpgradeState state = detectRollingUpgradeState();
+
+                if (!state.isCompatible(joiningNode)) {
+                    return new IgniteNodeValidationResult(
+                        joiningNode.id(),
+                        "The joining node is incompatible with the current 
state of the cluster version rolling upgrade being in progress" +
+                            " [rollingUpgradeState=" + state +
+                            ", joiningNodeVer=" + joiningNode.version() +
+                            ", joiningNode=" + joiningNode + ']');
+                }
+
+                IgniteProductFeatures locActiveFeatures = 
featureMgr.activeFeatures();
+
+                IgniteProductFeatures joiningNodeProductFeatures;
+
                 try {
-                    rollUpVers = 
metastorage.read(ROLLING_UPGRADE_VERSIONS_KEY);
+                    joiningNodeProductFeatures = 
extractProductFeatures(joiningNode);
                 }
                 catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
+                    return new IgniteNodeValidationResult(
+                        joiningNode.id(),
+                        "Failed to resolve joining node product features" +
+                            " [joiningNode=" + joiningNode + ", errMsg=" + 
e.getMessage() + ']');
                 }
 
-                // Keep the current and target version pair in sync with 
metastorage updates, e.g., to handle coordinator changes.
-                metastorage.listen(ROLLING_UPGRADE_VERSIONS_KEY::equals, (key, 
oldVal, newVal) -> {
-                    rollUpVers = (IgnitePair<IgniteProductVersion>)newVal;
-                });
+                if 
(!locActiveFeatures.isUpgradableTo(joiningNodeProductFeatures)) {
+                    return new IgniteNodeValidationResult(
+                        joiningNode.id(),
+                        "Rolling Upgrade is not available between the current 
cluster logical version and the joining node" +
+                            " product version [clusterLogicalVer=" + 
locActiveFeatures.version() +
+                            ", joiningNodeVer=" + joiningNode.version() +
+                            ", joiningNode=" + joiningNode + ']');
+                }
+            }
+            else if (!joiningNode.version().equals(localProductVersion())) {
+                return new IgniteNodeValidationResult(
+                    joiningNode.id(),
+                    "The joining node version differs from the version of the 
cluster" +
+                        " [clusterVer=" + localProductVersion() +
+                        ", joiningNodeVer=" + joiningNode.version() +
+                        ", joiningNode=" + joiningNode + ']');
             }
-        });
-    }
 
-    /** {@inheritDoc} The joining node is stored to verify later whether it 
successfully connected to the ring or failed to join. */
-    @Override public @Nullable IgniteNodeValidationResult 
validateNode(ClusterNode node) {
-        synchronized (lock) {
-            lastJoiningNode = node;
-        }
+            joiningNodes.add(joiningNode);
 
-        ClusterNode locNode = ctx.discovery().localNode();
+            return null;
+        }
+    }
 
-        String locBuildVer = locNode.attribute(ATTR_BUILD_VER);
-        String rmtBuildVer = node.attribute(ATTR_BUILD_VER);
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
+        enableProc.onDisconnected();
+        finalizeProc.onDisconnected();
+    }
 
-        IgniteProductVersion rmtVer = 
IgniteProductVersion.fromString(rmtBuildVer);
+    /** */
+    private IgniteProductVersion localProductVersion() {
+        return featureMgr.localVersionFeatures().version();
+    }
 
-        IgnitePair<IgniteProductVersion> pair = rollUpVers;
+    /** */
+    private IgniteProductVersion clusterLogicalVersion() {
+        return featureMgr.activeFeatures().version();
+    }
 
-        IgniteProductVersion curVer = pair == null ? 
IgniteProductVersion.fromString(locBuildVer) : pair.get1();
-        IgniteProductVersion targetVer = pair == null ? null : pair.get2();
+    /** */
+    private RollingUpgradeState detectRollingUpgradeState() {
+        SortedSet<IgniteProductVersion> clusterVers = 
distinctClusterProductVersions();
 
-        if (Objects.equals(rmtVer, curVer) || Objects.equals(rmtVer, 
targetVer))
-            return null;
+        assert !clusterVers.isEmpty() && clusterVers.size() <= 2;
 
-        String errMsg = "Remote node rejected due to incompatible version for 
cluster join.\n"
-            + "Remote node info:\n"
-            + "  - Version     : " + rmtBuildVer + "\n"
-            + "  - Addresses   : " + U.addressesAsString(node) + "\n"
-            + "  - Node ID     : " + node.id() + "\n"
-            + "Local node info:\n"
-            + "  - Version     : " + locBuildVer + "\n"
-            + "  - Addresses   : " + U.addressesAsString(locNode) + "\n"
-            + "  - Node ID     : " + locNode.id() + "\n"
-            + "Allowed versions for joining: " + curVer + (targetVer == null ? 
"" : ", " + targetVer);
+        IgniteProductVersion minClusterVer = clusterVers.first();
+        IgniteProductVersion maxClusterVer = clusterVers.last();
 
-        LT.warn(log, errMsg);
+        if (!minClusterVer.equals(maxClusterVer))
+            return new RollingUpgradeState(minClusterVer, maxClusterVer, 
false);
 
-        if (log.isDebugEnabled())
-            log.debug(errMsg);
+        IgniteProductVersion logicalVer = clusterLogicalVersion();
 
-        return new IgniteNodeValidationResult(node.id(), errMsg);
+        return new RollingUpgradeState(
+            logicalVer,
+            logicalVer.equals(maxClusterVer) ? null : maxClusterVer,
+            true);
     }
 
-    /**
-     * Enables rolling upgrade with specified target version.
-     * This method can only be called on coordinator node with {@link 
TcpDiscoverySpi}.
-     *
-     * @param target Target version.
-     * @param force If {@code true}, skips target version compatibility checks 
and forcibly enables rolling upgrade.
-     *              This flag does not override an already active upgrade 
configuration.
-     * @throws IgniteCheckedException If:
-     *     <ul>
-     *         <li>The current and target versions are incompatible;</li>
-     *         <li>The local node is not a coordinator;</li>
-     *         <li>The discovery SPI is not {@link TcpDiscoverySpi};</li>
-     *         <li>The distributed metastorage is not ready;</li>
-     *     </ul>
-     */
-    public void enable(IgniteProductVersion target, boolean force) throws 
IgniteCheckedException {
-        ctx.security().authorize(SecurityPermission.ADMIN_ROLLING_UPGRADE);
+    /** */
+    private SortedSet<IgniteProductVersion> distinctClusterProductVersions() {
+        assert Thread.holdsLock(topGuard);
 
-        if (startLatch.getCount() > 0)
-            throw new IgniteCheckedException("Cannot enable rolling upgrade: 
processor has not been started yet");
+        TreeSet<IgniteProductVersion> res = new TreeSet<>();
 
-        if (!U.isLocalNodeCoordinator(ctx.discovery()))
-            throw new IgniteCheckedException("Rolling upgrade can be enabled 
only on coordinator node");
+        for (ClusterNode node : ctx.discovery().discoverySpiRemoteNodes())
+            res.add(node.version());
 
-        if (metastorage == null)
-            throw new IgniteCheckedException("Metastorage is not ready yet. 
Try again later");
+        res.add(ctx.discovery().localNode().version());
 
-        if (!(ctx.config().getDiscoverySpi() instanceof TcpDiscoverySpi))
-            throw new IgniteCheckedException("Rolling upgrade is supported 
only with TCP discovery SPI");
+        for (ClusterNode node : joiningNodes)
+            res.add(node.version());
 
-        String curBuildVer = 
ctx.discovery().localNode().attribute(ATTR_BUILD_VER);
-        IgniteProductVersion curVer = 
IgniteProductVersion.fromString(curBuildVer);
+        return res;
+    }
 
-        if (!checkVersionsForEnabling(curVer, target, force))
-            return;
+    /** */
+    private RollingUpgradeNodeData collectRollingUpgradeNodeData() {
+        return new RollingUpgradeNodeData(isVerUpgradeEnabled, 
isNodeFenceActive, featureMgr.activeFeatures());
+    }
 
-        IgnitePair<IgniteProductVersion> newPair = F.pair(curVer, target);
+    /** */
+    private IgniteProductFeatures extractProductFeatures(ClusterNode node) 
throws IgniteCheckedException {
+        byte[] attrVal = node.attribute(ATTR_IGNITE_FEATURES);
 
-        if (!metastorage.compareAndSet(ROLLING_UPGRADE_VERSIONS_KEY, null, 
newPair)) {
-            IgnitePair<IgniteProductVersion> oldVerPair = 
metastorage.read(ROLLING_UPGRADE_VERSIONS_KEY);
+        IgniteFeatureSet features = 
U.unmarshal(ctx.marshallerContext().jdkMarshaller(), attrVal, 
U.resolveClassLoader(ctx.config()));
 
-            if (newPair.equals(oldVerPair))
-                return;
+        return new IgniteProductFeatures(node.version(), features);
+    }
 
-            if (oldVerPair == null)
-                throw new IgniteCheckedException("Could not enable rolling 
upgrade. Try again");
+    /** */
+    private static Throwable firstError(Map<UUID, Throwable> errors) {
+        return F.isEmpty(errors) ? null : F.firstValue(errors);
+    }
 
-            throw new IgniteCheckedException("Rolling upgrade is already 
enabled with a different current and target version: " +
-                oldVerPair.get1() + " , " + oldVerPair.get2());
+    /** */
+    private class ClusterVersionUpgradeEnableProcess extends AbstractProcess {
+        /** */
+        private final DistributedProcess<Message, Message> distributedProc;
+
+        /** */
+        public ClusterVersionUpgradeEnableProcess() {
+            distributedProc = new DistributedProcess<>(
+                ctx,
+                RU_ENABLE,
+                this::execute,
+                this::finish,
+                (reqId, req) -> new InitMessage<>(reqId, RU_ENABLE, req, 
true));
         }
 
-        rollUpVers = newPair;
+        /** {@inheritDoc} */
+        @Override protected UUID startInternal() {
+            UUID reqId = UUID.randomUUID();
 
-        if (log.isInfoEnabled())
-            log.info("Rolling upgrade enabled [current=" + curVer + ", 
target=" + target + ']');
+            distributedProc.start(reqId, null);
+
+            if (log.isInfoEnabled())
+                log.info("Cluster version upgrade enable process has been 
started [procId=" + reqId + "]");
+
+            return reqId;
+        }
+
+        /** */
+        private IgniteInternalFuture<Message> execute(UUID ignored, Message 
req) {
+            isVerUpgradeEnabled = true;
+
+            return new GridFinishedFuture<>();
+        }
+
+        /** */
+        private void finish(UUID reqId, Map<UUID, Message> responses, 
Map<UUID, Throwable> errors) {
+            finishProcess(reqId, firstError(errors));
+        }
     }
 
-    /**
-     * Disables rolling upgrade.
-     * This method can only be called on coordinator node.
-     *
-     * <p>May be blocked while a node with a different version is still 
joining or during metastorage operations.</p>
-     *
-     * @throws IgniteCheckedException If cluster has two or more nodes with 
different versions or if node is not coordinator
-     * or metastorage is not ready.
-     */
-    public void disable() throws IgniteCheckedException {
-        ctx.security().authorize(SecurityPermission.ADMIN_ROLLING_UPGRADE);
+    /** */
+    private class ClusterVersionFinalizationProcess extends AbstractProcess {
+        /** */
+        private final DistributedProcess<Message, Message> preparePhase;
+
+        /** */
+        private final DistributedProcess<Message, Message> completePhase;
+
+        /** */
+        @Nullable private volatile UUID activeProcId;
+
+        /** */
+        public ClusterVersionFinalizationProcess() {
+            preparePhase = new DistributedProcess<>(
+                ctx,
+                RU_PREPARE_VERSION_FINALIZATION,
+                this::executePreparePhase,
+                this::finishPreparePhase,
+                (reqId, req) -> new InitMessage<>(reqId, 
RU_PREPARE_VERSION_FINALIZATION, req, true));
+
+            completePhase = new DistributedProcess<>(
+                ctx,
+                RU_COMPLETE_VERSION_FINALIZATION,
+                this::executeCompletePhase,
+                this::finishCompletePhase,
+                (reqId, req) -> new InitMessage<>(reqId, 
RU_COMPLETE_VERSION_FINALIZATION, req, true));
+        }
 
-        if (!U.isLocalNodeCoordinator(ctx.discovery()))
-            throw new IgniteCheckedException("Rolling upgrade can be disabled 
only on coordinator node");
+        /** {@inheritDoc} */
+        @Override protected UUID startInternal() {
+            UUID reqId = UUID.randomUUID();
 
-        if (metastorage == null)
-            throw new IgniteCheckedException("Meta storage is not ready. Try 
again");
+            preparePhase.start(reqId, null);
 
-        if (rollUpVers == null)
-            return;
+            if (log.isInfoEnabled())
+                log.info("Cluster version finalization process has been 
started [procId=" + reqId + ']');
+
+            return reqId;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void finishProcess(UUID reqId, @Nullable Throwable 
err) {
+            if (err != null)
+                U.error(log, "Cluster version finalization process failed 
[procId=" + reqId + ']', err);
 
-        IgnitePair<IgniteProductVersion> minMaxVerPair;
+            // We rely on the guarantee that {@code activeProcId} is only 
accessed from the Discovery thread, just like
+            // the current method. Therefore, synchronization is not required.
+            if (reqId.equals(activeProcId))
+                activeProcId = null;
 
-        synchronized (lock) {
-            minMaxVerPair = resolveMinMaxNodeVersions();
+            super.finishProcess(reqId, err);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void onDisconnected() {
+            activeProcId = null;
 
-            if (!minMaxVerPair.get1().equals(minMaxVerPair.get2()))
-                throw new IgniteCheckedException("Can't disable rolling 
upgrade with different versions in cluster: "
-                    + minMaxVerPair.get1() + ", " + minMaxVerPair.get2());
+            super.onDisconnected();
+        }
 
-            if (lastJoiningNode != null) {
-                IgniteProductVersion lastJoiningNodeVer = 
IgniteProductVersion.fromString(lastJoiningNode.attribute(ATTR_BUILD_VER));
+        /** */
+        private IgniteInternalFuture<Message> executePreparePhase(UUID reqId, 
Message req) {
+            if (activeProcId != null) {
+                U.error(log, "Failed to handle cluster version finalization 
request. Another process is " +
+                    "already in progress [curProcId=" + reqId + ", 
activeProcId=" + activeProcId + ']');
 
-                if (!minMaxVerPair.get1().equals(lastJoiningNodeVer))
-                    throw new IgniteCheckedException("Can't disable rolling 
upgrade with different versions in cluster: "
-                        + minMaxVerPair.get1() + ", " + lastJoiningNodeVer);
+                return new GridFinishedFuture<>(new IgniteCheckedException(
+                    "Cluster version finalization process is already in 
progress"));
             }
 
-            rollUpVers = null;
+            activeProcId = reqId;
+
+            synchronized (topGuard) {
+                Set<IgniteProductVersion> distinctNodeVersions = 
distinctClusterProductVersions();
+
+                if (distinctNodeVersions.size() > 1) {
+                    return new GridFinishedFuture<>(new IgniteCheckedException(
+                        "Cluster version finalization failed. The topology 
contains nodes running multiple different" +
+                            " versions [distinctNodeVersions=" + 
distinctNodeVersions + "]"
+                    ));
+                }
+
+                isNodeFenceActive = true;
+
+                return new GridFinishedFuture<>();
+            }
         }
 
-        metastorage.remove(ROLLING_UPGRADE_VERSIONS_KEY);
+        /** */
+        private void finishPreparePhase(UUID reqId, Map<UUID, Message> 
responses, Map<UUID, Throwable> errors) {
+            if (!F.isEmpty(errors)) {
+                if (reqId.equals(activeProcId))
+                    isNodeFenceActive = false;
 
-        if (log.isInfoEnabled())
-            log.info("Rolling upgrade disabled. Current version of nodes in 
cluster: " + minMaxVerPair.get1());
+                finishProcess(reqId, firstError(errors));
+            }
+            else if (U.isLocalNodeCoordinator(ctx.discovery()))
+                completePhase.start(reqId, null);

Review Comment:
   We have two possible scenarious:
   
   1. The coordinator fails before sending the FullMessage.
   In this case, a new coordinator is "elected", and all nodes resend their 
SingleMessages to it. As a result, the FullMessage will eventually be 
propagated across the cluster.
   2. The coordinator sends the FullMessage and fails afterward.
   The message should not be processed until it has been validated by the newly 
"elected" coordinator.
   
   I do not see a reason to distrust the Distributed Process implementation. As 
you mentioned, a number of Ignite components rely on Distributed Process in the 
same manner. If a problem does exist in some scenarios, it should be fixed in 
the Distributed Process implementation itself rather than in the RU.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to