sashapolo commented on a change in pull request #723:
URL: https://github.com/apache/ignite-3/pull/723#discussion_r829838204



##########
File path: 
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
##########
@@ -405,7 +369,7 @@ private ColumnFamilyDescriptor cfDescriptorFromName(String 
cfName) {
      * Creates a descriptor of the "partition" Column Family.
      */
     private static ColumnFamilyDescriptor partitionCfDescriptor() {
-        return new 
ColumnFamilyDescriptor(PARTITION_CF_NAME.getBytes(StandardCharsets.UTF_8), new 
ColumnFamilyOptions());
+        return new ColumnFamilyDescriptor(PARTITION_CF_NAME.getBytes(UTF_8), 
new ColumnFamilyOptions());

Review comment:
       indeed, I wonder why my IDEA didn't complain about that

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
##########
@@ -179,25 +179,30 @@ public void start() {
 
         ColumnFamilyOptions indexFamilyOptions = new 
ColumnFamilyOptions(indexOptions);
 
-        List<ColumnFamilyDescriptor> descriptors = Arrays.asList(
+        return List.of(
                 new ColumnFamilyDescriptor(DATA.nameAsBytes(), 
dataFamilyOptions),

Review comment:
       I had a thought about that. I think that it is ok if options will be 
closed by the GC for the following reasons:
   1. It is quite cumbersome to pass these options everywhere in order for them 
to be closed.
   2. We create only a couple of these objects during the entire life of an 
Ignite node, so it's ok if they hang out in the heap for a while.
   3. We actually very rarely get rid of these objects. 

##########
File path: 
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java
##########
@@ -44,53 +43,53 @@
     /** Column family handle. */
     private final ColumnFamilyHandle cfHandle;
 
-    /** Column family options. */
-    private final ColumnFamilyOptions cfOptions;
-
-    /** Options for the column family options. */
-    private final Options options;
-
     /**
      * Constructor.
      *
-     * @param db        Db.
-     * @param handle    Column family handle.
-     * @param cfName    Column family name.
-     * @param cfOptions Column family options.
-     * @param options   Options for the column family options (for resource 
management purposes).
+     * @param db Db.
+     * @param handle Column family handle.
      */
-    public ColumnFamily(
-            RocksDB db,
-            ColumnFamilyHandle handle,
-            String cfName,
-            ColumnFamilyOptions cfOptions,
-            @Nullable Options options
-    ) {
+    private ColumnFamily(RocksDB db, ColumnFamilyHandle handle) throws 
RocksDBException {
         this.db = db;
-        this.cfName = cfName;
-        this.cfOptions = cfOptions;
-        this.options = options;
         this.cfHandle = handle;
+        this.cfName = new String(cfHandle.getName(), StandardCharsets.UTF_8);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void close() throws Exception {
-        // cfHandle is closed by the owning RocksDB instance
-        IgniteUtils.closeAll(cfOptions, options);
+    /**
+     * Creates a new Column Family in the provided RocksDB instance.
+     *
+     * @param db RocksDB instance.
+     * @param descriptor Column Family descriptor.
+     * @return new Column Family.
+     * @throws RocksDBException If an error has occurred during creation.
+     */
+    public static ColumnFamily createNew(RocksDB db, ColumnFamilyDescriptor 
descriptor) throws RocksDBException {
+        ColumnFamilyHandle cfHandle = db.createColumnFamily(descriptor);
+
+        return new ColumnFamily(db, cfHandle);
+    }
+
+    /**
+     * Creates a wrapper around an already created Column Family.
+     *
+     * @param db RocksDB instance.
+     * @param handle Column Family handle.
+     * @return Column Family wrapper.
+     * @throws RocksDBException If an error has occurred during creation.
+     */
+    public static ColumnFamily createExisting(RocksDB db, ColumnFamilyHandle 
handle) throws RocksDBException {

Review comment:
       cool, I like this approach

##########
File path: 
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.rocksdb.snapshot;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+
+/**
+ * Class for creating and restoring RocksDB snapshots.
+ */
+public class RocksSnapshotManager {
+    /** Suffix for the temporary snapshot folder. */
+    private static final String TMP_SUFFIX = ".tmp";
+
+    private final RocksDB db;
+
+    private final Collection<ColumnFamilyRange> ranges;
+
+    private final Executor executor;
+
+    /**
+     * Creates a new instance of the snapshot manager.
+     * This instance <b>does not</b> own any of the provided resources and 
will not close them.
+     *
+     * @param db RocksDB instance which snapshots will be managed.
+     * @param ranges Key ranges of Column Families that exist in the provided 
{@code db} instance.
+     * @param executor Executor which will be used for creating snapshots.
+     */
+    public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> 
ranges, Executor executor) {
+        assert !ranges.isEmpty();
+
+        this.db = db;
+        this.ranges = ranges;
+        this.executor = executor;
+    }
+
+    /**
+     * Creates a snapshot of the enclosed RocksDB instance and saves it into a 
provided folder.
+     *
+     * @param snapshotDir Folder to save the snapshot into.
+     * @return Future that either completes successfully upon snapshot 
creation or signals a failure.
+     */
+    public CompletableFuture<Void> createSnapshot(Path snapshotDir) {
+        Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX);
+
+        return CompletableFuture.supplyAsync(db::getSnapshot, executor)
+                .thenComposeAsync(snapshot -> {
+                    createTmpSnapshotDir(tmpPath);
+
+                    // Create futures for capturing SST snapshots of the 
column families
+                    CompletableFuture<?>[] sstFutures = ranges.stream()
+                            .map(cf -> createSstFileAsync(cf, snapshot, 
tmpPath))
+                            .toArray(CompletableFuture[]::new);
+
+                    return CompletableFuture.allOf(sstFutures).thenApply(v -> 
snapshot);
+                }, executor)
+                .whenCompleteAsync((snapshot, e) -> {
+                    // Release the snapshot
+                    db.releaseSnapshot(snapshot);
+
+                    // Snapshot is not actually closed here, because a 
Snapshot instance doesn't own a pointer, the
+                    // database does. Calling close to maintain the 
AutoCloseable semantics
+                    snapshot.close();

Review comment:
       nice catch!

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
##########
@@ -218,78 +223,33 @@ private void destroyRocksDb() throws RocksDBException {
     public void close() throws Exception {
         IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);

Review comment:
       Let's imagine the scenario when this method is actually closed: we are 
stopping an Ignite node. In order to stop the node, we first need to stop the 
Meta Storage raft group. In which case it's the enclosing component's job to 
verify that all necessary futures have been resolved before allowing to close 
itself. What do you think?

##########
File path: 
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
##########
@@ -206,4 +208,74 @@ void testRestart(
         assertThat(storage.getPartition(1), is(nullValue()));
         assertThat(storage.getPartition(0).read(testData), 
is(equalTo(testData)));
     }
+
+    /**
+     * Tests that restoring a snapshot clears all previous data.
+     */
+    @Test
+    void testRestoreSnapshot() {
+        PartitionStorage partitionStorage = storage.getOrCreatePartition(0);
+
+        var testData1 = new 
SimpleDataRow("foo".getBytes(StandardCharsets.UTF_8), 
"bar".getBytes(StandardCharsets.UTF_8));
+        var testData2 = new 
SimpleDataRow("baz".getBytes(StandardCharsets.UTF_8), 
"quux".getBytes(StandardCharsets.UTF_8));
+
+        Path snapshotDir = workDir.resolve("snapshot");
+
+        partitionStorage.write(testData1);
+
+        assertThat(partitionStorage.snapshot(snapshotDir), 
willBe(nullValue(Void.class)));
+
+        partitionStorage.write(testData2);
+
+        partitionStorage.restoreSnapshot(snapshotDir);
+
+        assertThat(partitionStorage.read(testData1), is(testData1));
+        assertThat(partitionStorage.read(testData2), is(nullValue()));
+    }
+
+    /**
+     * Tests that loading snapshots for one partition does not influence data 
in another.
+     */
+    @Test
+    void testSnapshotIndependence() {

Review comment:
       > Item 3 does not seem to have anything with snapshotting, so my 
suggestion is to remove it (line 278).
   
   I don't agree, it checks that loading a snapshot for a different partition 
does not overwrite existing keys in another partition.
   
   I'll split the test

##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
##########
@@ -218,78 +223,33 @@ private void destroyRocksDb() throws RocksDBException {
     public void close() throws Exception {
         IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, 
TimeUnit.SECONDS);

Review comment:
       > So, in our case, this would mean that MetaStorageManager (the class 
that now stops RocksDbKeyValueStorage) would need to make sure that all the 
futures are resolved? 
   
   yes
   
   > Currently, its stop() method does not seem to bother about it explicitly, 
but maybe I'm missing something here.
   
   because it's not entirely correct =)
   
   I think I will create a ticket to properly implement the component's stop




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to