rpuch commented on a change in pull request #723: URL: https://github.com/apache/ignite-3/pull/723#discussion_r829731056
########## File path: modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rocksdb.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.apache.ignite.internal.rocksdb.RocksUtils; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.rocksdb.EnvOptions; +import org.rocksdb.IngestExternalFileOptions; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; +import org.rocksdb.Snapshot; +import org.rocksdb.SstFileWriter; + +/** + * Class for creating and restoring RocksDB snapshots. + */ +public class RocksSnapshotManager { + /** Suffix for the temporary snapshot folder. */ + private static final String TMP_SUFFIX = ".tmp"; + + private final RocksDB db; + + private final Collection<ColumnFamilyRange> ranges; + + private final Executor executor; + + /** + * Creates a new instance of the snapshot manager. + * This instance <b>does not</b> own any of the provided resources and will not close them. + * + * @param db RocksDB instance which snapshots will be managed. + * @param ranges Key ranges of Column Families that exist in the provided {@code db} instance. + * @param executor Executor which will be used for creating snapshots. + */ + public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> ranges, Executor executor) { + assert !ranges.isEmpty(); + + this.db = db; + this.ranges = ranges; Review comment: Let's make a defensive copy of ranges: better safe than sorry. ########## File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java ########## @@ -218,78 +223,33 @@ private void destroyRocksDb() throws RocksDBException { public void close() throws Exception { IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS); Review comment: This is not what you touch in this PR, just a sidenode. The executor uses an unbounded work queue and a bounded number of threads, and snapshotting operations can probably be pretty long. So, if there is an ongoing snapshot, the executor will be stopped forcibly in the midst of a snapshot. Two (possible?) problems: 1. A snapshot might not be finished? 2. Some tasks submitted to the executor could be enqueued in its work queue and have no chance to be executed, so they will be thrown on the floor due to stop. This will make the corresponding futures hang forever. Should we file tickets about these problems? ########## File path: modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/ColumnFamily.java ########## @@ -44,53 +43,53 @@ /** Column family handle. */ private final ColumnFamilyHandle cfHandle; - /** Column family options. */ - private final ColumnFamilyOptions cfOptions; - - /** Options for the column family options. */ - private final Options options; - /** * Constructor. * - * @param db Db. - * @param handle Column family handle. - * @param cfName Column family name. - * @param cfOptions Column family options. - * @param options Options for the column family options (for resource management purposes). + * @param db Db. + * @param handle Column family handle. */ - public ColumnFamily( - RocksDB db, - ColumnFamilyHandle handle, - String cfName, - ColumnFamilyOptions cfOptions, - @Nullable Options options - ) { + private ColumnFamily(RocksDB db, ColumnFamilyHandle handle) throws RocksDBException { this.db = db; - this.cfName = cfName; - this.cfOptions = cfOptions; - this.options = options; this.cfHandle = handle; + this.cfName = new String(cfHandle.getName(), StandardCharsets.UTF_8); } - /** {@inheritDoc} */ - @Override - public void close() throws Exception { - // cfHandle is closed by the owning RocksDB instance - IgniteUtils.closeAll(cfOptions, options); + /** + * Creates a new Column Family in the provided RocksDB instance. + * + * @param db RocksDB instance. + * @param descriptor Column Family descriptor. + * @return new Column Family. + * @throws RocksDBException If an error has occurred during creation. + */ + public static ColumnFamily createNew(RocksDB db, ColumnFamilyDescriptor descriptor) throws RocksDBException { + ColumnFamilyHandle cfHandle = db.createColumnFamily(descriptor); + + return new ColumnFamily(db, cfHandle); + } + + /** + * Creates a wrapper around an already created Column Family. + * + * @param db RocksDB instance. + * @param handle Column Family handle. + * @return Column Family wrapper. + * @throws RocksDBException If an error has occurred during creation. + */ + public static ColumnFamily createExisting(RocksDB db, ColumnFamilyHandle handle) throws RocksDBException { Review comment: How about renaming this to `wrap()` as it actually just wraps a handle? Such name seems more accurate than `createExisting()`. Then, `createNew()` can be renamed to just `create()` as it actually creates a column family. ########## File path: modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rocksdb.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.apache.ignite.internal.rocksdb.RocksUtils; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.rocksdb.EnvOptions; +import org.rocksdb.IngestExternalFileOptions; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; +import org.rocksdb.Snapshot; +import org.rocksdb.SstFileWriter; + +/** + * Class for creating and restoring RocksDB snapshots. + */ +public class RocksSnapshotManager { + /** Suffix for the temporary snapshot folder. */ + private static final String TMP_SUFFIX = ".tmp"; + + private final RocksDB db; + + private final Collection<ColumnFamilyRange> ranges; + + private final Executor executor; + + /** + * Creates a new instance of the snapshot manager. + * This instance <b>does not</b> own any of the provided resources and will not close them. + * + * @param db RocksDB instance which snapshots will be managed. + * @param ranges Key ranges of Column Families that exist in the provided {@code db} instance. + * @param executor Executor which will be used for creating snapshots. + */ + public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> ranges, Executor executor) { + assert !ranges.isEmpty(); + + this.db = db; + this.ranges = ranges; + this.executor = executor; + } + + /** + * Creates a snapshot of the enclosed RocksDB instance and saves it into a provided folder. + * + * @param snapshotDir Folder to save the snapshot into. + * @return Future that either completes successfully upon snapshot creation or signals a failure. + */ + public CompletableFuture<Void> createSnapshot(Path snapshotDir) { + Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX); + + return CompletableFuture.supplyAsync(db::getSnapshot, executor) + .thenComposeAsync(snapshot -> { + createTmpSnapshotDir(tmpPath); + + // Create futures for capturing SST snapshots of the column families + CompletableFuture<?>[] sstFutures = ranges.stream() + .map(cf -> createSstFileAsync(cf, snapshot, tmpPath)) + .toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(sstFutures).thenApply(v -> snapshot); + }, executor) + .whenCompleteAsync((snapshot, e) -> { + // Release the snapshot Review comment: Do we need this comment at all? The following line talks for itself. ########## File path: modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java ########## @@ -206,4 +208,74 @@ void testRestart( assertThat(storage.getPartition(1), is(nullValue())); assertThat(storage.getPartition(0).read(testData), is(equalTo(testData))); } + + /** + * Tests that restoring a snapshot clears all previous data. + */ + @Test + void testRestoreSnapshot() { + PartitionStorage partitionStorage = storage.getOrCreatePartition(0); + + var testData1 = new SimpleDataRow("foo".getBytes(StandardCharsets.UTF_8), "bar".getBytes(StandardCharsets.UTF_8)); + var testData2 = new SimpleDataRow("baz".getBytes(StandardCharsets.UTF_8), "quux".getBytes(StandardCharsets.UTF_8)); + + Path snapshotDir = workDir.resolve("snapshot"); + + partitionStorage.write(testData1); + + assertThat(partitionStorage.snapshot(snapshotDir), willBe(nullValue(Void.class))); + + partitionStorage.write(testData2); + + partitionStorage.restoreSnapshot(snapshotDir); + + assertThat(partitionStorage.read(testData1), is(testData1)); + assertThat(partitionStorage.read(testData2), is(nullValue())); + } + + /** + * Tests that loading snapshots for one partition does not influence data in another. + */ + @Test + void testSnapshotIndependence() { Review comment: It looks like this test actually has three test scenarios: 1. When we restore one partition from a snapshot, other partitions are not affected 2. When we restore a partition from a snapshot, the data is reverted to the values that were in the snapshot (we don't see a key that was added after snapshot creation) 3. When we read by a row, actually its key gets used for addressing the value, and `value` part is ignored. Item 3 does not seem to have anything with snapshotting, so my suggestion is to remove it (line 278). As for items 1 and 2, I suggest to split the test scenario to two and test them independently. ########## File path: modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.rocksdb.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.apache.ignite.internal.rocksdb.RocksUtils; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.rocksdb.EnvOptions; +import org.rocksdb.IngestExternalFileOptions; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; +import org.rocksdb.Snapshot; +import org.rocksdb.SstFileWriter; + +/** + * Class for creating and restoring RocksDB snapshots. + */ +public class RocksSnapshotManager { + /** Suffix for the temporary snapshot folder. */ + private static final String TMP_SUFFIX = ".tmp"; + + private final RocksDB db; + + private final Collection<ColumnFamilyRange> ranges; + + private final Executor executor; + + /** + * Creates a new instance of the snapshot manager. + * This instance <b>does not</b> own any of the provided resources and will not close them. + * + * @param db RocksDB instance which snapshots will be managed. + * @param ranges Key ranges of Column Families that exist in the provided {@code db} instance. + * @param executor Executor which will be used for creating snapshots. + */ + public RocksSnapshotManager(RocksDB db, Collection<ColumnFamilyRange> ranges, Executor executor) { + assert !ranges.isEmpty(); + + this.db = db; + this.ranges = ranges; + this.executor = executor; + } + + /** + * Creates a snapshot of the enclosed RocksDB instance and saves it into a provided folder. + * + * @param snapshotDir Folder to save the snapshot into. + * @return Future that either completes successfully upon snapshot creation or signals a failure. + */ + public CompletableFuture<Void> createSnapshot(Path snapshotDir) { + Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX); + + return CompletableFuture.supplyAsync(db::getSnapshot, executor) + .thenComposeAsync(snapshot -> { + createTmpSnapshotDir(tmpPath); + + // Create futures for capturing SST snapshots of the column families + CompletableFuture<?>[] sstFutures = ranges.stream() + .map(cf -> createSstFileAsync(cf, snapshot, tmpPath)) + .toArray(CompletableFuture[]::new); + + return CompletableFuture.allOf(sstFutures).thenApply(v -> snapshot); + }, executor) + .whenCompleteAsync((snapshot, e) -> { + // Release the snapshot + db.releaseSnapshot(snapshot); + + // Snapshot is not actually closed here, because a Snapshot instance doesn't own a pointer, the + // database does. Calling close to maintain the AutoCloseable semantics + snapshot.close(); Review comment: It seems that if `e` is not null, then `snapshot` is null, so this might throw an NPE. ########## File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java ########## @@ -218,78 +223,33 @@ private void destroyRocksDb() throws RocksDBException { public void close() throws Exception { IgniteUtils.shutdownAndAwaitTermination(snapshotExecutor, 10, TimeUnit.SECONDS); Review comment: So, in our case, this would mean that `MetaStorageManager` (the class that now stops `RocksDbKeyValueStorage`) would need to make sure that all the futures are resolved? Currently, its `stop()` method does not seem to bother about it explicitly, but maybe I'm missing something here. If we push this responsibility to the enclosing component, then this must be a part of the contract, so the contract grows a bit more, and it becomes a little bit less explicit (as the job to deal with such hanging futures is not encapsulated close to the executor). I'm not sure if this is bad or not, to be honest. Also, we still need this to be done at the enclosing component side, and it does not seem that it's done there yet (again, maybe I just don't see it). Not in this PR, of course. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
