sanpwc commented on code in PR #4056:
URL: https://github.com/apache/ignite-3/pull/4056#discussion_r1678058683


##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -40,7 +40,7 @@ public class Assignments implements Serializable {
     private static final long serialVersionUID = -59553172012153869L;
 
     /** Empty assignments. */
-    public static final Assignments EMPTY = new 
Assignments(Collections.emptySet(), false);
+    public static final Assignments EMPTY = new 
Assignments(Collections.emptySet(), false, 0);

Review Comment:
   Is 0 a valid catalog version? I mean that it shouldn't be valid if we wan't 
to use it as a default value.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java:
##########
@@ -271,7 +282,13 @@ private void countDownPartitionsFromZone(Set<Assignment> 
stable) {
 
             Condition condition = value(tablesCounterKey(zoneId, 
partId)).eq(counterEntry.value());
 
-            byte[] stableArray = Assignments.toBytes(stable);
+            Entry catalogVersionEntry = 
metaStorageMgr.get(catalogVersionKey(zoneId, partId)).get();
+
+            assert catalogVersionEntry.value() != null;
+
+            int catalogVersion = bytesToInt(catalogVersionEntry.value());
+
+            byte[] stableArray = Assignments.toBytes(catalogVersion, stable);

Review Comment:
   I agree with Misha, catalogVersion as an initial parameter is confusing.



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -53,32 +53,38 @@ public class Assignments implements Serializable {
      */
     private final boolean force;
 
+    /**
+     * Version of Catalog that matches current assignments.

Review Comment:
   Such definition is rather ambiguous. Could you please expand it a bit 
explaining that it's a catalog version at the time of assignments calculation?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -828,7 +831,8 @@ private CompletableFuture<Void> 
handleChangePendingAssignmentEvent(
             return handleChangePendingAssignmentEvent(
                     zonePartitionId,
                     stableAssignments,
-                    pendingAssignments
+                    pendingAssignments,
+                    catalogVersion

Review Comment:
   Why should we pass catalogVersion explicitly if we already have one in 
pendingAssignments parameter?



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -663,7 +663,8 @@ public CompletableFuture<Void> onUpdate(WatchEvent evt) {
                                     dataNodes,
                                     zoneDescriptor.replicas(),
                                     replicaGrpId,
-                                    evt
+                                    evt,
+                                    catalogVersion

Review Comment:
   It's errorprone to pass catalogVersion like that, first of all because 
neither naming nor semantics enforces its boundaries. As mentioned above I'd 
consider adding catalog version to zoneDescriptor and pass it as is.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java:
##########
@@ -429,14 +439,22 @@ static void doStableKeySwitch(
             Set<Assignment> retrievedStable = 
readAssignments(stableEntry).nodes();
             Set<Assignment> retrievedSwitchReduce = 
readAssignments(switchReduceEntry).nodes();
             Set<Assignment> retrievedSwitchAppend = 
readAssignments(switchAppendEntry).nodes();
-            Set<Assignment> retrievedPending = 
readAssignments(pendingEntry).nodes();
+
+            Assignments pendingAssignments = readAssignments(pendingEntry);
+
+            Set<Assignment> retrievedPending = pendingAssignments.nodes();
             long stableChangeTriggerValue = stableChangeTriggerEntry.value() 
== null
                     ? 0L : 
bytesToLongKeepingOrder(stableChangeTriggerEntry.value());
 
             if (!retrievedPending.equals(stableFromRaft)) {
                 return;
             }
 
+            int catalogVersion = pendingAssignments.catalogVersion();

Review Comment:
   Could you please add a comment explaining why it's safe to use catalog 
version from pending assignments? I mean that we obviously have a case when the 
node is stale and thus it will read new pending and not the one that it's 
currently processing within onChangePeersConfiguartionApplied.



##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -565,7 +565,7 @@ private CompletableFuture<List<Assignments>> 
getOrCreateAssignments(
                             dataNodes,
                             zoneDescriptor.partitions(),

Review Comment:
   Curious whether it'll be more robust not to pass catalogVersion as an 
additional method param but to add it to Catalog<>Descriptor? I believe that 
should be easy to implement and will eliminate the ability of passing non 
matching catalog descriptors and versions.



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -53,32 +53,38 @@ public class Assignments implements Serializable {
      */
     private final boolean force;
 
+    /**
+     * Version of Catalog that matches current assignments.
+     */
+    private final int catalogVersion;

Review Comment:
   I don't like how it looks. Formally Assignments is a part of affinity module 
that should not depend on catalog. Let's revise it one more time. Let's revise 
an idea of using a timestamp here.



##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java:
##########
@@ -429,14 +439,22 @@ static void doStableKeySwitch(
             Set<Assignment> retrievedStable = 
readAssignments(stableEntry).nodes();
             Set<Assignment> retrievedSwitchReduce = 
readAssignments(switchReduceEntry).nodes();
             Set<Assignment> retrievedSwitchAppend = 
readAssignments(switchAppendEntry).nodes();
-            Set<Assignment> retrievedPending = 
readAssignments(pendingEntry).nodes();
+
+            Assignments pendingAssignments = readAssignments(pendingEntry);
+
+            Set<Assignment> retrievedPending = pendingAssignments.nodes();
             long stableChangeTriggerValue = stableChangeTriggerEntry.value() 
== null
                     ? 0L : 
bytesToLongKeepingOrder(stableChangeTriggerEntry.value());
 
             if (!retrievedPending.equals(stableFromRaft)) {
                 return;
             }
 
+            int catalogVersion = pendingAssignments.catalogVersion();
+
+            Set<Assignment> calculatedAssignments =

Review Comment:
   Why not to use same approach in `ZoneRebalanceRaftGroupEventsListener`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to