denis-chudov commented on code in PR #2697:
URL: https://github.com/apache/ignite-3/pull/2697#discussion_r1363654069
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -320,35 +324,115 @@ public PartitionReplicaListener(
schemaCompatValidator = new SchemaCompatValidator(schemas,
catalogService);
- placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
(evt, e) -> {
- if (!localNode.name().equals(evt.leaseholder())) {
- return completedFuture(false);
- }
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
this::onPrimaryElected);
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
this::onPrimaryExpired);
+ }
+
+ private CompletableFuture<Boolean>
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable
exception) {
+ if (!localNode.name().equals(evt.leaseholder())) {
+ return completedFuture(false);
+ }
+
+ List<CompletableFuture<?>> cleanupFutures = new ArrayList<>();
+
+ Cursor<IgniteBiTuple<UUID, TxMeta>> txs;
+
+ try {
+ txs = txStateStorage.scan();
+ } catch (IgniteInternalException e) {
+ return completedFuture(false);
+ }
- LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+ for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+ UUID txId = tx.getKey();
+ TxMeta txMeta = tx.getValue();
- ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+ assert !txMeta.enlistedPartitions().isEmpty();
- for (UUID txId : txCleanupReadyFutures.keySet()) {
- txCleanupReadyFutures.compute(txId, (id, txOps) -> {
- if (txOps == null || isFinalState(txOps.state)) {
- return null;
+ if (isFinalState(txMeta.txState()) && !txMeta.locksReleased()) {
+ CompletableFuture<?> cleanupFuture =
txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta));
+
+ cleanupFutures.add(cleanupFuture);
+ }
+ }
+
+ allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Failure occurred while triggering cleanup
on commit partition primary replica election "
+ + "[commitPartition=" + replicationGroupId +
']', e);
Review Comment:
fixed
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -320,35 +324,115 @@ public PartitionReplicaListener(
schemaCompatValidator = new SchemaCompatValidator(schemas,
catalogService);
- placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
(evt, e) -> {
- if (!localNode.name().equals(evt.leaseholder())) {
- return completedFuture(false);
- }
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
this::onPrimaryElected);
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
this::onPrimaryExpired);
+ }
+
+ private CompletableFuture<Boolean>
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable
exception) {
+ if (!localNode.name().equals(evt.leaseholder())) {
+ return completedFuture(false);
+ }
+
+ List<CompletableFuture<?>> cleanupFutures = new ArrayList<>();
+
+ Cursor<IgniteBiTuple<UUID, TxMeta>> txs;
+
+ try {
+ txs = txStateStorage.scan();
+ } catch (IgniteInternalException e) {
+ return completedFuture(false);
+ }
- LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+ for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+ UUID txId = tx.getKey();
+ TxMeta txMeta = tx.getValue();
- ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+ assert !txMeta.enlistedPartitions().isEmpty();
- for (UUID txId : txCleanupReadyFutures.keySet()) {
- txCleanupReadyFutures.compute(txId, (id, txOps) -> {
- if (txOps == null || isFinalState(txOps.state)) {
- return null;
+ if (isFinalState(txMeta.txState()) && !txMeta.locksReleased()) {
+ CompletableFuture<?> cleanupFuture =
txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta));
+
+ cleanupFutures.add(cleanupFuture);
+ }
+ }
+
+ allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Failure occurred while triggering cleanup
on commit partition primary replica election "
+ + "[commitPartition=" + replicationGroupId +
']', e);
}
+ });
- if (!txOps.futures.isEmpty()) {
- CompletableFuture<?>[] txFuts =
txOps.futures.values().stream()
- .flatMap(Collection::stream)
- .toArray(CompletableFuture[]::new);
+ // The future returned by this event handler can't wait for all
cleanups because it's not necessary and it can block
+ // meta storage notification thread for a while, preventing it from
delivering further updates (including leases) and therefore
+ // causing deadlock on primary replica waiting.
+ return completedFuture(false);
+ }
+
+ private CompletableFuture<?> durableCleanup(UUID txId, TxMeta txMeta) {
+ return cleanup(txId, txMeta)
+ .handle((v, e) -> {
+ if (e == null) {
+ return txManager.executeCleanupAsync(() ->
markLocksReleased(
+ txId,
+ txMeta.enlistedPartitions(),
+ txMeta.txState(),
+ txMeta.commitTimestamp())
+ );
+ } else {
+ LOG.warn("Failed to execute cleanup on commit
partition primary replica switch [txId="
+ + txId + ", commitPartition=" +
replicationGroupId + ']', e);
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]