tkalkirill commented on code in PR #1530:
URL: https://github.com/apache/ignite-3/pull/1530#discussion_r1081129036
##########
modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java:
##########
@@ -1405,4 +1454,84 @@ private <V> V busy(Supplier<V> supplier) {
busyLock.leaveBusy();
}
}
+
+ /**
+ * Creates a summary info of the storage in the format "table=user,
partitionId=1".
+ */
+ String createStorageInfo() {
+ return IgniteStringFormatter.format("table={}, partitionId={}",
tableStorage.getTableName(), partitionId);
+ }
+
+ /**
+ * Prepares the storage for rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when starting
the rebalance.
+ */
+ void startRebalance(WriteBatch writeBatch) {
+ if (!state.compareAndSet(StorageState.RUNNABLE,
StorageState.REBALANCE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(),
createStorageInfo());
+ }
+
+ // Changed storage states and expect all storage operations to stop
soon.
+ busyLock.block();
+
+ try {
+ clearStorageOnRebalance(writeBatch, REBALANCE_IN_PROGRESS,
REBALANCE_IN_PROGRESS);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to start
rebalancing storage: " + createStorageInfo(), e);
+ } finally {
+ busyLock.unblock();
+ }
+ }
+
+ /**
+ * Aborts storage rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when aborting
the rebalance.
+ */
+ void abortReblance(WriteBatch writeBatch) {
+ if (!state.compareAndSet(StorageState.REBALANCE,
StorageState.RUNNABLE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(),
createStorageInfo());
+ }
+
+ try {
+ clearStorageOnRebalance(writeBatch, 0, 0);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to abort
rebalancing storage: " + createStorageInfo(), e);
+ }
+ }
+
+ /**
+ * Completes storage rebalancing.
+ *
+ * @throws StorageRebalanceException If there was an error when finishing
the rebalance.
+ */
+ void finishRebalance(WriteBatch writeBatch, long lastAppliedIndex, long
lastAppliedTerm) {
+ if (!state.compareAndSet(StorageState.REBALANCE,
StorageState.RUNNABLE)) {
+ throwExceptionDependingOnStorageStateOnRebalance(state.get(),
createStorageInfo());
+ }
+
+ try {
+ lastAppliedOnRebalance(writeBatch, lastAppliedIndex,
lastAppliedTerm);
+ } catch (RocksDBException e) {
+ throw new StorageRebalanceException("Error when trying to abort
rebalancing storage: " + createStorageInfo(), e);
+ }
+ }
+
+ private void clearStorageOnRebalance(WriteBatch writeBatch, long
lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
+ lastAppliedOnRebalance(writeBatch, lastAppliedIndex, lastAppliedTerm);
+
+ writeBatch.delete(meta, lastGroupConfigKey);
+ writeBatch.delete(meta, partitionIdKey(partitionId));
+ writeBatch.deleteRange(cf, partitionStartPrefix(),
partitionEndPrefix());
+ }
+
+ private void lastAppliedOnRebalance(WriteBatch writeBatch, long
lastAppliedIndex, long lastAppliedTerm) throws RocksDBException {
Review Comment:
Why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]