ptupitsyn commented on code in PR #7588:
URL: https://github.com/apache/ignite-3/pull/7588#discussion_r2846115408


##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java:
##########
@@ -206,6 +225,79 @@ public void commit() throws TransactionException {
         sync(commitAsync());
     }
 
+    /**
+     * Discards the directly mapped transaction fragments in case of 
coordinator side transaction invalidation
+     * (either kill or implicit rollback due to mapping failure, see 
postEnlist).
+     *
+     * @param killed Killed flag.
+     *
+     * @return The future.
+     */
+    public CompletableFuture<Void> discardDirectMappings(boolean killed) {
+        enlistPartitionLock.writeLock().lock();
+
+        try {
+            if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
+                return finishFut.get();
+            }
+        } finally {
+            enlistPartitionLock.writeLock().unlock();
+        }
+
+        return sendDiscardRequests().handle((r, e) -> {
+            setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
+            ch.inflights().erase(txId());
+            this.finishFut.get().complete(null);
+            return null;
+        });
+    }
+
+    private CompletableFuture<Void> sendDiscardRequests() {
+        assert finishFut != null;
+
+        if 
(!ch.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING_SEND_DISCARD)) {
+            return nullCompletedFuture();
+        }
+
+        Map<String, List<TablePartitionId>> enlistments = new HashMap<>();
+
+        for (Entry<TablePartitionId, CompletableFuture<IgniteBiTuple<String, 
Long>>> entry : enlisted.entrySet()) {
+            IgniteBiTuple<String, Long> info = entry.getValue().getNow(null);
+
+            if (info == null) {
+                continue; // Ignore incomplete enlistments.
+            }
+
+            enlistments.computeIfAbsent(info.get1(), k -> new 
ArrayList<>()).add(entry.getKey());
+        }
+
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(enlistments.size());
+
+        for (Entry<String, List<TablePartitionId>> entry : 
enlistments.entrySet()) {
+            CompletableFuture<Void> discardFut = 
reliableChannel.getNodeChannelAsync(entry.getKey()).thenCompose(ch -> {

Review Comment:
   We should not establish new connections to discard enlistments. There are 
two ways to approach the problem:
   * Enlistment is tied to a client connection => track it on the server in 
`ClientResourceRegistry` and discard on disconnect
   * Enlistment is not tied to a given connection => send **one request to any 
node** with the list of things to clean up, then the server re-routes as 
necessary.
   
   We should never assume that the client can connect to a certain node.



##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java:
##########
@@ -75,7 +89,7 @@ public class ClientTransaction implements Transaction {
 
     /** The future used on repeated commit/rollback. */
     @IgniteToStringExclude
-    private final AtomicReference<CompletableFuture<Void>> finishFut = new 
AtomicReference<>();
+    private final AtomicReference<CompletableFuture<Void>> finishFut = new 
AtomicReference<>(); // TODO use updater

Review Comment:
   TODO without a ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to