vldpyatkov commented on code in PR #2697:
URL: https://github.com/apache/ignite-3/pull/2697#discussion_r1362612138


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java:
##########
@@ -35,39 +35,65 @@ public class TxMeta implements TransactionMeta {
     private final TxState txState;
 
     /** The list of enlisted partitions. */
-    private final List<TablePartitionId> enlistedPartitions;
+    private final Collection<TablePartitionId> enlistedPartitions;
 
     /** Commit timestamp. */
     @Nullable
     private final HybridTimestamp commitTimestamp;
 
+    /** Whether the locks are released. */
+    private final boolean locksReleased;

Review Comment:
   I do not think that is a semantically correct name because locks can already 
be released (for example, the lock can be released due to the primary role 
move), but the flag is still false.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1227,9 +1311,30 @@ private CompletableFuture<Void> finishAndCleanup(
     ) {
         CompletableFuture<?> changeStateFuture = 
finishTransaction(enlistedPartitions, txId, commit, commitTimestamp, 
txCoordinatorId);
 
+        return cleanup(changeStateFuture, enlistedPartitions, commit, 
commitTimestamp, txId, ATTEMPTS_TO_CLEANUP_REPLICA)
+                .thenRun(() -> markLocksReleased(
+                        txId,
+                        enlistedPartitions,
+                        commit ? COMMITED : ABORTED,
+                        commitTimestamp)
+                );
+    }
+
+    private CompletableFuture<Void> cleanup(UUID txId, TxMeta txMeta) {
+        return cleanup(completedFuture(null), txMeta.enlistedPartitions(), 
txMeta.txState() == COMMITED, txMeta.commitTimestamp(), txId, 1);

Review Comment:
   Why is this cleanup limited by attempts, but the durable cleanup is not? 
Might will we use the durable version here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -320,35 +324,115 @@ public PartitionReplicaListener(
 
         schemaCompatValidator = new SchemaCompatValidator(schemas, 
catalogService);
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
(evt, e) -> {
-            if (!localNode.name().equals(evt.leaseholder())) {
-                return completedFuture(false);
-            }
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
this::onPrimaryElected);
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
this::onPrimaryExpired);
+    }
+
+    private CompletableFuture<Boolean> 
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        if (!localNode.name().equals(evt.leaseholder())) {
+            return completedFuture(false);
+        }
+
+        List<CompletableFuture<?>> cleanupFutures = new ArrayList<>();
+
+        Cursor<IgniteBiTuple<UUID, TxMeta>> txs;
+
+        try {
+            txs = txStateStorage.scan();
+        } catch (IgniteInternalException e) {
+            return completedFuture(false);
+        }
 
-            LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+        for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+            UUID txId = tx.getKey();
+            TxMeta txMeta = tx.getValue();
 
-            ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+            assert !txMeta.enlistedPartitions().isEmpty();
 
-            for (UUID txId : txCleanupReadyFutures.keySet()) {
-                txCleanupReadyFutures.compute(txId, (id, txOps) -> {
-                    if (txOps == null || isFinalState(txOps.state)) {
-                        return null;
+            if (isFinalState(txMeta.txState()) && !txMeta.locksReleased()) {
+                CompletableFuture<?> cleanupFuture = 
txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta));
+
+                cleanupFutures.add(cleanupFuture);
+            }
+        }
+
+        allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]))
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failure occurred while triggering cleanup 
on commit partition primary replica election "
+                                + "[commitPartition=" + replicationGroupId + 
']', e);

Review Comment:
   LOG.warn("Failed to execute cleanup on commit partition primary replica 
switch [txId={}"
                                   + ", commitPartition={}]", e, txId, 
replicationGroupId);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to