sashapolo commented on code in PR #960:
URL: https://github.com/apache/ignite-3/pull/960#discussion_r934509166
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -31,6 +33,43 @@
* <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
+ /**
+ * Closure for executing write operations on the storage.
+ *
+ * @param <E> Type of exception that could be thrown within a closure.
+ * @param <V> Type of the result returned from the closure.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ @FunctionalInterface
+ interface DataAccessClosure<E extends Exception, V> {
+ V execute() throws E, StorageException;
+ }
+
+ /**
+ * Executes {@link DataAccessClosure} atomically, maening that partial
result of the incompleted closure will never be written to a
Review Comment:
```suggestion
* Executes a {@link DataAccessClosure} atomically, meaning that partial
result of an incomplete closure will never be written to the
```
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -31,6 +33,43 @@
* <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
+ /**
+ * Closure for executing write operations on the storage.
+ *
+ * @param <E> Type of exception that could be thrown within a closure.
+ * @param <V> Type of the result returned from the closure.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ @FunctionalInterface
+ interface DataAccessClosure<E extends Exception, V> {
Review Comment:
If this closure represents a write operation, maybe it should be called
`WriteClosure`?
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -31,6 +33,43 @@
* <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
+ /**
+ * Closure for executing write operations on the storage.
+ *
+ * @param <E> Type of exception that could be thrown within a closure.
+ * @param <V> Type of the result returned from the closure.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ @FunctionalInterface
+ interface DataAccessClosure<E extends Exception, V> {
Review Comment:
Why do you need to declare a generic exception here? Looks like all
implementations don't throw any checked exceptions, maybe we should get rid of
it?
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -31,6 +33,43 @@
* <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
+ /**
+ * Closure for executing write operations on the storage.
+ *
+ * @param <E> Type of exception that could be thrown within a closure.
+ * @param <V> Type of the result returned from the closure.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ @FunctionalInterface
+ interface DataAccessClosure<E extends Exception, V> {
+ V execute() throws E, StorageException;
+ }
+
+ /**
+ * Executes {@link DataAccessClosure} atomically, maening that partial
result of the incompleted closure will never be written to a
+ * physical device, thus guaranteeing data consistency after restart.
Simply runs the closure in case of a volatile storage.
+ *
+ * @param closure Data access closure to be executed.
+ * @param <E> Type of exception that could be thrown within a closure.
+ * @param <V> Type of the result returned from the closure.
+ * @return Closure result.
+ * @throws E If closure thrown exception.
+ * @throws StorageException If failed to write data to the storage.
+ */
+ default <E extends Exception, V> V runConsistently(DataAccessClosure<E, V>
closure) throws E, StorageException {
+ return closure.execute();
+ }
+
+ /**
+ * Flushes current state of the data or <i>the state from the nearest
future</i> to the storage. It means that the future can be
+ * completed with the data that has not been written yet. This feature
allows implementing a batch flush for several partitions at once.
+ *
+ * @return Future that's completed when flushing of the data is completed.
+ */
+ default CompletionStage<Void> flush() {
Review Comment:
I would recommend using `CompletableFuture` here
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -639,7 +768,21 @@ public void destroy() {
/** {@inheritDoc} */
@Override
public void close() throws Exception {
- IgniteUtils.closeAll(writeOpts, upperBound);
+ for (CompletableFuture<Void> future : flushFutures.values()) {
+ future.completeExceptionally(new StorageException("Can't complete
flush operation, partition is being stopped."));
Review Comment:
Should we cancel the future instead?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -173,23 +231,121 @@ public void start() throws StorageException {
partitions = new AtomicReferenceArray<>(tableCfg.value().partitions());
for (int partId : meta.getPartitionIds()) {
- partitions.set(partId, new RocksDbPartitionStorage(db,
partitionCf, partId, threadPool));
+ partitions.set(partId, new RocksDbMvPartitionStorage(this,
partId));
}
}
+ /**
+ * Schedules a flush of the table. If run several times within a small
amount of time, only the last scheduled flush will be executed.
+ */
+ public void scheduleFlush() {
+ Runnable newClosure = new Runnable() {
+ @Override
+ public void run() {
+ if (latestFlushClosure != this) {
+ return;
+ }
+
+ try (FlushOptions flushOptions = new
FlushOptions().setWaitForFlush(false)) {
+ db.flush(flushOptions);
+ } catch (RocksDBException e) {
+ LOG.error("Error occurred during the explicit flush for
table '{}'", e, tableCfg.name());
+ }
+ }
+ };
+
+ latestFlushClosure = newClosure;
+
+ int delay = engine.engineConfiguration().flushDelay().value();
+
+ engine.scheduledPool().schedule(newClosure, delay,
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Returns a listener of RocksDB flush events. This listener is
responsible for updating persisted index of partitions.
+ *
+ * @see RocksDbMvPartitionStorage#persistedIndex()
+ * @see RocksDbMvPartitionStorage#refreshPersistedIndex()
+ */
+ private AbstractEventListener flushListener() {
Review Comment:
Is it possible to extract this listener into a separate class?
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -47,70 +54,118 @@
import org.apache.ignite.internal.storage.rocksdb.index.BinaryRowComparator;
import
org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
+import org.rocksdb.AbstractEventListener;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
+import org.rocksdb.FlushJobInfo;
+import org.rocksdb.FlushOptions;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
/**
* Table storage implementation based on {@link RocksDB} instance.
*/
class RocksDbTableStorage implements TableStorage, MvTableStorage {
+ /** Logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(RocksDbTableStorage.class);
+
+ /** RocksDB storage engine instance. */
+ private final RocksDbStorageEngine engine;
+
/** Path for the directory that stores table data. */
private final Path tablePath;
/** Table configuration. */
private final TableConfiguration tableCfg;
- /** Thread pool for async operations. */
- private final Executor threadPool;
-
/** Data region for the table. */
private final RocksDbDataRegion dataRegion;
/** Rocks DB instance. */
private volatile RocksDB db;
+ /** Write options for write operations. */
+ private final WriteOptions writeOptions = new
WriteOptions().setDisableWAL(true);
+
/** Meta information. */
private volatile RocksDbMetaStorage meta;
/** Column Family handle for partition data. */
private volatile ColumnFamily partitionCf;
/** Partition storages. */
- private volatile AtomicReferenceArray<PartitionStorage> partitions;
+ private volatile AtomicReferenceArray<RocksDbMvPartitionStorage>
partitions;
/** Column families for indexes by their names. */
private final Map<String, RocksDbSortedIndexStorage> sortedIndices = new
ConcurrentHashMap<>();
+ /**
+ * Instance of the latest scheduled flush closure.
+ *
+ * @see #scheduleFlush()
+ */
+ private volatile Runnable latestFlushClosure;
+
/** Flag indicating if the storage has been stopped. */
+ @Deprecated
private volatile boolean stopped = false;
+ //TODO Use it instead of the "stopped" flag.
Review Comment:
Why not do it now?
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java:
##########
@@ -31,6 +33,43 @@
* <p>Each MvPartitionStorage instance represents exactly one partition.
*/
public interface MvPartitionStorage extends AutoCloseable {
+ /**
+ * Closure for executing write operations on the storage.
+ *
+ * @param <E> Type of exception that could be thrown within a closure.
+ * @param <V> Type of the result returned from the closure.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ @FunctionalInterface
+ interface DataAccessClosure<E extends Exception, V> {
+ V execute() throws E, StorageException;
+ }
+
+ /**
+ * Executes {@link DataAccessClosure} atomically, maening that partial
result of the incompleted closure will never be written to a
+ * physical device, thus guaranteeing data consistency after restart.
Simply runs the closure in case of a volatile storage.
+ *
+ * @param closure Data access closure to be executed.
+ * @param <E> Type of exception that could be thrown within a closure.
+ * @param <V> Type of the result returned from the closure.
+ * @return Closure result.
+ * @throws E If closure thrown exception.
+ * @throws StorageException If failed to write data to the storage.
+ */
+ default <E extends Exception, V> V runConsistently(DataAccessClosure<E, V>
closure) throws E, StorageException {
+ return closure.execute();
+ }
+
+ /**
+ * Flushes current state of the data or <i>the state from the nearest
future</i> to the storage. It means that the future can be
Review Comment:
> It means that the future can be completed with the data that has not been
written yet
What do you mean? This future does not hold any data
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/configuration/schema/RocksDbStorageEngineConfigurationSchema.java:
##########
@@ -34,6 +35,9 @@ public class RocksDbStorageEngineConfigurationSchema {
/** Name of the default data region. */
public static final String DEFAULT_DATA_REGION_NAME = "default";
+ @Value(hasDefault = true)
+ public int flushDelay = 100;
Review Comment:
It is better to specify the unit as a part of the variable name
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -385,6 +509,7 @@ public Cursor<BinaryRow> scan(Predicate<BinaryRow>
keyFilter, Timestamp timestam
return scan(keyFilter, timestamp, null);
}
+ //TODO Integrate write batch? Idk
Review Comment:
???
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -627,10 +752,14 @@ public void forEach(BiConsumer<RowId, BinaryRow>
consumer) {
/**
* Deletes partition data from the storage.
+ *
+ * @param writeBatch Write batch to add delete requests to.
*/
- public void destroy() {
+ public void destroy(WriteBatch writeBatch) {
Review Comment:
I'm not sure, but wouldn't it be better to remove the partition id from the
meta CF here as well? This way we will not have to accept a `WriteBatch`
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java:
##########
@@ -69,6 +70,11 @@ public class RocksDbStorageEngine implements StorageEngine {
new NamedThreadFactory("rocksdb-storage-engine-pool", LOG)
);
+ private final ScheduledExecutorService scheduledPool =
Executors.newScheduledThreadPool(
Review Comment:
you can use `newSingleThreadScheduledExecutor` here
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -102,61 +120,105 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
private final ColumnFamilyHandle meta;
/** Write options. */
- private final WriteOptions writeOpts = new WriteOptions();
+ private final WriteOptions writeOpts = new
WriteOptions().setDisableWAL(true);
+
+ /** Read options for regular reads. */
+ private final ReadOptions readOpts = new ReadOptions();
+
+ /** Read options for reading persisted data. */
+ private final ReadOptions persistedTierReadOpts = new
ReadOptions().setReadTier(PERSISTED_TIER);
/** Upper bound for scans and reads. */
private final Slice upperBound;
/** Key to store applied index value in meta. */
- private byte[] lastAppliedIndexKey;
+ private final byte[] lastAppliedIndexKey;
+
+ /** On-heap-cached last applied index value. */
+ private volatile long lastAppliedIndex;
+
+ /** The value of {@link #lastAppliedIndex} persisted to the device at this
moment. */
+ private volatile long persistedIndex;
+
+ /** Map with flush futures by applied index at the time of the {@link
#flush()} call. */
+ private final ConcurrentMap<Long, CompletableFuture<Void>> flushFutures =
new ConcurrentHashMap<>();
Review Comment:
I would recommend to rename this field to `flushFuturesByAppliedIndex`
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java:
##########
@@ -82,6 +88,27 @@ public
RocksDbStorageEngine(RocksDbStorageEngineConfiguration engineConfig, Path
this.storagePath = storagePath;
}
+ /**
+ * Returns a RocksDB storage engine configuration.
+ */
+ public RocksDbStorageEngineConfiguration engineConfiguration() {
Review Comment:
I think this method can be called `configuration`
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -102,61 +120,105 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
private final ColumnFamilyHandle meta;
/** Write options. */
- private final WriteOptions writeOpts = new WriteOptions();
+ private final WriteOptions writeOpts = new
WriteOptions().setDisableWAL(true);
+
+ /** Read options for regular reads. */
+ private final ReadOptions readOpts = new ReadOptions();
+
+ /** Read options for reading persisted data. */
+ private final ReadOptions persistedTierReadOpts = new
ReadOptions().setReadTier(PERSISTED_TIER);
/** Upper bound for scans and reads. */
private final Slice upperBound;
/** Key to store applied index value in meta. */
- private byte[] lastAppliedIndexKey;
+ private final byte[] lastAppliedIndexKey;
+
+ /** On-heap-cached last applied index value. */
+ private volatile long lastAppliedIndex;
+
+ /** The value of {@link #lastAppliedIndex} persisted to the device at this
moment. */
+ private volatile long persistedIndex;
+
+ /** Map with flush futures by applied index at the time of the {@link
#flush()} call. */
+ private final ConcurrentMap<Long, CompletableFuture<Void>> flushFutures =
new ConcurrentHashMap<>();
+
Review Comment:
unnecessary empty line
##########
modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java:
##########
@@ -127,55 +128,61 @@ public void tearDown() throws Exception {
*/
@Test
void testCreatePartition() {
- PartitionStorage partitionStorage = storage.getPartition(0);
+ MvPartitionStorage absentStorage = storage.getMvPartition(0);
- assertThat(partitionStorage, is(nullValue()));
+ assertThat(absentStorage, is(nullValue()));
- partitionStorage = storage.getOrCreatePartition(0);
+ final MvPartitionStorage partitionStorage =
storage.getOrCreateMvPartition(0);
Review Comment:
final?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]