SammyVimes commented on code in PR #1016:
URL: https://github.com/apache/ignite-3/pull/1016#discussion_r957411508


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java:
##########
@@ -252,92 +294,189 @@ private void scheduleChangePeers(List<PeerId> peers, 
long term) {
      */
     private void doOnNewPeersConfigurationApplied(List<PeerId> peers) {
         try {
-            Map<ByteArray, Entry> keys = metaStorageMgr.getAll(
+            ByteArray pendingPartAssignmentsKey = 
pendingPartAssignmentsKey(partId);
+            ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(partId);
+            ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(partId);
+            ByteArray switchReduceKey = switchReduceKey(partId);
+            ByteArray switchAppendKey = switchAppendKey(partId);
+
+            Map<ByteArray, Entry> values = metaStorageMgr.getAll(
                     Set.of(
-                            plannedPartAssignmentsKey(partId),
-                            pendingPartAssignmentsKey(partId),
-                            stablePartAssignmentsKey(partId))).get();
+                            plannedPartAssignmentsKey,
+                            pendingPartAssignmentsKey,
+                            stablePartAssignmentsKey,
+                            switchReduceKey,
+                            switchAppendKey
+                    )
+            ).get();
 
-            Entry plannedEntry = keys.get(plannedPartAssignmentsKey(partId));
+            Entry stableEntry = values.get(stablePartAssignmentsKey);
+            Entry pendingEntry = values.get(pendingPartAssignmentsKey);
+            Entry plannedEntry = values.get(plannedPartAssignmentsKey);
+            Entry switchReduceEntry = values.get(switchReduceKey);
+            Entry switchAppendEntry = values.get(switchAppendKey);
 
-            List<ClusterNode> appliedPeers = resolveClusterNodes(peers,
-                    keys.get(pendingPartAssignmentsKey(partId)).value(), 
keys.get(stablePartAssignmentsKey(partId)).value());
+            List<ClusterNode> calculatedAssignments = 
calculateAssignmentsFn.apply(tblConfiguration, partNum);
 
-            tblConfiguration.change(ch -> {
-                List<List<ClusterNode>> assignments =
-                        (List<List<ClusterNode>>) 
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
-                assignments.set(partNum, appliedPeers);
-                ((ExtendedTableChange) 
ch).changeAssignments(ByteUtils.toBytes(assignments));
-            }).get();
-
-            if (plannedEntry.value() != null) {
-                if (!metaStorageMgr.invoke(If.iif(
-                        
revision(plannedPartAssignmentsKey(partId)).eq(plannedEntry.revision()),
-                        ops(
-                                put(stablePartAssignmentsKey(partId), 
ByteUtils.toBytes(appliedPeers)),
-                                put(pendingPartAssignmentsKey(partId), 
plannedEntry.value()),
-                                remove(plannedPartAssignmentsKey(partId)))
-                                .yield(true),
-                        ops().yield(false))).get().getAsBoolean()) {
-                    LOG.info("Planned key changed while trying to update 
rebalance information. Going to retry"
-                                    + " [key={}, partition={}, table={}, 
appliedPeers={}]",
-                            plannedPartAssignmentsKey(partId), partNum, 
tblConfiguration.name(), appliedPeers);
+            List<ClusterNode> stable = resolveClusterNodes(peers, 
pendingEntry.value(), stableEntry.value());
 
-                    doOnNewPeersConfigurationApplied(peers);
-                }
+            List<ClusterNode> retrievedSwitchReduce = 
readClusterNodes(switchReduceEntry);
+            List<ClusterNode> retrievedSwitchAppend = 
readClusterNodes(switchAppendEntry);
+            List<ClusterNode> retrievedStable = readClusterNodes(stableEntry);
 
-                LOG.info("Rebalance finished. Going to schedule next rebalance 
[partition={}, table={}, appliedPeers={}, plannedPeers={}]",
-                        partNum, tblConfiguration.name().value(), 
appliedPeers, ByteUtils.fromBytes(plannedEntry.value()));
-            } else {
-                if (!metaStorageMgr.invoke(If.iif(
-                        notExists(plannedPartAssignmentsKey(partId)),
-                        ops(put(stablePartAssignmentsKey(partId), 
ByteUtils.toBytes(appliedPeers)),
-                                
remove(pendingPartAssignmentsKey(partId))).yield(true),
-                        ops().yield(false))).get().getAsBoolean()) {
-                    LOG.info("Planned key changed while trying to update 
rebalance information. Going to retry"
-                                    + " [key={}, partition={}, table={}, 
appliedPeers={}]",
-                            plannedPartAssignmentsKey(partId), partNum, 
tblConfiguration.name(), appliedPeers);
+            // Were reduced
+            List<ClusterNode> reducedNodes = subtract(retrievedSwitchReduce, 
stable);
 
-                    doOnNewPeersConfigurationApplied(peers);
-                }
+            // Were added
+            List<ClusterNode> addedNodes = subtract(stable, retrievedStable);
 
-                LOG.info("Rebalance finished [partition={}, table={}, 
appliedPeers={}]",
-                        partNum, tblConfiguration.name().value(), 
appliedPeers);
-            }
+            // For further reduction
+            List<ClusterNode> calculatedSwitchReduce = 
subtract(retrievedSwitchReduce, reducedNodes);
 
-            rebalanceAttempts.set(0);
-        } catch (InterruptedException | ExecutionException e) {
-            // TODO: IGNITE-14693
-            LOG.warn("Unable to commit partition configuration to metastore 
[table = {}, partition = {}]",
-                    e, tblConfiguration.name(), partNum);
-        }
-    }
+            // For further addition
+            List<ClusterNode> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
+            calculatedSwitchAppend = subtract(calculatedSwitchAppend, 
addedNodes);
+            calculatedSwitchAppend = intersect(calculatedAssignments, 
calculatedSwitchAppend);
 
-    private static List<ClusterNode> resolveClusterNodes(
-            List<PeerId> peers, byte[] pendingAssignments, byte[] 
stableAssignments) {
-        Map<NetworkAddress, ClusterNode> resolveRegistry = new HashMap<>();
+            var calculatedPendingReduction = subtract(stable, 
retrievedSwitchReduce);
 
-        if (pendingAssignments != null) {
-            ((List<ClusterNode>) 
ByteUtils.fromBytes(pendingAssignments)).forEach(n -> 
resolveRegistry.put(n.address(), n));
-        }
+            var calculatedPendingAddition = union(stable, reducedNodes);
+            calculatedPendingAddition = intersect(calculatedAssignments, 
calculatedPendingAddition);
 
-        if (stableAssignments != null) {
-            ((List<ClusterNode>) 
ByteUtils.fromBytes(stableAssignments)).forEach(n -> 
resolveRegistry.put(n.address(), n));
-        }
+            // eq(revision(assignments.stable), 
retrievedAssignmentsStable.revision)
+            SimpleCondition con1 = stableEntry.empty()
+                    ? notExists(stablePartAssignmentsKey) :
+                    
revision(stablePartAssignmentsKey).eq(stableEntry.revision());
+
+            // eq(revision(assignments.pending), 
retrievedAssignmentsPending.revision)
+            SimpleCondition con2 = 
revision(pendingPartAssignmentsKey).eq(pendingEntry.revision());
 
-        List<ClusterNode> resolvedNodes = new ArrayList<>(peers.size());
+            // eq(revision(assignments.switch.reduce), 
retrievedAssignmentsSwitchReduce.revision)
+            SimpleCondition con3 = switchReduceEntry.empty()
+                    ? notExists(switchReduceKey) : 
revision(switchReduceKey).eq(switchReduceEntry.revision());
 
-        for (PeerId p : peers) {
-            var addr = NetworkAddress.from(p.getEndpoint().getIp() + ":" + 
p.getEndpoint().getPort());
+            // eq(revision(assignments.switch.append), 
retrievedAssignmentsSwitchAppend.revision)
+            SimpleCondition con4 = switchAppendEntry.empty()
+                    ? notExists(switchAppendKey) : 
revision(switchAppendKey).eq(switchAppendEntry.revision());
 
-            if (resolveRegistry.containsKey(addr)) {
-                resolvedNodes.add(resolveRegistry.get(addr));
+            Condition retryPreconditions = and(con1, and(con2, and(con3, 
con4)));
+
+            tblConfiguration.change(ch -> {
+                List<List<ClusterNode>> assignments = 
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
+                assignments.set(partNum, stable);
+                ((ExtendedTableChange) 
ch).changeAssignments(ByteUtils.toBytes(assignments));
+            }).get(10, TimeUnit.SECONDS);
+
+            Update successCase;
+            Update failCase;
+
+            byte[] stableByteArray = ByteUtils.toBytes(stable);
+            byte[] additionByteArray = 
ByteUtils.toBytes(calculatedPendingAddition);
+            byte[] reductionByteArray = 
ByteUtils.toBytes(calculatedPendingReduction);
+            byte[] switchReduceByteArray = 
ByteUtils.toBytes(calculatedSwitchReduce);
+            byte[] switchAppendByteArray = 
ByteUtils.toBytes(calculatedSwitchAppend);
+
+            if (!calculatedSwitchAppend.isEmpty()) {
+                successCase = ops(
+                        put(stablePartAssignmentsKey, stableByteArray),
+                        put(pendingPartAssignmentsKey, additionByteArray),
+                        put(switchReduceKey, switchReduceByteArray),
+                        put(switchAppendKey, switchAppendByteArray)
+                ).yield(SWITCH_APPEND_SUCCESS);
+                failCase = ops().yield(SWITCH_APPEND_FAIL);
+            } else if (!calculatedSwitchReduce.isEmpty()) {
+                successCase = ops(
+                        put(stablePartAssignmentsKey, stableByteArray),
+                        put(pendingPartAssignmentsKey, reductionByteArray),
+                        put(switchReduceKey, switchReduceByteArray),
+                        put(switchAppendKey, switchAppendByteArray)
+                ).yield(SWITCH_REDUCE_SUCCESS);
+                failCase = ops().yield(SWITCH_REDUCE_FAIL);
             } else {
-                throw new IgniteInternalException("Can't find appropriate 
cluster node for raft group peer: " + p);
+                Condition con5;

Review Comment:
   well they're just parts of one big IF statement's condition basically, I 
don't think that they deserve a proper naming. Each of them have a comment that 
describes the condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to