rpuch commented on a change in pull request #723:
URL: https://github.com/apache/ignite-3/pull/723#discussion_r829731056



##########
File path: 
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+
+/**
+ * Class for creating and restoring RocksDB snapshots.
+ */
+public class RocksSnapshotManager {
+    /** Suffix for the temporary snapshot folder. */
+    private static final String TMP_SUFFIX = ".tmp";
+
+    private final RocksDB db;
+
+    private final Collection<ColumnFamilyRange> ranges;
+
+    private final Executor executor;
+
+    /**
+     * Creates a new instance of the snapshot manager.
+     * This instance <b>does not</b> own any of the provided resources and 
will not close them.
+     *
+     * @param db RocksDB instance which snapshots will be managed.
+     * @param ranges Key ranges of Column Families that exist in the provided 
{@code db} instance.
+     * @param executor Executor which will be used for creating snapshots.
+     */
+    public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> 
ranges, Executor executor) {
+        assert !ranges.isEmpty();
+
+        this.db = db;
+        this.ranges = ranges;

Review comment:
       Let's make a defensive copy of ranges: better safe than sorry.

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
##########
@@ -218,78 +223,33 @@ private void destroyRocksDb() throws RocksDBException {
     public void close() throws Exception {
         IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);

Review comment:
       This is not what you touch in this PR, just a sidenode. The executor 
uses an unbounded work queue and a bounded number of threads, and snapshotting 
operations can probably be pretty long. So, if there is an ongoing snapshot, 
the executor will be stopped forcibly in the midst of a snapshot. Two 
(possible?) problems:
   
   1. A snapshot might not be finished?
   2. Some tasks submitted to the executor could be enqueued in its work queue 
and have no chance to be executed, so they will be thrown on the floor due to 
stop. This will make the corresponding futures hang forever.
   
   Should we file tickets about these problems?

##########
File path: 
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
##########
@@ -44,53 +43,53 @@
     /** Column family handle. */
     private final ColumnFamilyHandle cfHandle;
 
-    /** Column family options. */
-    private final ColumnFamilyOptions cfOptions;
-
-    /** Options for the column family options. */
-    private final Options options;
-
     /**
      * Constructor.
      *
-     * @param db        Db.
-     * @param handle    Column family handle.
-     * @param cfName    Column family name.
-     * @param cfOptions Column family options.
-     * @param options   Options for the column family options (for resource 
management purposes).
+     * @param db Db.
+     * @param handle Column family handle.
      */
-    public ColumnFamily(
-            RocksDB db,
-            ColumnFamilyHandle handle,
-            String cfName,
-            ColumnFamilyOptions cfOptions,
-            @Nullable Options options
-    ) {
+    private ColumnFamily(RocksDB db, ColumnFamilyHandle handle) throws 
RocksDBException {
         this.db = db;
-        this.cfName = cfName;
-        this.cfOptions = cfOptions;
-        this.options = options;
         this.cfHandle = handle;
+        this.cfName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void close() throws Exception {
-        // cfHandle is closed by the owning RocksDB instance
-        IgniteUtils.closeAll(cfOptions, options);
+    /**
+     * Creates a new Column Family in the provided RocksDB instance.
+     *
+     * @param db RocksDB instance.
+     * @param descriptor Column Family descriptor.
+     * @return new Column Family.
+     * @throws RocksDBException If an error has occurred during creation.
+     */
+    public static ColumnFamily createNew(RocksDB db, ColumnFamilyDescriptor 
descriptor) throws RocksDBException {
+        ColumnFamilyHandle cfHandle = db.createColumnFamily(descriptor);
+
+        return new ColumnFamily(db, cfHandle);
+    }
+
+    /**
+     * Creates a wrapper around an already created Column Family.
+     *
+     * @param db RocksDB instance.
+     * @param handle Column Family handle.
+     * @return Column Family wrapper.
+     * @throws RocksDBException If an error has occurred during creation.
+     */
+    public static ColumnFamily createExisting(RocksDB db, ColumnFamilyHandle 
handle) throws RocksDBException {

Review comment:
       How about renaming this to `wrap()` as it actually just wraps a handle? 
Such name seems more accurate than `createExisting()`.
   
   Then, `createNew()` can be renamed to just `create()` as it actually creates 
a column family.

##########
File path: 
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+
+/**
+ * Class for creating and restoring RocksDB snapshots.
+ */
+public class RocksSnapshotManager {
+    /** Suffix for the temporary snapshot folder. */
+    private static final String TMP_SUFFIX = ".tmp";
+
+    private final RocksDB db;
+
+    private final Collection<ColumnFamilyRange> ranges;
+
+    private final Executor executor;
+
+    /**
+     * Creates a new instance of the snapshot manager.
+     * This instance <b>does not</b> own any of the provided resources and 
will not close them.
+     *
+     * @param db RocksDB instance which snapshots will be managed.
+     * @param ranges Key ranges of Column Families that exist in the provided 
{@code db} instance.
+     * @param executor Executor which will be used for creating snapshots.
+     */
+    public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> 
ranges, Executor executor) {
+        assert !ranges.isEmpty();
+
+        this.db = db;
+        this.ranges = ranges;
+        this.executor = executor;
+    }
+
+    /**
+     * Creates a snapshot of the enclosed RocksDB instance and saves it into a 
provided folder.
+     *
+     * @param snapshotDir Folder to save the snapshot into.
+     * @return Future that either completes successfully upon snapshot 
creation or signals a failure.
+     */
+    public CompletableFuture<Void> createSnapshot(Path snapshotDir) {
+        Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX);
+
+        return CompletableFuture.supplyAsync(db::getSnapshot, executor)
+                .thenComposeAsync(snapshot -> {
+                    createTmpSnapshotDir(tmpPath);
+
+                    // Create futures for capturing SST snapshots of the 
column families
+                    CompletableFuture<?>[] sstFutures = ranges.stream()
+                            .map(cf -> createSstFileAsync(cf, snapshot, 
tmpPath))
+                            .toArray(CompletableFuture[]::new);
+
+                    return CompletableFuture.allOf(sstFutures).thenApply(v -> 
snapshot);
+                }, executor)
+                .whenCompleteAsync((snapshot, e) -> {
+                    // Release the snapshot

Review comment:
       Do we need this comment at all? The following line talks for itself.

##########
File path: 
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
##########
@@ -206,4 +208,74 @@ void testRestart(
         assertThat(storage.getPartition(1), is(nullValue()));
         assertThat(storage.getPartition(0).read(testData), 
is(equalTo(testData)));
     }
+
+    /**
+     * Tests that restoring a snapshot clears all previous data.
+     */
+    @Test
+    void testRestoreSnapshot() {
+        PartitionStorage partitionStorage = storage.getOrCreatePartition(0);
+
+        var testData1 = new 
SimpleDataRow("foo".getBytes(StandardCharsets.UTF_8), 
"bar".getBytes(StandardCharsets.UTF_8));
+        var testData2 = new 
SimpleDataRow("baz".getBytes(StandardCharsets.UTF_8), 
"quux".getBytes(StandardCharsets.UTF_8));
+
+        Path snapshotDir = workDir.resolve("snapshot");
+
+        partitionStorage.write(testData1);
+
+        assertThat(partitionStorage.snapshot(snapshotDir), 
willBe(nullValue(Void.class)));
+
+        partitionStorage.write(testData2);
+
+        partitionStorage.restoreSnapshot(snapshotDir);
+
+        assertThat(partitionStorage.read(testData1), is(testData1));
+        assertThat(partitionStorage.read(testData2), is(nullValue()));
+    }
+
+    /**
+     * Tests that loading snapshots for one partition does not influence data 
in another.
+     */
+    @Test
+    void testSnapshotIndependence() {

Review comment:
       It looks like this test actually has three test scenarios:
   
   1. When we restore one partition from a snapshot, other partitions are not 
affected
   2. When we restore a partition from a snapshot, the data is reverted to the 
values that were in the snapshot (we don't see a key that was added after 
snapshot creation)
   3. When we read by a row, actually its key gets used for addressing the 
value, and `value` part is ignored.
   
   Item 3 does not seem to have anything with snapshotting, so my suggestion is 
to remove it (line 278).
   
   As for items 1 and 2, I suggest to split the test scenario to two and test 
them independently.
   
   

##########
File path: 
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+
+/**
+ * Class for creating and restoring RocksDB snapshots.
+ */
+public class RocksSnapshotManager {
+    /** Suffix for the temporary snapshot folder. */
+    private static final String TMP_SUFFIX = ".tmp";
+
+    private final RocksDB db;
+
+    private final Collection<ColumnFamilyRange> ranges;
+
+    private final Executor executor;
+
+    /**
+     * Creates a new instance of the snapshot manager.
+     * This instance <b>does not</b> own any of the provided resources and 
will not close them.
+     *
+     * @param db RocksDB instance which snapshots will be managed.
+     * @param ranges Key ranges of Column Families that exist in the provided 
{@code db} instance.
+     * @param executor Executor which will be used for creating snapshots.
+     */
+    public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> 
ranges, Executor executor) {
+        assert !ranges.isEmpty();
+
+        this.db = db;
+        this.ranges = ranges;
+        this.executor = executor;
+    }
+
+    /**
+     * Creates a snapshot of the enclosed RocksDB instance and saves it into a 
provided folder.
+     *
+     * @param snapshotDir Folder to save the snapshot into.
+     * @return Future that either completes successfully upon snapshot 
creation or signals a failure.
+     */
+    public CompletableFuture<Void> createSnapshot(Path snapshotDir) {
+        Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX);
+
+        return CompletableFuture.supplyAsync(db::getSnapshot, executor)
+                .thenComposeAsync(snapshot -> {
+                    createTmpSnapshotDir(tmpPath);
+
+                    // Create futures for capturing SST snapshots of the 
column families
+                    CompletableFuture<?>[] sstFutures = ranges.stream()
+                            .map(cf -> createSstFileAsync(cf, snapshot, 
tmpPath))
+                            .toArray(CompletableFuture[]::new);
+
+                    return CompletableFuture.allOf(sstFutures).thenApply(v -> 
snapshot);
+                }, executor)
+                .whenCompleteAsync((snapshot, e) -> {
+                    // Release the snapshot
+                    db.releaseSnapshot(snapshot);
+
+                    // Snapshot is not actually closed here, because a 
Snapshot instance doesn't own a pointer, the
+                    // database does. Calling close to maintain the 
AutoCloseable semantics
+                    snapshot.close();

Review comment:
       It seems that if `e` is not null, then `snapshot` is null, so this might 
throw an NPE.

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
##########
@@ -218,78 +223,33 @@ private void destroyRocksDb() throws RocksDBException {
     public void close() throws Exception {
         IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);

Review comment:
       So, in our case, this would mean that `MetaStorageManager` (the class 
that now stops `RocksDbKeyValueStorage`) would need to make sure that all the 
futures are resolved? Currently, its `stop()` method does not seem to bother 
about it explicitly, but maybe I'm missing something here.
   
   If we push this responsibility to the enclosing component, then this must be 
a part of the contract, so the contract grows a bit more, and it becomes a 
little bit less explicit (as the job to deal with such hanging futures is not 
encapsulated close to the executor). I'm not sure if this is bad or not, to be 
honest.
   
   Also, we still need this to be done at the enclosing component side, and it 
does not seem that it's done there yet (again, maybe I just don't see it). Not 
in this PR, of course.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to