chesnokoff commented on code in PR #13195:
URL: https://github.com/apache/ignite/pull/13195#discussion_r3341635111


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/rollingupgrade/RollingUpgradeProcessor.java:
##########
@@ -17,334 +17,480 @@
 
 package org.apache.ignite.internal.processors.rollingupgrade;
 
-import java.util.Objects;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteVersionUtils;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import 
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
-import 
org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
-import 
org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
 import 
org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
-import org.apache.ignite.internal.util.lang.IgnitePair;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeatureManager;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeatureSet;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteProductFeatures;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteReleaseFeatures;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
-import org.apache.ignite.spi.discovery.DiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodesRing;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
-import static 
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage.IGNITE_INTERNAL_KEY_PREFIX;
-
-/** Rolling upgrade processor. Manages current and target versions of cluster. 
*/
+import static org.apache.ignite.events.EventType.EVT_NODE_VALIDATION_FAILED;
+import static 
org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.ROLLING_UPGRADE_PROC;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_COMPLETE_VERSION_FINALIZATION;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_ENABLE;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_PREPARE_VERSION_FINALIZATION;
+import static 
org.apache.ignite.plugin.security.SecurityPermission.ADMIN_ROLLING_UPGRADE;
+
+/** */
 public class RollingUpgradeProcessor extends GridProcessorAdapter implements 
DiscoveryNodeValidationProcessor {
-    /** Key for the distributed property that holds current and target 
versions. */
-    private static final String ROLLING_UPGRADE_VERSIONS_KEY = 
IGNITE_INTERNAL_KEY_PREFIX + "rolling.upgrade.versions";
-
-    /** Metastorage with the write access. */
-    @Nullable private volatile DistributedMetaStorage metastorage;
+    /** */
+    private final IgniteFeatureManager featureMgr;
 
-    /** TCP discovery nodes ring. */
-    private TcpDiscoveryNodesRing ring;
+    /** */
+    private final ClusterVersionUpgradeEnableProcess enableProc;
 
-    /** Last joining node. */
-    private ClusterNode lastJoiningNode;
+    /** */
+    private final ClusterVersionFinalizationProcess finalizeProc;
 
-    /** Last joining node timestamp. */
-    private long lastJoiningNodeTimestamp;
+    /** */
+    private final Object topGuard = new Object();
 
-    /** Lock for synchronization between tcp-disco-msg-worker thread and 
management operations. */
-    private final Object lock = new Object();
+    /** */
+    private final Set<ClusterNode> joiningNodes = new HashSet<>();
 
     /** */
-    private final CountDownLatch startLatch = new CountDownLatch(1);
+    private volatile boolean isNodeFenceActive;
 
-    /** Pair with current and target versions. {@code null} when rolling 
upgrade is disabled. */
-    @Nullable private volatile IgnitePair<IgniteProductVersion> rollUpVers;
+    /** */
+    private volatile boolean isVersionUpgradeEnabled;
 
-    /**
-     * @param ctx Context.
-     */
+    /** */
     public RollingUpgradeProcessor(GridKernalContext ctx) {
+        this(ctx, () -> new IgniteProductFeatures(
+            IgniteVersionUtils.VER,
+            IgniteFeatureSet.buildFrom(IgniteReleaseFeatures.class))
+        );
+    }
+
+    /** */
+    protected RollingUpgradeProcessor(GridKernalContext ctx, 
Supplier<IgniteProductFeatures> locVerFeaturesProv) {
         super(ctx);
+
+        enableProc = new ClusterVersionUpgradeEnableProcess();
+        finalizeProc = new ClusterVersionFinalizationProcess();
+        featureMgr = new IgniteFeatureManager(locVerFeaturesProv);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) throws 
IgniteCheckedException {
-        DiscoverySpi spi = ctx.config().getDiscoverySpi();
+    /** */
+    public boolean isVersionUpgradeEnabled() {
+        return isVersionUpgradeEnabled;
+    }
+
+    /** */
+    public void enableVersionUpgrade() throws IgniteCheckedException {
+        ctx.security().authorize(ADMIN_ROLLING_UPGRADE);
+
+        if (isVersionUpgradeEnabled)
+            return;
+
+        enableProc.start().get();
+
+        if (log.isInfoEnabled())
+            log.info("Cluster version Rolling Upgrade was enabled");
+    }
+
+    /** */
+    public void finalizeClusterVersion() throws IgniteCheckedException {
+        ctx.security().authorize(ADMIN_ROLLING_UPGRADE);
+
+        if (!isVersionUpgradeEnabled)
+            return;
+
+        finalizeProc.start().get();
 
-        if (spi instanceof TcpDiscoverySpi)
-            ring = ((TcpDiscoverySpi)spi).discoveryRing();
+        if (log.isInfoEnabled())
+            log.info("Cluster version was successfully finalized 
[activeLogicalVer=" + clusterLogicalVersion() + ']');
+    }
 
-        startLatch.countDown();
+    /** */
+    public IgniteFeatureManager features() {
+        return featureMgr;
     }
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
-        ctx.event().addLocalEventListener(new GridLocalEventListener() {
-            @Override public void onEvent(Event evt) {
-                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
+        ctx.addNodeAttribute(ATTR_IGNITE_FEATURES, 
featureMgr.localVersionFeatures().features());
 
-                synchronized (lock) {
-                    if (lastJoiningNode != null && 
lastJoiningNode.id().equals(nodeId))
-                        lastJoiningNode = null;
+        ctx.event().addLocalEventListener(
+            evt -> {
+                synchronized (topGuard) {
+                    joiningNodes.remove(((DiscoveryEvent)evt).eventNode());
                 }
-            }
-        }, EVT_NODE_JOINED, EVT_NODE_FAILED, EVT_NODE_LEFT);
+            },
+            EVT_NODE_JOINED,
+            EVT_NODE_FAILED,
+            EVT_NODE_LEFT,
+            EVT_NODE_VALIDATION_FAILED
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return ROLLING_UPGRADE_PROC;
+    }
 
-        
ctx.internalSubscriptionProcessor().registerDistributedMetastorageListener(new 
DistributedMetastorageLifecycleListener() {
-            @Override public void onReadyForWrite(DistributedMetaStorage 
metastorage) {
-                RollingUpgradeProcessor.this.metastorage = metastorage;
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.clientNode())
+            return;
+
+        int cmpId = discoveryDataType().ordinal();
+
+        if (!dataBag.commonDataCollectedFor(cmpId))
+            dataBag.addGridCommonData(cmpId, collectRollingUpgradeNodeData());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        RollingUpgradeNodeData gridData = 
(RollingUpgradeNodeData)data.commonData();
+
+        isVersionUpgradeEnabled = gridData.isVersionUpgradeEnabled();
+        isNodeFenceActive = gridData.isNodeFenceActive();
+
+        featureMgr.onGridDataReceived(gridData.activeFeatures());
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IgniteNodeValidationResult 
validateNode(ClusterNode joiningNode) {
+        synchronized (topGuard) {
+            if (isNodeFenceActive) {
+                return new IgniteNodeValidationResult(
+                    joiningNode.id(),
+                    "Node joins are not allowed during cluster version 
finalization [joiningNode=" + joiningNode + ']');
             }
 
-            @Override public void 
onReadyForRead(ReadableDistributedMetaStorage metastorage) {
-                try {
-                    rollUpVers = 
metastorage.read(ROLLING_UPGRADE_VERSIONS_KEY);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
+            if (isVersionUpgradeEnabled) {
+                RollingUpgradeState state = detectRollingUpgradeState();
+
+                if (!state.isCompatible(joiningNode)) {
+                    return new IgniteNodeValidationResult(
+                        joiningNode.id(),
+                        "The joining node is incompatible with the current 
state of the cluster version rolling upgrade being in progress" +
+                            " [rollingUpgradeState=" + state +
+                            ", joiningNodeVer=" + joiningNode.version() +
+                            ", joiningNode=" + joiningNode + ']');
                 }
 
-                // Keep the current and target version pair in sync with 
metastorage updates, e.g., to handle coordinator changes.
-                metastorage.listen(ROLLING_UPGRADE_VERSIONS_KEY::equals, (key, 
oldVal, newVal) -> {
-                    rollUpVers = (IgnitePair<IgniteProductVersion>)newVal;
-                });
+                IgniteProductFeatures locActiveFeatures = 
featureMgr.activeFeatures();
+
+                if 
(!locActiveFeatures.isUpgradableTo(extractProductFeatures(joiningNode))) {
+                    return new IgniteNodeValidationResult(
+                        joiningNode.id(),
+                        "Rolling Upgrade is not available between the current 
cluster logical version and the joining node" +
+                            " product version [clusterLogicalVer=" + 
locActiveFeatures.version() +
+                            ", joiningNodeVer=" + joiningNode.version() +
+                            ", joiningNode=" + joiningNode + ']');
+                }
+            }
+            else if (!joiningNode.version().equals(localProductVersion())) {
+                return new IgniteNodeValidationResult(
+                    joiningNode.id(),
+                    "The joining node version differs from the version of the 
cluster" +
+                        " [clusterVer=" + localProductVersion() +
+                        ", joiningNodeVer=" + joiningNode.version() +
+                        ", joiningNode=" + joiningNode + ']');
             }
-        });
-    }
 
-    /** {@inheritDoc} The joining node is stored to verify later whether it 
successfully connected to the ring or failed to join. */
-    @Override public @Nullable IgniteNodeValidationResult 
validateNode(ClusterNode node) {
-        synchronized (lock) {
-            lastJoiningNode = node;
+            joiningNodes.add(joiningNode);

Review Comment:
   The problem is that a joining node may leave the cluster without producing 
any discovery event.
   
   See 
`org.apache.ignite.spi.discovery.tcp.ServerImpl.RingMessageWorker#processJoinRequestMessage`,
 around lines 4522-4595:
   ```java
   err = spi.getSpiContext().validateNode(node);
   ...
   if (!Objects.equals(locMarsh, rmtMarsh)) {
       ...
       // Send message "Local node's marshaller differs from remote node's 
marshaller"
       trySendMessageDirectly(node, new 
TcpDiscoveryCheckFailedMessage(locNodeId, sndMsg));
       return;
   }
   ```
   In this path the node has already passed component validation, but it is 
rejected later by TCP discovery checks and never joins the topology. As a 
result, the coordinator does not receive 
`NODE_JOINED`/`NODE_LEFT`/`NODE_FAILED` for this node, so it cannot remove it 
from `joiningNodes` by listening to discovery events only.
   
   This is why the previous implementation tracked `lastJoiningNode` and 
`lastJoiningNodeTimestamp`. They were needed to eventually forget such 
abandoned join attempts



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to