rpuch commented on code in PR #3659:
URL: https://github.com/apache/ignite-3/pull/3659#discussion_r1579525030
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -350,52 +352,66 @@ private ClusterState createClusterState(CmgInitMessage
msg) {
* </ol>
*/
private void onElectedAsLeader(long term) {
- LOG.info("CMG leader has been elected, executing onLeaderElected
callback");
-
- // The cluster state is broadcast via the messaging service; hence,
the future must be completed here on the leader node.
- // TODO: This needs to be reworked following the implementation of
IGNITE-18275.
- raftServiceAfterJoin()
- .thenCompose(CmgRaftService::readClusterState)
- .thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
-
- raftServiceAfterJoin()
- .thenCompose(this::updateLogicalTopology)
- .thenCompose(service ->
service.updateLearners(term).thenApply(unused -> service))
- .thenAccept(service -> {
- // Register a listener to send ClusterState messages to
new nodes.
- TopologyService topologyService =
clusterService.topologyService();
-
- // TODO: remove listeners if leadership is lost, see
https://issues.apache.org/jira/browse/IGNITE-16842
-
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
-
- // Send the ClusterStateMessage to all members of the
physical topology. We do not wait for the send operation
- // because being unable to send ClusterState messages
should not fail the CMG service startup.
- // TODO: IGNITE-18275 - use RAFT replication instead of
message sending
- ClusterNode thisNode = topologyService.localMember();
-
- Collection<ClusterNode> otherNodes =
topologyService.allMembers().stream()
- .filter(node -> !thisNode.equals(node))
- .collect(toList());
+ if (!busyLock.enterBusy()) {
+ LOG.info("Skipping onLeaderElected callback, because the node is
stopping");
- sendClusterState(service, otherNodes);
- })
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.warn("Error when executing onLeaderElected
callback", e);
- } else {
- LOG.info("onLeaderElected callback executed
successfully");
- }
- });
+ return;
+ }
+
+ try {
+ LOG.info("CMG leader has been elected, executing onLeaderElected
callback");
+
+ // The cluster state is broadcast via the messaging service;
hence, the future must be completed here on the leader node.
+ // TODO: This needs to be reworked following the implementation of
IGNITE-18275.
+ raftServiceAfterJoin().thenAccept(service -> inBusyLock(busyLock,
() -> {
+ service.readClusterState()
+ .thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
+
+ updateLogicalTopology(service)
+ .thenCompose(v -> inBusyLock(busyLock, () ->
service.updateLearners(term)))
+ .thenAccept(v -> inBusyLock(busyLock, () -> {
+ // Register a listener to send ClusterState
messages to new nodes.
+ TopologyService topologyService =
clusterService.topologyService();
+
+ // TODO: remove listeners if leadership is lost,
see https://issues.apache.org/jira/browse/IGNITE-16842
+
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+ // Send the ClusterStateMessage to all members of
the physical topology. We do not wait for the send operation
+ // because being unable to send ClusterState
messages should not fail the CMG service startup.
+ // TODO: IGNITE-18275 - use RAFT replication
instead of message sending
+ ClusterNode thisNode =
topologyService.localMember();
+
+ Collection<ClusterNode> otherNodes =
topologyService.allMembers().stream()
+ .filter(node -> !thisNode.equals(node))
+ .collect(toList());
+
+ sendClusterState(service, otherNodes);
+ }))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ if (unwrapCause(e) instanceof
NodeStoppingException) {
+ LOG.info("Unable to execute
onLeaderElected callback, because the node is stopping", e);
+ } else {
+ LOG.error("Error when executing
onLeaderElected callback", e);
+ }
+ } else {
+ LOG.info("onLeaderElected callback executed
successfully");
+ }
+ });
+ }));
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
* This method must be executed upon CMG leader election in order to
regain logical topology consistency in case some nodes left the
* physical topology during the election. Newly appeared nodes will be
added automatically after the new leader broadcasts the current
* cluster state.
*/
- private CompletableFuture<CmgRaftService>
updateLogicalTopology(CmgRaftService service) {
+ private CompletableFuture<Void> updateLogicalTopology(CmgRaftService
service) {
return service.logicalTopology()
- .thenCompose(logicalTopology -> {
+ .thenCompose(logicalTopology -> inBusyLock(busyLock, () -> {
Review Comment:
I think there are components that use this approach, but the current state
is fine
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]