ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1057730028


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -93,8 +115,12 @@ public class TxStateRocksDbStorage implements 
TxStateStorage {
     /** The value of {@link #lastAppliedIndex} persisted to the device at this 
moment. */
     private volatile long persistedIndex;
 
-    /** Database key for the last applied index+term. */
-    private final byte[] lastAppliedIndexAndTermKey;
+    /** Current state of the storage. */
+    private volatile StorageState state = StorageState.RUNNABLE;
+
+    @Nullable

Review Comment:
   Usually you put Nullable next to the type. Why is it different this time?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -121,30 +147,28 @@ public TxStateRocksDbStorage(
                 .putShort((short) partitionId)
                 .array();
 
-        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
-        lastAppliedIndex = indexAndTermBytes == null ? 0 : 
bytesToLong(indexAndTermBytes);
-        lastAppliedTerm = indexAndTermBytes == null ? 0 : 
bytesToLong(indexAndTermBytes, Long.BYTES);
-
-        persistedIndex = lastAppliedIndex;
+        initLastApplied();
     }
 
     @Override
     @Nullable
     public TxMeta get(UUID txId) {
         if (!busyLock.enterBusy()) {
-            throwStorageStoppedException();
+            throwExceptionIfStorageClosedOrRebalance();

Review Comment:
   What's up with the rebalance in the exception if the storage is closed?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -397,12 +429,18 @@ private long readLastAppliedIndex(ReadOptions 
readOptions) {
 
     @Override
     public void destroy() {
-        try (WriteBatch writeBatch = new WriteBatch()) {
-            close();
+        if (!close0()) {
+            return;
+        }
 
-            writeBatch.deleteRange(partitionStartPrefix(), 
partitionEndPrefix());
+        try {
+            try (WriteBatch writeBatch = new WriteBatch()) {

Review Comment:
   Isn't a single try enough?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), 
partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, 
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;

Review Comment:
   What is happening? If this future is always completed, why do you need a 
field for the future at all?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), 
partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, 
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for 
partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();
+        }
     }
 
     @Override
-    public CompletableFuture<Void> startFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> abortRebalance() {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) 
REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), 
partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+
+                        lastAppliedIndex = 0;
+                        lastAppliedTerm = 0;
+                        persistedIndex = 0;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to clear storage for partition 
{} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
     @Override
-    public CompletableFuture<Void> abortFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long 
lastAppliedTerm) {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) 
REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, 
"Rebalancing has not started");
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.put(lastAppliedIndexAndTermKey, 
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+                        db.write(writeOptions, writeBatch);
+
+                        this.lastAppliedIndex = lastAppliedIndex;
+                        this.lastAppliedTerm = lastAppliedTerm;
+                        this.persistedIndex = lastAppliedIndex;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to finish rebalance for 
partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
-    @Override
-    public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex, 
long lastAppliedTerm) {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    private void initLastApplied() {
+        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+        if (indexAndTermBytes != null) {
+            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                try (WriteBatch writeBatch = new WriteBatch()) {

Review Comment:
   I wonder if there's a designated "start" method in the storage.
   Doing a recovery in the constructor?... That's a bad idea. Reading data in 
constructor is also bad idea, but that was my fault I believe.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -199,21 +221,22 @@ public boolean compareAndSet(UUID txId, @Nullable TxState 
txStateExpected, TxMet
 
                         result = true;
                     } else {
-                        result = txMetaExisting.txState() == txMeta.txState() 
&& (
-                                (txMetaExisting.commitTimestamp() == null && 
txMeta.commitTimestamp() == null)
-                                        || 
txMetaExisting.commitTimestamp().equals(txMeta.commitTimestamp()));
+                        result = txMetaExisting.txState() == txMeta.txState()
+                                && 
Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
                     }
                 } else {
                     result = false;
                 }
             }
 
-            writeBatch.put(lastAppliedIndexAndTermKey, 
indexAndTermToBytes(commandIndex, commandTerm));
+            if (state != StorageState.REBALANCE) {

Review Comment:
   I think places like this deserve comments



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), 
partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, 
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for 
partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();
+        }
     }
 
     @Override
-    public CompletableFuture<Void> startFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> abortRebalance() {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) 
REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            return completedFuture(null);
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.deleteRange(partitionStartPrefix(), 
partitionEndPrefix());
+                        writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                        db.write(writeOptions, writeBatch);
+
+                        lastAppliedIndex = 0;
+                        lastAppliedTerm = 0;
+                        persistedIndex = 0;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to clear storage for partition 
{} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
     @Override
-    public CompletableFuture<Void> abortFullRebalance() {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long 
lastAppliedTerm) {
+        CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>) 
REBALANCE_FUTURE.getAndSet(this, null);
+
+        if (rebalanceFuture == null) {
+            throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, 
"Rebalancing has not started");
+        }
+
+        return rebalanceFuture
+                .thenAccept(unused -> {
+                    try (WriteBatch writeBatch = new WriteBatch()) {
+                        writeBatch.put(lastAppliedIndexAndTermKey, 
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+                        db.write(writeOptions, writeBatch);
+
+                        this.lastAppliedIndex = lastAppliedIndex;
+                        this.lastAppliedTerm = lastAppliedTerm;
+                        this.persistedIndex = lastAppliedIndex;
+
+                        state = StorageState.RUNNABLE;
+                    } catch (Exception e) {
+                        throw new IgniteInternalException(
+                                TX_STATE_STORAGE_REBALANCE_ERR,
+                                IgniteStringFormatter.format(
+                                        "Failed to finish rebalance for 
partition {} of table {}",
+                                        partitionId,
+                                        getTableName()
+                                ),
+                                e
+                        );
+                    }
+                });
     }
 
-    @Override
-    public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex, 
long lastAppliedTerm) {
-        // TODO: IGNITE-18024 Implement
-        throw new UnsupportedOperationException();
+    private void initLastApplied() {
+        byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+        if (indexAndTermBytes != null) {
+            long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+            if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+                try (WriteBatch writeBatch = new WriteBatch()) {
+                    writeBatch.deleteRange(partitionStartPrefix(), 
partitionEndPrefix());
+                    writeBatch.delete(lastAppliedIndexAndTermKey);
+
+                    db.write(writeOptions, writeBatch);
+                } catch (Exception e) {
+                    throw new IgniteInternalException(
+                            TX_STATE_STORAGE_REBALANCE_ERR,
+                            IgniteStringFormatter.format(
+                                    "Failed to clear storage for partition {} 
of table {}",
+                                    partitionId,
+                                    getTableName()
+                            ),
+                            e
+                    );
+                }
+            } else {
+                this.lastAppliedIndex = lastAppliedIndex;
+                persistedIndex = lastAppliedIndex;
+
+                lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+            }
+        }
+    }
+
+    private boolean close0() {

Review Comment:
   Not the best name for the method that performs so many complex actions.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -57,6 +61,24 @@
  * Tx state storage implementation based on RocksDB.
  */
 public class TxStateRocksDbStorage implements TxStateStorage {
+    private static final VarHandle STATE;
+
+    private static final VarHandle REBALANCE_FUTURE;
+
+    static {
+        try {
+            STATE = 
MethodHandles.lookup().findVarHandle(TxStateRocksDbStorage.class, "state", 
StorageState.class);

Review Comment:
   What's the problem with using atomics?
   Are you sure that placing these values inside of the object is beneficial? 
Do you have proofs?
   I'd prefer a code simplicity if the advantage is not clear.



##########
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java:
##########
@@ -42,21 +42,30 @@ protected BusyRocksIteratorAdapter(IgniteSpinBusyLock 
busyLock, RocksIterator it
     /**
      * Handles busy lock acquiring failure. This means that db has been 
stopped and cursor can't proceed. Must throw an exception.
      */
-    protected abstract void handleBusy();
+    protected abstract void handleBusyFail();
 
-    private void handleBusy0() {
-        handleBusy();
+    /**
+     * Handles busy lock acquiring success.
+     */
+    protected void handeBusySuccess() {

Review Comment:
   Why did you introduce this method? No one uses it. Can you please explain it?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
 
     @Override
     public void close() {
-        if (!closeGuard.compareAndSet(false, true)) {
-            return;
+        close0();
+    }
+
+    @Override
+    public CompletableFuture<Void> startRebalance() {
+        if (!STATE.compareAndSet(this, StorageState.RUNNABLE, 
StorageState.REBALANCE)) {
+            throwExceptionIfStorageClosedOrRebalance();
         }
 
         busyLock.block();
 
-        List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+        try (WriteBatch writeBatch = new WriteBatch()) {
+            writeBatch.deleteRange(partitionStartPrefix(), 
partitionEndPrefix());
+            writeBatch.put(lastAppliedIndexAndTermKey, 
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
 
-        RocksUtils.closeAll(resources);
+            db.write(writeOptions, writeBatch);
+
+            lastAppliedIndex = REBALANCE_IN_PROGRESS;
+            lastAppliedTerm = REBALANCE_IN_PROGRESS;
+            persistedIndex = REBALANCE_IN_PROGRESS;
+
+            CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+            this.rebalanceFuture = rebalanceFuture;
+
+            return rebalanceFuture;
+        } catch (Exception e) {
+            throw new IgniteInternalException(
+                    TX_STATE_STORAGE_REBALANCE_ERR,
+                    IgniteStringFormatter.format("Failed to clear storage for 
partition {} of table {}", partitionId, getTableName()),
+                    e
+            );
+        } finally {
+            busyLock.unblock();

Review Comment:
   Why do you need a busy lock in this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to