denis-chudov commented on code in PR #3469:
URL: https://github.com/apache/ignite-3/pull/3469#discussion_r1539429074


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java:
##########
@@ -100,4 +104,65 @@ public TxStateMeta state(UUID txId) {
     public Collection<TxStateMeta> states() {
         return txStateMap.values();
     }
+
+    /**
+     * Locally vacuums no longer needed transactional resource.
+     * For each finished (COMMITTED or ABORTED) transactions:
+     * <ol>
+     *     <li> Removes it from the volatile storage if txnResourcesTTL == 0 
or if
+     *     txnState.initialVacuumObservationTimestamp + txnResourcesTTL < 
vacuumObservationTimestamp.</li>
+     *     <li>Updates txnState.initialVacuumObservationTimestamp by setting 
it to vacuumObservationTimestamp
+     *     if it's not already initialized.</li>
+     * </ol>
+     *
+     * @param vacuumObservationTimestamp Timestamp of the vacuum attempt.
+     * @param txnResourceTtl Transactional resource time to live in 
milliseconds.
+     */
+    public void vacuum(long vacuumObservationTimestamp, long txnResourceTtl) {
+        LOG.info("Vacuum started [vacuumObservationTimestamp={}, 
txnResourceTtl={}]", vacuumObservationTimestamp, txnResourceTtl);

Review Comment:
   ```suggestion
           LOG.info("Vacuum started [vacuumObservationTimestamp={}, 
txnResourceTtl={}].", vacuumObservationTimestamp, txnResourceTtl);
   ```



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/VolatileTxStateMetaStorage.java:
##########
@@ -100,4 +104,65 @@ public TxStateMeta state(UUID txId) {
     public Collection<TxStateMeta> states() {
         return txStateMap.values();
     }
+
+    /**
+     * Locally vacuums no longer needed transactional resource.
+     * For each finished (COMMITTED or ABORTED) transactions:
+     * <ol>
+     *     <li> Removes it from the volatile storage if txnResourcesTTL == 0 
or if
+     *     txnState.initialVacuumObservationTimestamp + txnResourcesTTL < 
vacuumObservationTimestamp.</li>
+     *     <li>Updates txnState.initialVacuumObservationTimestamp by setting 
it to vacuumObservationTimestamp
+     *     if it's not already initialized.</li>
+     * </ol>
+     *
+     * @param vacuumObservationTimestamp Timestamp of the vacuum attempt.
+     * @param txnResourceTtl Transactional resource time to live in 
milliseconds.
+     */
+    public void vacuum(long vacuumObservationTimestamp, long txnResourceTtl) {
+        LOG.info("Vacuum started [vacuumObservationTimestamp={}, 
txnResourceTtl={}]", vacuumObservationTimestamp, txnResourceTtl);
+
+        AtomicInteger vacuumizedTxnsCount = new AtomicInteger(0);
+        AtomicInteger markedAsInitiallyDetectedTxnsCount = new 
AtomicInteger(0);
+        AtomicInteger alreadyMarkedTxnsCount = new AtomicInteger(0);
+        AtomicInteger skippedFotFurtherProcessingUnfinishedTxnsCount = new 
AtomicInteger(0);
+
+        txStateMap.forEach((txId, meta) -> {
+            txStateMap.computeIfPresent(txId, (txId0, meta0) -> {
+                if (TxState.isFinalState(meta0.txState())) {
+                    if (txnResourceTtl == 0) {
+                        vacuumizedTxnsCount.incrementAndGet();
+                        return null;
+                    } else if (meta0.initialVacuumObservationTimestamp() == 
null) {
+                        markedAsInitiallyDetectedTxnsCount.incrementAndGet();
+                        return new TxStateMeta(
+                                meta0.txState(),
+                                meta0.txCoordinatorId(),
+                                meta0.commitPartitionId(),
+                                meta0.commitTimestamp(),
+                                vacuumObservationTimestamp
+                        );
+                    } else if (meta0.initialVacuumObservationTimestamp() + 
txnResourceTtl < vacuumObservationTimestamp) {
+                        vacuumizedTxnsCount.incrementAndGet();
+                        return null;
+                    } else {
+                        alreadyMarkedTxnsCount.incrementAndGet();
+                        return meta0;
+                    }
+                } else {
+                    
skippedFotFurtherProcessingUnfinishedTxnsCount.incrementAndGet();
+                    return meta0;
+                }
+            });
+        });
+
+        LOG.info("Vacuum finished [vacuumObservationTimestamp={}, 
txnResourceTtl={}, vacuumizedTxnsCount={},"
+                + " markedAsInitiallyDetectedTxnsCount={}, 
alreadyMarkedTxnsCount={}, skippedFotFurtherProcessingUnfinishedTxnsCount={}]",

Review Comment:
   ```suggestion
           LOG.info("Vacuum finished [vacuumObservationTimestamp={}, 
txnResourceTtl={}, vacuumizedTxnsCount={},"
                   + " markedAsInitiallyDetectedTxnsCount={}, 
alreadyMarkedTxnsCount={}, skippedFotFurtherProcessingUnfinishedTxnsCount={}].",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to