SammyVimes commented on code in PR #1016:
URL: https://github.com/apache/ignite-3/pull/1016#discussion_r957427737
##########
modules/table/src/main/java/org/apache/ignite/internal/utils/RebalanceUtil.java:
##########
@@ -235,4 +288,209 @@ public static boolean recoverable(Throwable t) {
// As long as we don't have a general failure handler, we assume that
all errors are recoverable.
return true;
}
+
+ /**
+ * Starts the process of removing peer from raft group if that peer has
in-memory storage or if its
+ * storage was cleared.
+ *
+ * @param partId Partition's raft group id.
+ * @param clusterNode Cluster node to be removed from peers.
+ * @param metaStorageMgr MetaStorage manager.
+ * @return Completable future that signifies the completion of this
operation.
+ */
+ public static CompletableFuture<Void> startPeerRemoval(String partId,
ClusterNode clusterNode, MetaStorageManager metaStorageMgr) {
+ ByteArray key = switchReduceKey(partId);
+
+ return metaStorageMgr.get(key)
+ .thenCompose(retrievedAssignmentsSwitchReduce -> {
+ byte[] prevValue =
retrievedAssignmentsSwitchReduce.value();
+
+ boolean prevValueEmpty = true;
+ List<ClusterNode> calculatedAssignmentsSwitchReduce;
+
+ if (prevValue != null) {
+ calculatedAssignmentsSwitchReduce =
(List<ClusterNode>) ByteUtils.fromBytes(prevValue);
+ prevValueEmpty = false;
+ } else {
+ calculatedAssignmentsSwitchReduce = new ArrayList<>();
+ }
+
+ calculatedAssignmentsSwitchReduce.add(clusterNode);
+
+ byte[] newValue =
ByteUtils.toBytes(calculatedAssignmentsSwitchReduce);
+
+ if (prevValueEmpty) {
+ return metaStorageMgr.invoke(
+ Conditions.notExists(key),
+ Operations.put(key, newValue),
+ Operations.noop()
+ );
+ } else {
+ return metaStorageMgr.invoke(
+
revision(key).eq(retrievedAssignmentsSwitchReduce.revision()),
+ Operations.put(key, newValue),
+ Operations.noop()
+ );
+ }
+ }).thenCompose(res -> {
+ if (!res) {
+ return startPeerRemoval(partId, clusterNode,
metaStorageMgr);
+ }
+
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ /**
+ * Handles assignments switch reduce changed.
+ *
+ * @param metaStorageMgr MetaStorage manager.
+ * @param baselineNodes Baseline nodes.
+ * @param partitions Partitions count.
+ * @param replicas Replicas count.
+ * @param partNum Number of the partition.
+ * @param partId Partition's raft group id..
+ * @param event Assignments switch reduce change event.
+ * @return Completable future that signifies the completion of this
operation.
+ */
+ public static CompletableFuture<Void>
handleReduceChanged(MetaStorageManager metaStorageMgr, Collection<ClusterNode>
baselineNodes,
+ int partitions, int replicas, int partNum, String partId,
WatchEvent event) {
+ Entry entry = event.entryEvent().newEntry();
+ byte[] eventData = entry.value();
+
+ List<ClusterNode> assignments =
AffinityUtils.calculateAssignments(baselineNodes, partitions,
replicas).get(partNum);
+ List<ClusterNode> switchReduce = (List<ClusterNode>)
ByteUtils.fromBytes(eventData);
Review Comment:
What can I say, we must refactor the hell out of it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]