xtern commented on code in PR #4256:
URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1732278323


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -601,6 +603,20 @@ public MvPartitionStorage getMvStorage() {
         return minActiveTxBeginTime0;
     }
 
+    /**
+     * Returns minimum starting time among all active RW transactions in the 
cluster,
+     * or {@code null} if the value has not yet been set.
+     */
+    public @Nullable Long snapshottedMinActiveTxBeginTime() {

Review Comment:
   Not sure but may be it worth somehow note the difference between 
`snapshottedMinActiveTxBeginTime` and `minimumActiveTxBeginTime` in javadocs.
   Currently javadocs are identical :thinking: 
   
   Also I suggest to move non-test method upper then methods marked TestOnly.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -299,7 +334,11 @@ private CompletableFuture<Void> 
startCompaction(LogicalTopologySnapshot topology
                             catalogCompactionFut,
                             propagateToReplicasFut
                     );
-                }, executor);
+                }, executor).whenComplete((r, t) -> {

Review Comment:
   We have 2 separated processes:
   1. catalogCompactionFut  already has error handler with logging
   2. propagateToReplicasFut - already has error handler with logging
   
   Why we need this additional logging?
   And what "startCompaction launched (time) has failed" means? :thinking: 
   
   This "combined" future was left for testing purposes



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,70 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
                     return;
                 }
 
-                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
+                lastRunFuture = startCompaction(lwm, 
logicalTopologyService.localLogicalTopology());
             }
         });
     }
 
-    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
-        long localMinimum = localMinTimeProvider.time();
+    private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+        Map<TablePartitionId, @Nullable Long> partitionStates = 
localMinTimeProvider.minTimePerPartition();
 
-        if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
-            LOG.info("Catalog compaction skipped, nothing to compact 
[timestamp={}].", localMinimum);
+        // Find the minimum time among all partitions.
+        long partitionMinTime = Long.MAX_VALUE;
 
-            return CompletableFutures.nullCompletedFuture();
+        for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) 
{
+            Long state = e.getValue();
+
+            if (state == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Partition state is missing [partition={}, 
all={}]", e.getKey(), partitionStates);
+                }
+                return null;
+            }
+
+            partitionMinTime = Math.min(partitionMinTime, state);
         }
 
-        return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinimum)
+        // Choose the minimum time between the low watermark and the minimum 
time among all partitions.
+        long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Local minimum required time: [partitionMinTime={}, 
lowWatermark={}, chosen={}]",
+                    partitionMinTime,
+                    lwm,
+                    chosenMinTime
+            );
+        }
+
+        return chosenMinTime;
+    }
+
+    private CompletableFuture<Void> startCompaction(HybridTimestamp lwm, 
LogicalTopologySnapshot topologySnapshot) {
+        LOG.info("Catalog compaction started at [lowWaterMark={}]", lwm);
+
+        Long localMinRequiredTime = getMinLocalTime(lwm);
+
+        if (localMinRequiredTime == null) {
+            // If do not have local time yet, Use a placeholder value that is 
going to be overwritten.
+            localMinRequiredTime = Long.MAX_VALUE;

Review Comment:
   As I see the behavior on coordinator must be consistent with remotes.
   
   Remotes report MIN_VALUE to prevent compaction. Coordinator should do the 
same.
   
   Otherwise, this can lead to issue with single node cluster, for example.
   
   May be it's better to move this to `getMinLocalTime`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to