rpuch commented on code in PR #3659:
URL: https://github.com/apache/ignite-3/pull/3659#discussion_r1579306454


##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -350,52 +352,66 @@ private ClusterState createClusterState(CmgInitMessage 
msg) {
      * </ol>
      */
     private void onElectedAsLeader(long term) {
-        LOG.info("CMG leader has been elected, executing onLeaderElected 
callback");
-
-        // The cluster state is broadcast via the messaging service; hence, 
the future must be completed here on the leader node.
-        // TODO: This needs to be reworked following the implementation of 
IGNITE-18275.
-        raftServiceAfterJoin()
-                .thenCompose(CmgRaftService::readClusterState)
-                .thenAccept(state -> 
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
-
-        raftServiceAfterJoin()
-                .thenCompose(this::updateLogicalTopology)
-                .thenCompose(service -> 
service.updateLearners(term).thenApply(unused -> service))
-                .thenAccept(service -> {
-                    // Register a listener to send ClusterState messages to 
new nodes.
-                    TopologyService topologyService = 
clusterService.topologyService();
-
-                    // TODO: remove listeners if leadership is lost, see 
https://issues.apache.org/jira/browse/IGNITE-16842
-                    
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
-
-                    // Send the ClusterStateMessage to all members of the 
physical topology. We do not wait for the send operation
-                    // because being unable to send ClusterState messages 
should not fail the CMG service startup.
-                    // TODO: IGNITE-18275 - use RAFT replication instead of 
message sending
-                    ClusterNode thisNode = topologyService.localMember();
-
-                    Collection<ClusterNode> otherNodes = 
topologyService.allMembers().stream()
-                            .filter(node -> !thisNode.equals(node))
-                            .collect(toList());
+        if (!busyLock.enterBusy()) {
+            LOG.info("Skipping onLeaderElected callback, because the node is 
stopping");
 
-                    sendClusterState(service, otherNodes);
-                })
-                .whenComplete((v, e) -> {
-                    if (e != null) {
-                        LOG.warn("Error when executing onLeaderElected 
callback", e);
-                    } else {
-                        LOG.info("onLeaderElected callback executed 
successfully");
-                    }
-                });
+            return;
+        }
+
+        try {
+            LOG.info("CMG leader has been elected, executing onLeaderElected 
callback");
+
+            // The cluster state is broadcast via the messaging service; 
hence, the future must be completed here on the leader node.
+            // TODO: This needs to be reworked following the implementation of 
IGNITE-18275.
+            raftServiceAfterJoin().thenAccept(service -> inBusyLock(busyLock, 
() -> {
+                service.readClusterState()
+                        .thenAccept(state -> 
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
+
+                updateLogicalTopology(service)
+                        .thenCompose(v -> inBusyLock(busyLock, () -> 
service.updateLearners(term)))
+                        .thenAccept(v -> inBusyLock(busyLock, () -> {
+                            // Register a listener to send ClusterState 
messages to new nodes.
+                            TopologyService topologyService = 
clusterService.topologyService();
+
+                            // TODO: remove listeners if leadership is lost, 
see https://issues.apache.org/jira/browse/IGNITE-16842
+                            
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+                            // Send the ClusterStateMessage to all members of 
the physical topology. We do not wait for the send operation
+                            // because being unable to send ClusterState 
messages should not fail the CMG service startup.
+                            // TODO: IGNITE-18275 - use RAFT replication 
instead of message sending
+                            ClusterNode thisNode = 
topologyService.localMember();
+
+                            Collection<ClusterNode> otherNodes = 
topologyService.allMembers().stream()
+                                    .filter(node -> !thisNode.equals(node))
+                                    .collect(toList());
+
+                            sendClusterState(service, otherNodes);
+                        }))
+                        .whenComplete((v, e) -> {
+                            if (e != null) {
+                                if (unwrapCause(e) instanceof 
NodeStoppingException) {
+                                    LOG.info("Unable to execute 
onLeaderElected callback, because the node is stopping", e);
+                                } else {
+                                    LOG.error("Error when executing 
onLeaderElected callback", e);
+                                }
+                            } else {
+                                LOG.info("onLeaderElected callback executed 
successfully");
+                            }
+                        });
+            }));
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * This method must be executed upon CMG leader election in order to 
regain logical topology consistency in case some nodes left the
      * physical topology during the election. Newly appeared nodes will be 
added automatically after the new leader broadcasts the current
      * cluster state.
      */
-    private CompletableFuture<CmgRaftService> 
updateLogicalTopology(CmgRaftService service) {
+    private CompletableFuture<Void> updateLogicalTopology(CmgRaftService 
service) {
         return service.logicalTopology()
-                .thenCompose(logicalTopology -> {
+                .thenCompose(logicalTopology -> inBusyLock(busyLock, () -> {

Review Comment:
   How about introducing an instance method called `inBusyLock()` accepting 
just a closure? That would make the code a bit cleaner.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java:
##########
@@ -117,133 +127,132 @@ private void init() {
 
     @Override
     public byte @Nullable [] get(byte[] key) {
-        try {
-            return db.get(key);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to get data from Rocks DB", 
e);
-        }
+        return inBusyLock(busyLock, () -> {
+            try {
+                return db.get(key);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to get data from Rocks 
DB", e);
+            }
+        });
     }
 
     @Override
     public void put(byte[] key, byte[] value) {
-        try {
-            db.put(key, value);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to put data into Rocks DB", 
e);
-        }
+        inBusyLock(busyLock, () -> {
+            try {
+                db.put(defaultWriteOptions, key, value);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to put data into Rocks 
DB", e);
+            }
+        });
     }
 
     @Override
     public void replaceAll(byte[] prefix, byte[] key, byte[] value) {
-        try (
-                var batch = new WriteBatch();
-                var options = new WriteOptions();
-        ) {
-            byte[] endKey = RocksUtils.incrementPrefix(prefix);
+        inBusyLock(busyLock, () -> {
+            try (var batch = new WriteBatch()) {
+                byte[] endKey = RocksUtils.incrementPrefix(prefix);
 
-            assert endKey != null : Arrays.toString(prefix);
+                assert endKey != null : Arrays.toString(prefix);
 
-            batch.deleteRange(prefix, endKey);
+                batch.deleteRange(prefix, endKey);
 
-            batch.put(key, value);
+                batch.put(key, value);
 
-            db.write(options, batch);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to replace data in Rocks 
DB", e);
-        }
+                db.write(defaultWriteOptions, batch);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to replace data in Rocks 
DB", e);
+            }
+        });
     }
 
     @Override
     public void remove(byte[] key) {
-        try {
-            db.delete(key);
-        } catch (RocksDBException e) {
-            throw new CmgStorageException("Unable to remove data from Rocks 
DB", e);
-        }
+        inBusyLock(busyLock, () -> {
+            try {
+                db.delete(defaultWriteOptions, key);
+            } catch (RocksDBException e) {
+                throw new CmgStorageException("Unable to remove data from 
Rocks DB", e);
+            }
+        });
     }
 
     @Override
     public void removeAll(Collection<byte[]> keys) {
-        try (
-                var batch = new WriteBatch();
-                var options = new WriteOptions();
-        ) {
-            for (byte[] key : keys) {
-                batch.delete(key);
+        inBusyLock(busyLock, () -> {

Review Comment:
   Same suggestion about adding an instance method called `inBusyLock()` 
accepting just a closure as a parameter



##########
modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java:
##########
@@ -371,11 +330,26 @@ void snapshotShouldNotContainWritesAddedAfterItsStart() 
throws Exception {
         assertThat(storage.get(keyAddedAfterSnapshotStart), is(nullValue()));
     }
 
+    @Test
+    void throwsNodeStoppingException() throws Exception {
+        storage.stop();
+
+        assertThrowsWithCause(() -> storage.get(BYTE_EMPTY_ARRAY), 
NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.put(BYTE_EMPTY_ARRAY, 
BYTE_EMPTY_ARRAY), NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.remove(BYTE_EMPTY_ARRAY), 
NodeStoppingException.class);
+        assertThrowsWithCause(() -> 
storage.removeAll(List.of(BYTE_EMPTY_ARRAY)), NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.replaceAll(BYTE_EMPTY_ARRAY, 
BYTE_EMPTY_ARRAY, BYTE_EMPTY_ARRAY), NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.getWithPrefix(BYTE_EMPTY_ARRAY, 
(k, v) -> null), NodeStoppingException.class);
+        assertThrowsWithCause(() -> storage.restoreSnapshot(workDir), 
NodeStoppingException.class);
+
+        assertThat(storage.snapshot(workDir), 
willThrow(NodeStoppingException.class));
+    }
+

Review Comment:
   How about adding a test for double stop?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to