sanpwc commented on code in PR #3591:
URL: https://github.com/apache/ignite-3/pull/3591#discussion_r1562041825


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java:
##########
@@ -50,7 +50,7 @@ public class ResourceVacuumManager implements IgniteComponent 
{
     public static final String RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY 
= "RESOURCE_VACUUM_INTERVAL_MILLISECONDS";
 
     private final int resourceVacuumIntervalMilliseconds = 
IgniteSystemProperties
-            .getInteger(RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, 
30_000);
+            .getInteger(RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, 1_000);

Review Comment:
   Why?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -77,14 +78,15 @@
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.UpdateCommandResult;
+import org.apache.ignite.internal.tx.message.VacuumTxStatesCommand;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.TrackerClosedException;
 import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /**
- * Partition command handler.
+ * Partition cmd handler.

Review Comment:
   Why?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java:
##########
@@ -155,14 +187,62 @@ public void vacuum(long vacuumObservationTimestamp, long 
txnResourceTtl) {
             });
         });
 
-        LOG.info("Vacuum finished [vacuumObservationTimestamp={}, 
txnResourceTtl={}, vacuumizedTxnsCount={},"
-                + " markedAsInitiallyDetectedTxnsCount={}, 
alreadyMarkedTxnsCount={}, skippedFotFurtherProcessingUnfinishedTxnsCount={}].",
-                vacuumObservationTimestamp,
-                txnResourceTtl,
-                vacuumizedTxnsCount,
-                markedAsInitiallyDetectedTxnsCount,
-                alreadyMarkedTxnsCount,
-                skippedFotFurtherProcessingUnfinishedTxnsCount
+        beforeVacuum.apply(txIds)
+                .thenAccept(tuple -> {
+                    Set<UUID> successful = tuple.get1();
+
+                    for (UUID txId : successful) {
+                        txStateMap.compute(txId, (k, v) -> {
+                            if (v == null) {
+                                return null;
+                            } else {
+                                Long cleanupCompletionTs = 
timestamps.get(txId);
+
+                                return (cleanupCompletionTs != null && 
Objects.equals(cleanupCompletionTs, v.cleanupCompletionTimestamp()))

Review Comment:
   Seems that cleanupCompletionTs != null is not needed here, we just may do 
v.cleanupCompletitionTimestamp.eqauls(cleanupCompletionTs), am I right?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
+import org.apache.ignite.network.ClusterNode;
+
+public class PersistentTxStateVacuumizer {
+    private static final IgniteLogger LOG = 
Loggers.forClass(PersistentTxStateVacuumizer.class);
+
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
+    private final ReplicaService replicaService;
+
+    private final ClusterNode localNode;
+
+    public PersistentTxStateVacuumizer(
+            ReplicaService replicaService,
+            ClusterNode localNode) {
+        this.replicaService = replicaService;
+        this.localNode = localNode;
+    }
+
+    public CompletableFuture<IgniteBiTuple<Set<UUID>, Integer>> 
vacuumPersistentTxStates(Map<TablePartitionId, Set<UUID>> txIds) {
+        Set<UUID> successful = ConcurrentHashMap.newKeySet();
+        AtomicInteger unsuccessfulCount = new AtomicInteger(0);
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+
+        txIds.forEach((commitPartitionId, txs) -> {
+            VacuumTxStateReplicaRequest request = 
TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
+                    .groupId(commitPartitionId)
+                    .transactionIds(txs)
+                    .build();
+
+            CompletableFuture<?> future = replicaService.invoke(localNode, 
request).whenComplete((v, e) -> {
+                if (e == null) {
+                    successful.addAll(txs);
+                } else {
+                    LOG.warn("Failed to vacuum tx states from the persistent 
storage.", e);

Review Comment:
   Seems that such general approach won't actually work with replica miss 
exceptions. Are you going to cleanup the volatile state in that case?
   All in all I believe that we should use getPrimaryReplica instead of localId 
in order to calculate the recipient node. Looks more robust to me.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java:
##########
@@ -155,14 +187,62 @@ public void vacuum(long vacuumObservationTimestamp, long 
txnResourceTtl) {
             });
         });
 
-        LOG.info("Vacuum finished [vacuumObservationTimestamp={}, 
txnResourceTtl={}, vacuumizedTxnsCount={},"
-                + " markedAsInitiallyDetectedTxnsCount={}, 
alreadyMarkedTxnsCount={}, skippedFotFurtherProcessingUnfinishedTxnsCount={}].",
-                vacuumObservationTimestamp,
-                txnResourceTtl,
-                vacuumizedTxnsCount,
-                markedAsInitiallyDetectedTxnsCount,
-                alreadyMarkedTxnsCount,
-                skippedFotFurtherProcessingUnfinishedTxnsCount
+        beforeVacuum.apply(txIds)
+                .thenAccept(tuple -> {

Review Comment:
   Seems that we may even skip the future, and just update volatile txn state 
after successful persistent txn state removal by setting 
cleanupCompletionTimestamp to null if it's the same as was propagated to the 
function(currently there's no such propagation). In that case next vacuum 
iteration will just remove the state from the volatile map. What do you think? 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -406,18 +413,18 @@ private UpdateCommandResult 
handleUpdateAllCommand(UpdateAllCommand cmd, long co
      * Handler for the {@link WriteIntentSwitchCommand}.
      *
      * @param cmd Command.
-     * @param commandIndex Index of the RAFT command.
-     * @param commandTerm Term of the RAFT command.
+     * @param commandIndex Index of the RAFT cmd.
+     * @param commandTerm Term of the RAFT cmd.
      */
     private void handleWriteIntentSwitchCommand(WriteIntentSwitchCommand cmd, 
long commandIndex, long commandTerm) {
-        // Skips the write command because the storage has already executed it.
+        // Skips the write cmd because the storage has already executed it.
         if (commandIndex <= storage.lastAppliedIndex()) {
             return;
         }
 
         UUID txId = cmd.txId();
 
-        markFinished(txId, cmd.commit(), cmd.commitTimestamp());
+        markFinished(txId, cmd.commit(), cmd.commitTimestamp(), null);

Review Comment:
   Why did you add commitPartitionId parameter here?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java:
##########
@@ -118,35 +129,56 @@ public Collection<TxStateMeta> states() {
      * @param vacuumObservationTimestamp Timestamp of the vacuum attempt.
      * @param txnResourceTtl Transactional resource time to live in 
milliseconds.
      */
-    public void vacuum(long vacuumObservationTimestamp, long txnResourceTtl) {
+    public void vacuum(
+            long vacuumObservationTimestamp,
+            long txnResourceTtl,
+            Function<Map<TablePartitionId, Set<UUID>>, 
CompletableFuture<IgniteBiTuple<Set<UUID>, Integer>>> beforeVacuum
+    ) {
         LOG.info("Vacuum started [vacuumObservationTimestamp={}, 
txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
 
         AtomicInteger vacuumizedTxnsCount = new AtomicInteger(0);
         AtomicInteger markedAsInitiallyDetectedTxnsCount = new 
AtomicInteger(0);
         AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
         AtomicInteger skippedFotFurtherProcessingUnfinishedTxnsCount = new 
AtomicInteger(0);
 
+        Map<TablePartitionId, Set<UUID>> txIds = new HashMap<>();
+        Map<UUID, Long> timestamps = new HashMap<>();
+
         txStateMap.forEach((txId, meta) -> {
             txStateMap.computeIfPresent(txId, (txId0, meta0) -> {
                 if (TxState.isFinalState(meta0.txState())) {
-                    if (txnResourceTtl == 0) {

Review Comment:
   Do you intentionally skip fast removal in case of txnResourceTtl == 0?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/storage/state/rocksdb/TxStateRocksDbStorage.java:
##########
@@ -231,12 +231,21 @@ public boolean compareAndSet(UUID txId, @Nullable TxState 
txStateExpected, TxMet
     }
 
     @Override
-    public void remove(UUID txId) {
+    public void remove(UUID txId, long commandIndex, long commandTerm) {

Review Comment:
   As mentioned above, let's either add Set<UUID> txId counterpart or even 
substitute given one. 



##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/storage/state/test/TestTxStateStorage.java:
##########
@@ -108,10 +108,15 @@ public boolean compareAndSet(UUID txId, @Nullable TxState 
txStateExpected, TxMet
     }
 
     @Override
-    public void remove(UUID txId) {
+    public void remove(UUID txId, long commandIndex, long commandTerm) {
         checkStorageClosedOrInProgressOfRebalance();
 
         storage.remove(txId);
+
+        if (rebalanceFutureReference.get() == null) {
+            lastAppliedIndex = commandIndex;

Review Comment:
   Seems that we don't use neither (long commandIndex, long commandTerm) nor 
rebalanceFutureReferencein `public void put(UUID txId, TxMeta txMeta)`, at 
first glance that doesn't look correct. What's also important that it's the 
fact that we don't use (long commandIndex, long commandTerm) in 
TxStateRocksDbStorage.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java:
##########
@@ -118,35 +129,56 @@ public Collection<TxStateMeta> states() {
      * @param vacuumObservationTimestamp Timestamp of the vacuum attempt.
      * @param txnResourceTtl Transactional resource time to live in 
milliseconds.
      */
-    public void vacuum(long vacuumObservationTimestamp, long txnResourceTtl) {
+    public void vacuum(
+            long vacuumObservationTimestamp,
+            long txnResourceTtl,
+            Function<Map<TablePartitionId, Set<UUID>>, 
CompletableFuture<IgniteBiTuple<Set<UUID>, Integer>>> beforeVacuum
+    ) {
         LOG.info("Vacuum started [vacuumObservationTimestamp={}, 
txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
 
         AtomicInteger vacuumizedTxnsCount = new AtomicInteger(0);
         AtomicInteger markedAsInitiallyDetectedTxnsCount = new 
AtomicInteger(0);
         AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
         AtomicInteger skippedFotFurtherProcessingUnfinishedTxnsCount = new 
AtomicInteger(0);
 
+        Map<TablePartitionId, Set<UUID>> txIds = new HashMap<>();
+        Map<UUID, Long> timestamps = new HashMap<>();
+
         txStateMap.forEach((txId, meta) -> {
             txStateMap.computeIfPresent(txId, (txId0, meta0) -> {
                 if (TxState.isFinalState(meta0.txState())) {
-                    if (txnResourceTtl == 0) {
-                        vacuumizedTxnsCount.incrementAndGet();
-                        return null;
-                    } else if (meta0.initialVacuumObservationTimestamp() == 
null) {
+                    Long initialVacuumObservationTimestamp = 
meta0.initialVacuumObservationTimestamp();
+
+                    if (initialVacuumObservationTimestamp == null) {
                         markedAsInitiallyDetectedTxnsCount.incrementAndGet();
-                        return new TxStateMeta(
-                                meta0.txState(),
-                                meta0.txCoordinatorId(),
-                                meta0.commitPartitionId(),
-                                meta0.commitTimestamp(),
-                                vacuumObservationTimestamp
-                        );
-                    } else if (meta0.initialVacuumObservationTimestamp() + 
txnResourceTtl < vacuumObservationTimestamp) {
-                        vacuumizedTxnsCount.incrementAndGet();
-                        return null;
+
+                        return markInitialVacuumObservationTimestamp(meta0, 
vacuumObservationTimestamp);
                     } else {
-                        alreadyMarkedTxnsCount.incrementAndGet();
-                        return meta0;
+                        Long cleanupCompletionTimestamp = 
meta0.cleanupCompletionTimestamp();
+
+                        boolean shouldBeVacuumized = 
shouldBeVacuumized(requireNonNull(initialVacuumObservationTimestamp),
+                                cleanupCompletionTimestamp, txnResourceTtl, 
vacuumObservationTimestamp);
+
+                        if (shouldBeVacuumized) {
+                            if (meta0.commitPartitionId() == null) {

Review Comment:
   I believe that having !null cleanupCompletionTimestamp should be enough 
here. Not within exact logic, but with slight adjustment.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/VacuumTxStateReplicaRequest.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.message;
+
+import static 
org.apache.ignite.internal.tx.message.TxMessageGroup.VACUUM_TX_STATE_REPLICA_REQUEST;
+
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+
+@Transferable(VACUUM_TX_STATE_REPLICA_REQUEST)
+public interface VacuumTxStateReplicaRequest extends ReplicaRequest {

Review Comment:
   Curious whether ReplicaRequest is intended or it actually should be 
PrimaryReplicaRequest?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.tx.message.TxMessagesFactory;
+import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
+import org.apache.ignite.network.ClusterNode;
+
+public class PersistentTxStateVacuumizer {
+    private static final IgniteLogger LOG = 
Loggers.forClass(PersistentTxStateVacuumizer.class);
+
+    private static final TxMessagesFactory TX_MESSAGES_FACTORY = new 
TxMessagesFactory();
+
+    private final ReplicaService replicaService;
+
+    private final ClusterNode localNode;
+
+    public PersistentTxStateVacuumizer(
+            ReplicaService replicaService,
+            ClusterNode localNode) {
+        this.replicaService = replicaService;
+        this.localNode = localNode;
+    }
+
+    public CompletableFuture<IgniteBiTuple<Set<UUID>, Integer>> 
vacuumPersistentTxStates(Map<TablePartitionId, Set<UUID>> txIds) {
+        Set<UUID> successful = ConcurrentHashMap.newKeySet();
+        AtomicInteger unsuccessfulCount = new AtomicInteger(0);
+        List<CompletableFuture<?>> futures = new ArrayList<>();
+
+        txIds.forEach((commitPartitionId, txs) -> {
+            VacuumTxStateReplicaRequest request = 
TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
+                    .groupId(commitPartitionId)
+                    .transactionIds(txs)
+                    .build();
+
+            CompletableFuture<?> future = replicaService.invoke(localNode, 
request).whenComplete((v, e) -> {
+                if (e == null) {
+                    successful.addAll(txs);
+                } else {
+                    LOG.warn("Failed to vacuum tx states from the persistent 
storage.", e);
+
+                    unsuccessfulCount.incrementAndGet();
+                }
+            });
+
+            futures.add(future);

Review Comment:
   So, do you really want to fail allOf because of one* failed future?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -609,14 +616,32 @@ void handleBuildIndexCommand(BuildIndexCommand cmd, long 
commandIndex, long comm
      * @param commandTerm Command term.
      */
     private void handlePrimaryReplicaChangeCommand(PrimaryReplicaChangeCommand 
cmd, long commandIndex, long commandTerm) {
-        // Skips the write command because the storage has already executed it.
+        // Skips the write cmd because the storage has already executed it.
         if (commandIndex <= storage.lastAppliedIndex()) {
             return;
         }
 
         txStateStorage.updateLease(cmd.leaseStartTime(), commandIndex, 
commandTerm);
     }
 
+    /**
+     * Handler for {@link VacuumTxStatesCommand}.
+     *
+     * @param cmd Command.
+     * @param commandIndex Command index.
+     * @param commandTerm Command term.
+     */
+    private void handleVacuumTxStatesCommand(VacuumTxStatesCommand cmd, long 
commandIndex, long commandTerm) {
+        // Skips the write cmd because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
+        for (UUID txId : cmd.txIds()) {
+            txStateStorage.remove(txId, commandIndex, commandTerm);

Review Comment:
   It's way to ineffective. Internally in TxStateRocksDbStorage we have 
batching `writeBatch.delete(txIdToKey(txId));` Please use it by adding `public 
void remove(Set<UUID> txIds, long commandIndex, long commandTerm)` or similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to