vldpyatkov commented on code in PR #2585:
URL: https://github.com/apache/ignite-3/pull/2585#discussion_r1328696752


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -188,6 +191,36 @@ public TxStateMeta stateMeta(UUID txId) {
         return txStateMap.get(txId);
     }
 
+    @Override
+    public CompletableFuture<TransactionMeta> 
transactionMetaReadTimestampAware(
+            UUID txId,
+            HybridTimestamp readTimestamp,
+            TxStateStorage storage
+    ) {
+        AtomicReference<CompletableFuture<TransactionMeta>> txStateFutRef = 
new AtomicReference<>();
+
+        txStateMap.compute(txId, (k, v) -> {
+            TransactionMeta txMeta = v;
+
+            if (txMeta == null) {
+                txMeta = storage.get(txId);
+            }
+
+            if (txMeta != null && txMeta instanceof TxStateMetaFinishing) {
+                txStateFutRef.set(((TxStateMetaFinishing) 
txMeta).txFinishFuture());
+            } else {
+                // All future transactions will be committed after the 
resolution processed.
+                clock.update(readTimestamp);

Review Comment:
   It a commit partition does not ever generate commit timestamp now, there is 
no point to update clock on the commit partition.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -336,8 +328,8 @@ public PartitionReplicaListener(
 
     @Override
     public CompletableFuture<?> invoke(ReplicaRequest request, String 
senderId) {
-        if (request instanceof TxStateReplicaRequest) {
-            return processTxStateReplicaRequest((TxStateReplicaRequest) 
request);
+        if (request instanceof TxStateCommitPartitionRequest) {
+            return 
processTxStateReplicaRequest((TxStateCommitPartitionRequest) request);

Review Comment:
   Rename `processTxStateCommitPartitionRequest(request)`.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java:
##########
@@ -44,29 +69,202 @@ public class TransactionStateResolver {
     /** Function that resolves a node consistent ID to a cluster node. */
     private final Function<String, ClusterNode> clusterNodeResolver;
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this 
ticket this resolver will be no longer needed, as
+    // TODO we will store coordinator as ClusterNode in local tx state map.
+    /** Function that resolves a node non-consistent ID to a cluster node. */
+    private final Function<String, ClusterNode> clusterNodeResolverById;
+
+    private final Map<UUID, CompletableFuture<TransactionMeta>> txStateFutures 
= new ConcurrentHashMap<>();
+
+    private final TxManager txManager;
+
+    private final HybridClock clock;
+
+    private final MessagingService messagingService;
+
     /**
      * The constructor.
      *
      * @param replicaService Replication service.
+     * @param txManager Transaction manager.
+     * @param clock Node clock.
+     * @param clusterNodeResolver Cluster node resolver.
+     * @param clusterNodeResolverById Cluster node resolver using 
non-consistent id.
+     * @param messagingService Messaging service.
      */
-    public TransactionStateResolver(ReplicaService replicaService, 
Function<String, ClusterNode> clusterNodeResolver) {
+    public TransactionStateResolver(
+            ReplicaService replicaService,
+            TxManager txManager,
+            HybridClock clock,
+            Function<String, ClusterNode> clusterNodeResolver,
+            Function<String, ClusterNode> clusterNodeResolverById,
+            MessagingService messagingService
+    ) {
         this.replicaService = replicaService;
+        this.txManager = txManager;
+        this.clock = clock;
         this.clusterNodeResolver = clusterNodeResolver;
+        this.clusterNodeResolverById = clusterNodeResolverById;
+        this.messagingService = messagingService;
+    }
+
+    /**
+     * This should be called in order to allow the transaction state resolver 
to listen to {@link TxStateCoordinatorRequest} messages.
+     */
+    public void start() {
+        messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, 
correlationId) -> {
+            if (msg instanceof TxStateCoordinatorRequest) {
+                TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) 
msg;
+
+                processTxStateRequest(req)
+                        .thenAccept(txStateMeta -> {
+                            NetworkMessage response = FACTORY.txStateResponse()
+                                    .txStateMeta(txStateMeta)
+                                    .timestampLong(clock.nowLong())
+                                    .build();
+
+                            messagingService.respond(sender, response, 
correlationId);
+                        });
+            }
+        });
     }
 
     /**
-     * Sends a transaction state request to the primary replica.
+     * Resolves transaction state locally, if possible, or distributively, if 
needed.
      *
-     * @param replicaGrp Replication group id.
-     * @param request Status request.
-     * @return Result future.
+     * @param txId Transaction id.
+     * @param commitGrpId Commit partition group id.
+     * @param timestamp Timestamp.
+     * @return Future with the transaction state meta as a result.
      */
-    public CompletableFuture<TxMeta> sendMetaRequest(ReplicationGroupId 
replicaGrp, TxStateReplicaRequest request) {
-        CompletableFuture<TxMeta> resFut = new CompletableFuture<>();
+    public CompletableFuture<TransactionMeta> resolveTxState(
+            UUID txId,
+            ReplicationGroupId commitGrpId,
+            HybridTimestamp timestamp
+    ) {
+        TxStateMeta localMeta = txManager.stateMeta(txId);
+
+        if (localMeta != null && isFinalState(localMeta.txState())) {
+            return completedFuture(localMeta);
+        }
 
-        sendAndRetry(resFut, replicaGrp, request);
+        CompletableFuture<TransactionMeta> future = 
txStateFutures.compute(txId, (k, v) -> {
+            if (v == null) {
+                v = new CompletableFuture<>();
 
-        return resFut;
+                resolveDistributiveTxState(txId, localMeta, commitGrpId, 
timestamp, v);
+            }
+
+            return v;
+        });
+
+        future.whenComplete((v, e) -> txStateFutures.remove(txId));
+
+        return future;
+    }
+
+    /**
+     * Resolve the transaction state distributively. This method doesn't 
process final tx states.
+     *
+     * @param txId Transaction id.
+     * @param localMeta Local tx meta.
+     * @param commitGrpId Commit partition group id.
+     * @param timestamp Timestamp to pass to target node.
+     * @param txMetaFuture Tx meta future to complete with the result.
+     */
+    private void resolveDistributiveTxState(
+            UUID txId,
+            @Nullable TxStateMeta localMeta,
+            ReplicationGroupId commitGrpId,
+            HybridTimestamp timestamp,
+            CompletableFuture<TransactionMeta> txMetaFuture
+    ) {
+        assert localMeta == null || !isFinalState(localMeta.txState()) : 
"Unexpected tx meta [txId" + txId + ", meta=" + localMeta + ']';
+
+        HybridTimestamp timestamp0 = timestamp == null ? 
HybridTimestamp.MIN_VALUE : timestamp;
+
+        if (localMeta == null) {
+            // Fallback to commit partition path, because we don't have 
coordinator id.
+            resolveTxStateFromCommitPartition(txId, commitGrpId, timestamp0, 
txMetaFuture);
+        } else if (localMeta.txState() == PENDING) {
+            resolveTxStateFromTxCoordinator(txId, localMeta.txCoordinatorId(), 
commitGrpId, timestamp0, txMetaFuture);
+        } else if (localMeta.txState() == FINISHING) {
+            assert localMeta instanceof TxStateMetaFinishing;
+
+            ((TxStateMetaFinishing) 
localMeta).txFinishFuture().whenComplete((v, e) -> {
+                if (e == null) {
+                    txMetaFuture.complete(v);
+                } else {
+                    txMetaFuture.completeExceptionally(e);
+                }
+            });
+        } else {
+            assert localMeta.txState() == ABANDONED : "Unexpected transaction 
state [txId=" + txId + ", txStateMeta=" + localMeta + ']';
+
+            txMetaFuture.complete(localMeta);
+        }
+    }
+
+    private void resolveTxStateFromTxCoordinator(
+            UUID txId,
+            String coordinatorId,
+            ReplicationGroupId commitGrpId,
+            HybridTimestamp timestamp,
+            CompletableFuture<TransactionMeta> txMetaFuture
+    ) {
+        updateLocalTxMapAfterDistributedStateResolved(txId, txMetaFuture);
+
+        ClusterNode coordinator = clusterNodeResolverById.apply(coordinatorId);
+
+        if (coordinator == null) {
+            // This means the coordinator node have either left the cluster or 
restarted.
+            resolveTxStateFromCommitPartition(txId, commitGrpId, timestamp, 
txMetaFuture);
+        } else {
+            CompletableFuture<TransactionMeta> coordinatorTxMetaFuture = new 
CompletableFuture<>();
+
+            coordinatorTxMetaFuture.whenComplete((v, e) -> {
+                assert v != null : "Unexpected result from transaction 
coordinator: unknown transaction state [txId=" + txId
+                        + ", transactionCoordinator=" + coordinator + ']';
+
+                if (e == null) {
+                    txMetaFuture.complete(v);
+                } else {
+                    txMetaFuture.completeExceptionally(e);
+                }
+            });
+
+            TxStateCoordinatorRequest request = 
FACTORY.txStateCoordinatorRequest()
+                    .readTimestampLong(timestamp.longValue())
+                    .txId(txId)
+                    .build();
+
+            sendAndRetry(coordinatorTxMetaFuture, coordinator, request);
+        }
+    }
+
+    private void resolveTxStateFromCommitPartition(
+            UUID txId,
+            ReplicationGroupId commitGrpId,
+            HybridTimestamp timestamp,
+            CompletableFuture<TransactionMeta> txMetaFuture
+    ) {
+        TxStateCommitPartitionRequest request = 
FACTORY.txStateCommitPartitionRequest()
+                .groupId(commitGrpId)
+                .readTimestampLong(timestamp.longValue())

Review Comment:
   Likly, the commit timestamp can be exceluded for the commit partition state 
request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to