denis-chudov commented on code in PR #1663:
URL: https://github.com/apache/ignite-3/pull/1663#discussion_r1105182807
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -2177,14 +2238,19 @@ private CompletableFuture<BinaryTuple>
extractKey(@Nullable BinaryRow row) {
}
/**
- * Class that stores list of futures for updates that can block tx
cleanup, and finished flag.
+ * Class that stores a list of futures for operations that has happened in
a specific transaction, and a state of the transaction relate
+ * of the replica.
Review Comment:
"a state of the transaction relate of the replica."
honestly, I couln't get this, could you rephrase?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1086,10 +1111,30 @@ private CompletableFuture<Void>
processTxCleanupAction(TxCleanupReplicaRequest r
return raftClient
.run(txCleanupCmd)
- .thenRun(() -> releaseTxLocks(request.txId()));
+ .thenCompose(ignored ->
allOffFuturesExceptionIgnored(txReadFutures, request)
+ .thenRun(() -> releaseTxLocks(request.txId())));
});
}
+ /**
+ * Creates a future that waits all transaction operations are completed.
+ *
+ * @param txFutures Transaction operation futures.
+ * @param request Cleanup request.
+ * @return The future completes when all futures in passed list are
completed.
+ */
+ private static CompletableFuture<Void>
allOffFuturesExceptionIgnored(List<CompletableFuture<?>> txFutures,
+ TxCleanupReplicaRequest request) {
+ return allOf(txFutures.toArray(new CompletableFuture<?>[0]))
+ .exceptionally(e -> {
+ assert !request.commit() :
+ "Transaction is committing, but an operation was
completed with exception [txId=" + request.txId()
Review Comment:
```suggestion
"Transaction is committing, but an operation has
completed with exception [txId=" + request.txId()
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1086,10 +1111,30 @@ private CompletableFuture<Void>
processTxCleanupAction(TxCleanupReplicaRequest r
return raftClient
.run(txCleanupCmd)
- .thenRun(() -> releaseTxLocks(request.txId()));
+ .thenCompose(ignored ->
allOffFuturesExceptionIgnored(txReadFutures, request)
+ .thenRun(() -> releaseTxLocks(request.txId())));
Review Comment:
just a note: is it useful (i mean, waiting read futures separately from
update futures) given that we are going to move releaseTxLocks before cleanup
raft command?
##########
modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java:
##########
@@ -1898,4 +1900,72 @@ public void
testReadOnlyPendingWriteIntentSkippedCombined() {
Collection<Tuple> retrievedKeys3 =
accounts.recordView().getAll(readOnlyTx2, List.of(makeKey(1), makeKey(2)));
validateBalance(retrievedKeys3, 300.);
}
+
+ @Test
+ public void testTransactionAlreadyCommitted() {
+ testTransactionAlreadyFixed(true);
+ }
+
+ @Test
+ public void testTransactionAlreadyRolledback() {
+ testTransactionAlreadyFixed(false);
+ }
+
+ /**
+ * Checks operations that act after a transaction is committed, are
finished with exception.
+ *
+ * @param commit True when transaction is committed, false the transaction
is rolled back.
+ */
+ private void testTransactionAlreadyFixed(boolean commit) {
Review Comment:
maybe `testTransactionAlreadyFinished`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]