denis-chudov commented on code in PR #7999:
URL: https://github.com/apache/ignite-3/pull/7999#discussion_r3130093666


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java:
##########
@@ -581,6 +597,80 @@ private void replicaTouch(UUID txId, UUID coordinatorId, 
ZonePartitionId commitP
                 .build());
     }
 
+    private <T> CompletableFuture<T> resolvePendingTransactions(
+            HybridTimestamp requestTime,
+            HybridTimestamp currentSafeTime,
+            Function<HybridTimestamp, CompletableFuture<T>> action
+    ) {
+        // Wait for currentSafeTime >= requestTime to avoid out-of-order 
transactions arrival.
+        if (currentSafeTime.compareTo(requestTime) < 0) {
+            return safeTime.waitFor(requestTime)
+                    .thenComposeAsync(
+                            unused -> resolvePendingTransactions(requestTime, 
safeTime.current(), action),
+                            partitionOperationsExecutor);
+        }
+
+        assert currentSafeTime.compareTo(requestTime) >= 0 : "currentSafeTime 
< lowerBoundTimestamp";
+        assert currentSafeTime.compareTo(safeTime.current()) <= 0 : 
"currentSafeTime > safeTime";
+
+        // Stable committed snapshot is ensured after resolving pending 
transactions state.
+        UUID upperBoundTxId = TransactionIds.transactionId(currentSafeTime, 
Integer.MAX_VALUE, TxPriority.NORMAL);
+        ConcurrentNavigableMap<UUID, PendingTxContext> txToWait = 
pendingTransactions.headMap(upperBoundTxId, true);
+
+        if (!txToWait.isEmpty()) {
+            List<CompletableFuture<?>> futs = null;
+
+            for (Map.Entry<UUID, PendingTxContext> entry : 
txToWait.entrySet()) {
+                if (!entry.getValue().cleanupFut.isDone()) {
+                    futs = futs == null ? new ArrayList<>() : futs;
+                    futs.add(resolveTransactionState(entry.getKey(), 
entry.getValue(), currentSafeTime));
+                }
+            }
+
+            if (futs != null) {
+                return allOf(futs.toArray(CompletableFuture[]::new))
+                        .thenComposeAsync(unused -> 
action.apply(currentSafeTime), partitionOperationsExecutor);
+            }
+        }
+
+        return action.apply(currentSafeTime);
+    }
+
+    private CompletableFuture<Void> resolveTransactionState(UUID txId, 
PendingTxContext txCtx, HybridTimestamp observableTimestamp) {
+        CompletableFuture<Void> resFut = new CompletableFuture<>();
+
+        txCtx.cleanupFut.whenComplete(copyStateTo(resFut));
+
+        return resFut.orTimeout(WAIT_FOR_CLEANUP_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS).handle((ignored, e) -> {
+            if (e == null) {
+                return CompletableFutures.<Void>nullCompletedFuture();
+            }
+
+            if (e instanceof TimeoutException) {
+                // Transaction was not cleaned up in time.
+                // Send tx state request to coordinator to bump commit 
timestamp for active txn beyond safe timestamp.
+                return 
transactionStateResolver.resolveTxState(txStateResolutionParameters()
+                                .txId(txId)
+                                .commitGroupId(txCtx.commitPartId)
+                                .readTimestamp(observableTimestamp)
+                                .build())

Review Comment:
   We discussed it privately. There is a risk in case of endless waiting on 
cleanupFut but it should be resolved in separate tickets that were already 
created:
   https://issues.apache.org/jira/browse/IGNITE-27610
   https://issues.apache.org/jira/browse/IGNITE-27904



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to