Cyrill commented on code in PR #2751:
URL: https://github.com/apache/ignite-3/pull/2751#discussion_r1383154207
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -295,139 +319,252 @@ public void finishFull(HybridTimestampTracker
timestampTracker, UUID txId, boole
updateTxMeta(txId, old -> new TxStateMeta(finalState,
old.txCoordinatorId(), old.commitTimestamp()));
}
+ private @Nullable HybridTimestamp getCommitTimestamp(boolean commit) {
+ return commit ? clock.now() : null;
+ }
+
+ private String coordinatorId() {
+ return localNodeId.get();
+ }
+
+ @Override
+ public CompletableFuture<Void> finishEmpty(boolean commit, UUID txId) {
+ // If there are no enlisted groups, just update local state - we
already marked the tx as finished.
+ updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit,
getCommitTimestamp(commit)));
+
+ return completedFuture(null);
+ }
+
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
- ClusterNode recipientNode,
- Long term,
boolean commit,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
assert enlistedGroups != null;
+ assert !enlistedGroups.isEmpty() : "No enlisted partitions found";
// Here we put finishing state meta into the local map, so that all
concurrent operations trying to read tx state
// with using read timestamp could see that this transaction is
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
// None of them now are able to update node's clock with read
timestamp and we can create the commit timestamp that is greater
// than all the read timestamps processed before.
// Every concurrent operation will now use a finish future from the
finishing state meta and get only final transaction
// state after the transaction is finished.
- TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(localNodeId.get());
- updateTxMeta(txId, old -> finishingStateMeta);
- HybridTimestamp commitTimestamp = commit ? clock.now() : null;
-
- // If there are no enlisted groups, just return - we already marked
the tx as finished.
- boolean finishRequestNeeded = !enlistedGroups.isEmpty();
-
- if (!finishRequestNeeded) {
- updateTxMeta(txId, old -> {
- TxStateMeta finalStateMeta =
coordinatorFinalTxStateMeta(commit, commitTimestamp);
-
- finishingStateMeta.txFinishFuture().complete(finalStateMeta);
-
- return finalStateMeta;
- });
+ TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(coordinatorId());
- return completedFuture(null);
- }
+ updateTxMeta(txId, old -> finishingStateMeta);
- Function<Void, CompletableFuture<Void>> clo = ignored -> {
- // In case of commit it's required to check whether current
primaries are still the same that were enlisted and whether
- // given primaries are not expired or, in other words, whether
commitTimestamp is less or equal to the enlisted primaries
- // expiration timestamps.
- CompletableFuture<Void> verificationFuture =
- commit ? verifyCommitTimestamp(enlistedGroups,
commitTimestamp) : completedFuture(null);
-
- return verificationFuture.handle(
- (unused, throwable) -> {
- Collection<ReplicationGroupId> replicationGroupIds =
new HashSet<>(enlistedGroups.keySet());
-
- boolean verifiedCommit = throwable == null && commit;
-
- TxFinishReplicaRequest req =
FACTORY.txFinishReplicaRequest()
- .txId(txId)
- .timestampLong(clock.nowLong())
- .groupId(commitPartition)
- .groups(replicationGroupIds)
- // In case of verification future failure
transaction will be rolled back.
- .commit(verifiedCommit)
-
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
- .term(term)
- .build();
-
- return replicaService.invoke(recipientNode,
req).thenRun(
- () -> {
- updateTxMeta(txId, old -> {
- if (isFinalState(old.txState())) {
-
finishingStateMeta.txFinishFuture().complete(old);
-
- return old;
- }
-
- assert old instanceof
TxStateMetaFinishing;
-
- TxStateMeta finalTxStateMeta =
coordinatorFinalTxStateMeta(verifiedCommit, commitTimestamp);
-
-
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
-
- return finalTxStateMeta;
- });
-
- if (verifiedCommit) {
-
observableTimestampTracker.update(commitTimestamp);
- }
- });
- })
- .thenCompose(Function.identity())
- // verification future is added in order to share proper
exception with the client
- .thenCompose(r -> verificationFuture);
- };
-
- AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
+ AtomicBoolean performingFinish = new AtomicBoolean();
TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
if (tuple0 == null) {
tuple0 = new TxContext(); // No writes enlisted.
}
- if (tuple0.finishFut == null) {
- tuple0.finishFut = new CompletableFuture<>();
- ref.set(tuple0.finishFut);
+ if (!tuple0.isTxFinishing()) {
+ tuple0.finishTx();
+
+ performingFinish.set(true);
}
return tuple0;
});
- if (ref.get() != null) { // This is a finishing thread.
- if (!commit) {
- clo.apply(null).handle((ignored, err) -> {
+ // This is a finishing thread.
+ if (performingFinish.get()) {
+ Function<Void, CompletableFuture<Void>> finishAction = ignored ->
+ makeFinishRequest(
+ observableTimestampTracker,
+ commitPartition,
+ commit,
+ enlistedGroups,
+ txId,
+ finishingStateMeta.txFinishFuture()
+ );
+
+ runFinish(commit, tuple, finishAction);
+ }
+
+ // The method `runFinish` has a side effect on
`finishInProgressFuture` future, it kicks off another future that will complete
it.
+ return tuple.finishInProgressFuture;
+ }
+
+ private static void runFinish(boolean commit, TxContext tuple,
Function<Void, CompletableFuture<Void>> finishAction) {
Review Comment:
I did some sort of refactoring here. please check the new verison
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]