ibessonov commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1057730028
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -93,8 +115,12 @@ public class TxStateRocksDbStorage implements
TxStateStorage {
/** The value of {@link #lastAppliedIndex} persisted to the device at this
moment. */
private volatile long persistedIndex;
- /** Database key for the last applied index+term. */
- private final byte[] lastAppliedIndexAndTermKey;
+ /** Current state of the storage. */
+ private volatile StorageState state = StorageState.RUNNABLE;
+
+ @Nullable
Review Comment:
Usually you put Nullable next to the type. Why is it different this time?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -121,30 +147,28 @@ public TxStateRocksDbStorage(
.putShort((short) partitionId)
.array();
- byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
- lastAppliedIndex = indexAndTermBytes == null ? 0 :
bytesToLong(indexAndTermBytes);
- lastAppliedTerm = indexAndTermBytes == null ? 0 :
bytesToLong(indexAndTermBytes, Long.BYTES);
-
- persistedIndex = lastAppliedIndex;
+ initLastApplied();
}
@Override
@Nullable
public TxMeta get(UUID txId) {
if (!busyLock.enterBusy()) {
- throwStorageStoppedException();
+ throwExceptionIfStorageClosedOrRebalance();
Review Comment:
What's up with the rebalance in the exception if the storage is closed?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -397,12 +429,18 @@ private long readLastAppliedIndex(ReadOptions
readOptions) {
@Override
public void destroy() {
- try (WriteBatch writeBatch = new WriteBatch()) {
- close();
+ if (!close0()) {
+ return;
+ }
- writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ try {
+ try (WriteBatch writeBatch = new WriteBatch()) {
Review Comment:
Isn't a single try enough?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
@Override
public void close() {
- if (!closeGuard.compareAndSet(false, true)) {
- return;
+ close0();
+ }
+
+ @Override
+ public CompletableFuture<Void> startRebalance() {
+ if (!STATE.compareAndSet(this, StorageState.RUNNABLE,
StorageState.REBALANCE)) {
+ throwExceptionIfStorageClosedOrRebalance();
}
busyLock.block();
- List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
- RocksUtils.closeAll(resources);
+ db.write(writeOptions, writeBatch);
+
+ lastAppliedIndex = REBALANCE_IN_PROGRESS;
+ lastAppliedTerm = REBALANCE_IN_PROGRESS;
+ persistedIndex = REBALANCE_IN_PROGRESS;
+
+ CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+ this.rebalanceFuture = rebalanceFuture;
+
+ return rebalanceFuture;
Review Comment:
What is happening? If this future is always completed, why do you need a
field for the future at all?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
@Override
public void close() {
- if (!closeGuard.compareAndSet(false, true)) {
- return;
+ close0();
+ }
+
+ @Override
+ public CompletableFuture<Void> startRebalance() {
+ if (!STATE.compareAndSet(this, StorageState.RUNNABLE,
StorageState.REBALANCE)) {
+ throwExceptionIfStorageClosedOrRebalance();
}
busyLock.block();
- List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
- RocksUtils.closeAll(resources);
+ db.write(writeOptions, writeBatch);
+
+ lastAppliedIndex = REBALANCE_IN_PROGRESS;
+ lastAppliedTerm = REBALANCE_IN_PROGRESS;
+ persistedIndex = REBALANCE_IN_PROGRESS;
+
+ CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+ this.rebalanceFuture = rebalanceFuture;
+
+ return rebalanceFuture;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Failed to clear storage for
partition {} of table {}", partitionId, getTableName()),
+ e
+ );
+ } finally {
+ busyLock.unblock();
+ }
}
@Override
- public CompletableFuture<Void> startFullRebalance() {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ public CompletableFuture<Void> abortRebalance() {
+ CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>)
REBALANCE_FUTURE.getAndSet(this, null);
+
+ if (rebalanceFuture == null) {
+ return completedFuture(null);
+ }
+
+ return rebalanceFuture
+ .thenAccept(unused -> {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.delete(lastAppliedIndexAndTermKey);
+
+ db.write(writeOptions, writeBatch);
+
+ lastAppliedIndex = 0;
+ lastAppliedTerm = 0;
+ persistedIndex = 0;
+
+ state = StorageState.RUNNABLE;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format(
+ "Failed to clear storage for partition
{} of table {}",
+ partitionId,
+ getTableName()
+ ),
+ e
+ );
+ }
+ });
}
@Override
- public CompletableFuture<Void> abortFullRebalance() {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long
lastAppliedTerm) {
+ CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>)
REBALANCE_FUTURE.getAndSet(this, null);
+
+ if (rebalanceFuture == null) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR,
"Rebalancing has not started");
+ }
+
+ return rebalanceFuture
+ .thenAccept(unused -> {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+ db.write(writeOptions, writeBatch);
+
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ this.persistedIndex = lastAppliedIndex;
+
+ state = StorageState.RUNNABLE;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format(
+ "Failed to finish rebalance for
partition {} of table {}",
+ partitionId,
+ getTableName()
+ ),
+ e
+ );
+ }
+ });
}
- @Override
- public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex,
long lastAppliedTerm) {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ private void initLastApplied() {
+ byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+ if (indexAndTermBytes != null) {
+ long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+ if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+ try (WriteBatch writeBatch = new WriteBatch()) {
Review Comment:
I wonder if there's a designated "start" method in the storage.
Doing a recovery in the constructor?... That's a bad idea. Reading data in
constructor is also bad idea, but that was my fault I believe.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -199,21 +221,22 @@ public boolean compareAndSet(UUID txId, @Nullable TxState
txStateExpected, TxMet
result = true;
} else {
- result = txMetaExisting.txState() == txMeta.txState()
&& (
- (txMetaExisting.commitTimestamp() == null &&
txMeta.commitTimestamp() == null)
- ||
txMetaExisting.commitTimestamp().equals(txMeta.commitTimestamp()));
+ result = txMetaExisting.txState() == txMeta.txState()
+ &&
Objects.equals(txMetaExisting.commitTimestamp(), txMeta.commitTimestamp());
}
} else {
result = false;
}
}
- writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(commandIndex, commandTerm));
+ if (state != StorageState.REBALANCE) {
Review Comment:
I think places like this deserve comments
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
@Override
public void close() {
- if (!closeGuard.compareAndSet(false, true)) {
- return;
+ close0();
+ }
+
+ @Override
+ public CompletableFuture<Void> startRebalance() {
+ if (!STATE.compareAndSet(this, StorageState.RUNNABLE,
StorageState.REBALANCE)) {
+ throwExceptionIfStorageClosedOrRebalance();
}
busyLock.block();
- List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
- RocksUtils.closeAll(resources);
+ db.write(writeOptions, writeBatch);
+
+ lastAppliedIndex = REBALANCE_IN_PROGRESS;
+ lastAppliedTerm = REBALANCE_IN_PROGRESS;
+ persistedIndex = REBALANCE_IN_PROGRESS;
+
+ CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+ this.rebalanceFuture = rebalanceFuture;
+
+ return rebalanceFuture;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Failed to clear storage for
partition {} of table {}", partitionId, getTableName()),
+ e
+ );
+ } finally {
+ busyLock.unblock();
+ }
}
@Override
- public CompletableFuture<Void> startFullRebalance() {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ public CompletableFuture<Void> abortRebalance() {
+ CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>)
REBALANCE_FUTURE.getAndSet(this, null);
+
+ if (rebalanceFuture == null) {
+ return completedFuture(null);
+ }
+
+ return rebalanceFuture
+ .thenAccept(unused -> {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.delete(lastAppliedIndexAndTermKey);
+
+ db.write(writeOptions, writeBatch);
+
+ lastAppliedIndex = 0;
+ lastAppliedTerm = 0;
+ persistedIndex = 0;
+
+ state = StorageState.RUNNABLE;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format(
+ "Failed to clear storage for partition
{} of table {}",
+ partitionId,
+ getTableName()
+ ),
+ e
+ );
+ }
+ });
}
@Override
- public CompletableFuture<Void> abortFullRebalance() {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long
lastAppliedTerm) {
+ CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>)
REBALANCE_FUTURE.getAndSet(this, null);
+
+ if (rebalanceFuture == null) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR,
"Rebalancing has not started");
+ }
+
+ return rebalanceFuture
+ .thenAccept(unused -> {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+ db.write(writeOptions, writeBatch);
+
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ this.persistedIndex = lastAppliedIndex;
+
+ state = StorageState.RUNNABLE;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format(
+ "Failed to finish rebalance for
partition {} of table {}",
+ partitionId,
+ getTableName()
+ ),
+ e
+ );
+ }
+ });
}
- @Override
- public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex,
long lastAppliedTerm) {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ private void initLastApplied() {
+ byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+ if (indexAndTermBytes != null) {
+ long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+ if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.delete(lastAppliedIndexAndTermKey);
+
+ db.write(writeOptions, writeBatch);
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format(
+ "Failed to clear storage for partition {}
of table {}",
+ partitionId,
+ getTableName()
+ ),
+ e
+ );
+ }
+ } else {
+ this.lastAppliedIndex = lastAppliedIndex;
+ persistedIndex = lastAppliedIndex;
+
+ lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+ }
+ }
+ }
+
+ private boolean close0() {
Review Comment:
Not the best name for the method that performs so many complex actions.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -57,6 +61,24 @@
* Tx state storage implementation based on RocksDB.
*/
public class TxStateRocksDbStorage implements TxStateStorage {
+ private static final VarHandle STATE;
+
+ private static final VarHandle REBALANCE_FUTURE;
+
+ static {
+ try {
+ STATE =
MethodHandles.lookup().findVarHandle(TxStateRocksDbStorage.class, "state",
StorageState.class);
Review Comment:
What's the problem with using atomics?
Are you sure that placing these values inside of the object is beneficial?
Do you have proofs?
I'd prefer a code simplicity if the advantage is not clear.
##########
modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/BusyRocksIteratorAdapter.java:
##########
@@ -42,21 +42,30 @@ protected BusyRocksIteratorAdapter(IgniteSpinBusyLock
busyLock, RocksIterator it
/**
* Handles busy lock acquiring failure. This means that db has been
stopped and cursor can't proceed. Must throw an exception.
*/
- protected abstract void handleBusy();
+ protected abstract void handleBusyFail();
- private void handleBusy0() {
- handleBusy();
+ /**
+ * Handles busy lock acquiring success.
+ */
+ protected void handeBusySuccess() {
Review Comment:
Why did you introduce this method? No one uses it. Can you please explain it?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
@Override
public void close() {
- if (!closeGuard.compareAndSet(false, true)) {
- return;
+ close0();
+ }
+
+ @Override
+ public CompletableFuture<Void> startRebalance() {
+ if (!STATE.compareAndSet(this, StorageState.RUNNABLE,
StorageState.REBALANCE)) {
+ throwExceptionIfStorageClosedOrRebalance();
}
busyLock.block();
- List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
- RocksUtils.closeAll(resources);
+ db.write(writeOptions, writeBatch);
+
+ lastAppliedIndex = REBALANCE_IN_PROGRESS;
+ lastAppliedTerm = REBALANCE_IN_PROGRESS;
+ persistedIndex = REBALANCE_IN_PROGRESS;
+
+ CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+ this.rebalanceFuture = rebalanceFuture;
+
+ return rebalanceFuture;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Failed to clear storage for
partition {} of table {}", partitionId, getTableName()),
+ e
+ );
+ } finally {
+ busyLock.unblock();
Review Comment:
Why do you need a busy lock in this method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]