sashapolo commented on a change in pull request #203:
URL: https://github.com/apache/ignite-3/pull/203#discussion_r670401710
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Value.java
##########
@@ -66,7 +66,7 @@ public long updateCounter() {
*
* @return {@code True} if value is tombstone, otherwise - {@code false}.
*/
- boolean tombstone() {
+ public boolean tombstone() {
Review comment:
can we rename it to `isTombstone`?
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
##########
@@ -314,13 +316,33 @@ else if (clo.command() instanceof WatchExactKeysCommand) {
/** {@inheritDoc} */
@Override public void onSnapshotSave(String path, Consumer<Throwable>
doneClo) {
- // Not implemented yet.
+ storage.snapshot(Paths.get(path)).whenComplete((unused, throwable) -> {
Review comment:
can we change this method's signature to accept a `Path`?
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RocksDBKeyValueStorage.java
##########
@@ -0,0 +1,1249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+import org.rocksdb.SstFileWriter;
+import org.rocksdb.StringAppendOperator;
+
+import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
+
+/**
+ * Key-value storage based on RocksDB.
+ * Keys are stored with revision.
+ * Values are stored with update counter and a boolean flag which represenets
whether this record is a tombstone.
+ * <br>
+ * Key: [8 bytes revision, N bytes key itself].
+ * <br>
+ * Value: [8 bytes update counter, 1 byte tombstone flag, N bytes value].
+ */
+public class RocksDBKeyValueStorage implements KeyValueStorage {
+ /** Database snapshot file name. */
+ private static final String SNAPSHOT_FILE_NAME = "db.snapshot";
+
+ /** Suffix for temporal snapshot folder */
+ private static final String TMP_SUFFIX = ".tmp";
+
+ /** Revision key. */
+ private static final byte[] REVISION_KEY = keyToRocksKey(-1,
"SYSTEM_REVISION_KEY".getBytes());
+
+ /** Update counter key. */
+ private static final byte[] UPDATE_COUNTER_KEY = keyToRocksKey(-1,
"SYSTEM_UPDATE_COUNTER_KEY".getBytes());
+
+ static {
+ RocksDB.loadLibrary();
+ }
+
+ /** RockDB options. */
+ private final Options options;
+
+ /** RocksDb instance. */
+ private final RocksDB db;
+
+ /** RW lock. */
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ /** Thread-pool for a snapshot operation execution. */
+ private final Executor snapshotExecutor =
Executors.newSingleThreadExecutor();
+
+ /**
+ * Special value for revision number which means that operation should be
applied
+ * to the latest revision of an entry.
+ */
+ private static final long LATEST_REV = -1;
+
+ /** Lexicographic order comparator. */
+ private final Comparator<byte[]> CMP = Arrays::compare;
+
+ /** Keys index. Value is the list of all revisions under which entry
corresponding to the key was modified. */
+ private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
+
+ /** Revision. Will be incremented for each single-entry or multi-entry
update operation. */
+ private long rev;
+
+ /** Update counter. Will be incremented for each update of any particular
entry. */
+ private long updCntr;
+
+ /**
+ * Constructor.
+ *
+ * @param dbPath RocksDB path.
+ */
+ public RocksDBKeyValueStorage(Path dbPath) {
+ try {
+ options = new Options();
+
+ options.setCreateIfMissing(true);
+
+ options.useFixedLengthPrefixExtractor(8);
+
+ this.db = RocksDB.open(options,
dbPath.toAbsolutePath().toString());
+ }
+ catch (Exception e) {
+ close();
+
+ throw new IgniteInternalException("Failed to start the storage",
e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ try {
+ IgniteUtils.closeAll(Set.of(options, db));
+ }
+ catch (Exception e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> snapshot(Path snapshotPath) {
+ return createSnapshot(snapshotPath);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restoreSnapshot(Path snapshotPath) {
+ readSnapshot(snapshotPath);
+ }
+
+ /**
+ * Reads a snapsot from path and ingest this key-value storage.
+ *
+ * @param path Snapshot path.
+ */
+ public void readSnapshot(Path path) {
+ Lock writeLock = this.rwLock.writeLock();
+ writeLock.lock();
+
+ Path snapshotPath = path.resolve(SNAPSHOT_FILE_NAME);
+
+ if (!Files.exists(snapshotPath))
+ throw new IgniteInternalException("Snapshot not found within path:
" + path);
+
+ try (IngestExternalFileOptions ingestOptions = new
IngestExternalFileOptions()) {
+
this.db.ingestExternalFile(Collections.singletonList(snapshotPath.toString()),
ingestOptions);
+ buildKeyIndex();
+
+ rev = ByteUtils.bytesToLong(this.db.get(REVISION_KEY));
+
+ updCntr = ByteUtils.bytesToLong(this.db.get(UPDATE_COUNTER_KEY));
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException("Fail to ingest sst file at
path: " + path, e);
+ }
+ finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Builds an index of this storage.
+ *
+ * @throws RocksDBException If failed.
+ */
+ private void buildKeyIndex() throws RocksDBException {
+ try (RocksIterator iterator = this.db.newIterator()) {
+ for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
+ byte[] rocksKey = iterator.key();
+ byte[] key = rocksKeyToBytes(rocksKey);
+ long revision = ByteUtils.bytesToLong(rocksKey);
+ if (revision == -1)
+ continue;
+
+ List<Long> revs = keysIdx.computeIfAbsent(key, k -> new
ArrayList<>());
+ revs.add(revision);
+ }
+ }
+ }
+
+ /**
+ * Captures a snapshot.
+ *
+ * @param snapsthotPath Snapshot path.
+ * @return Future that represents a state of the operation.
+ */
+ public CompletableFuture<Void> createSnapshot(Path snapsthotPath) {
+ Lock readLock = this.rwLock.readLock();
+ readLock.lock();
+
+ try {
+ Path tempPath = Paths.get(snapsthotPath.toString() + TMP_SUFFIX);
+
+ IgniteUtils.delete(tempPath);
+ Files.createDirectories(tempPath);
+
+ CompletableFuture<Void> snapshotFuture = new CompletableFuture<>();
+
+ CompletableFuture<Void> sstFuture = createSstFile(tempPath);
+
+ sstFuture.whenComplete((aVoid, throwable) -> {
+ if (throwable == null) {
+ try {
+ IgniteUtils.delete(snapsthotPath);
+
+ Files.move(tempPath, snapsthotPath);
+
+ snapshotFuture.complete(null);
+ }
+ catch (Throwable t) {
+ snapshotFuture.completeExceptionally(t);
+ }
+ }
+ else
+ snapshotFuture.completeExceptionally(throwable);
+ });
+
+ return snapshotFuture;
+ }
+ catch (Exception e) {
+ return CompletableFuture.failedFuture(e);
+ }
+ finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Creates a SST file from {@link #db}.
+ *
+ * @param path Path to store SST file at.
+ * @return Future that represents a state of the operation.
+ */
+ CompletableFuture<Void> createSstFile(Path path) {
+ CompletableFuture<Void> sstFuture = new CompletableFuture<>();
+
+ snapshotExecutor.execute(() -> {
+ Lock readLock = this.rwLock.readLock();
+ readLock.lock();
+
+ Snapshot snapshot = this.db.getSnapshot();
+
+ try (
+ ReadOptions readOptions = new
ReadOptions().setSnapshot(snapshot);
+ EnvOptions envOptions = new EnvOptions();
+ Options options = new Options().setMergeOperator(new
StringAppendOperator());
+ RocksIterator it = this.db.newIterator(readOptions);
+ SstFileWriter sstFileWriter = new SstFileWriter(envOptions,
options)
+ ) {
+ Path sstFile = path.resolve(SNAPSHOT_FILE_NAME);
+
+ it.seekToFirst();
+
+ sstFileWriter.open(sstFile.toString());
+
+ long count = 0;
+
+ for (;;) {
+ if (!it.isValid()) {
+ break;
+ }
+ byte[] key = it.key();
+
+ sstFileWriter.put(key, it.value());
+
+ ++count;
+
+ it.next();
+ }
+
+ if (count == 0)
+ sstFileWriter.close();
+ else
+ sstFileWriter.finish();
+
+ sstFuture.complete(null);
+ }
+ catch (Throwable t) {
+ sstFuture.completeExceptionally(t);
+ }
+ finally {
+ readLock.unlock();
+
+ // Nothing to release, rocksDB never own the pointer for a
snapshot.
+ snapshot.close();
+ // The pointer to the snapshot is released by the database
instance.
+ this.db.releaseSnapshot(snapshot);
+ }
+ });
+
+ return sstFuture;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long revision() {
+ return rev;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long updateCounter() {
+ return updCntr;
+ }
+
+ /**
+ * Puts a value into {@link #db}.
+ *
+ * @param key Key.
+ * @param value Value.
+ */
+ private void setValue(byte[] key, byte[] value) {
+ try {
+ this.db.put(key, value);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ }
+
+ /**
+ * Updates the revision of this storage.
+ *
+ * @param newRevision New revision.
+ */
+ private void updateRevision(long newRevision) {
+ setValue(REVISION_KEY, ByteUtils.longToBytes(newRevision));
+
+ rev = newRevision;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(byte[] key, byte[] value) {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ long curRev = rev + 1;
+
+ doPut(key, value, curRev);
+
+ updateRevision(curRev);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Entry getAndPut(byte[] key, byte[] bytes) {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ long curRev = rev + 1;
+
+ long lastRev = doPut(key, bytes, curRev);
+
+ updateRevision(curRev);
+
+ // Return previous value.
+ return doGetValue(key, lastRev);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ long curRev = rev + 1;
+
+ doPutAll(curRev, keys, values);
+
+ updateRevision(curRev);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAndPutAll(List<byte[]> keys,
List<byte[]> values) {
+ Collection<Entry> res;
+
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ long curRev = rev + 1;
+
+ res = doGetAll(keys, curRev);
+
+ doPutAll(curRev, keys, values);
+
+ updateRevision(curRev);
+ }
+ finally {
+ lock.unlock();
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Entry get(byte[] key) {
+ Lock lock = rwLock.readLock();
+ lock.lock();
+ try {
+ return doGet(key, LATEST_REV, false);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Entry get(byte[] key, long rev) {
+ Lock lock = rwLock.readLock();
+ lock.lock();
+ try {
+ return doGet(key, rev, true);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAll(List<byte[]> keys) {
+ return doGetAll(keys, LATEST_REV);
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAll(List<byte[]> keys, long
revUpperBound) {
+ return doGetAll(keys, revUpperBound);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove(byte[] key) {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ long curRev = rev + 1;
+
+ if (doRemove(key, curRev))
+ updateRevision(curRev);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Entry getAndRemove(byte[] key) {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ if (e.empty() || e.tombstone())
+ return e;
+
+ return getAndPut(key, TOMBSTONE);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll(List<byte[]> keys) {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ long curRev = rev + 1;
+
+ List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+ List<byte[]> vals = new ArrayList<>(keys.size());
+
+ for (byte[] key : keys) {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ if (e.empty() || e.tombstone())
+ continue;
+
+ existingKeys.add(key);
+
+ vals.add(TOMBSTONE);
+ }
+
+ doPutAll(curRev, existingKeys, vals);
+
+ updateRevision(curRev);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
+ Collection<Entry> res = new ArrayList<>(keys.size());
+
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+ try {
+ long curRev = rev + 1;
+
+ List<byte[]> existingKeys = new ArrayList<>(keys.size());
+
+ List<byte[]> vals = new ArrayList<>(keys.size());
+
+ for (byte[] key : keys) {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ res.add(e);
+
+ if (e.empty() || e.tombstone())
+ continue;
+
+ existingKeys.add(key);
+
+ vals.add(TOMBSTONE);
+ }
+
+ doPutAll(curRev, existingKeys, vals);
+
+ updateRevision(curRev);
+ }
+ finally {
+ lock.unlock();
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean invoke(Condition condition, Collection<Operation>
success, Collection<Operation> failure) {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+
+ try {
+ Entry e = get(condition.key());
+
+ boolean branch = condition.test(e);
+
+ Collection<Operation> ops = branch ? success : failure;
+
+ long curRev = rev + 1;
+
+ boolean modified = false;
+
+ for (Operation op : ops) {
+ switch (op.type()) {
+ case PUT:
+ doPut(op.key(), op.value(), curRev);
+
+ modified = true;
+
+ break;
+
+ case REMOVE:
+ modified |= doRemove(op.key(), curRev);
+
+ break;
+
+ case NO_OP:
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown operation
type: " + op.type());
+ }
+ }
+
+ if (modified)
+ updateRevision(curRev);
+
+ return branch;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
+ return new RocksDBKeyValueStorage.RangeCursor(keyFrom, keyTo, rev);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long
revUpperBound) {
+ return new RocksDBKeyValueStorage.RangeCursor(keyFrom, keyTo,
revUpperBound);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo,
long rev) {
+ assert keyFrom != null : "keyFrom couldn't be null.";
+ assert rev > 0 : "rev must be positive.";
+
+ return new RocksDBKeyValueStorage.WatchCursor(rev, k ->
+ CMP.compare(keyFrom, k) <= 0 && (keyTo == null || CMP.compare(k,
keyTo) < 0)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
+ assert key != null : "key couldn't be null.";
+ assert rev > 0 : "rev must be positive.";
+
+ return new RocksDBKeyValueStorage.WatchCursor(rev, k -> CMP.compare(k,
key) == 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long
rev) {
+ assert keys != null && !keys.isEmpty() : "keys couldn't be null or
empty: " + keys;
+ assert rev > 0 : "rev must be positive.";
+
+ TreeSet<byte[]> keySet = new TreeSet<>(CMP);
+
+ keySet.addAll(keys);
+
+ return new RocksDBKeyValueStorage.WatchCursor(rev, keySet::contains);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void compact() {
+ Lock lock = rwLock.writeLock();
+ lock.lock();
+
+ try {
+ NavigableMap<byte[], List<Long>> compactedKeysIdx = new
TreeMap<>(CMP);
+
+ keysIdx.forEach((key, revs) -> {
+ try {
+ compactForKey(key, revs, compactedKeysIdx);
+ }
+ catch (RocksDBException e) {
+ throw new IgniteInternalException(e);
+ }
+ });
+
+ keysIdx = compactedKeysIdx;
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /** */
+ private boolean doRemove(byte[] key, long curRev) {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ if (e.empty() || e.tombstone())
+ return false;
+
+ doPut(key, TOMBSTONE, curRev);
+
+ return true;
+ }
+
+ /** */
+ private void compactForKey(
+ byte[] key,
+ List<Long> revs,
+ NavigableMap<byte[], List<Long>> compactedKeysIdx
+ ) throws RocksDBException {
+ long lastRev = lastRevision(revs);
+
+ for (int i = 0; i < revs.size(); i++) {
+ Long revision = revs.get(i);
+
+ byte[] rocksKey = keyToRocksKey(revision, key);
+
+ if (i == revs.size() - 1) {
+ Value value = bytesToValue(db.get(rocksKey));
+
+ if (value.tombstone())
+ this.db.delete(rocksKey);
+ else
+ compactedKeysIdx.put(key, listOf(lastRev));
+ }
+ else
+ this.db.delete(rocksKey);
+ }
+ }
+
+ /** */
+ @NotNull
+ private Collection<Entry> doGetAll(Collection<byte[]> keys, long rev) {
+ assert keys != null : "keys list can't be null.";
+ assert !keys.isEmpty() : "keys list can't be empty.";
+ assert rev > 0 || rev == LATEST_REV : "Revision must be positive or "
+ LATEST_REV + '.';
+
+ Collection<Entry> res = new ArrayList<>(keys.size());
+
+ Lock lock = rwLock.readLock();
+ lock.lock();
+ try {
+ for (byte[] key : keys) {
+ res.add(doGet(key, rev, false));
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+
+ return res;
+ }
+
+ /** */
+ @NotNull
+ private Entry doGet(byte[] key, long rev, boolean exactRev) {
Review comment:
I think the code will be become simpler, I'll check
##########
File path:
modules/core/src/test/java/org/apache/ignite/internal/testframework/WorkDirectoryExtension.java
##########
@@ -110,7 +110,11 @@ private static Path createWorkDir(ExtensionContext
extensionContext) throws IOEx
if (shouldRemoveDir())
IgniteUtils.deleteIfExists(BASE_PATH);
- Path workDir =
BASE_PATH.resolve(extensionContext.getRequiredTestMethod().getName());
+ String testClassDir =
extensionContext.getRequiredTestClass().getSimpleName();
+
+ String testMethodDir =
extensionContext.getRequiredTestMethod().getName() + "_" +
System.currentTimeMillis();
Review comment:
please update this class' javadoc about the generated folder
specification
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
##########
@@ -314,13 +316,33 @@ else if (clo.command() instanceof WatchExactKeysCommand) {
/** {@inheritDoc} */
@Override public void onSnapshotSave(String path, Consumer<Throwable>
doneClo) {
- // Not implemented yet.
+ storage.snapshot(Paths.get(path)).whenComplete((unused, throwable) -> {
+ doneClo.accept(throwable);
+ });
}
/** {@inheritDoc} */
@Override public boolean onSnapshotLoad(String path) {
Review comment:
same here
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
##########
@@ -314,13 +316,33 @@ else if (clo.command() instanceof WatchExactKeysCommand) {
/** {@inheritDoc} */
@Override public void onSnapshotSave(String path, Consumer<Throwable>
doneClo) {
- // Not implemented yet.
+ storage.snapshot(Paths.get(path)).whenComplete((unused, throwable) -> {
+ doneClo.accept(throwable);
+ });
}
/** {@inheritDoc} */
@Override public boolean onSnapshotLoad(String path) {
- // Not implemented yet.
- return false;
+ storage.restoreSnapshot(Paths.get(path));
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onShutdown() {
+ try {
+ storage.close();
+ }
+ catch (Exception e) {
+ throw new IgniteInternalException("Failed to close storage:" +
e.getMessage(), e);
Review comment:
```suggestion
throw new IgniteInternalException("Failed to close storage: " +
e.getMessage(), e);
```
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
##########
@@ -203,4 +205,19 @@
* Compacts storage (removes tombstones).
*/
void compact();
+
+ /**
+ * Creates a snapshot of the storage.
+ *
+ * @param snapshotPath Path to a snapshot.
+ * @return Future that represents the state of a operation.
+ */
+ CompletableFuture<Void> snapshot(Path snapshotPath);
+
+ /**
+ * Restores a snapshot.
Review comment:
Why did you resolve this comment?
##########
File path:
modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
##########
@@ -203,4 +205,19 @@
* Compacts storage (removes tombstones).
*/
void compact();
+
+ /**
+ * Creates a snapshot of the storage.
+ *
+ * @param snapshotPath Path to a snapshot.
+ * @return Future that represents the state of a operation.
+ */
+ CompletableFuture<Void> snapshot(Path snapshotPath);
Review comment:
Why did you resolve this comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]