sashapolo commented on code in PR #1384:
URL: https://github.com/apache/ignite-3/pull/1384#discussion_r1034422407
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java:
##########
@@ -53,9 +57,15 @@ public class CmgRaftService {
private final ClusterService clusterService;
- public CmgRaftService(RaftGroupService raftService, ClusterService
clusterService) {
+ private final LogicalTopology logicalTopology;
+
+ /**
+ * Creates a new instance.
Review Comment:
Is this javadoc really necessary?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java:
##########
@@ -206,4 +216,33 @@ private ClusterNodeMessage nodeMessage(ClusterNode node) {
.port(node.address().port())
.build();
}
+
+ /**
+ * Issues changePeersAsync request with same peers; learners are
recalculated based on the current peers (which is same as
Review Comment:
```suggestion
* Issues {@code changePeersAsync} request with same peers; learners are
recalculated based on the current peers (which is same as
```
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java:
##########
@@ -206,4 +216,33 @@ private ClusterNodeMessage nodeMessage(ClusterNode node) {
.port(node.address().port())
.build();
}
+
+ /**
+ * Issues changePeersAsync request with same peers; learners are
recalculated based on the current peers (which is same as
+ * CMG nodes) and known logical topology. Any node in the logical topology
that is not a CMG node constitutes a learner.
+ *
+ * @param term RAFT term in which we operate (used to avoid races when
changing peers/learners).
+ * @return Future that completes when the request is processed.
+ */
+ public CompletionStage<Void> updateLearners(long term) {
+ List<Peer> currentPeers = raftService.peers();
+
+ assert currentPeers != null : "Raft service is not yet initialized";
+
+ Set<String> peersConsistentIds = currentPeers.stream()
+ .map(Peer::consistentId)
+ .collect(toSet());
+
+ Set<String> consistentIds =
logicalTopology.getLogicalTopology().stream()
Review Comment:
I would suggest replacing this code with:
```
Set<Peer> newLearners = logicalTopology.getLogicalTopology().stream()
.map(ClusterNode::name)
.filter(name -> !peersConsistentIds.contains(name))
.map(Peer::new)
.collect(toSet());
```
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/CmgRaftService.java:
##########
@@ -206,4 +216,33 @@ private ClusterNodeMessage nodeMessage(ClusterNode node) {
.port(node.address().port())
.build();
}
+
+ /**
+ * Issues changePeersAsync request with same peers; learners are
recalculated based on the current peers (which is same as
+ * CMG nodes) and known logical topology. Any node in the logical topology
that is not a CMG node constitutes a learner.
+ *
+ * @param term RAFT term in which we operate (used to avoid races when
changing peers/learners).
+ * @return Future that completes when the request is processed.
+ */
+ public CompletionStage<Void> updateLearners(long term) {
Review Comment:
I'd prefer using `CompletableFuture`
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -466,27 +468,51 @@ private CompletableFuture<CmgRaftService>
joinCluster(CmgRaftService service, Cl
* Starts the CMG Raft service using the provided node names as its peers.
*/
private CompletableFuture<CmgRaftService>
startCmgRaftService(Collection<String> nodeNames) {
+ String thisNodeConsistentId =
clusterService.topologyService().localMember().name();
+
+ // If we are not in the CMG, we must be a lerner. List of learners
will be updated by a leader accordingly,
+ // but just to start a RAFT service we must include ourselves in the
initial learners list, that's why we
+ // pass List.of(we) as learners list if we are not in the CMG.
+ List<String> learnerConsistentIds =
nodeNames.contains(thisNodeConsistentId) ? List.of() :
List.of(thisNodeConsistentId);
+
try {
return raftManager
.prepareRaftGroup(
INSTANCE,
nodeNames,
- List.of(),
- () -> new
CmgRaftGroupListener(clusterStateStorage, logicalTopologyService),
+ learnerConsistentIds,
+ () -> new
CmgRaftGroupListener(clusterStateStorage, logicalTopology,
this::onLogicalTopologyChanged),
this::createCmgRaftGroupEventsListener,
RaftGroupOptions.defaults()
)
- .thenApply(service -> new CmgRaftService(service,
clusterService));
+ .thenApply(service -> new CmgRaftService(service,
clusterService, logicalTopology));
} catch (Exception e) {
return failedFuture(e);
}
}
+ private void onLogicalTopologyChanged(long term) {
+ CompletableFuture<CmgRaftService> serviceFuture = raftService;
Review Comment:
Hm, should we take the `raftServiceLock` here?
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -466,27 +468,51 @@ private CompletableFuture<CmgRaftService>
joinCluster(CmgRaftService service, Cl
* Starts the CMG Raft service using the provided node names as its peers.
*/
private CompletableFuture<CmgRaftService>
startCmgRaftService(Collection<String> nodeNames) {
+ String thisNodeConsistentId =
clusterService.topologyService().localMember().name();
+
+ // If we are not in the CMG, we must be a lerner. List of learners
will be updated by a leader accordingly,
Review Comment:
```suggestion
// If we are not in the CMG, we must be a learner. List of learners
will be updated by a leader accordingly,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]