tkalkirill commented on code in PR #1478:
URL: https://github.com/apache/ignite-3/pull/1478#discussion_r1058175467
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -445,32 +479,210 @@ private UUID keyToTxId(byte[] bytes) {
@Override
public void close() {
- if (!closeGuard.compareAndSet(false, true)) {
- return;
+ close0();
+ }
+
+ @Override
+ public CompletableFuture<Void> startRebalance() {
+ if (!STATE.compareAndSet(this, StorageState.RUNNABLE,
StorageState.REBALANCE)) {
+ throwExceptionIfStorageClosedOrRebalance();
}
busyLock.block();
- List<AbstractNativeReference> resources = new ArrayList<>(iterators);
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
- RocksUtils.closeAll(resources);
+ db.write(writeOptions, writeBatch);
+
+ lastAppliedIndex = REBALANCE_IN_PROGRESS;
+ lastAppliedTerm = REBALANCE_IN_PROGRESS;
+ persistedIndex = REBALANCE_IN_PROGRESS;
+
+ CompletableFuture<Void> rebalanceFuture = completedFuture(null);
+
+ this.rebalanceFuture = rebalanceFuture;
+
+ return rebalanceFuture;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format("Failed to clear storage for
partition {} of table {}", partitionId, getTableName()),
+ e
+ );
+ } finally {
+ busyLock.unblock();
+ }
}
@Override
- public CompletableFuture<Void> startFullRebalance() {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ public CompletableFuture<Void> abortRebalance() {
+ CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>)
REBALANCE_FUTURE.getAndSet(this, null);
+
+ if (rebalanceFuture == null) {
+ return completedFuture(null);
+ }
+
+ return rebalanceFuture
+ .thenAccept(unused -> {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.delete(lastAppliedIndexAndTermKey);
+
+ db.write(writeOptions, writeBatch);
+
+ lastAppliedIndex = 0;
+ lastAppliedTerm = 0;
+ persistedIndex = 0;
+
+ state = StorageState.RUNNABLE;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format(
+ "Failed to clear storage for partition
{} of table {}",
+ partitionId,
+ getTableName()
+ ),
+ e
+ );
+ }
+ });
}
@Override
- public CompletableFuture<Void> abortFullRebalance() {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ public CompletableFuture<Void> finishRebalance(long lastAppliedIndex, long
lastAppliedTerm) {
+ CompletableFuture<Void> rebalanceFuture = (CompletableFuture<Void>)
REBALANCE_FUTURE.getAndSet(this, null);
+
+ if (rebalanceFuture == null) {
+ throw new IgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR,
"Rebalancing has not started");
+ }
+
+ return rebalanceFuture
+ .thenAccept(unused -> {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.put(lastAppliedIndexAndTermKey,
indexAndTermToBytes(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS));
+
+ db.write(writeOptions, writeBatch);
+
+ this.lastAppliedIndex = lastAppliedIndex;
+ this.lastAppliedTerm = lastAppliedTerm;
+ this.persistedIndex = lastAppliedIndex;
+
+ state = StorageState.RUNNABLE;
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format(
+ "Failed to finish rebalance for
partition {} of table {}",
+ partitionId,
+ getTableName()
+ ),
+ e
+ );
+ }
+ });
}
- @Override
- public CompletableFuture<Void> finishFullRebalance(long lastAppliedIndex,
long lastAppliedTerm) {
- // TODO: IGNITE-18024 Implement
- throw new UnsupportedOperationException();
+ private void initLastApplied() {
+ byte[] indexAndTermBytes = readLastAppliedIndexAndTerm(readOptions);
+
+ if (indexAndTermBytes != null) {
+ long lastAppliedIndex = bytesToLong(indexAndTermBytes);
+
+ if (lastAppliedIndex == REBALANCE_IN_PROGRESS) {
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ writeBatch.deleteRange(partitionStartPrefix(),
partitionEndPrefix());
+ writeBatch.delete(lastAppliedIndexAndTermKey);
+
+ db.write(writeOptions, writeBatch);
+ } catch (Exception e) {
+ throw new IgniteInternalException(
+ TX_STATE_STORAGE_REBALANCE_ERR,
+ IgniteStringFormatter.format(
+ "Failed to clear storage for partition {}
of table {}",
+ partitionId,
+ getTableName()
+ ),
+ e
+ );
+ }
+ } else {
+ this.lastAppliedIndex = lastAppliedIndex;
+ persistedIndex = lastAppliedIndex;
+
+ lastAppliedTerm = bytesToLong(indexAndTermBytes, Long.BYTES);
+ }
+ }
+ }
+
+ private boolean close0() {
Review Comment:
Tried to rename and add documentation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]