denis-chudov commented on code in PR #7999:
URL: https://github.com/apache/ignite-3/pull/7999#discussion_r3130093666
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java:
##########
@@ -581,6 +597,80 @@ private void replicaTouch(UUID txId, UUID coordinatorId,
ZonePartitionId commitP
.build());
}
+ private <T> CompletableFuture<T> resolvePendingTransactions(
+ HybridTimestamp requestTime,
+ HybridTimestamp currentSafeTime,
+ Function<HybridTimestamp, CompletableFuture<T>> action
+ ) {
+ // Wait for currentSafeTime >= requestTime to avoid out-of-order
transactions arrival.
+ if (currentSafeTime.compareTo(requestTime) < 0) {
+ return safeTime.waitFor(requestTime)
+ .thenComposeAsync(
+ unused -> resolvePendingTransactions(requestTime,
safeTime.current(), action),
+ partitionOperationsExecutor);
+ }
+
+ assert currentSafeTime.compareTo(requestTime) >= 0 : "currentSafeTime
< lowerBoundTimestamp";
+ assert currentSafeTime.compareTo(safeTime.current()) <= 0 :
"currentSafeTime > safeTime";
+
+ // Stable committed snapshot is ensured after resolving pending
transactions state.
+ UUID upperBoundTxId = TransactionIds.transactionId(currentSafeTime,
Integer.MAX_VALUE, TxPriority.NORMAL);
+ ConcurrentNavigableMap<UUID, PendingTxContext> txToWait =
pendingTransactions.headMap(upperBoundTxId, true);
+
+ if (!txToWait.isEmpty()) {
+ List<CompletableFuture<?>> futs = null;
+
+ for (Map.Entry<UUID, PendingTxContext> entry :
txToWait.entrySet()) {
+ if (!entry.getValue().cleanupFut.isDone()) {
+ futs = futs == null ? new ArrayList<>() : futs;
+ futs.add(resolveTransactionState(entry.getKey(),
entry.getValue(), currentSafeTime));
+ }
+ }
+
+ if (futs != null) {
+ return allOf(futs.toArray(CompletableFuture[]::new))
+ .thenComposeAsync(unused ->
action.apply(currentSafeTime), partitionOperationsExecutor);
+ }
+ }
+
+ return action.apply(currentSafeTime);
+ }
+
+ private CompletableFuture<Void> resolveTransactionState(UUID txId,
PendingTxContext txCtx, HybridTimestamp observableTimestamp) {
+ CompletableFuture<Void> resFut = new CompletableFuture<>();
+
+ txCtx.cleanupFut.whenComplete(copyStateTo(resFut));
+
+ return resFut.orTimeout(WAIT_FOR_CLEANUP_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS).handle((ignored, e) -> {
+ if (e == null) {
+ return CompletableFutures.<Void>nullCompletedFuture();
+ }
+
+ if (e instanceof TimeoutException) {
+ // Transaction was not cleaned up in time.
+ // Send tx state request to coordinator to bump commit
timestamp for active txn beyond safe timestamp.
+ return
transactionStateResolver.resolveTxState(txStateResolutionParameters()
+ .txId(txId)
+ .commitGroupId(txCtx.commitPartId)
+ .readTimestamp(observableTimestamp)
+ .build())
Review Comment:
We discussed it privately. There is a risk in case of endless waiting on
cleanupFut but it should be resolved in separate tickets that were already
created:
https://issues.apache.org/jira/browse/IGNITE-27610
https://issues.apache.org/jira/browse/IGNITE-27904
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]