rpuch commented on code in PR #3659:
URL: https://github.com/apache/ignite-3/pull/3659#discussion_r1579306454
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/ClusterManagementGroupManager.java:
##########
@@ -350,52 +352,66 @@ private ClusterState createClusterState(CmgInitMessage
msg) {
* </ol>
*/
private void onElectedAsLeader(long term) {
- LOG.info("CMG leader has been elected, executing onLeaderElected
callback");
-
- // The cluster state is broadcast via the messaging service; hence,
the future must be completed here on the leader node.
- // TODO: This needs to be reworked following the implementation of
IGNITE-18275.
- raftServiceAfterJoin()
- .thenCompose(CmgRaftService::readClusterState)
- .thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
-
- raftServiceAfterJoin()
- .thenCompose(this::updateLogicalTopology)
- .thenCompose(service ->
service.updateLearners(term).thenApply(unused -> service))
- .thenAccept(service -> {
- // Register a listener to send ClusterState messages to
new nodes.
- TopologyService topologyService =
clusterService.topologyService();
-
- // TODO: remove listeners if leadership is lost, see
https://issues.apache.org/jira/browse/IGNITE-16842
-
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
-
- // Send the ClusterStateMessage to all members of the
physical topology. We do not wait for the send operation
- // because being unable to send ClusterState messages
should not fail the CMG service startup.
- // TODO: IGNITE-18275 - use RAFT replication instead of
message sending
- ClusterNode thisNode = topologyService.localMember();
-
- Collection<ClusterNode> otherNodes =
topologyService.allMembers().stream()
- .filter(node -> !thisNode.equals(node))
- .collect(toList());
+ if (!busyLock.enterBusy()) {
+ LOG.info("Skipping onLeaderElected callback, because the node is
stopping");
- sendClusterState(service, otherNodes);
- })
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.warn("Error when executing onLeaderElected
callback", e);
- } else {
- LOG.info("onLeaderElected callback executed
successfully");
- }
- });
+ return;
+ }
+
+ try {
+ LOG.info("CMG leader has been elected, executing onLeaderElected
callback");
+
+ // The cluster state is broadcast via the messaging service;
hence, the future must be completed here on the leader node.
+ // TODO: This needs to be reworked following the implementation of
IGNITE-18275.
+ raftServiceAfterJoin().thenAccept(service -> inBusyLock(busyLock,
() -> {
+ service.readClusterState()
+ .thenAccept(state ->
initialClusterConfigurationFuture.complete(state.initialClusterConfiguration()));
+
+ updateLogicalTopology(service)
+ .thenCompose(v -> inBusyLock(busyLock, () ->
service.updateLearners(term)))
+ .thenAccept(v -> inBusyLock(busyLock, () -> {
+ // Register a listener to send ClusterState
messages to new nodes.
+ TopologyService topologyService =
clusterService.topologyService();
+
+ // TODO: remove listeners if leadership is lost,
see https://issues.apache.org/jira/browse/IGNITE-16842
+
topologyService.addEventHandler(cmgLeaderTopologyEventHandler(service));
+
+ // Send the ClusterStateMessage to all members of
the physical topology. We do not wait for the send operation
+ // because being unable to send ClusterState
messages should not fail the CMG service startup.
+ // TODO: IGNITE-18275 - use RAFT replication
instead of message sending
+ ClusterNode thisNode =
topologyService.localMember();
+
+ Collection<ClusterNode> otherNodes =
topologyService.allMembers().stream()
+ .filter(node -> !thisNode.equals(node))
+ .collect(toList());
+
+ sendClusterState(service, otherNodes);
+ }))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ if (unwrapCause(e) instanceof
NodeStoppingException) {
+ LOG.info("Unable to execute
onLeaderElected callback, because the node is stopping", e);
+ } else {
+ LOG.error("Error when executing
onLeaderElected callback", e);
+ }
+ } else {
+ LOG.info("onLeaderElected callback executed
successfully");
+ }
+ });
+ }));
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
* This method must be executed upon CMG leader election in order to
regain logical topology consistency in case some nodes left the
* physical topology during the election. Newly appeared nodes will be
added automatically after the new leader broadcasts the current
* cluster state.
*/
- private CompletableFuture<CmgRaftService>
updateLogicalTopology(CmgRaftService service) {
+ private CompletableFuture<Void> updateLogicalTopology(CmgRaftService
service) {
return service.logicalTopology()
- .thenCompose(logicalTopology -> {
+ .thenCompose(logicalTopology -> inBusyLock(busyLock, () -> {
Review Comment:
How about introducing an instance method called `inBusyLock()` accepting
just a closure? That would make the code a bit cleaner.
##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbClusterStateStorage.java:
##########
@@ -117,133 +127,132 @@ private void init() {
@Override
public byte @Nullable [] get(byte[] key) {
- try {
- return db.get(key);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to get data from Rocks DB",
e);
- }
+ return inBusyLock(busyLock, () -> {
+ try {
+ return db.get(key);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to get data from Rocks
DB", e);
+ }
+ });
}
@Override
public void put(byte[] key, byte[] value) {
- try {
- db.put(key, value);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to put data into Rocks DB",
e);
- }
+ inBusyLock(busyLock, () -> {
+ try {
+ db.put(defaultWriteOptions, key, value);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to put data into Rocks
DB", e);
+ }
+ });
}
@Override
public void replaceAll(byte[] prefix, byte[] key, byte[] value) {
- try (
- var batch = new WriteBatch();
- var options = new WriteOptions();
- ) {
- byte[] endKey = RocksUtils.incrementPrefix(prefix);
+ inBusyLock(busyLock, () -> {
+ try (var batch = new WriteBatch()) {
+ byte[] endKey = RocksUtils.incrementPrefix(prefix);
- assert endKey != null : Arrays.toString(prefix);
+ assert endKey != null : Arrays.toString(prefix);
- batch.deleteRange(prefix, endKey);
+ batch.deleteRange(prefix, endKey);
- batch.put(key, value);
+ batch.put(key, value);
- db.write(options, batch);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to replace data in Rocks
DB", e);
- }
+ db.write(defaultWriteOptions, batch);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to replace data in Rocks
DB", e);
+ }
+ });
}
@Override
public void remove(byte[] key) {
- try {
- db.delete(key);
- } catch (RocksDBException e) {
- throw new CmgStorageException("Unable to remove data from Rocks
DB", e);
- }
+ inBusyLock(busyLock, () -> {
+ try {
+ db.delete(defaultWriteOptions, key);
+ } catch (RocksDBException e) {
+ throw new CmgStorageException("Unable to remove data from
Rocks DB", e);
+ }
+ });
}
@Override
public void removeAll(Collection<byte[]> keys) {
- try (
- var batch = new WriteBatch();
- var options = new WriteOptions();
- ) {
- for (byte[] key : keys) {
- batch.delete(key);
+ inBusyLock(busyLock, () -> {
Review Comment:
Same suggestion about adding an instance method called `inBusyLock()`
accepting just a closure as a parameter
##########
modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java:
##########
@@ -371,11 +330,26 @@ void snapshotShouldNotContainWritesAddedAfterItsStart()
throws Exception {
assertThat(storage.get(keyAddedAfterSnapshotStart), is(nullValue()));
}
+ @Test
+ void throwsNodeStoppingException() throws Exception {
+ storage.stop();
+
+ assertThrowsWithCause(() -> storage.get(BYTE_EMPTY_ARRAY),
NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.put(BYTE_EMPTY_ARRAY,
BYTE_EMPTY_ARRAY), NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.remove(BYTE_EMPTY_ARRAY),
NodeStoppingException.class);
+ assertThrowsWithCause(() ->
storage.removeAll(List.of(BYTE_EMPTY_ARRAY)), NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.replaceAll(BYTE_EMPTY_ARRAY,
BYTE_EMPTY_ARRAY, BYTE_EMPTY_ARRAY), NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.getWithPrefix(BYTE_EMPTY_ARRAY,
(k, v) -> null), NodeStoppingException.class);
+ assertThrowsWithCause(() -> storage.restoreSnapshot(workDir),
NodeStoppingException.class);
+
+ assertThat(storage.snapshot(workDir),
willThrow(NodeStoppingException.class));
+ }
+
Review Comment:
How about adding a test for double stop?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]