vldpyatkov commented on code in PR #2697:
URL: https://github.com/apache/ignite-3/pull/2697#discussion_r1363558930


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -320,35 +324,115 @@ public PartitionReplicaListener(
 
         schemaCompatValidator = new SchemaCompatValidator(schemas, 
catalogService);
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
(evt, e) -> {
-            if (!localNode.name().equals(evt.leaseholder())) {
-                return completedFuture(false);
-            }
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
this::onPrimaryElected);
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
this::onPrimaryExpired);
+    }
+
+    private CompletableFuture<Boolean> 
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        if (!localNode.name().equals(evt.leaseholder())) {
+            return completedFuture(false);
+        }
+
+        List<CompletableFuture<?>> cleanupFutures = new ArrayList<>();
+
+        Cursor<IgniteBiTuple<UUID, TxMeta>> txs;
+
+        try {
+            txs = txStateStorage.scan();
+        } catch (IgniteInternalException e) {
+            return completedFuture(false);
+        }
 
-            LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+        for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+            UUID txId = tx.getKey();
+            TxMeta txMeta = tx.getValue();
 
-            ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+            assert !txMeta.enlistedPartitions().isEmpty();
 
-            for (UUID txId : txCleanupReadyFutures.keySet()) {
-                txCleanupReadyFutures.compute(txId, (id, txOps) -> {
-                    if (txOps == null || isFinalState(txOps.state)) {
-                        return null;
+            if (isFinalState(txMeta.txState()) && !txMeta.locksReleased()) {
+                CompletableFuture<?> cleanupFuture = 
txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta));
+
+                cleanupFutures.add(cleanupFuture);
+            }
+        }
+
+        allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]))
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failure occurred while triggering cleanup 
on commit partition primary replica election "
+                                + "[commitPartition=" + replicationGroupId + 
']', e);
                     }
+                });
 
-                    if (!txOps.futures.isEmpty()) {
-                        CompletableFuture<?>[] txFuts = 
txOps.futures.values().stream()
-                                .flatMap(Collection::stream)
-                                .toArray(CompletableFuture[]::new);
+        // The future returned by this event handler can't wait for all 
cleanups because it's not necessary and it can block
+        // meta storage notification thread for a while, preventing it from 
delivering further updates (including leases) and therefore
+        // causing deadlock on primary replica waiting.
+        return completedFuture(false);
+    }
+
+    private CompletableFuture<?> durableCleanup(UUID txId, TxMeta txMeta) {
+        return cleanup(txId, txMeta)
+                .handle((v, e) -> {
+                    if (e == null) {
+                        return txManager.executeCleanupAsync(() -> 
markLocksReleased(
+                                txId,
+                                txMeta.enlistedPartitions(),
+                                txMeta.txState(),
+                                txMeta.commitTimestamp())
+                        );
+                    } else {
+                        LOG.warn("Failed to execute cleanup on commit 
partition primary replica switch [txId="
+                                + txId + ", commitPartition=" + 
replicationGroupId + ']', e);

Review Comment:
   LOG.warn("Failed to execute cleanup on commit partition primary replica 
switch [txId={}"
                                   + ", commitPartition={}]", e, txId, 
replicationGroupId);



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -320,35 +324,115 @@ public PartitionReplicaListener(
 
         schemaCompatValidator = new SchemaCompatValidator(schemas, 
catalogService);
 
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
(evt, e) -> {
-            if (!localNode.name().equals(evt.leaseholder())) {
-                return completedFuture(false);
-            }
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
this::onPrimaryElected);
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
this::onPrimaryExpired);
+    }
+
+    private CompletableFuture<Boolean> 
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        if (!localNode.name().equals(evt.leaseholder())) {
+            return completedFuture(false);
+        }
+
+        List<CompletableFuture<?>> cleanupFutures = new ArrayList<>();
+
+        Cursor<IgniteBiTuple<UUID, TxMeta>> txs;
+
+        try {
+            txs = txStateStorage.scan();
+        } catch (IgniteInternalException e) {
+            return completedFuture(false);
+        }
 
-            LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+        for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+            UUID txId = tx.getKey();
+            TxMeta txMeta = tx.getValue();
 
-            ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+            assert !txMeta.enlistedPartitions().isEmpty();
 
-            for (UUID txId : txCleanupReadyFutures.keySet()) {
-                txCleanupReadyFutures.compute(txId, (id, txOps) -> {
-                    if (txOps == null || isFinalState(txOps.state)) {
-                        return null;
+            if (isFinalState(txMeta.txState()) && !txMeta.locksReleased()) {
+                CompletableFuture<?> cleanupFuture = 
txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta));
+
+                cleanupFutures.add(cleanupFuture);
+            }
+        }
+
+        allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]))
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failure occurred while triggering cleanup 
on commit partition primary replica election "
+                                + "[commitPartition=" + replicationGroupId + 
']', e);

Review Comment:
   LOG.error("Failure occurred while triggering cleanup on commit partition 
primary replica election "
                                   + "[commitPartition={}]", e, 
replicationGroupId);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to