denis-chudov commented on code in PR #2751:
URL: https://github.com/apache/ignite-3/pull/2751#discussion_r1382092944
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java:
##########
@@ -116,22 +115,27 @@ public interface TxManager extends IgniteComponent {
* transaction execution. The tracker is also used to determine
the read timestamp for read-only transactions. Each client
* should pass its own tracker to provide linearizability between
read-write and read-only transactions started by this client.
* @param commitPartition Partition to store a transaction state.
- * @param recipientNode Recipient node.
- * @param term Raft term.
- * @param commit {@code True} if a commit requested.
+ * @param commit {@code true} if a commit requested.
* @param enlistedGroups Enlisted partition groups with consistency token.
* @param txId Transaction id.
*/
CompletableFuture<Void> finish(
HybridTimestampTracker timestampTracker,
TablePartitionId commitPartition,
- ClusterNode recipientNode,
- Long term,
boolean commit,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
);
+
+ /**
+ * Make sure the state of the provided transaction is updated correctly.
+ *
+ * @param commit {@code true} if a commit requested.
+ * @param txId Transaction id.
+ */
+ CompletableFuture<Void> finishEmpty(boolean commit, UUID txId);
Review Comment:
I am not sure that it's worthy to extend the interface to add the method for
one specific case. Why do we need it?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1437,6 +1450,49 @@ private CompletableFuture<Void> finishAndCleanup(
UUID txId,
String txCoordinatorId
) {
+ TxMeta txMeta = txStateStorage.get(txId);
+
+ // Check that a transaction has already been finished.
+ boolean transactionAlreadyFinished = txMeta != null &&
isFinalState(txMeta.txState());
+
+ // Check locksReleased flag. If it is already set, do nothing and
return a successful result.
+ // Even if the outcome is different (the transaction was aborted, but
we want to commit it),
+ // we return 'success' to be in alignment with common transaction
handling.
+ if (transactionAlreadyFinished) {
+ if (txMeta.locksReleased()) {
+ return completedFuture(null);
+ }
+
+ assert !(txMeta.txState() == COMMITED && !commit) : "Not allowed
to abort an already committed transaction.";
Review Comment:
```suggestion
assert commit || txMeta.txState() == ABORTED : "Not allowed to
abort an already committed transaction [txState=" + txMeta.txState() + "].";
```
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/SimpleFailHandler.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import
org.apache.ignite.internal.replicator.exception.ReplicationTimeoutException;
+import org.apache.ignite.internal.util.ExceptionUtils;
+
+/**
+ * A simple fail handler implementation.
+ */
+public class SimpleFailHandler implements FailHandler {
+ private static final Class<? extends Throwable>[] RECOVERABLE = new
Class[]{
+ TimeoutException.class,
+ IOException.class,
+ ReplicationTimeoutException.class,
+ PrimaryReplicaMissException.class
+ };
Review Comment:
Why not `Set`?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FailHandler.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+/**
+ * A transaction fail handler.
+ */
+public interface FailHandler {
+ /**
+ * Check if the provided exception is recoverable.
Review Comment:
It lacks description of what "recoverable" means.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FailHandler.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+/**
+ * A transaction fail handler.
+ */
+public interface FailHandler {
Review Comment:
Is it specific for transactions? Javadoc says so. Let's name it
`TransactionFailureHandler`. It's more correct and specific to transactions,
while we are going to have `FailureHandler` defining node's behavior.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -295,139 +319,252 @@ public void finishFull(HybridTimestampTracker
timestampTracker, UUID txId, boole
updateTxMeta(txId, old -> new TxStateMeta(finalState,
old.txCoordinatorId(), old.commitTimestamp()));
}
+ private @Nullable HybridTimestamp getCommitTimestamp(boolean commit) {
+ return commit ? clock.now() : null;
+ }
+
+ private String coordinatorId() {
+ return localNodeId.get();
+ }
+
+ @Override
+ public CompletableFuture<Void> finishEmpty(boolean commit, UUID txId) {
+ // If there are no enlisted groups, just update local state - we
already marked the tx as finished.
+ updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit,
getCommitTimestamp(commit)));
+
+ return completedFuture(null);
+ }
+
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
- ClusterNode recipientNode,
- Long term,
boolean commit,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
assert enlistedGroups != null;
+ assert !enlistedGroups.isEmpty() : "No enlisted partitions found";
// Here we put finishing state meta into the local map, so that all
concurrent operations trying to read tx state
// with using read timestamp could see that this transaction is
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
// None of them now are able to update node's clock with read
timestamp and we can create the commit timestamp that is greater
// than all the read timestamps processed before.
// Every concurrent operation will now use a finish future from the
finishing state meta and get only final transaction
// state after the transaction is finished.
- TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(localNodeId.get());
- updateTxMeta(txId, old -> finishingStateMeta);
- HybridTimestamp commitTimestamp = commit ? clock.now() : null;
-
- // If there are no enlisted groups, just return - we already marked
the tx as finished.
- boolean finishRequestNeeded = !enlistedGroups.isEmpty();
-
- if (!finishRequestNeeded) {
- updateTxMeta(txId, old -> {
- TxStateMeta finalStateMeta =
coordinatorFinalTxStateMeta(commit, commitTimestamp);
-
- finishingStateMeta.txFinishFuture().complete(finalStateMeta);
-
- return finalStateMeta;
- });
+ TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(coordinatorId());
- return completedFuture(null);
- }
+ updateTxMeta(txId, old -> finishingStateMeta);
- Function<Void, CompletableFuture<Void>> clo = ignored -> {
- // In case of commit it's required to check whether current
primaries are still the same that were enlisted and whether
- // given primaries are not expired or, in other words, whether
commitTimestamp is less or equal to the enlisted primaries
- // expiration timestamps.
- CompletableFuture<Void> verificationFuture =
- commit ? verifyCommitTimestamp(enlistedGroups,
commitTimestamp) : completedFuture(null);
-
- return verificationFuture.handle(
- (unused, throwable) -> {
- Collection<ReplicationGroupId> replicationGroupIds =
new HashSet<>(enlistedGroups.keySet());
-
- boolean verifiedCommit = throwable == null && commit;
-
- TxFinishReplicaRequest req =
FACTORY.txFinishReplicaRequest()
- .txId(txId)
- .timestampLong(clock.nowLong())
- .groupId(commitPartition)
- .groups(replicationGroupIds)
- // In case of verification future failure
transaction will be rolled back.
- .commit(verifiedCommit)
-
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
- .term(term)
- .build();
-
- return replicaService.invoke(recipientNode,
req).thenRun(
- () -> {
- updateTxMeta(txId, old -> {
- if (isFinalState(old.txState())) {
-
finishingStateMeta.txFinishFuture().complete(old);
-
- return old;
- }
-
- assert old instanceof
TxStateMetaFinishing;
-
- TxStateMeta finalTxStateMeta =
coordinatorFinalTxStateMeta(verifiedCommit, commitTimestamp);
-
-
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
-
- return finalTxStateMeta;
- });
-
- if (verifiedCommit) {
-
observableTimestampTracker.update(commitTimestamp);
- }
- });
- })
- .thenCompose(Function.identity())
- // verification future is added in order to share proper
exception with the client
- .thenCompose(r -> verificationFuture);
- };
-
- AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
+ AtomicBoolean performingFinish = new AtomicBoolean();
TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
if (tuple0 == null) {
tuple0 = new TxContext(); // No writes enlisted.
}
- if (tuple0.finishFut == null) {
- tuple0.finishFut = new CompletableFuture<>();
- ref.set(tuple0.finishFut);
+ if (!tuple0.isTxFinishing()) {
+ tuple0.finishTx();
+
+ performingFinish.set(true);
}
return tuple0;
});
- if (ref.get() != null) { // This is a finishing thread.
- if (!commit) {
- clo.apply(null).handle((ignored, err) -> {
+ // This is a finishing thread.
+ if (performingFinish.get()) {
+ Function<Void, CompletableFuture<Void>> finishAction = ignored ->
+ makeFinishRequest(
+ observableTimestampTracker,
+ commitPartition,
+ commit,
+ enlistedGroups,
+ txId,
+ finishingStateMeta.txFinishFuture()
+ );
+
+ runFinish(commit, tuple, finishAction);
+ }
+
+ // The method `runFinish` has a side effect on
`finishInProgressFuture` future, it kicks off another future that will complete
it.
+ return tuple.finishInProgressFuture;
+ }
+
+ private static void runFinish(boolean commit, TxContext tuple,
Function<Void, CompletableFuture<Void>> finishAction) {
+ // Wait for commit acks first, then proceed with the finish request.
+ CompletableFuture<Void> finisher = commit ? tuple.waitNoInflights() :
completedFuture(null);
+
+ finisher
+ .thenCompose(finishAction)
+ .handle((ignored, err) -> {
if (err == null) {
- tuple.finishFut.complete(null);
+ tuple.finishInProgressFuture.complete(null);
} else {
- tuple.finishFut.completeExceptionally(err);
+
tuple.finishInProgressFuture.completeExceptionally(err);
}
return null;
});
- } else {
+ }
- // All inflights have been completed before the finish.
- if (tuple.inflights == 0) {
- tuple.waitRepFut.complete(null);
- }
+ private CompletableFuture<Void> makeFinishRequest(
+ HybridTimestampTracker observableTimestampTracker,
+ TablePartitionId commitPartition,
+ boolean commit,
+ Map<TablePartitionId, Long> enlistedGroups,
+ UUID txId,
+ CompletableFuture<TransactionMeta> txFinishFuture
+ ) {
+ HybridTimestamp commitTimestamp = getCommitTimestamp(commit);
+ // In case of commit it's required to check whether current primaries
are still the same that were enlisted and whether
+ // given primaries are not expired or, in other words, whether
commitTimestamp is less or equal to the enlisted primaries
+ // expiration timestamps.
+ CompletableFuture<Void> verificationFuture =
+ commit ? verifyCommitTimestamp(enlistedGroups,
commitTimestamp) : completedFuture(null);
+
+ return verificationFuture.handle(
+ (unused, throwable) -> {
+ boolean verifiedCommit = throwable == null &&
commit;
+
+ Collection<ReplicationGroupId> replicationGroupIds
= new HashSet<>(enlistedGroups.keySet());
+
+ return makeDurableFinishRequest(
+ observableTimestampTracker,
+ commitPartition,
+ verifiedCommit,
+ replicationGroupIds,
+ txId,
+ commitTimestamp,
+ txFinishFuture);
+ })
+ .thenCompose(Function.identity())
+ // verification future is added in order to share proper
exception with the client
+ .thenCompose(r -> verificationFuture);
+ }
- // Wait for commit acks first, then proceed with the finish
request.
- tuple.waitRepFut.thenCompose(clo).handle((ignored, err) -> {
- if (err == null) {
- tuple.finishFut.complete(null);
- } else {
- tuple.finishFut.completeExceptionally(err);
+ /**
+ * Durable finish request.
+ */
+ private CompletableFuture<Void> makeDurableFinishRequest(
+ HybridTimestampTracker observableTimestampTracker,
+ TablePartitionId commitPartition,
+ boolean commit,
+ Collection<ReplicationGroupId> replicationGroupIds,
+ UUID txId,
+ HybridTimestamp commitTimestamp,
+ CompletableFuture<TransactionMeta> txFinishFuture
+ ) {
+ return inBusyLockAsync(busyLock, () ->
findPrimaryReplica(commitPartition, clock.now())
+ .thenCompose(meta ->
+ finishOnPrimary(
+ observableTimestampTracker,
+ commitPartition,
+ meta.getLeaseholder(),
+ meta.getStartTime().longValue(),
+ commit,
+ replicationGroupIds,
+ txId,
+ commitTimestamp,
+ txFinishFuture
+ ))
+ .handle((res, ex) -> {
+ if (ex != null) {
+ Throwable cause = ExceptionUtils.unwrapCause(ex);
+
+ if (cause instanceof TransactionException) {
+ TransactionException transactionException =
(TransactionException) cause;
+
+ if (transactionException.code() ==
TX_WAS_ABORTED_ERR) {
+ updateTxMeta(txId, old -> new
TxStateMeta(ABORTED, old.txCoordinatorId(), null));
+
+ return
CompletableFuture.<Void>failedFuture(cause);
+ }
+ }
+
Review Comment:
extra empty line
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -135,6 +147,14 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler {
private final LongSupplier idleSafeTimePropagationPeriodMsSupplier;
+ /** Prevents double stopping of the tracker. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
Review Comment:
Why is it used just in one public method, except `stop`?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java:
##########
@@ -198,88 +197,7 @@ private static void waitingForLeaderCache(TableImpl tbl,
String primary) throws
* @throws InterruptedException If failed.
*/
private String transferPrimary(TableImpl tbl, @Nullable String
preferablePrimary) throws InterruptedException {
Review Comment:
I am not sure this method is still needed here.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1437,6 +1450,49 @@ private CompletableFuture<Void> finishAndCleanup(
UUID txId,
String txCoordinatorId
) {
+ TxMeta txMeta = txStateStorage.get(txId);
+
+ // Check that a transaction has already been finished.
+ boolean transactionAlreadyFinished = txMeta != null &&
isFinalState(txMeta.txState());
+
+ // Check locksReleased flag. If it is already set, do nothing and
return a successful result.
+ // Even if the outcome is different (the transaction was aborted, but
we want to commit it),
+ // we return 'success' to be in alignment with common transaction
handling.
+ if (transactionAlreadyFinished) {
+ if (txMeta.locksReleased()) {
+ return completedFuture(null);
+ }
+
+ assert !(txMeta.txState() == COMMITED && !commit) : "Not allowed
to abort an already committed transaction.";
+ // If the locks were not released, we are likely to be in a
recovery mode and retrying the finish request.
+ // In this case we want to check the expected outcome and the
actual one.
+ if (commit && txMeta.txState() == ABORTED) {
+ LOG.error("Failed to commit a transaction that is already
aborted [txId={}].", txId);
+
+ throw new TransactionException(TX_WAS_ABORTED_ERR,
Review Comment:
Why assertion error in case of `!commit && txState == COMMITED`
but transaction exception in case of `commit && txState == ABORTED`?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -295,139 +319,252 @@ public void finishFull(HybridTimestampTracker
timestampTracker, UUID txId, boole
updateTxMeta(txId, old -> new TxStateMeta(finalState,
old.txCoordinatorId(), old.commitTimestamp()));
}
+ private @Nullable HybridTimestamp getCommitTimestamp(boolean commit) {
+ return commit ? clock.now() : null;
+ }
+
+ private String coordinatorId() {
+ return localNodeId.get();
+ }
+
+ @Override
+ public CompletableFuture<Void> finishEmpty(boolean commit, UUID txId) {
+ // If there are no enlisted groups, just update local state - we
already marked the tx as finished.
+ updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit,
getCommitTimestamp(commit)));
+
+ return completedFuture(null);
+ }
+
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
- ClusterNode recipientNode,
- Long term,
boolean commit,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
assert enlistedGroups != null;
+ assert !enlistedGroups.isEmpty() : "No enlisted partitions found";
// Here we put finishing state meta into the local map, so that all
concurrent operations trying to read tx state
// with using read timestamp could see that this transaction is
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
// None of them now are able to update node's clock with read
timestamp and we can create the commit timestamp that is greater
// than all the read timestamps processed before.
// Every concurrent operation will now use a finish future from the
finishing state meta and get only final transaction
// state after the transaction is finished.
- TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(localNodeId.get());
- updateTxMeta(txId, old -> finishingStateMeta);
- HybridTimestamp commitTimestamp = commit ? clock.now() : null;
-
- // If there are no enlisted groups, just return - we already marked
the tx as finished.
- boolean finishRequestNeeded = !enlistedGroups.isEmpty();
-
- if (!finishRequestNeeded) {
- updateTxMeta(txId, old -> {
- TxStateMeta finalStateMeta =
coordinatorFinalTxStateMeta(commit, commitTimestamp);
-
- finishingStateMeta.txFinishFuture().complete(finalStateMeta);
-
- return finalStateMeta;
- });
+ TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(coordinatorId());
- return completedFuture(null);
- }
+ updateTxMeta(txId, old -> finishingStateMeta);
- Function<Void, CompletableFuture<Void>> clo = ignored -> {
- // In case of commit it's required to check whether current
primaries are still the same that were enlisted and whether
- // given primaries are not expired or, in other words, whether
commitTimestamp is less or equal to the enlisted primaries
- // expiration timestamps.
- CompletableFuture<Void> verificationFuture =
- commit ? verifyCommitTimestamp(enlistedGroups,
commitTimestamp) : completedFuture(null);
-
- return verificationFuture.handle(
- (unused, throwable) -> {
- Collection<ReplicationGroupId> replicationGroupIds =
new HashSet<>(enlistedGroups.keySet());
-
- boolean verifiedCommit = throwable == null && commit;
-
- TxFinishReplicaRequest req =
FACTORY.txFinishReplicaRequest()
- .txId(txId)
- .timestampLong(clock.nowLong())
- .groupId(commitPartition)
- .groups(replicationGroupIds)
- // In case of verification future failure
transaction will be rolled back.
- .commit(verifiedCommit)
-
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
- .term(term)
- .build();
-
- return replicaService.invoke(recipientNode,
req).thenRun(
- () -> {
- updateTxMeta(txId, old -> {
- if (isFinalState(old.txState())) {
-
finishingStateMeta.txFinishFuture().complete(old);
-
- return old;
- }
-
- assert old instanceof
TxStateMetaFinishing;
-
- TxStateMeta finalTxStateMeta =
coordinatorFinalTxStateMeta(verifiedCommit, commitTimestamp);
-
-
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
-
- return finalTxStateMeta;
- });
-
- if (verifiedCommit) {
-
observableTimestampTracker.update(commitTimestamp);
- }
- });
- })
- .thenCompose(Function.identity())
- // verification future is added in order to share proper
exception with the client
- .thenCompose(r -> verificationFuture);
- };
-
- AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
+ AtomicBoolean performingFinish = new AtomicBoolean();
TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
if (tuple0 == null) {
tuple0 = new TxContext(); // No writes enlisted.
}
- if (tuple0.finishFut == null) {
- tuple0.finishFut = new CompletableFuture<>();
- ref.set(tuple0.finishFut);
+ if (!tuple0.isTxFinishing()) {
+ tuple0.finishTx();
+
+ performingFinish.set(true);
}
return tuple0;
});
- if (ref.get() != null) { // This is a finishing thread.
- if (!commit) {
- clo.apply(null).handle((ignored, err) -> {
+ // This is a finishing thread.
+ if (performingFinish.get()) {
+ Function<Void, CompletableFuture<Void>> finishAction = ignored ->
Review Comment:
What purpose does this function serve? It is just passed to the method only
once, and called inside only once. Does it just pick some local context for
`runFinish` to reduce the number of parameters?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -295,139 +319,252 @@ public void finishFull(HybridTimestampTracker
timestampTracker, UUID txId, boole
updateTxMeta(txId, old -> new TxStateMeta(finalState,
old.txCoordinatorId(), old.commitTimestamp()));
}
+ private @Nullable HybridTimestamp getCommitTimestamp(boolean commit) {
+ return commit ? clock.now() : null;
+ }
+
+ private String coordinatorId() {
+ return localNodeId.get();
+ }
+
+ @Override
+ public CompletableFuture<Void> finishEmpty(boolean commit, UUID txId) {
+ // If there are no enlisted groups, just update local state - we
already marked the tx as finished.
+ updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit,
getCommitTimestamp(commit)));
+
+ return completedFuture(null);
+ }
+
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
- ClusterNode recipientNode,
- Long term,
boolean commit,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
assert enlistedGroups != null;
+ assert !enlistedGroups.isEmpty() : "No enlisted partitions found";
// Here we put finishing state meta into the local map, so that all
concurrent operations trying to read tx state
// with using read timestamp could see that this transaction is
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
// None of them now are able to update node's clock with read
timestamp and we can create the commit timestamp that is greater
// than all the read timestamps processed before.
// Every concurrent operation will now use a finish future from the
finishing state meta and get only final transaction
// state after the transaction is finished.
- TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(localNodeId.get());
- updateTxMeta(txId, old -> finishingStateMeta);
- HybridTimestamp commitTimestamp = commit ? clock.now() : null;
-
- // If there are no enlisted groups, just return - we already marked
the tx as finished.
- boolean finishRequestNeeded = !enlistedGroups.isEmpty();
-
- if (!finishRequestNeeded) {
- updateTxMeta(txId, old -> {
- TxStateMeta finalStateMeta =
coordinatorFinalTxStateMeta(commit, commitTimestamp);
-
- finishingStateMeta.txFinishFuture().complete(finalStateMeta);
-
- return finalStateMeta;
- });
+ TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(coordinatorId());
- return completedFuture(null);
- }
+ updateTxMeta(txId, old -> finishingStateMeta);
- Function<Void, CompletableFuture<Void>> clo = ignored -> {
- // In case of commit it's required to check whether current
primaries are still the same that were enlisted and whether
- // given primaries are not expired or, in other words, whether
commitTimestamp is less or equal to the enlisted primaries
- // expiration timestamps.
- CompletableFuture<Void> verificationFuture =
- commit ? verifyCommitTimestamp(enlistedGroups,
commitTimestamp) : completedFuture(null);
-
- return verificationFuture.handle(
- (unused, throwable) -> {
- Collection<ReplicationGroupId> replicationGroupIds =
new HashSet<>(enlistedGroups.keySet());
-
- boolean verifiedCommit = throwable == null && commit;
-
- TxFinishReplicaRequest req =
FACTORY.txFinishReplicaRequest()
- .txId(txId)
- .timestampLong(clock.nowLong())
- .groupId(commitPartition)
- .groups(replicationGroupIds)
- // In case of verification future failure
transaction will be rolled back.
- .commit(verifiedCommit)
-
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
- .term(term)
- .build();
-
- return replicaService.invoke(recipientNode,
req).thenRun(
- () -> {
- updateTxMeta(txId, old -> {
- if (isFinalState(old.txState())) {
-
finishingStateMeta.txFinishFuture().complete(old);
-
- return old;
- }
-
- assert old instanceof
TxStateMetaFinishing;
-
- TxStateMeta finalTxStateMeta =
coordinatorFinalTxStateMeta(verifiedCommit, commitTimestamp);
-
-
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
-
- return finalTxStateMeta;
- });
-
- if (verifiedCommit) {
-
observableTimestampTracker.update(commitTimestamp);
- }
- });
- })
- .thenCompose(Function.identity())
- // verification future is added in order to share proper
exception with the client
- .thenCompose(r -> verificationFuture);
- };
-
- AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
+ AtomicBoolean performingFinish = new AtomicBoolean();
TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
if (tuple0 == null) {
tuple0 = new TxContext(); // No writes enlisted.
}
- if (tuple0.finishFut == null) {
- tuple0.finishFut = new CompletableFuture<>();
- ref.set(tuple0.finishFut);
+ if (!tuple0.isTxFinishing()) {
+ tuple0.finishTx();
+
+ performingFinish.set(true);
}
return tuple0;
});
- if (ref.get() != null) { // This is a finishing thread.
- if (!commit) {
- clo.apply(null).handle((ignored, err) -> {
+ // This is a finishing thread.
+ if (performingFinish.get()) {
+ Function<Void, CompletableFuture<Void>> finishAction = ignored ->
+ makeFinishRequest(
+ observableTimestampTracker,
+ commitPartition,
+ commit,
+ enlistedGroups,
+ txId,
+ finishingStateMeta.txFinishFuture()
+ );
+
+ runFinish(commit, tuple, finishAction);
+ }
+
+ // The method `runFinish` has a side effect on
`finishInProgressFuture` future, it kicks off another future that will complete
it.
+ return tuple.finishInProgressFuture;
+ }
+
+ private static void runFinish(boolean commit, TxContext tuple,
Function<Void, CompletableFuture<Void>> finishAction) {
Review Comment:
I like the idea of splitting the spaghetti code to the smaller methods, but
their names should make sense in order to allow guessing which logic they
contain. Now in this PR we have 5 "finish" methods:
- finish
- finishEmpty
- finishFull
- finishOnPrimary
- runFinish
and I am already lost...
Same with `makeFinishRequest` (for which making the request is not even its
main job) and `makeDurableFinishRequest`.
I suggest to
- remove `finishEmpty`, `runFinish` and `makeFinishRequest`;
- `finishOnPrimary` should be renamed to `makeFinishRequest`;
- move the logic related to computing of `performingFinish` to a method
inside of TxContext;
- rename `makeDurableFinishRequest` to `durableFinish`
WDYT?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -295,139 +319,252 @@ public void finishFull(HybridTimestampTracker
timestampTracker, UUID txId, boole
updateTxMeta(txId, old -> new TxStateMeta(finalState,
old.txCoordinatorId(), old.commitTimestamp()));
}
+ private @Nullable HybridTimestamp getCommitTimestamp(boolean commit) {
+ return commit ? clock.now() : null;
+ }
+
+ private String coordinatorId() {
+ return localNodeId.get();
+ }
+
+ @Override
+ public CompletableFuture<Void> finishEmpty(boolean commit, UUID txId) {
+ // If there are no enlisted groups, just update local state - we
already marked the tx as finished.
+ updateTxMeta(txId, old -> coordinatorFinalTxStateMeta(commit,
getCommitTimestamp(commit)));
+
+ return completedFuture(null);
+ }
+
@Override
public CompletableFuture<Void> finish(
HybridTimestampTracker observableTimestampTracker,
TablePartitionId commitPartition,
- ClusterNode recipientNode,
- Long term,
boolean commit,
Map<TablePartitionId, Long> enlistedGroups,
UUID txId
) {
assert enlistedGroups != null;
+ assert !enlistedGroups.isEmpty() : "No enlisted partitions found";
// Here we put finishing state meta into the local map, so that all
concurrent operations trying to read tx state
// with using read timestamp could see that this transaction is
finishing, see #transactionMetaReadTimestampAware(txId, timestamp).
// None of them now are able to update node's clock with read
timestamp and we can create the commit timestamp that is greater
// than all the read timestamps processed before.
// Every concurrent operation will now use a finish future from the
finishing state meta and get only final transaction
// state after the transaction is finished.
- TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(localNodeId.get());
- updateTxMeta(txId, old -> finishingStateMeta);
- HybridTimestamp commitTimestamp = commit ? clock.now() : null;
-
- // If there are no enlisted groups, just return - we already marked
the tx as finished.
- boolean finishRequestNeeded = !enlistedGroups.isEmpty();
-
- if (!finishRequestNeeded) {
- updateTxMeta(txId, old -> {
- TxStateMeta finalStateMeta =
coordinatorFinalTxStateMeta(commit, commitTimestamp);
-
- finishingStateMeta.txFinishFuture().complete(finalStateMeta);
-
- return finalStateMeta;
- });
+ TxStateMetaFinishing finishingStateMeta = new
TxStateMetaFinishing(coordinatorId());
- return completedFuture(null);
- }
+ updateTxMeta(txId, old -> finishingStateMeta);
- Function<Void, CompletableFuture<Void>> clo = ignored -> {
- // In case of commit it's required to check whether current
primaries are still the same that were enlisted and whether
- // given primaries are not expired or, in other words, whether
commitTimestamp is less or equal to the enlisted primaries
- // expiration timestamps.
- CompletableFuture<Void> verificationFuture =
- commit ? verifyCommitTimestamp(enlistedGroups,
commitTimestamp) : completedFuture(null);
-
- return verificationFuture.handle(
- (unused, throwable) -> {
- Collection<ReplicationGroupId> replicationGroupIds =
new HashSet<>(enlistedGroups.keySet());
-
- boolean verifiedCommit = throwable == null && commit;
-
- TxFinishReplicaRequest req =
FACTORY.txFinishReplicaRequest()
- .txId(txId)
- .timestampLong(clock.nowLong())
- .groupId(commitPartition)
- .groups(replicationGroupIds)
- // In case of verification future failure
transaction will be rolled back.
- .commit(verifiedCommit)
-
.commitTimestampLong(hybridTimestampToLong(commitTimestamp))
- .term(term)
- .build();
-
- return replicaService.invoke(recipientNode,
req).thenRun(
- () -> {
- updateTxMeta(txId, old -> {
- if (isFinalState(old.txState())) {
-
finishingStateMeta.txFinishFuture().complete(old);
-
- return old;
- }
-
- assert old instanceof
TxStateMetaFinishing;
-
- TxStateMeta finalTxStateMeta =
coordinatorFinalTxStateMeta(verifiedCommit, commitTimestamp);
-
-
finishingStateMeta.txFinishFuture().complete(finalTxStateMeta);
-
- return finalTxStateMeta;
- });
-
- if (verifiedCommit) {
-
observableTimestampTracker.update(commitTimestamp);
- }
- });
- })
- .thenCompose(Function.identity())
- // verification future is added in order to share proper
exception with the client
- .thenCompose(r -> verificationFuture);
- };
-
- AtomicReference<CompletableFuture<Void>> ref = new AtomicReference<>();
+ AtomicBoolean performingFinish = new AtomicBoolean();
TxContext tuple = txCtxMap.compute(txId, (uuid, tuple0) -> {
if (tuple0 == null) {
tuple0 = new TxContext(); // No writes enlisted.
}
- if (tuple0.finishFut == null) {
- tuple0.finishFut = new CompletableFuture<>();
- ref.set(tuple0.finishFut);
+ if (!tuple0.isTxFinishing()) {
+ tuple0.finishTx();
+
+ performingFinish.set(true);
}
return tuple0;
});
- if (ref.get() != null) { // This is a finishing thread.
- if (!commit) {
- clo.apply(null).handle((ignored, err) -> {
+ // This is a finishing thread.
+ if (performingFinish.get()) {
+ Function<Void, CompletableFuture<Void>> finishAction = ignored ->
+ makeFinishRequest(
+ observableTimestampTracker,
+ commitPartition,
+ commit,
+ enlistedGroups,
+ txId,
+ finishingStateMeta.txFinishFuture()
+ );
+
+ runFinish(commit, tuple, finishAction);
+ }
+
+ // The method `runFinish` has a side effect on
`finishInProgressFuture` future, it kicks off another future that will complete
it.
+ return tuple.finishInProgressFuture;
+ }
+
+ private static void runFinish(boolean commit, TxContext tuple,
Function<Void, CompletableFuture<Void>> finishAction) {
+ // Wait for commit acks first, then proceed with the finish request.
+ CompletableFuture<Void> finisher = commit ? tuple.waitNoInflights() :
completedFuture(null);
+
+ finisher
+ .thenCompose(finishAction)
+ .handle((ignored, err) -> {
if (err == null) {
- tuple.finishFut.complete(null);
+ tuple.finishInProgressFuture.complete(null);
} else {
- tuple.finishFut.completeExceptionally(err);
+
tuple.finishInProgressFuture.completeExceptionally(err);
}
return null;
});
- } else {
+ }
- // All inflights have been completed before the finish.
- if (tuple.inflights == 0) {
- tuple.waitRepFut.complete(null);
- }
+ private CompletableFuture<Void> makeFinishRequest(
+ HybridTimestampTracker observableTimestampTracker,
+ TablePartitionId commitPartition,
+ boolean commit,
+ Map<TablePartitionId, Long> enlistedGroups,
+ UUID txId,
+ CompletableFuture<TransactionMeta> txFinishFuture
+ ) {
+ HybridTimestamp commitTimestamp = getCommitTimestamp(commit);
Review Comment:
I would suggest to leave the generation of commit timestamp in `finish`
method to emphasize that it must be generated only once. It's not obvious in
this method and can be lost during some refactoring. But I also suggest to
refactor it, as I said in another comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]