rpuch commented on code in PR #722:
URL: https://github.com/apache/ignite-3/pull/722#discussion_r845984411


##########
modules/cluster-management/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterManagerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.network.StaticNodeFinder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Integration tests for {@link ClusterManagementGroupManager}.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ItClusterManagerTest {
+    private static final int PORT_BASE = 10000;
+
+    private final List<MockNode> cluster = new ArrayList<>();
+
+    @WorkDirectory
+    private Path workDir;
+
+    @BeforeEach
+    void setUp(TestInfo testInfo) throws IOException {
+        var addr1 = new NetworkAddress("localhost", PORT_BASE);
+        var addr2 = new NetworkAddress("localhost", PORT_BASE + 1);
+
+        var nodeFinder = new StaticNodeFinder(List.of(addr1, addr2));
+
+        cluster.add(new MockNode(testInfo, addr1, nodeFinder, 
workDir.resolve("node0")));
+        cluster.add(new MockNode(testInfo, addr2, nodeFinder, 
workDir.resolve("node1")));
+
+        for (MockNode node : cluster) {
+            node.start();
+        }
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        for (MockNode node : cluster) {
+            node.beforeNodeStop();
+        }
+
+        for (MockNode node : cluster) {
+            node.stop();
+        }
+    }
+
+    /**
+     * Tests initial cluster setup.
+     */
+    @Test
+    void testInit() throws Exception {
+        String[] cmgNodes = { cluster.get(0).localMember().name() };
+
+        String[] metaStorageNodes = { cluster.get(1).localMember().name() };
+
+        initCluster(metaStorageNodes, cmgNodes);
+
+        assertThat(cluster.get(0).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+        assertThat(cluster.get(1).clusterManager().metaStorageNodes(), 
will(containsInAnyOrder(metaStorageNodes)));
+
+        ClusterNode[] expectedTopology = currentPhysicalTopology();
+
+        assertThat(cluster.get(0).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+        assertThat(cluster.get(1).clusterManager().logicalTopology(), 
will(containsInAnyOrder(expectedTopology)));
+    }
+
+    /**
+     * Tests cancelling an init command due to a node failure.
+     */
+    @Test
+    void testInitCancel() throws Exception {

Review Comment:
   Ok, it still tests for 2 different scenarios. The second is not that 
obvious, it is not mentioned anywhere (comments, test name).
   
   Hence the suggestion: split the method to 2 test cases (one per tested 
property) and name each test method accordongly, so that the reader understands 
the tested property just by reading the method name.



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/IllegalInitArgumentException.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management;
+
+/**
+ * Internal exception used by the {@link ClusterManagementGroupManager} to 
respond to incorrect user commands.
+ */
+class IllegalInitArgumentException extends RuntimeException {

Review Comment:
   Shouldn't it extend `IgniteException`?



##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java:
##########
@@ -164,20 +189,29 @@ private static Ignite doStart(String nodeName, @Nullable 
String cfgContent, Path
         ackBanner();
 
         try {
-            nodeToStart.start(cfgContent);
+            return nodeToStart.start(cfgContent)
+                    .handle((ignite, e) -> {
+                        if (e == null) {
+                            ackSuccessStart();
+
+                            return ignite;
+                        } else {
+                            throw handleException(nodeName, e);
+                        }
+                    });
         } catch (Exception e) {
-            nodes.remove(nodeName);
-
-            if (e instanceof IgniteException) {
-                throw e;
-            } else {
-                throw new IgniteException(e);
-            }
+            throw handleException(nodeName, e);
         }
+    }
 
-        ackSuccessStart();
+    private static IgniteException handleException(String nodeName, Throwable 
e) {

Review Comment:
   I suggest to rename the method to `handleStartException()` to make it clear 
that it's about handling start exceptions, not any exception at all (which 
could be tempting later for someone who is not attentive enough).



##########
modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbRaftStorage.java:
##########
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster.management.raft;
+
+import static 
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
+import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+
+/**
+ * {@link RaftStorage} implementation based on RocksDB.
+ */
+public class RocksDbRaftStorage implements RaftStorage {
+    /** Thread-pool for snapshot operations execution. */
+    private final ExecutorService snapshotExecutor = 
Executors.newFixedThreadPool(2);
+
+    /** Path to the rocksdb database. */
+    private final Path dbPath;
+
+    /** RockDB options. */
+    private volatile Options options;
+
+    /** RocksDb instance. */
+    private volatile RocksDB db;
+
+    private volatile RocksSnapshotManager snapshotManager;
+
+    private final Object snapshotRestoreLock = new Object();
+
+    public RocksDbRaftStorage(Path dbPath) {
+        this.dbPath = dbPath;
+    }
+
+    @Override
+    public void start() {
+        options = new Options().setCreateIfMissing(true);
+
+        try {
+            db = RocksDB.open(options, dbPath.toString());
+
+            ColumnFamily defaultCf = ColumnFamily.wrap(db, 
db.getDefaultColumnFamily());
+
+            snapshotManager = new RocksSnapshotManager(db, 
List.of(fullRange(defaultCf)), snapshotExecutor);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Failed to start the storage", 
e);
+        }
+    }
+
+    @Override
+    public boolean isStarted() {
+        return db != null;
+    }
+
+    @Override
+    public byte @Nullable [] get(byte[] key) {
+        try {
+            return db.get(key);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to get data from Rocks 
DB", e);
+        }
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) {
+        try {
+            db.put(key, value);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to put data into Rocks 
DB", e);
+        }
+    }
+
+    @Override
+    public void remove(byte[] key) {
+        try {
+            db.delete(key);
+        } catch (RocksDBException e) {
+            throw new IgniteInternalException("Unable to remove data from 
Rocks DB", e);
+        }
+    }
+
+    @Override
+    public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], 
byte[], T> entryTransformer) {
+        byte[] upperBound = prefix.clone();
+
+        upperBound[upperBound.length - 1] += 1;

Review Comment:
   Now I have 2 questions :)
   
   1. Why isn't `Byte.MAX_VALUE` a valid value for the last byte?
   2. Doesn't RocksDB interpret bytes as unsigned? In Java they are signed, so 
max(byte) for RocksDB is not the same as max(byte) for Java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to