sashapolo commented on code in PR #5187:
URL: https://github.com/apache/ignite-3/pull/5187#discussion_r1949280970
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -505,6 +518,9 @@ private CompletableFuture<?>
createZonePartitionReplicationNode(
rebalanceRetryDelayConfiguration
);
+ var safeTimeTracker = new
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
Review Comment:
These values are only needed for the `raftGroupListener`. Shall we move it
into the same lambda below? Also, I think the lambda itself should be extracted
into a method
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -103,18 +134,23 @@ public void
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
private void processWriteCommand(CommandClosure<WriteCommand> clo) {
Command command = clo.command();
- if (command instanceof FinishTxCommand) {
- FinishTxCommand cmd = (FinishTxCommand) command;
+ long commandIndex = clo.index();
+ long commandTerm = clo.term();
+ @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();
Review Comment:
I don't think `@Nullable` annotation is needed here, it should be deduced
automatically
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -124,6 +160,23 @@ private void
processWriteCommand(CommandClosure<WriteCommand> clo) {
clo.result(null);
}
+
+ // result == null means that the command either was not handled by
anyone (and clo.result() is called) or
+ // that it was delegated to a table processor (which called
clo.result()).
+ if (result != null) {
+ if (Boolean.TRUE.equals(result.get2())) {
Review Comment:
This is weird, why did you write the comparison this way?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ReplicaTxFinishMarker.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains transaction finishing logic for partitions' {@link
ReplicaListener} implementations.
+ */
+public class ReplicaTxFinishMarker {
+ private final TxManager txManager;
+
+ public ReplicaTxFinishMarker(TxManager txManager) {
+ this.txManager = txManager;
+ }
+
+ /**
+ * Marks the transaction as finished in local tx state map.
+ *
+ * @param txId Transaction id.
+ * @param txState Transaction state, must be either {@link
TxState#COMMITTED} or {@link TxState#ABORTED}.
+ * @param commitTimestamp Commit timestamp ({@code null} when aborting).
+ */
+ public void markFinished(UUID txId, TxState txState, @Nullable
HybridTimestamp commitTimestamp) {
+ assert isFinalState(txState) : "Unexpected state [txId=" + txId + ",
txState=" + txState + ']';
+
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(
Review Comment:
Please see my comments in the other similar class
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+ /** Factory to create RAFT command messages. */
+ private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
+ new PartitionReplicationMessagesFactory();
+
+ /** Factory for creating replica command messages. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
+ private final TxStatePartitionStorage txStatePartitionStorage;
+ private final ClockService clockService;
+ private final TxManager txManager;
+ private final ReplicationGroupId replicationGroupId;
+
+ private final SchemaCompatibilityValidator schemaCompatValidator;
+ private final ReliableCatalogVersions reliableCatalogVersions;
+ private final ReplicationRaftCommandApplicator raftCommandApplicator;
+ private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+ /** Constructor. */
+ public TxFinishReplicaRequestHandler(
+ TxStatePartitionStorage txStatePartitionStorage,
+ ClockService clockService,
+ TxManager txManager,
+ ValidationSchemasSource validationSchemasSource,
+ SchemaSyncService schemaSyncService,
+ CatalogService catalogService,
+ RaftCommandRunner raftCommandRunner,
+ ReplicationGroupId replicationGroupId
+ ) {
+ this.txStatePartitionStorage = txStatePartitionStorage;
+ this.clockService = clockService;
+ this.txManager = txManager;
+ this.replicationGroupId = replicationGroupId;
+
+ schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
+ raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+ replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+ }
+
+ /**
+ * Processes transaction finish request.
+ * <ol>
+ * <li>Get commit timestamp from finish replica request.</li>
+ * <li>If attempting a commit, validate commit (and, if not valid,
switch to abort)</li>
+ * <li>Run specific raft {@code FinishTxCommand} command, that will
apply txn state to corresponding txStateStorage.</li>
+ * <li>Send cleanup requests to all enlisted primary replicas.</li>
+ * </ol>
+ *
+ * @param request Transaction finish request.
+ * @return future result of the operation.
+ */
+ public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest
request) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use
ZonePartitionIdMessage and remove cast
+ Map<TablePartitionId, String> enlistedGroups =
asTablePartitionIdStringMap(request.groups());
+
+ UUID txId = request.txId();
+
+ if (request.commit()) {
+ HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+ return schemaCompatValidator.validateCommit(txId,
enlistedGroups.keySet(), commitTimestamp)
+ .thenCompose(validationResult ->
+ finishAndCleanup(
+ enlistedGroups,
+ validationResult.isSuccessful(),
+ validationResult.isSuccessful() ?
commitTimestamp : null,
+ txId
+ ).thenApply(txResult -> {
+
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+ return txResult;
+ }));
+ } else {
+ // Aborting.
+ return finishAndCleanup(enlistedGroups, false, null, txId);
+ }
+ }
+
+ private static Map<TablePartitionId, String>
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+ var result = new HashMap<TablePartitionId, String>(messages.size());
+
+ for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+ result.put(e.getKey().asTablePartitionId(), e.getValue());
+ }
+
+ return result;
+ }
+
+ private CompletableFuture<TransactionResult> finishAndCleanup(
+ Map<TablePartitionId, String> enlistedPartitions,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ UUID txId
+ ) {
+ // Read TX state from the storage, we will need this state to check if
the locks are released.
+ // Since this state is written only on the transaction finish (see
PartitionListener.handleFinishTxCommand),
+ // the value of txMeta can be either null or COMMITTED/ABORTED. No
other values is expected.
Review Comment:
I don't understand. We don't expect any other values, but yet we later
explicitly check for them in `transactionAlreadyFinished`. What am I missing?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+ /** Factory to create RAFT command messages. */
+ private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
+ new PartitionReplicationMessagesFactory();
+
+ /** Factory for creating replica command messages. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
+ private final TxStatePartitionStorage txStatePartitionStorage;
+ private final ClockService clockService;
+ private final TxManager txManager;
+ private final ReplicationGroupId replicationGroupId;
+
+ private final SchemaCompatibilityValidator schemaCompatValidator;
+ private final ReliableCatalogVersions reliableCatalogVersions;
+ private final ReplicationRaftCommandApplicator raftCommandApplicator;
+ private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+ /** Constructor. */
+ public TxFinishReplicaRequestHandler(
+ TxStatePartitionStorage txStatePartitionStorage,
+ ClockService clockService,
+ TxManager txManager,
+ ValidationSchemasSource validationSchemasSource,
+ SchemaSyncService schemaSyncService,
+ CatalogService catalogService,
+ RaftCommandRunner raftCommandRunner,
+ ReplicationGroupId replicationGroupId
+ ) {
+ this.txStatePartitionStorage = txStatePartitionStorage;
+ this.clockService = clockService;
+ this.txManager = txManager;
+ this.replicationGroupId = replicationGroupId;
+
+ schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
+ raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+ replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+ }
+
+ /**
+ * Processes transaction finish request.
+ * <ol>
+ * <li>Get commit timestamp from finish replica request.</li>
+ * <li>If attempting a commit, validate commit (and, if not valid,
switch to abort)</li>
+ * <li>Run specific raft {@code FinishTxCommand} command, that will
apply txn state to corresponding txStateStorage.</li>
+ * <li>Send cleanup requests to all enlisted primary replicas.</li>
+ * </ol>
+ *
+ * @param request Transaction finish request.
+ * @return future result of the operation.
+ */
+ public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest
request) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use
ZonePartitionIdMessage and remove cast
+ Map<TablePartitionId, String> enlistedGroups =
asTablePartitionIdStringMap(request.groups());
+
+ UUID txId = request.txId();
+
+ if (request.commit()) {
+ HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+ return schemaCompatValidator.validateCommit(txId,
enlistedGroups.keySet(), commitTimestamp)
+ .thenCompose(validationResult ->
+ finishAndCleanup(
+ enlistedGroups,
+ validationResult.isSuccessful(),
+ validationResult.isSuccessful() ?
commitTimestamp : null,
+ txId
+ ).thenApply(txResult -> {
+
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+ return txResult;
+ }));
+ } else {
+ // Aborting.
+ return finishAndCleanup(enlistedGroups, false, null, txId);
+ }
+ }
+
+ private static Map<TablePartitionId, String>
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+ var result = new HashMap<TablePartitionId, String>(messages.size());
+
+ for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+ result.put(e.getKey().asTablePartitionId(), e.getValue());
+ }
+
+ return result;
+ }
+
+ private CompletableFuture<TransactionResult> finishAndCleanup(
+ Map<TablePartitionId, String> enlistedPartitions,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ UUID txId
+ ) {
+ // Read TX state from the storage, we will need this state to check if
the locks are released.
+ // Since this state is written only on the transaction finish (see
PartitionListener.handleFinishTxCommand),
+ // the value of txMeta can be either null or COMMITTED/ABORTED. No
other values is expected.
+ TxMeta txMeta = txStatePartitionStorage.get(txId);
+
+ // Check whether a transaction has already been finished.
+ boolean transactionAlreadyFinished = txMeta != null &&
isFinalState(txMeta.txState());
+
+ if (transactionAlreadyFinished) {
+ // - The Coordinator calls use same tx state over retries, both
abort and commit are possible.
+ // - Server side recovery may only change tx state to aborted.
+ // - The Coordinator itself should prevent user calls with
different proposed state to the one,
+ // that was already triggered (e.g. the client side ->
txCoordinator.commitAsync(); txCoordinator.rollbackAsync()).
+ // - A coordinator might send a commit, then die, but the commit
message might still arrive at the commit partition primary.
+ // If it arrived with a delay, another node might come across a
write intent/lock from that tx
+ // and realize that the coordinator is no longer available and
start tx recovery.
+ // The original commit message might arrive later than the
recovery one,
+ // hence a 'commit over rollback' case.
+ // The possible states that a 'commit' is allowed to see:
+ // - null (if it's the first change state attempt)
+ // - committed (if it was already updated in the previous attempt)
+ // - aborted (if it was aborted by the initiate recovery logic,
+ // though this is a very unlikely case because initiate recovery
will only roll back the tx if coordinator is dead).
+ //
+ // Within 'roll back' it's allowed to see:
+ // - null (if it's the first change state attempt)
+ // - aborted (if it was already updated in the previous attempt
or the result of a concurrent recovery)
+ // - commit (if initiate recovery has started, but a delayed
message from the coordinator finally arrived and executed earlier).
+
+ // Let the client know a transaction has finished with a different
outcome.
+ if (commit != (txMeta.txState() == COMMITTED)) {
+ LOG.error("Failed to finish a transaction that is already
finished [txId={}, expectedState={}, actualState={}].",
+ txId,
+ commit ? COMMITTED : ABORTED,
+ txMeta.txState()
+ );
+
+ throw new MismatchingTransactionOutcomeInternalException(
+ "Failed to change the outcome of a finished
transaction [txId=" + txId + ", txState=" + txMeta.txState() + "].",
+ new TransactionResult(txMeta.txState(),
txMeta.commitTimestamp())
+ );
+ }
+
+ return completedFuture(new TransactionResult(txMeta.txState(),
txMeta.commitTimestamp()));
+ }
+
+ return finishTransaction(enlistedPartitions.keySet(), txId, commit,
commitTimestamp)
+ .thenCompose(txResult ->
+ txManager.cleanup(replicationGroupId, enlistedPartitions,
commit, commitTimestamp, txId)
+ .thenApply(v -> txResult)
+ );
+ }
+
+ private static void
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult,
TransactionResult txResult) {
+ if (!validationResult.isSuccessful()) {
+ if (validationResult.isTableDropped()) {
+ throw new IncompatibleSchemaAbortException(
+ format("Commit failed because a table was already
dropped [table={}]", validationResult.failedTableName()),
+ txResult
+ );
+ } else {
+ throw new IncompatibleSchemaAbortException(
+ format(
+ "Commit failed because schema is not
forward-compatible "
+ + "[fromSchemaVersion={},
toSchemaVersion={}, table={}, details={}]",
+ validationResult.fromSchemaVersion(),
+ validationResult.toSchemaVersion(),
+ validationResult.failedTableName(),
+ validationResult.details()
+ ),
+ txResult
+ );
+ }
+ }
+ }
+
+ /**
+ * Finishes a transaction. This operation is idempotent.
+ *
+ * @param partitionIds Collection of enlisted partition groups.
+ * @param txId Transaction id.
+ * @param commit True is the transaction is committed, false otherwise.
+ * @param commitTimestamp Commit timestamp, if applicable.
+ * @return Future to wait of the finish.
+ */
+ private CompletableFuture<TransactionResult> finishTransaction(
+ Collection<TablePartitionId> partitionIds,
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
+ assert !(commit && commitTimestamp == null) : "Cannot commit without
the timestamp.";
+
+ HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp :
clockService.now();
+
+ return reliableCatalogVersionFor(tsForCatalogVersion)
+ .thenCompose(catalogVersion -> applyFinishCommand(
+ txId,
+ commit,
+ commitTimestamp,
+ catalogVersion,
+ toPartitionIdMessage(partitionIds)
+ )
+ )
+ .handle((txOutcome, ex) -> {
+ if (ex != null) {
+ // RAFT 'finish' command failed because the state has
already been written by someone else.
+ // In that case we throw a corresponding exception.
+ if (ex instanceof UnexpectedTransactionStateException)
{
+ UnexpectedTransactionStateException utse =
(UnexpectedTransactionStateException) ex;
+ TransactionResult result =
utse.transactionResult();
+
+ replicaTxFinishMarker.markFinished(txId,
result.transactionState(), result.commitTimestamp());
+
+ throw new
MismatchingTransactionOutcomeInternalException(utse.getMessage(),
utse.transactionResult());
+ }
+ // Otherwise we convert from the internal exception to
the client one.
+ throw new TransactionException(commit ? TX_COMMIT_ERR
: TX_ROLLBACK_ERR, ex);
+ }
+
+ TransactionResult result = (TransactionResult) txOutcome;
+
+ replicaTxFinishMarker.markFinished(txId,
result.transactionState(), result.commitTimestamp());
+
+ return result;
+ });
+ }
+
+ private CompletableFuture<Integer>
reliableCatalogVersionFor(HybridTimestamp ts) {
+ return reliableCatalogVersions.reliableCatalogVersionFor(ts);
+ }
+
+ private CompletableFuture<Object> applyFinishCommand(
+ UUID transactionId,
+ boolean commit,
+ HybridTimestamp commitTimestamp,
Review Comment:
```suggestion
@Nullable HybridTimestamp commitTimestamp,
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ResultWrapper.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.raft.Command;
+
+/**
+ * Wrapper for the update(All)Command processing result that besides result
itself stores actual command that was processed.
+ */
+public class ResultWrapper<T> {
+ private final Command command;
+ private final T result;
+
+ public ResultWrapper(Command command, T result) {
+ this.command = command;
+ this.result = result;
+ }
+
+ public Command getCommand() {
Review Comment:
Why is this needed? Did you extract it from somewhere and will be used in
some upcoming code?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft;
+
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains transaction finishing logic for partitions Raft listener
implementations.
+ */
+public class RaftTxFinishMarker {
+ private final TxManager txManager;
+
+ public RaftTxFinishMarker(TxManager txManager) {
+ this.txManager = txManager;
+ }
+
+ /**
+ * Marks the transaction as finished in local tx state map.
+ *
+ * @param txId Transaction id.
+ * @param commit Whether this is a commit.
+ * @param commitTimestamp Commit timestamp ({@code null} when aborting).
+ * @param commitPartitionId Commit partition ID.
+ */
+ public void markFinished(
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ @Nullable TablePartitionId commitPartitionId
+ ) {
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(
+ commit ? COMMITTED : ABORTED,
Review Comment:
Shall we pass this state directly, instead of the flag?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ResultWrapper.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import org.apache.ignite.internal.raft.Command;
+
+/**
+ * Wrapper for the update(All)Command processing result that besides result
itself stores actual command that was processed.
Review Comment:
If this is only for update commands, should we call it `UpdateResultWrapper`?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -735,7 +735,7 @@ public CompletableFuture<Replica> startReplica(
Function<RaftGroupService, ReplicaListener> listenerFactory,
SnapshotStorageFactory snapshotStorageFactory,
PeersAndLearners newConfiguration,
- RaftGroupListener raftGroupListener,
+ Supplier<RaftGroupListener> raftGroupListenerFactory,
Review Comment:
Why is this change needed? You immediately call `.get` on it
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft;
+
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains transaction finishing logic for partitions Raft listener
implementations.
+ */
+public class RaftTxFinishMarker {
+ private final TxManager txManager;
+
+ public RaftTxFinishMarker(TxManager txManager) {
+ this.txManager = txManager;
+ }
+
+ /**
+ * Marks the transaction as finished in local tx state map.
+ *
+ * @param txId Transaction id.
+ * @param commit Whether this is a commit.
+ * @param commitTimestamp Commit timestamp ({@code null} when aborting).
+ * @param commitPartitionId Commit partition ID.
+ */
+ public void markFinished(
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ @Nullable TablePartitionId commitPartitionId
+ ) {
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(
+ commit ? COMMITTED : ABORTED,
+ old == null ? null : old.txCoordinatorId(),
+ old == null ? commitPartitionId : old.commitPartitionId(),
+ commit ? commitTimestamp : null,
Review Comment:
This check is redundant
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/RaftTxFinishMarker.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator.raft;
+
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxStateMeta;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Contains transaction finishing logic for partitions Raft listener
implementations.
+ */
+public class RaftTxFinishMarker {
+ private final TxManager txManager;
+
+ public RaftTxFinishMarker(TxManager txManager) {
+ this.txManager = txManager;
+ }
+
+ /**
+ * Marks the transaction as finished in local tx state map.
+ *
+ * @param txId Transaction id.
+ * @param commit Whether this is a commit.
+ * @param commitTimestamp Commit timestamp ({@code null} when aborting).
+ * @param commitPartitionId Commit partition ID.
+ */
+ public void markFinished(
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ @Nullable TablePartitionId commitPartitionId
+ ) {
+ txManager.updateTxMeta(txId, old -> new TxStateMeta(
+ commit ? COMMITTED : ABORTED,
+ old == null ? null : old.txCoordinatorId(),
Review Comment:
I would suggest to extract one big `if (old == null)`
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -103,18 +134,23 @@ public void
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
private void processWriteCommand(CommandClosure<WriteCommand> clo) {
Command command = clo.command();
- if (command instanceof FinishTxCommand) {
- FinishTxCommand cmd = (FinishTxCommand) command;
+ long commandIndex = clo.index();
+ long commandTerm = clo.term();
+ @Nullable HybridTimestamp safeTimestamp = clo.safeTimestamp();
+ assert safeTimestamp == null || command instanceof
SafeTimePropagatingCommand : command;
+
+ IgniteBiTuple<Serializable, Boolean> result = null;
Review Comment:
Can we do something with this ugly `IgniteBiTuple`?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+ /** Factory to create RAFT command messages. */
+ private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
+ new PartitionReplicationMessagesFactory();
+
+ /** Factory for creating replica command messages. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
+ private final TxStatePartitionStorage txStatePartitionStorage;
+ private final ClockService clockService;
+ private final TxManager txManager;
+ private final ReplicationGroupId replicationGroupId;
+
+ private final SchemaCompatibilityValidator schemaCompatValidator;
+ private final ReliableCatalogVersions reliableCatalogVersions;
+ private final ReplicationRaftCommandApplicator raftCommandApplicator;
+ private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+ /** Constructor. */
+ public TxFinishReplicaRequestHandler(
+ TxStatePartitionStorage txStatePartitionStorage,
+ ClockService clockService,
+ TxManager txManager,
+ ValidationSchemasSource validationSchemasSource,
+ SchemaSyncService schemaSyncService,
+ CatalogService catalogService,
+ RaftCommandRunner raftCommandRunner,
+ ReplicationGroupId replicationGroupId
+ ) {
+ this.txStatePartitionStorage = txStatePartitionStorage;
+ this.clockService = clockService;
+ this.txManager = txManager;
+ this.replicationGroupId = replicationGroupId;
+
+ schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
+ raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+ replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+ }
+
+ /**
+ * Processes transaction finish request.
+ * <ol>
+ * <li>Get commit timestamp from finish replica request.</li>
+ * <li>If attempting a commit, validate commit (and, if not valid,
switch to abort)</li>
+ * <li>Run specific raft {@code FinishTxCommand} command, that will
apply txn state to corresponding txStateStorage.</li>
+ * <li>Send cleanup requests to all enlisted primary replicas.</li>
+ * </ol>
+ *
+ * @param request Transaction finish request.
+ * @return future result of the operation.
+ */
+ public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest
request) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use
ZonePartitionIdMessage and remove cast
+ Map<TablePartitionId, String> enlistedGroups =
asTablePartitionIdStringMap(request.groups());
+
+ UUID txId = request.txId();
+
+ if (request.commit()) {
+ HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+ return schemaCompatValidator.validateCommit(txId,
enlistedGroups.keySet(), commitTimestamp)
+ .thenCompose(validationResult ->
+ finishAndCleanup(
+ enlistedGroups,
+ validationResult.isSuccessful(),
+ validationResult.isSuccessful() ?
commitTimestamp : null,
+ txId
+ ).thenApply(txResult -> {
+
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+ return txResult;
+ }));
+ } else {
+ // Aborting.
+ return finishAndCleanup(enlistedGroups, false, null, txId);
+ }
+ }
+
+ private static Map<TablePartitionId, String>
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+ var result = new HashMap<TablePartitionId, String>(messages.size());
+
+ for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+ result.put(e.getKey().asTablePartitionId(), e.getValue());
+ }
+
+ return result;
+ }
+
+ private CompletableFuture<TransactionResult> finishAndCleanup(
+ Map<TablePartitionId, String> enlistedPartitions,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ UUID txId
+ ) {
+ // Read TX state from the storage, we will need this state to check if
the locks are released.
+ // Since this state is written only on the transaction finish (see
PartitionListener.handleFinishTxCommand),
+ // the value of txMeta can be either null or COMMITTED/ABORTED. No
other values is expected.
Review Comment:
```suggestion
// the value of txMeta can be either null or COMMITTED/ABORTED. No
other values are expected.
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+ /** Factory to create RAFT command messages. */
+ private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
+ new PartitionReplicationMessagesFactory();
+
+ /** Factory for creating replica command messages. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
+ private final TxStatePartitionStorage txStatePartitionStorage;
+ private final ClockService clockService;
+ private final TxManager txManager;
+ private final ReplicationGroupId replicationGroupId;
+
+ private final SchemaCompatibilityValidator schemaCompatValidator;
+ private final ReliableCatalogVersions reliableCatalogVersions;
+ private final ReplicationRaftCommandApplicator raftCommandApplicator;
+ private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+ /** Constructor. */
+ public TxFinishReplicaRequestHandler(
+ TxStatePartitionStorage txStatePartitionStorage,
+ ClockService clockService,
+ TxManager txManager,
+ ValidationSchemasSource validationSchemasSource,
+ SchemaSyncService schemaSyncService,
+ CatalogService catalogService,
+ RaftCommandRunner raftCommandRunner,
+ ReplicationGroupId replicationGroupId
+ ) {
+ this.txStatePartitionStorage = txStatePartitionStorage;
+ this.clockService = clockService;
+ this.txManager = txManager;
+ this.replicationGroupId = replicationGroupId;
+
+ schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
+ raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+ replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+ }
+
+ /**
+ * Processes transaction finish request.
+ * <ol>
+ * <li>Get commit timestamp from finish replica request.</li>
+ * <li>If attempting a commit, validate commit (and, if not valid,
switch to abort)</li>
+ * <li>Run specific raft {@code FinishTxCommand} command, that will
apply txn state to corresponding txStateStorage.</li>
+ * <li>Send cleanup requests to all enlisted primary replicas.</li>
+ * </ol>
+ *
+ * @param request Transaction finish request.
+ * @return future result of the operation.
+ */
+ public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest
request) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use
ZonePartitionIdMessage and remove cast
+ Map<TablePartitionId, String> enlistedGroups =
asTablePartitionIdStringMap(request.groups());
+
+ UUID txId = request.txId();
+
+ if (request.commit()) {
+ HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+ return schemaCompatValidator.validateCommit(txId,
enlistedGroups.keySet(), commitTimestamp)
+ .thenCompose(validationResult ->
+ finishAndCleanup(
+ enlistedGroups,
+ validationResult.isSuccessful(),
+ validationResult.isSuccessful() ?
commitTimestamp : null,
+ txId
+ ).thenApply(txResult -> {
+
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+ return txResult;
+ }));
+ } else {
+ // Aborting.
+ return finishAndCleanup(enlistedGroups, false, null, txId);
+ }
+ }
+
+ private static Map<TablePartitionId, String>
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+ var result = new HashMap<TablePartitionId, String>(messages.size());
Review Comment:
you need to use `capacity` here
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+ /** Factory to create RAFT command messages. */
+ private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
+ new PartitionReplicationMessagesFactory();
+
+ /** Factory for creating replica command messages. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
+ private final TxStatePartitionStorage txStatePartitionStorage;
+ private final ClockService clockService;
+ private final TxManager txManager;
+ private final ReplicationGroupId replicationGroupId;
+
+ private final SchemaCompatibilityValidator schemaCompatValidator;
+ private final ReliableCatalogVersions reliableCatalogVersions;
+ private final ReplicationRaftCommandApplicator raftCommandApplicator;
+ private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+ /** Constructor. */
+ public TxFinishReplicaRequestHandler(
+ TxStatePartitionStorage txStatePartitionStorage,
+ ClockService clockService,
+ TxManager txManager,
+ ValidationSchemasSource validationSchemasSource,
+ SchemaSyncService schemaSyncService,
+ CatalogService catalogService,
+ RaftCommandRunner raftCommandRunner,
+ ReplicationGroupId replicationGroupId
+ ) {
+ this.txStatePartitionStorage = txStatePartitionStorage;
+ this.clockService = clockService;
+ this.txManager = txManager;
+ this.replicationGroupId = replicationGroupId;
+
+ schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
+ raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+ replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+ }
+
+ /**
+ * Processes transaction finish request.
+ * <ol>
+ * <li>Get commit timestamp from finish replica request.</li>
+ * <li>If attempting a commit, validate commit (and, if not valid,
switch to abort)</li>
+ * <li>Run specific raft {@code FinishTxCommand} command, that will
apply txn state to corresponding txStateStorage.</li>
Review Comment:
What do you mean by `specific`?
##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws
Exception {
assertThat(kvView2.getAll(null, data2.keySet()), is(data2));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void txFinishCommandGetsReplicated(boolean commit) throws Exception {
+ startCluster(3);
+
+ // Create a zone with a single partition on every node.
+ int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size());
+
+ int tableId1 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+ int tableId2 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME2);
+
+ var zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+ setupTableIdToZoneIdConverter(zonePartitionId, new
TablePartitionId(tableId1, 0), new TablePartitionId(tableId2, 0));
+
+ cluster.forEach(Node::waitForMetadataCompletenessAtNow);
+
+ Node node = cluster.get(0);
+
+ setPrimaryReplica(node, zonePartitionId);
+
+ KeyValueView<Integer, Integer> kvView1 =
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class,
Integer.class);
+ KeyValueView<Integer, Integer> kvView2 =
node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class,
Integer.class);
+
+ InternalTransaction transaction =
unwrapInternalTransaction(node.transactions().begin());
+ kvView1.put(transaction, 42, 69);
+ kvView2.put(transaction, 142, 169);
+ if (commit) {
+ transaction.commit();
+ } else {
+ transaction.rollback();
+ }
+
+ for (Node currentNode : cluster) {
+ assertTrue(waitForCondition(
+ () ->
!txStatesInPartitionStorage(currentNode.txStatePartitionStorage(zoneId,
0)).isEmpty(),
+ SECONDS.toMillis(10)
+ ));
+ }
+
+ List<CountExpectation> expectations = new ArrayList<>();
+ for (int i = 0; i < cluster.size(); i++) {
+ Node currentNode = cluster.get(i);
+ expectations.add(new CountExpectation(
Review Comment:
Why do we need this `CountExpectation` class? Can we just add a bunch of
`Executables` directly?
##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws
Exception {
assertThat(kvView2.getAll(null, data2.keySet()), is(data2));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void txFinishCommandGetsReplicated(boolean commit) throws Exception {
+ startCluster(3);
+
+ // Create a zone with a single partition on every node.
+ int zoneId = createZone(TEST_ZONE_NAME, 1, cluster.size());
+
+ int tableId1 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME1);
+ int tableId2 = createTable(TEST_ZONE_NAME, TEST_TABLE_NAME2);
+
+ var zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+ setupTableIdToZoneIdConverter(zonePartitionId, new
TablePartitionId(tableId1, 0), new TablePartitionId(tableId2, 0));
+
+ cluster.forEach(Node::waitForMetadataCompletenessAtNow);
+
+ Node node = cluster.get(0);
+
+ setPrimaryReplica(node, zonePartitionId);
+
+ KeyValueView<Integer, Integer> kvView1 =
node.tableManager.table(TEST_TABLE_NAME1).keyValueView(Integer.class,
Integer.class);
+ KeyValueView<Integer, Integer> kvView2 =
node.tableManager.table(TEST_TABLE_NAME2).keyValueView(Integer.class,
Integer.class);
+
+ InternalTransaction transaction =
unwrapInternalTransaction(node.transactions().begin());
Review Comment:
Why do you need this unwrapping?
##########
modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItZoneDataReplicationTest.java:
##########
@@ -404,6 +420,99 @@ void testDataRebalance(boolean truncateRaftLog) throws
Exception {
assertThat(kvView2.getAll(null, data2.keySet()), is(data2));
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void txFinishCommandGetsReplicated(boolean commit) throws Exception {
Review Comment:
I would suggest to extract this test in a separate class, because it will
become super huge if we will be adding all commands here. But I don't insist.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxFinishReplicaRequestHandler.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.partition.replicator;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
+import static org.apache.ignite.internal.tx.TxState.ABORTED;
+import static org.apache.ignite.internal.tx.TxState.COMMITTED;
+import static org.apache.ignite.internal.tx.TxState.isFinalState;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_COMMIT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ROLLBACK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.hlc.ClockService;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
+import
org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommandBuilder;
+import
org.apache.ignite.internal.partition.replicator.raft.UnexpectedTransactionStateException;
+import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.CompatValidationResult;
+import
org.apache.ignite.internal.partition.replicator.schemacompat.SchemaCompatibilityValidator;
+import org.apache.ignite.internal.raft.service.RaftCommandRunner;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage;
+import org.apache.ignite.internal.schema.SchemaSyncService;
+import org.apache.ignite.internal.tx.IncompatibleSchemaAbortException;
+import
org.apache.ignite.internal.tx.MismatchingTransactionOutcomeInternalException;
+import org.apache.ignite.internal.tx.TransactionResult;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxMeta;
+import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles {@link TxFinishReplicaRequest}.
+ */
+public class TxFinishReplicaRequestHandler {
+ private static final IgniteLogger LOG =
Loggers.forClass(TxFinishReplicaRequestHandler.class);
+
+ /** Factory to create RAFT command messages. */
+ private static final PartitionReplicationMessagesFactory
PARTITION_REPLICATION_MESSAGES_FACTORY =
+ new PartitionReplicationMessagesFactory();
+
+ /** Factory for creating replica command messages. */
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
+ private final TxStatePartitionStorage txStatePartitionStorage;
+ private final ClockService clockService;
+ private final TxManager txManager;
+ private final ReplicationGroupId replicationGroupId;
+
+ private final SchemaCompatibilityValidator schemaCompatValidator;
+ private final ReliableCatalogVersions reliableCatalogVersions;
+ private final ReplicationRaftCommandApplicator raftCommandApplicator;
+ private final ReplicaTxFinishMarker replicaTxFinishMarker;
+
+
+ /** Constructor. */
+ public TxFinishReplicaRequestHandler(
+ TxStatePartitionStorage txStatePartitionStorage,
+ ClockService clockService,
+ TxManager txManager,
+ ValidationSchemasSource validationSchemasSource,
+ SchemaSyncService schemaSyncService,
+ CatalogService catalogService,
+ RaftCommandRunner raftCommandRunner,
+ ReplicationGroupId replicationGroupId
+ ) {
+ this.txStatePartitionStorage = txStatePartitionStorage;
+ this.clockService = clockService;
+ this.txManager = txManager;
+ this.replicationGroupId = replicationGroupId;
+
+ schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
+ raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
+ replicaTxFinishMarker = new ReplicaTxFinishMarker(txManager);
+ }
+
+ /**
+ * Processes transaction finish request.
+ * <ol>
+ * <li>Get commit timestamp from finish replica request.</li>
+ * <li>If attempting a commit, validate commit (and, if not valid,
switch to abort)</li>
+ * <li>Run specific raft {@code FinishTxCommand} command, that will
apply txn state to corresponding txStateStorage.</li>
+ * <li>Send cleanup requests to all enlisted primary replicas.</li>
+ * </ol>
+ *
+ * @param request Transaction finish request.
+ * @return future result of the operation.
+ */
+ public CompletableFuture<TransactionResult> handle(TxFinishReplicaRequest
request) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19170 Use
ZonePartitionIdMessage and remove cast
+ Map<TablePartitionId, String> enlistedGroups =
asTablePartitionIdStringMap(request.groups());
+
+ UUID txId = request.txId();
+
+ if (request.commit()) {
+ HybridTimestamp commitTimestamp = request.commitTimestamp();
+
+ return schemaCompatValidator.validateCommit(txId,
enlistedGroups.keySet(), commitTimestamp)
+ .thenCompose(validationResult ->
+ finishAndCleanup(
+ enlistedGroups,
+ validationResult.isSuccessful(),
+ validationResult.isSuccessful() ?
commitTimestamp : null,
+ txId
+ ).thenApply(txResult -> {
+
throwIfSchemaValidationOnCommitFailed(validationResult, txResult);
+ return txResult;
+ }));
+ } else {
+ // Aborting.
+ return finishAndCleanup(enlistedGroups, false, null, txId);
+ }
+ }
+
+ private static Map<TablePartitionId, String>
asTablePartitionIdStringMap(Map<TablePartitionIdMessage, String> messages) {
+ var result = new HashMap<TablePartitionId, String>(messages.size());
+
+ for (Entry<TablePartitionIdMessage, String> e : messages.entrySet()) {
+ result.put(e.getKey().asTablePartitionId(), e.getValue());
+ }
+
+ return result;
+ }
+
+ private CompletableFuture<TransactionResult> finishAndCleanup(
+ Map<TablePartitionId, String> enlistedPartitions,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp,
+ UUID txId
+ ) {
+ // Read TX state from the storage, we will need this state to check if
the locks are released.
+ // Since this state is written only on the transaction finish (see
PartitionListener.handleFinishTxCommand),
+ // the value of txMeta can be either null or COMMITTED/ABORTED. No
other values is expected.
+ TxMeta txMeta = txStatePartitionStorage.get(txId);
+
+ // Check whether a transaction has already been finished.
+ boolean transactionAlreadyFinished = txMeta != null &&
isFinalState(txMeta.txState());
+
+ if (transactionAlreadyFinished) {
+ // - The Coordinator calls use same tx state over retries, both
abort and commit are possible.
+ // - Server side recovery may only change tx state to aborted.
+ // - The Coordinator itself should prevent user calls with
different proposed state to the one,
+ // that was already triggered (e.g. the client side ->
txCoordinator.commitAsync(); txCoordinator.rollbackAsync()).
+ // - A coordinator might send a commit, then die, but the commit
message might still arrive at the commit partition primary.
+ // If it arrived with a delay, another node might come across a
write intent/lock from that tx
+ // and realize that the coordinator is no longer available and
start tx recovery.
+ // The original commit message might arrive later than the
recovery one,
+ // hence a 'commit over rollback' case.
+ // The possible states that a 'commit' is allowed to see:
+ // - null (if it's the first change state attempt)
+ // - committed (if it was already updated in the previous attempt)
+ // - aborted (if it was aborted by the initiate recovery logic,
+ // though this is a very unlikely case because initiate recovery
will only roll back the tx if coordinator is dead).
+ //
+ // Within 'roll back' it's allowed to see:
+ // - null (if it's the first change state attempt)
+ // - aborted (if it was already updated in the previous attempt
or the result of a concurrent recovery)
+ // - commit (if initiate recovery has started, but a delayed
message from the coordinator finally arrived and executed earlier).
+
+ // Let the client know a transaction has finished with a different
outcome.
+ if (commit != (txMeta.txState() == COMMITTED)) {
+ LOG.error("Failed to finish a transaction that is already
finished [txId={}, expectedState={}, actualState={}].",
+ txId,
+ commit ? COMMITTED : ABORTED,
+ txMeta.txState()
+ );
+
+ throw new MismatchingTransactionOutcomeInternalException(
+ "Failed to change the outcome of a finished
transaction [txId=" + txId + ", txState=" + txMeta.txState() + "].",
+ new TransactionResult(txMeta.txState(),
txMeta.commitTimestamp())
+ );
+ }
+
+ return completedFuture(new TransactionResult(txMeta.txState(),
txMeta.commitTimestamp()));
+ }
+
+ return finishTransaction(enlistedPartitions.keySet(), txId, commit,
commitTimestamp)
+ .thenCompose(txResult ->
+ txManager.cleanup(replicationGroupId, enlistedPartitions,
commit, commitTimestamp, txId)
+ .thenApply(v -> txResult)
+ );
+ }
+
+ private static void
throwIfSchemaValidationOnCommitFailed(CompatValidationResult validationResult,
TransactionResult txResult) {
+ if (!validationResult.isSuccessful()) {
+ if (validationResult.isTableDropped()) {
+ throw new IncompatibleSchemaAbortException(
+ format("Commit failed because a table was already
dropped [table={}]", validationResult.failedTableName()),
+ txResult
+ );
+ } else {
+ throw new IncompatibleSchemaAbortException(
+ format(
+ "Commit failed because schema is not
forward-compatible "
+ + "[fromSchemaVersion={},
toSchemaVersion={}, table={}, details={}]",
+ validationResult.fromSchemaVersion(),
+ validationResult.toSchemaVersion(),
+ validationResult.failedTableName(),
+ validationResult.details()
+ ),
+ txResult
+ );
+ }
+ }
+ }
+
+ /**
+ * Finishes a transaction. This operation is idempotent.
+ *
+ * @param partitionIds Collection of enlisted partition groups.
+ * @param txId Transaction id.
+ * @param commit True is the transaction is committed, false otherwise.
+ * @param commitTimestamp Commit timestamp, if applicable.
+ * @return Future to wait of the finish.
+ */
+ private CompletableFuture<TransactionResult> finishTransaction(
+ Collection<TablePartitionId> partitionIds,
+ UUID txId,
+ boolean commit,
+ @Nullable HybridTimestamp commitTimestamp
+ ) {
+ assert !(commit && commitTimestamp == null) : "Cannot commit without
the timestamp.";
+
+ HybridTimestamp tsForCatalogVersion = commit ? commitTimestamp :
clockService.now();
+
+ return reliableCatalogVersionFor(tsForCatalogVersion)
+ .thenCompose(catalogVersion -> applyFinishCommand(
+ txId,
+ commit,
+ commitTimestamp,
+ catalogVersion,
+ toPartitionIdMessage(partitionIds)
+ )
Review Comment:
We can save us some indentation here:
```
.thenCompose(catalogVersion -> applyFinishCommand(
txId,
commit,
commitTimestamp,
catalogVersion,
toPartitionIdMessage(partitionIds)
))
```
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -124,6 +160,23 @@ private void
processWriteCommand(CommandClosure<WriteCommand> clo) {
clo.result(null);
}
+
+ // result == null means that the command either was not handled by
anyone (and clo.result() is called) or
+ // that it was delegated to a table processor (which called
clo.result()).
+ if (result != null) {
+ if (Boolean.TRUE.equals(result.get2())) {
Review Comment:
Also, this and the previous conditions can be united
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]