sashapolo commented on code in PR #722: URL: https://github.com/apache/ignite-3/pull/722#discussion_r845247661
########## modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbRaftStorage.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster.management.raft; + +import static org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange; + +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import org.apache.ignite.internal.rocksdb.ColumnFamily; +import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter; +import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; + +/** + * {@link RaftStorage} implementation based on RocksDB. + */ +public class RocksDbRaftStorage implements RaftStorage { + /** Thread-pool for snapshot operations execution. */ + private final ExecutorService snapshotExecutor = Executors.newFixedThreadPool(2); + + /** Path to the rocksdb database. */ + private final Path dbPath; + + /** RockDB options. */ + private volatile Options options; + + /** RocksDb instance. */ + private volatile RocksDB db; + + private volatile RocksSnapshotManager snapshotManager; + + private final Object snapshotRestoreLock = new Object(); + + public RocksDbRaftStorage(Path dbPath) { + this.dbPath = dbPath; + } + + @Override + public void start() { + options = new Options().setCreateIfMissing(true); + + try { + db = RocksDB.open(options, dbPath.toString()); + + ColumnFamily defaultCf = ColumnFamily.wrap(db, db.getDefaultColumnFamily()); + + snapshotManager = new RocksSnapshotManager(db, List.of(fullRange(defaultCf)), snapshotExecutor); + } catch (RocksDBException e) { + throw new IgniteInternalException("Failed to start the storage", e); + } + } + + @Override + public boolean isStarted() { + return db != null; + } + + @Override + public byte @Nullable [] get(byte[] key) { + try { + return db.get(key); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to get data from Rocks DB", e); + } + } + + @Override + public void put(byte[] key, byte[] value) { + try { + db.put(key, value); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to put data into Rocks DB", e); + } + } + + @Override + public void remove(byte[] key) { + try { + db.delete(key); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to remove data from Rocks DB", e); + } + } + + @Override + public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], T> entryTransformer) { + byte[] upperBound = prefix.clone(); + + upperBound[upperBound.length - 1] += 1; + + Slice upperBoundSlice = new Slice(upperBound); + + ReadOptions options = new ReadOptions().setIterateUpperBound(upperBoundSlice); + + RocksIterator it = db.newIterator(options); + + it.seek(prefix); + + return new RocksIteratorAdapter<>(it) { + @Override + protected T decodeEntry(byte[] key, byte[] value) { + return entryTransformer.apply(key, value); + } + + @Override + public void close() throws Exception { + super.close(); + + IgniteUtils.closeAll(options, upperBoundSlice); Review Comment: yeah, but should be a rare case and GC should take care of the resources for us, so they will not be hanging around for long ########## modules/cluster-management/src/main/java/org/apache/ignite/internal/cluster/management/raft/RocksDbRaftStorage.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.cluster.management.raft; + +import static org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange; + +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import org.apache.ignite.internal.rocksdb.ColumnFamily; +import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter; +import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager; +import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.lang.IgniteInternalException; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; + +/** + * {@link RaftStorage} implementation based on RocksDB. + */ +public class RocksDbRaftStorage implements RaftStorage { + /** Thread-pool for snapshot operations execution. */ + private final ExecutorService snapshotExecutor = Executors.newFixedThreadPool(2); + + /** Path to the rocksdb database. */ + private final Path dbPath; + + /** RockDB options. */ + private volatile Options options; + + /** RocksDb instance. */ + private volatile RocksDB db; + + private volatile RocksSnapshotManager snapshotManager; + + private final Object snapshotRestoreLock = new Object(); + + public RocksDbRaftStorage(Path dbPath) { + this.dbPath = dbPath; + } + + @Override + public void start() { + options = new Options().setCreateIfMissing(true); + + try { + db = RocksDB.open(options, dbPath.toString()); + + ColumnFamily defaultCf = ColumnFamily.wrap(db, db.getDefaultColumnFamily()); + + snapshotManager = new RocksSnapshotManager(db, List.of(fullRange(defaultCf)), snapshotExecutor); + } catch (RocksDBException e) { + throw new IgniteInternalException("Failed to start the storage", e); + } + } + + @Override + public boolean isStarted() { + return db != null; + } + + @Override + public byte @Nullable [] get(byte[] key) { + try { + return db.get(key); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to get data from Rocks DB", e); + } + } + + @Override + public void put(byte[] key, byte[] value) { + try { + db.put(key, value); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to put data into Rocks DB", e); + } + } + + @Override + public void remove(byte[] key) { + try { + db.delete(key); + } catch (RocksDBException e) { + throw new IgniteInternalException("Unable to remove data from Rocks DB", e); + } + } + + @Override + public <T> Cursor<T> getWithPrefix(byte[] prefix, BiFunction<byte[], byte[], T> entryTransformer) { + byte[] upperBound = prefix.clone(); + + upperBound[upperBound.length - 1] += 1; + + Slice upperBoundSlice = new Slice(upperBound); + + ReadOptions options = new ReadOptions().setIterateUpperBound(upperBoundSlice); + + RocksIterator it = db.newIterator(options); + + it.seek(prefix); + + return new RocksIteratorAdapter<>(it) { + @Override + protected T decodeEntry(byte[] key, byte[] value) { + return entryTransformer.apply(key, value); + } + + @Override + public void close() throws Exception { + super.close(); + + IgniteUtils.closeAll(options, upperBoundSlice); Review Comment: yeah, but it should be a rare case and GC should take care of the resources for us, so they will not be hanging around for long -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
