agura commented on a change in pull request #203: URL: https://github.com/apache/ignite-3/pull/203#discussion_r671255075
########## File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java ########## @@ -0,0 +1,985 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server.persistence; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.metastorage.server.Condition; +import org.apache.ignite.internal.metastorage.server.Entry; +import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.metastorage.server.Operation; +import org.apache.ignite.internal.metastorage.server.Value; +import org.apache.ignite.internal.metastorage.server.WatchEvent; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; +import org.rocksdb.EnvOptions; +import org.rocksdb.IngestExternalFileOptions; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.SstFileWriter; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.bytesToValue; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.forEach; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.keyToRocksKey; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.rocksKeyToBytes; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.valueToBytes; + +/** + * Key-value storage based on RocksDB. + * Keys are stored with revision. + * Values are stored with an update counter and a boolean flag which represents whether this record is a tombstone. + * <br> + * Key: [8 bytes revision, N bytes key itself]. + * <br> + * Value: [8 bytes update counter, 1 byte tombstone flag, N bytes value]. + */ +public class RocksDBKeyValueStorage implements KeyValueStorage { + /** Database snapshot file name. */ + private static final String SNAPSHOT_FILE_NAME = "db.snapshot"; + + /** Suffix for the temporary snapshot folder */ + private static final String TMP_SUFFIX = ".tmp"; + + /** A revision to store with system entries. */ + private static final long SYSTEM_REVISION_MARKER_VALUE = -1; + + /** Revision key. */ + private static final byte[] REVISION_KEY = keyToRocksKey( + SYSTEM_REVISION_MARKER_VALUE, + "SYSTEM_REVISION_KEY".getBytes(StandardCharsets.UTF_8) + ); + + /** Update counter key. */ + private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey( + SYSTEM_REVISION_MARKER_VALUE, + "SYSTEM_UPDATE_COUNTER_KEY".getBytes(StandardCharsets.UTF_8) + ); + + static { + RocksDB.loadLibrary(); + } + + /** RockDB options. */ + private final Options options; + + /** RocksDb instance. */ + private final RocksDB db; + + /** RW lock. */ + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + /** Thread-pool for snapshot operations execution. */ + private final Executor snapshotExecutor = Executors.newSingleThreadExecutor(); + + /** + * Special value for the revision number which means that operation should be applied + * to the latest revision of an entry. + */ + private static final long LATEST_REV = -1; + + /** Lexicographic order comparator. */ + static final Comparator<byte[]> CMP = Arrays::compare; + + /** Path to the rocksdb database. */ + private final Path dbPath; + + /** Keys index. Value is the list of all revisions under which the corresponding entry has ever been modified. */ + private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP); + + /** Revision. Will be incremented for each single-entry or multi-entry update operation. */ + private long rev; + + /** Update counter. Will be incremented for each update of any particular entry. */ + private long updCntr; + + /** + * Constructor. + * + * @param dbPath RocksDB path. + */ + public RocksDBKeyValueStorage(Path dbPath) { + try { + options = new Options() + .setCreateIfMissing(true) + // The prefix is the revision of an entry, so prefix length is the size of a long + .useFixedLengthPrefixExtractor(Long.BYTES); + + this.dbPath = dbPath; + + this.db = RocksDB.open(options, dbPath.toAbsolutePath().toString()); + } + catch (Exception e) { + try { + close(); + } + catch (Exception exception) { + e.addSuppressed(exception); + } + + throw new IgniteInternalException("Failed to start the storage", e); + } + } + + /** {@inheritDoc} */ + @Override public void close() throws Exception { + IgniteUtils.closeAll(options, db); + } + + /** {@inheritDoc} */ + @NotNull + @Override public CompletableFuture<Void> snapshot(Path snapshotPath) { + return createSstFile(snapshotPath); + } + + /** {@inheritDoc} */ + @Override public void restoreSnapshot(Path path) { + Path snapshotPath = path.resolve(SNAPSHOT_FILE_NAME); + + if (!Files.exists(snapshotPath)) + throw new IgniteInternalException("Snapshot not found: " + snapshotPath); + + rwLock.writeLock().lock(); + + try (IngestExternalFileOptions ingestOptions = new IngestExternalFileOptions()) { + this.db.ingestExternalFile(Collections.singletonList(snapshotPath.toString()), ingestOptions); + buildKeyIndex(); + + rev = ByteUtils.bytesToLong(this.db.get(REVISION_KEY)); + + updCntr = ByteUtils.bytesToLong(this.db.get(UPDATE_COUNTER_KEY)); + } + catch (RocksDBException e) { + throw new IgniteInternalException("Fail to ingest sst file at path: " + path, e); + } + finally { + rwLock.writeLock().unlock(); + } + } + + /** + * Builds an index of this storage. + * + * @throws RocksDBException If failed. + */ + private void buildKeyIndex() throws RocksDBException { + try (RocksIterator iterator = this.db.newIterator()) { + iterator.seekToFirst(); + + forEach(iterator, (rocksKey, value) -> { + byte[] key = rocksKeyToBytes(rocksKey); + + long revision = ByteUtils.bytesToLong(rocksKey); + + if (revision == SYSTEM_REVISION_MARKER_VALUE) + // It's a system entry like REVISION_KEY, ignore it while building the key index. + return; + + updateKeysIndex(key, revision); + }); + } + } + + /** + * Creates a SST file from {@link #db}. + * + * @param path Path to store SST file at. Review comment: Rename method parameter or javadoc param. ########## File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDBKeyValueStorage.java ########## @@ -0,0 +1,985 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server.persistence; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.metastorage.server.Condition; +import org.apache.ignite.internal.metastorage.server.Entry; +import org.apache.ignite.internal.metastorage.server.KeyValueStorage; +import org.apache.ignite.internal.metastorage.server.Operation; +import org.apache.ignite.internal.metastorage.server.Value; +import org.apache.ignite.internal.metastorage.server.WatchEvent; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; +import org.rocksdb.EnvOptions; +import org.rocksdb.IngestExternalFileOptions; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.SstFileWriter; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.bytesToValue; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.forEach; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.keyToRocksKey; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.rocksKeyToBytes; +import static org.apache.ignite.internal.metastorage.server.persistence.RocksStorageByteUtils.valueToBytes; + +/** + * Key-value storage based on RocksDB. + * Keys are stored with revision. + * Values are stored with an update counter and a boolean flag which represents whether this record is a tombstone. + * <br> + * Key: [8 bytes revision, N bytes key itself]. + * <br> + * Value: [8 bytes update counter, 1 byte tombstone flag, N bytes value]. + */ +public class RocksDBKeyValueStorage implements KeyValueStorage { + /** Database snapshot file name. */ + private static final String SNAPSHOT_FILE_NAME = "db.snapshot"; + + /** Suffix for the temporary snapshot folder */ + private static final String TMP_SUFFIX = ".tmp"; + + /** A revision to store with system entries. */ + private static final long SYSTEM_REVISION_MARKER_VALUE = -1; + + /** Revision key. */ + private static final byte[] REVISION_KEY = keyToRocksKey( + SYSTEM_REVISION_MARKER_VALUE, + "SYSTEM_REVISION_KEY".getBytes(StandardCharsets.UTF_8) + ); + + /** Update counter key. */ + private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey( + SYSTEM_REVISION_MARKER_VALUE, + "SYSTEM_UPDATE_COUNTER_KEY".getBytes(StandardCharsets.UTF_8) + ); + + static { + RocksDB.loadLibrary(); + } + + /** RockDB options. */ + private final Options options; + + /** RocksDb instance. */ + private final RocksDB db; + + /** RW lock. */ + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + + /** Thread-pool for snapshot operations execution. */ + private final Executor snapshotExecutor = Executors.newSingleThreadExecutor(); + + /** + * Special value for the revision number which means that operation should be applied + * to the latest revision of an entry. + */ + private static final long LATEST_REV = -1; + + /** Lexicographic order comparator. */ + static final Comparator<byte[]> CMP = Arrays::compare; + + /** Path to the rocksdb database. */ + private final Path dbPath; + + /** Keys index. Value is the list of all revisions under which the corresponding entry has ever been modified. */ + private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP); Review comment: It's bad idea to store `keysIdx` in memory and build it on snapshot restore. It is possible to support this index as additional column family. You can update several column families using cross families atomic write. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ########## @@ -469,4 +472,16 @@ public static void closeAll(Collection<? extends AutoCloseable> closeables) thro if (ex != null) throw ex; } + + /** + * Closes all provided objects. + * + * @param closeables Array of closeable objects to close. + * @throws Exception If failed to close. + * + * @see #closeAll(Collection) + */ + public static void closeAll(AutoCloseable... closeables) throws Exception { + closeAll(Arrays.asList(closeables)); Review comment: Please, avoid redundant objects creation. Just rewrite on cycle by array elements. ########## File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/WatchCursor.java ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.metastorage.server.persistence; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Predicate; +import org.apache.ignite.internal.metastorage.server.Entry; +import org.apache.ignite.internal.metastorage.server.EntryEvent; +import org.apache.ignite.internal.metastorage.server.Value; +import org.apache.ignite.internal.metastorage.server.WatchEvent; +import org.apache.ignite.internal.util.ByteUtils; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; + +/** + * Subscription on updates of entries corresponding to the given keys range (where an upper bound is unlimited) + * and starting from the given revision number. + */ +class WatchCursor implements Cursor<WatchEvent> { + /** Storage. */ + private final RocksDBKeyValueStorage storage; + + /** Key predicate. */ + private final Predicate<byte[]> p; + + /** Iterator for this cursor. */ + private final Iterator<WatchEvent> it; + + /** Options for {@link #nativeIterator}. */ + private final ReadOptions options = new ReadOptions().setPrefixSameAsStart(true); + + /** RocksDB iterator. */ + @Nullable + private final RocksIterator nativeIterator; + + /** + * Last matching revision. + */ + private long lastRetRev; + + /** + * Next matching revision. {@code -1} means it's not found yet or does not exist. + */ + private long nextRetRev = -1; + + /** + * Constructor. + * + * @param storage Storage. + * @param rev Starting revision. + * @param p Key predicate. + */ + WatchCursor(RocksDBKeyValueStorage storage, long rev, Predicate<byte[]> p) { + this.storage = storage; + this.p = p; + this.lastRetRev = rev - 1; + this.nativeIterator = storage.db().newIterator(options); + this.it = createIterator(); + } + + /** + * {@inheritDoc} + */ + @Override public boolean hasNext() { + return it.hasNext(); + } + + /** + * {@inheritDoc} + */ + @Nullable + @Override public WatchEvent next() { + return it.next(); + } + + /** + * {@inheritDoc} + */ + @Override public void close() throws Exception { + IgniteUtils.closeAll(options, nativeIterator); + } + + /** + * {@inheritDoc} + */ + @NotNull + @Override public Iterator<WatchEvent> iterator() { + return it; + } + + /** + * Creates an iterator for this cursor. + * + * @return Iterator. + */ + @NotNull + private Iterator<WatchEvent> createIterator() { + return new Iterator<>() { + /** {@inheritDoc} */ + @Override public boolean hasNext() { + storage.lock().readLock().lock(); + + try { + if (nextRetRev != -1) + // Next revision is already calculated and is not -1, meaning that there is set of keys + // matching the revision and the predicate. + return true; + + while (true) { + long curRev = lastRetRev + 1; + + byte[] revisionPrefix = ByteUtils.longToBytes(curRev); + + boolean empty = true; + + if (!nativeIterator.isValid()) Review comment: Please, wrap `if` body to braces. ########## File path: modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java ########## @@ -353,7 +372,7 @@ private void compactForKey( Value lastVal = kv.get(key); - if (!lastVal.tombstone()) { + if (!lastVal.isTombstone()) { Review comment: Please, return previous method name `tombstone` accordingly to https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines#CodingGuidelines-Gettersandsetters ########## File path: modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java ########## @@ -110,7 +110,11 @@ private static Path createWorkDir(ExtensionContext extensionContext) throws IOEx if (shouldRemoveDir()) IgniteUtils.deleteIfExists(BASE_PATH); - Path workDir = BASE_PATH.resolve(extensionContext.getRequiredTestMethod().getName()); + String testClassDir = extensionContext.getRequiredTestClass().getSimpleName(); + + String testMethodDir = extensionContext.getRequiredTestMethod().getName() + "_" + System.currentTimeMillis(); Review comment: Please, always use single char concatenation where it is possible (`'_'`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
