SammyVimes commented on code in PR #1016:
URL: https://github.com/apache/ignite-3/pull/1016#discussion_r957408631


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/RebalanceRaftGroupEventsListener.java:
##########
@@ -252,92 +294,189 @@ private void scheduleChangePeers(List<PeerId> peers, 
long term) {
      */
     private void doOnNewPeersConfigurationApplied(List<PeerId> peers) {
         try {
-            Map<ByteArray, Entry> keys = metaStorageMgr.getAll(
+            ByteArray pendingPartAssignmentsKey = 
pendingPartAssignmentsKey(partId);
+            ByteArray stablePartAssignmentsKey = 
stablePartAssignmentsKey(partId);
+            ByteArray plannedPartAssignmentsKey = 
plannedPartAssignmentsKey(partId);
+            ByteArray switchReduceKey = switchReduceKey(partId);
+            ByteArray switchAppendKey = switchAppendKey(partId);
+
+            Map<ByteArray, Entry> values = metaStorageMgr.getAll(
                     Set.of(
-                            plannedPartAssignmentsKey(partId),
-                            pendingPartAssignmentsKey(partId),
-                            stablePartAssignmentsKey(partId))).get();
+                            plannedPartAssignmentsKey,
+                            pendingPartAssignmentsKey,
+                            stablePartAssignmentsKey,
+                            switchReduceKey,
+                            switchAppendKey
+                    )
+            ).get();
 
-            Entry plannedEntry = keys.get(plannedPartAssignmentsKey(partId));
+            Entry stableEntry = values.get(stablePartAssignmentsKey);
+            Entry pendingEntry = values.get(pendingPartAssignmentsKey);
+            Entry plannedEntry = values.get(plannedPartAssignmentsKey);
+            Entry switchReduceEntry = values.get(switchReduceKey);
+            Entry switchAppendEntry = values.get(switchAppendKey);
 
-            List<ClusterNode> appliedPeers = resolveClusterNodes(peers,
-                    keys.get(pendingPartAssignmentsKey(partId)).value(), 
keys.get(stablePartAssignmentsKey(partId)).value());
+            List<ClusterNode> calculatedAssignments = 
calculateAssignmentsFn.apply(tblConfiguration, partNum);
 
-            tblConfiguration.change(ch -> {
-                List<List<ClusterNode>> assignments =
-                        (List<List<ClusterNode>>) 
ByteUtils.fromBytes(((ExtendedTableChange) ch).assignments());
-                assignments.set(partNum, appliedPeers);
-                ((ExtendedTableChange) 
ch).changeAssignments(ByteUtils.toBytes(assignments));
-            }).get();
-
-            if (plannedEntry.value() != null) {
-                if (!metaStorageMgr.invoke(If.iif(
-                        
revision(plannedPartAssignmentsKey(partId)).eq(plannedEntry.revision()),
-                        ops(
-                                put(stablePartAssignmentsKey(partId), 
ByteUtils.toBytes(appliedPeers)),
-                                put(pendingPartAssignmentsKey(partId), 
plannedEntry.value()),
-                                remove(plannedPartAssignmentsKey(partId)))
-                                .yield(true),
-                        ops().yield(false))).get().getAsBoolean()) {
-                    LOG.info("Planned key changed while trying to update 
rebalance information. Going to retry"
-                                    + " [key={}, partition={}, table={}, 
appliedPeers={}]",
-                            plannedPartAssignmentsKey(partId), partNum, 
tblConfiguration.name(), appliedPeers);
+            List<ClusterNode> stable = resolveClusterNodes(peers, 
pendingEntry.value(), stableEntry.value());
 
-                    doOnNewPeersConfigurationApplied(peers);
-                }
+            List<ClusterNode> retrievedSwitchReduce = 
readClusterNodes(switchReduceEntry);
+            List<ClusterNode> retrievedSwitchAppend = 
readClusterNodes(switchAppendEntry);
+            List<ClusterNode> retrievedStable = readClusterNodes(stableEntry);
 
-                LOG.info("Rebalance finished. Going to schedule next rebalance 
[partition={}, table={}, appliedPeers={}, plannedPeers={}]",
-                        partNum, tblConfiguration.name().value(), 
appliedPeers, ByteUtils.fromBytes(plannedEntry.value()));
-            } else {
-                if (!metaStorageMgr.invoke(If.iif(
-                        notExists(plannedPartAssignmentsKey(partId)),
-                        ops(put(stablePartAssignmentsKey(partId), 
ByteUtils.toBytes(appliedPeers)),
-                                
remove(pendingPartAssignmentsKey(partId))).yield(true),
-                        ops().yield(false))).get().getAsBoolean()) {
-                    LOG.info("Planned key changed while trying to update 
rebalance information. Going to retry"
-                                    + " [key={}, partition={}, table={}, 
appliedPeers={}]",
-                            plannedPartAssignmentsKey(partId), partNum, 
tblConfiguration.name(), appliedPeers);
+            // Were reduced
+            List<ClusterNode> reducedNodes = subtract(retrievedSwitchReduce, 
stable);
 
-                    doOnNewPeersConfigurationApplied(peers);
-                }
+            // Were added
+            List<ClusterNode> addedNodes = subtract(stable, retrievedStable);
 
-                LOG.info("Rebalance finished [partition={}, table={}, 
appliedPeers={}]",
-                        partNum, tblConfiguration.name().value(), 
appliedPeers);
-            }
+            // For further reduction
+            List<ClusterNode> calculatedSwitchReduce = 
subtract(retrievedSwitchReduce, reducedNodes);
 
-            rebalanceAttempts.set(0);
-        } catch (InterruptedException | ExecutionException e) {
-            // TODO: IGNITE-14693
-            LOG.warn("Unable to commit partition configuration to metastore 
[table = {}, partition = {}]",
-                    e, tblConfiguration.name(), partNum);
-        }
-    }
+            // For further addition
+            List<ClusterNode> calculatedSwitchAppend = 
union(retrievedSwitchAppend, reducedNodes);
+            calculatedSwitchAppend = subtract(calculatedSwitchAppend, 
addedNodes);
+            calculatedSwitchAppend = intersect(calculatedAssignments, 
calculatedSwitchAppend);
 
-    private static List<ClusterNode> resolveClusterNodes(
-            List<PeerId> peers, byte[] pendingAssignments, byte[] 
stableAssignments) {
-        Map<NetworkAddress, ClusterNode> resolveRegistry = new HashMap<>();
+            var calculatedPendingReduction = subtract(stable, 
retrievedSwitchReduce);

Review Comment:
   Right, forgot to clean them up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to