SammyVimes commented on a change in pull request #203:
URL: https://github.com/apache/ignite-3/pull/203#discussion_r673129308



##########
File path: 
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java
##########
@@ -0,0 +1,1093 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server.persistence;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.metastorage.server.Condition;
+import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.Value;
+import org.apache.ignite.internal.metastorage.server.WatchEvent;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.addLongToLongsByteArray;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToLong;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.bytesToValue;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.find;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.forEach;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.getAsLongs;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.keyToRocksKey;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.longToBytes;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.valueToBytes;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.DATA;
+import static 
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.INDEX;
+
+/**
+ * Key-value storage based on RocksDB.
+ * Keys are stored with revision.
+ * Values are stored with an update counter and a boolean flag which 
represents whether this record is a tombstone.
+ * <br>
+ * Key: [8 bytes revision, N bytes key itself].
+ * <br>
+ * Value: [8 bytes update counter, 1 byte tombstone flag, N bytes value].
+ */
+public class RocksDBKeyValueStorage implements KeyValueStorage {
+    /** Suffix for the temporary snapshot folder */
+    private static final String TMP_SUFFIX = ".tmp";
+
+    /** A revision to store with system entries. */
+    private static final long SYSTEM_REVISION_MARKER_VALUE = -1;
+
+    /** Revision key. */
+    private static final byte[] REVISION_KEY = keyToRocksKey(
+        SYSTEM_REVISION_MARKER_VALUE,
+        "SYSTEM_REVISION_KEY".getBytes(StandardCharsets.UTF_8)
+    );
+
+    /** Update counter key. */
+    private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey(
+        SYSTEM_REVISION_MARKER_VALUE,
+        "SYSTEM_UPDATE_COUNTER_KEY".getBytes(StandardCharsets.UTF_8)
+    );
+
+    static {
+        RocksDB.loadLibrary();
+    }
+
+    /** RockDB options. */
+    private final DBOptions options;
+
+    /** RocksDb instance. */
+    private final RocksDB db;
+
+    /** Data column family. */
+    private final ColumnFamily data;
+
+    /** Index column family. */
+    private final ColumnFamily index;
+
+    /** RW lock. */
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    /** Thread-pool for snapshot operations execution. */
+    private final Executor snapshotExecutor = Executors.newFixedThreadPool(2);
+
+    /**
+     * Special value for the revision number which means that operation should 
be applied
+     * to the latest revision of an entry.
+     */
+    private static final long LATEST_REV = -1;
+
+    /** Lexicographic order comparator. */
+    static final Comparator<byte[]> CMP = Arrays::compare;
+
+    /** Path to the rocksdb database. */
+    private final Path dbPath;
+
+    /** Revision. Will be incremented for each single-entry or multi-entry 
update operation. */
+    private long rev;
+
+    /** Update counter. Will be incremented for each update of any particular 
entry. */
+    private long updCntr;
+
+    /**
+     * Constructor.
+     *
+     * @param dbPath RocksDB path.
+     */
+    public RocksDBKeyValueStorage(Path dbPath) {
+        try {
+            options = new DBOptions()
+                .setCreateMissingColumnFamilies(true)
+                .setCreateIfMissing(true);
+
+            this.dbPath = dbPath;
+
+            Options dataOptions = new Options().setCreateIfMissing(true)
+                // The prefix is the revision of an entry, so prefix length is 
the size of a long
+                .useFixedLengthPrefixExtractor(Long.BYTES);
+
+            ColumnFamilyOptions dataFamilyOptions = new 
ColumnFamilyOptions(dataOptions);
+
+            Options indexOptions = new Options().setCreateIfMissing(true);
+
+            ColumnFamilyOptions indexFamilyOptions = new 
ColumnFamilyOptions(indexOptions);
+
+            List<ColumnFamilyDescriptor> descriptors = Arrays.asList(
+                new ColumnFamilyDescriptor(DATA.nameAsBytes(), 
dataFamilyOptions),
+                new ColumnFamilyDescriptor(INDEX.nameAsBytes(), 
indexFamilyOptions)
+            );
+
+            var handles = new ArrayList<ColumnFamilyHandle>();
+
+            // Delete existing data, relying on the raft's snapshot and log 
playback
+            destroyRocksDB();
+
+            this.db = RocksDB.open(options, 
dbPath.toAbsolutePath().toString(), descriptors, handles);
+
+            data = new ColumnFamily(db, handles.get(0), DATA, 
dataFamilyOptions, dataOptions);
+
+            index = new ColumnFamily(db, handles.get(1), INDEX, 
indexFamilyOptions, indexOptions);
+        }
+        catch (Exception e) {
+            try {
+                close();
+            }
+            catch (Exception exception) {
+                e.addSuppressed(exception);
+            }
+
+            throw new IgniteInternalException("Failed to start the storage", 
e);
+        }
+    }
+
+    /**
+     * Clear the RocksDB instance.
+     * The major difference with directly deleting the DB directory manually 
is that
+     * destroyDB() will take care of the case where the RocksDB database is 
stored
+     * in multiple directories. For instance, a single DB can be configured to 
store
+     * its data in multiple directories by specifying different paths to
+     * DBOptions::db_paths, DBOptions::db_log_dir, and DBOptions::wal_dir.
+     *
+     * @throws RocksDBException If failed.
+     */
+    private void destroyRocksDB() throws RocksDBException {
+        try (final Options opt = new Options()) {
+            RocksDB.destroyDB(dbPath.toString(), opt);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws Exception {
+        IgniteUtils.closeAll(options, data, index, db);
+    }
+
+    /** {@inheritDoc} */
+    @NotNull
+    @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {

Review comment:
       Nothing bad at all, we'll just use the snapshot of the database at the 
time when a snapshot was requested. 
   I'm not sure though that this situation won't happen, but afaik it should 
not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to