ibessonov commented on code in PR #960:
URL: https://github.com/apache/ignite-3/pull/960#discussion_r935320136
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java:
##########
@@ -173,23 +231,121 @@ public void start() throws StorageException {
partitions = new AtomicReferenceArray<>(tableCfg.value().partitions());
for (int partId : meta.getPartitionIds()) {
- partitions.set(partId, new RocksDbPartitionStorage(db,
partitionCf, partId, threadPool));
+ partitions.set(partId, new RocksDbMvPartitionStorage(this,
partId));
}
}
+ /**
+ * Schedules a flush of the table. If run several times within a small
amount of time, only the last scheduled flush will be executed.
+ */
+ public void scheduleFlush() {
+ Runnable newClosure = new Runnable() {
+ @Override
+ public void run() {
+ if (latestFlushClosure != this) {
+ return;
+ }
+
+ try (FlushOptions flushOptions = new
FlushOptions().setWaitForFlush(false)) {
+ db.flush(flushOptions);
+ } catch (RocksDBException e) {
+ LOG.error("Error occurred during the explicit flush for
table '{}'", e, tableCfg.name());
+ }
+ }
+ };
+
+ latestFlushClosure = newClosure;
+
+ int delay = engine.engineConfiguration().flushDelay().value();
+
+ engine.scheduledPool().schedule(newClosure, delay,
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Returns a listener of RocksDB flush events. This listener is
responsible for updating persisted index of partitions.
+ *
+ * @see RocksDbMvPartitionStorage#persistedIndex()
+ * @see RocksDbMvPartitionStorage#refreshPersistedIndex()
+ */
+ private AbstractEventListener flushListener() {
Review Comment:
Yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]