tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081129036


##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
             busyLock.leaveBusy();
         }
     }
+
+    /**
+     * Creates a summary info of the storage in the format "table=user, 
partitionId=1".
+     */
+    String createStorageInfo() {
+        return IgniteStringFormatter.format("table={}, partitionId={}", 
tableStorage.getTableName(), partitionId);
+    }
+
+    /**
+     * Prepares the storage for rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when starting 
the rebalance.
+     */
+    void startRebalance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+
+        // Changed storage states and expect all storage operations to stop 
soon.
+        busyLock.block();
+
+        try {
+            clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS, 
REBALANCE_IN_PROGRESS);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to start 
rebalancing storage: " + createStorageInfo(), e);
+        } finally {
+            busyLock.unblock();
+        }
+    }
+
+    /**
+     * Aborts storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when aborting 
the rebalance.
+     */
+    void abortReblance(WriteBatch writeBatch) {
+        if (!state.compareAndSet(StorageState.REBALANCE, 
StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+
+        try {
+            clearStorageOnRebalance(writeBatch, 0, 0);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort 
rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    /**
+     * Completes storage rebalancing.
+     *
+     * @throws StorageRebalanceException If there was an error when finishing 
the rebalance.
+     */
+    void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long 
lastAppliedTerm) {
+        if (!state.compareAndSet(StorageState.REBALANCE, 
StorageState.RUNNABLE)) {
+            throwExceptionDependingOnStorageStateOnRebalance(state.get(), 
createStorageInfo());
+        }
+
+        try {
+            lastAppliedOnRebalance(writeBatch, lastAppliedIndex, 
lastAppliedTerm);
+        } catch (RocksDBException e) {
+            throw new StorageRebalanceException("Error when trying to abort 
rebalancing storage: " + createStorageInfo(), e);
+        }
+    }
+
+    private void clearStorageOnRebalance(WriteBatch writeBatch, long 
lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+        lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+        writeBatch.delete(meta, lastGroupConfigKey);
+        writeBatch.delete(meta, partitionIdKey(partitionId));
+        writeBatch.deleteRange(cf, partitionStartPrefix(), 
partitionEndPrefix());
+    }
+
+    private void lastAppliedOnRebalance(WriteBatch writeBatch, long 
lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {

Review Comment:
   Why?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to