vldpyatkov commented on code in PR #2585:
URL: https://github.com/apache/ignite-3/pull/2585#discussion_r1326501046


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxStateReplicaRequest.java:
##########
@@ -27,7 +27,7 @@
 /**
  * Transaction state request.
  */
-@Transferable(TxMessageGroup.TX_STATE_REQUEST)
+@Transferable(TxMessageGroup.TX_STATE_REPLICA_REQUEST)
 public interface TxStateReplicaRequest extends ReplicaRequest {

Review Comment:
   Better to call it more specific: TxStateCommitPartitionRequest?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java:
##########
@@ -115,4 +308,64 @@ private void sendAndRetry(CompletableFuture<TxMeta> 
resFut, ReplicationGroupId r
             }
         });
     }
+
+    /**
+     * Tries to send a request to the given node.
+     *
+     * @param resFut Response future.
+     * @param node Node to send to.
+     * @param request Request.
+     */
+    private void sendAndRetry(CompletableFuture<TransactionMeta> resFut, 
ClusterNode node, TxStateRequest request) {
+        messagingService.invoke(node, request, RPC_TIMEOUT).thenAccept(resp -> 
{
+            assert resp instanceof TxStateResponse : "Unsupported response 
type [type=" + resp.getClass().getSimpleName() + ']';
+
+            TxStateResponse response = (TxStateResponse) resp;
+
+            resFut.complete(response.txStateMeta());
+        });
+    }
+
+    /**
+     * Processes the transaction state requests that are used for coordinator 
path based write intent resolution. Can't return
+     * {@link org.apache.ignite.internal.tx.TxState#FINISHING}, it waits for 
actual completion instead.
+     *
+     * @param request Request.
+     * @return Future that should be completed with transaction state meta.
+     */
+    private CompletableFuture<TransactionMeta> 
processTxStateRequest(TxStateRequest request) {
+        clock.update(request.readTimestamp());
+
+        UUID txId = request.txId();
+
+        TxStateMeta txStateMeta = txManager.stateMeta(txId);
+
+        if (txStateMeta != null && txStateMeta.txState() == FINISHING) {
+            assert txStateMeta instanceof TxStateMetaFinishing;
+
+            TxStateMetaFinishing txStateMetaFinishing = (TxStateMetaFinishing) 
txStateMeta;
+
+            AtomicReference<CompletableFuture<TransactionMeta>> futRef = new 
AtomicReference<>();
+
+            txStateFutures.computeIfAbsent(txId, k -> {
+                TxStateMeta meta = txManager.stateMeta(txId);
+
+                if (meta != null && meta.txState() != FINISHING) {
+                    futRef.set(completedFuture(meta));
+
+                    return null;
+                }
+
+                futRef.set(txStateMetaFinishing.future());
+
+                return futRef.get();
+            });
+
+            futRef.get().whenComplete((v, e) -> txStateFutures.remove(txId));
+
+            return futRef.get();

Review Comment:
   Why don't you just return `txStateMetaFinishing.future()`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java:
##########
@@ -44,29 +72,194 @@ public class TransactionStateResolver {
     /** Function that resolves a node consistent ID to a cluster node. */
     private final Function<String, ClusterNode> clusterNodeResolver;
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-20408 after this 
ticket this resolver will be no longer needed, as
+    // TODO we will store coordinator as ClusterNode in local tx state map.
+    /** Function that resolves a node non-consistent ID to a cluster node. */
+    private final Function<String, ClusterNode> clusterNodeResolverById;
+
+    private final Map<UUID, CompletableFuture<TransactionMeta>> txStateFutures 
= new ConcurrentHashMap<>();
+
+    private final Lazy<String> localNodeId;
+
+    private final TxManager txManager;
+
+    private final HybridClock clock;
+
+    private final MessagingService messagingService;
+
     /**
      * The constructor.
      *
      * @param replicaService Replication service.
+     * @param txManager Transaction manager.
+     * @param clock Node clock.
+     * @param clusterNodeResolver Cluster node resolver.
+     * @param localNodeIdSupplier Local node id supplier.
      */
-    public TransactionStateResolver(ReplicaService replicaService, 
Function<String, ClusterNode> clusterNodeResolver) {
+    public TransactionStateResolver(
+            ReplicaService replicaService,
+            TxManager txManager,
+            HybridClock clock,
+            Function<String, ClusterNode> clusterNodeResolver,
+            Function<String, ClusterNode> clusterNodeResolverById,
+            Supplier<String> localNodeIdSupplier,
+            MessagingService messagingService
+    ) {
         this.replicaService = replicaService;
+        this.txManager = txManager;
+        this.clock = clock;
         this.clusterNodeResolver = clusterNodeResolver;
+        this.clusterNodeResolverById = clusterNodeResolverById;
+        this.localNodeId = new Lazy<>(localNodeIdSupplier);
+        this.messagingService = messagingService;
     }
 
     /**
-     * Sends a transaction state request to the primary replica.
+     * This should be called in order to allow the transaction state resolver 
to listen to {@link TxStateRequest} messages.
+     */
+    public void start() {
+        messagingService.addMessageHandler(TxMessageGroup.class, (msg, sender, 
correlationId) -> {
+            if (msg instanceof TxStateRequest) {
+                TxStateRequest req = (TxStateRequest) msg;
+
+                processTxStateRequest(req)
+                        .thenAccept(txStateMeta -> {
+                            NetworkMessage response = FACTORY.txStateResponse()
+                                    .txStateMeta(txStateMeta)
+                                    .timestampLong(clock.nowLong())
+                                    .build();
+
+                            messagingService.respond(sender, response, 
correlationId);
+                        });
+            }
+        });
+    }
+
+    /**
+     * Resolves transaction state locally, if possible, or distributively, if 
needed.
      *
-     * @param replicaGrp Replication group id.
-     * @param request Status request.
-     * @return Result future.
+     * @param txId Transaction id.
+     * @param commitGrpId Commit partition group id.
+     * @param timestamp Timestamp.
+     * @return Future with the transaction state meta as a result.
      */
-    public CompletableFuture<TxMeta> sendMetaRequest(ReplicationGroupId 
replicaGrp, TxStateReplicaRequest request) {
-        CompletableFuture<TxMeta> resFut = new CompletableFuture<>();
+    public CompletableFuture<TransactionMeta> resolveTxState(
+            UUID txId,
+            ReplicationGroupId commitGrpId,
+            HybridTimestamp timestamp
+    ) {
+        TxStateMeta localMeta = txManager.stateMeta(txId);
+
+        if (localMeta != null) {
+            if (isFinalState(localMeta.txState())) {
+                return completedFuture(localMeta);
+            }
+
+            // If the local node is a tx coordinator:
+            // if tx state is FINISHING, we will have to wait for the actual 
finish, otherwise we can return the current state.
+            if (localNodeId.get().equals(localMeta.txCoordinatorId()) && 
localMeta.txState() != FINISHING) {

Review Comment:
   Don't worth to do it separately of common case, because if we have to send a 
message to local node the message bypass the network layer.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java:
##########
@@ -115,4 +308,64 @@ private void sendAndRetry(CompletableFuture<TxMeta> 
resFut, ReplicationGroupId r
             }
         });
     }
+
+    /**
+     * Tries to send a request to the given node.
+     *
+     * @param resFut Response future.
+     * @param node Node to send to.
+     * @param request Request.
+     */
+    private void sendAndRetry(CompletableFuture<TransactionMeta> resFut, 
ClusterNode node, TxStateRequest request) {
+        messagingService.invoke(node, request, RPC_TIMEOUT).thenAccept(resp -> 
{
+            assert resp instanceof TxStateResponse : "Unsupported response 
type [type=" + resp.getClass().getSimpleName() + ']';
+
+            TxStateResponse response = (TxStateResponse) resp;
+
+            resFut.complete(response.txStateMeta());
+        });
+    }
+
+    /**
+     * Processes the transaction state requests that are used for coordinator 
path based write intent resolution. Can't return
+     * {@link org.apache.ignite.internal.tx.TxState#FINISHING}, it waits for 
actual completion instead.
+     *
+     * @param request Request.
+     * @return Future that should be completed with transaction state meta.
+     */
+    private CompletableFuture<TransactionMeta> 
processTxStateRequest(TxStateRequest request) {
+        clock.update(request.readTimestamp());

Review Comment:
   You should sync it with a chosen commit timestamp because we can generate 
the timestamp earlier than updating the clock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to