vldpyatkov commented on code in PR #2697:
URL: https://github.com/apache/ignite-3/pull/2697#discussion_r1362612138
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxMeta.java:
##########
@@ -35,39 +35,65 @@ public class TxMeta implements TransactionMeta {
private final TxState txState;
/** The list of enlisted partitions. */
- private final List<TablePartitionId> enlistedPartitions;
+ private final Collection<TablePartitionId> enlistedPartitions;
/** Commit timestamp. */
@Nullable
private final HybridTimestamp commitTimestamp;
+ /** Whether the locks are released. */
+ private final boolean locksReleased;
Review Comment:
I do not think that is a semantically correct name because locks can already
be released (for example, the lock can be released due to the primary role
move), but the flag is still false.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1227,9 +1311,30 @@ private CompletableFuture<Void> finishAndCleanup(
) {
CompletableFuture<?> changeStateFuture =
finishTransaction(enlistedPartitions, txId, commit, commitTimestamp,
txCoordinatorId);
+ return cleanup(changeStateFuture, enlistedPartitions, commit,
commitTimestamp, txId, ATTEMPTS_TO_CLEANUP_REPLICA)
+ .thenRun(() -> markLocksReleased(
+ txId,
+ enlistedPartitions,
+ commit ? COMMITED : ABORTED,
+ commitTimestamp)
+ );
+ }
+
+ private CompletableFuture<Void> cleanup(UUID txId, TxMeta txMeta) {
+ return cleanup(completedFuture(null), txMeta.enlistedPartitions(),
txMeta.txState() == COMMITED, txMeta.commitTimestamp(), txId, 1);
Review Comment:
Why is this cleanup limited by attempts, but the durable cleanup is not?
Might will we use the durable version here?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -320,35 +324,115 @@ public PartitionReplicaListener(
schemaCompatValidator = new SchemaCompatValidator(schemas,
catalogService);
- placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
(evt, e) -> {
- if (!localNode.name().equals(evt.leaseholder())) {
- return completedFuture(false);
- }
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
this::onPrimaryElected);
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
this::onPrimaryExpired);
+ }
+
+ private CompletableFuture<Boolean>
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable
exception) {
+ if (!localNode.name().equals(evt.leaseholder())) {
+ return completedFuture(false);
+ }
+
+ List<CompletableFuture<?>> cleanupFutures = new ArrayList<>();
+
+ Cursor<IgniteBiTuple<UUID, TxMeta>> txs;
+
+ try {
+ txs = txStateStorage.scan();
+ } catch (IgniteInternalException e) {
+ return completedFuture(false);
+ }
- LOG.info("Primary replica expired [grp={}]", replicationGroupId);
+ for (IgniteBiTuple<UUID, TxMeta> tx : txs) {
+ UUID txId = tx.getKey();
+ TxMeta txMeta = tx.getValue();
- ArrayList<CompletableFuture<?>> futs = new ArrayList<>();
+ assert !txMeta.enlistedPartitions().isEmpty();
- for (UUID txId : txCleanupReadyFutures.keySet()) {
- txCleanupReadyFutures.compute(txId, (id, txOps) -> {
- if (txOps == null || isFinalState(txOps.state)) {
- return null;
+ if (isFinalState(txMeta.txState()) && !txMeta.locksReleased()) {
+ CompletableFuture<?> cleanupFuture =
txManager.executeCleanupAsync(() -> durableCleanup(txId, txMeta));
+
+ cleanupFutures.add(cleanupFuture);
+ }
+ }
+
+ allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]))
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Failure occurred while triggering cleanup
on commit partition primary replica election "
+ + "[commitPartition=" + replicationGroupId +
']', e);
Review Comment:
LOG.warn("Failed to execute cleanup on commit partition primary replica
switch [txId={}"
+ ", commitPartition={}]", e, txId,
replicationGroupId);
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]