xtern commented on code in PR #4256:
URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1732717034


##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,62 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
                     return;
                 }
 
-                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
+                lastRunFuture = startCompaction(lwm, 
logicalTopologyService.localLogicalTopology());
             }
         });
     }
 
-    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
-        long localMinimum = localMinTimeProvider.time();
+    private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+        Map<TablePartitionId, @Nullable Long> partitionStates = 
localMinTimeProvider.minTimePerPartition();
 
-        if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
-            LOG.info("Catalog compaction skipped, nothing to compact 
[timestamp={}].", localMinimum);
+        // Find the minimum time among all partitions.
+        long partitionMinTime = Long.MAX_VALUE;
 
-            return CompletableFutures.nullCompletedFuture();
+        for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) 
{
+            Long state = e.getValue();
+
+            if (state == null) {
+                LOG.debug("Partition state is missing [partition={}, all={}]", 
e.getKey(), partitionStates);
+                return null;
+            }
+
+            partitionMinTime = Math.min(partitionMinTime, state);
         }
 
-        return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinimum)
+        // Choose the minimum time between the low watermark and the minimum 
time among all partitions.
+        long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+        LOG.debug("Local minimum required time: [partitionMinTime={}, 
lowWatermark={}, chosen={}]",
+                partitionMinTime,
+                lwm,
+                chosenMinTime
+        );
+
+        return chosenMinTime;
+    }
+
+    private CompletableFuture<Void> startCompaction(HybridTimestamp lwm, 
LogicalTopologySnapshot topologySnapshot) {
+        LOG.info("Catalog compaction started at [lowWaterMark={}]", lwm);
+
+        Long localMinRequiredTime = getMinLocalTime(lwm);
+
+        if (localMinRequiredTime == null) {
+            // If do not have local time yet, Use a placeholder value that is 
going to be overwritten.
+            localMinRequiredTime = Long.MAX_VALUE;
+        }
+
+        return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinRequiredTime)
                 .thenComposeAsync(timeHolder -> {
                     long minRequiredTime = timeHolder.minRequiredTime;
                     long minActiveTxBeginTime = 
timeHolder.minActiveTxBeginTime;
+
                     Catalog catalog = 
catalogManagerFacade.catalogByTsNullable(minRequiredTime);
 
                     CompletableFuture<Boolean> catalogCompactionFut;
 
                     if (catalog == null) {
-                        LOG.info("Catalog compaction skipped, nothing to 
compact [timestamp={}].", minRequiredTime);
+                        LOG.info("Catalog compaction skipped, nothing to 
compact [timestamp={}]. No catalog at minRequiredTime",

Review Comment:
   pay attention to the @korlov42 comment
   
   > Besides, every log message [must end with a 
period](https://cwiki.apache.org/confluence/display/IGNITE/Java+Code+Style+Guide#JavaCodeStyleGuide-1.4Punctuation).



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,62 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
                     return;
                 }
 
-                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
+                lastRunFuture = startCompaction(lwm, 
logicalTopologyService.localLogicalTopology());
             }
         });
     }
 
-    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
-        long localMinimum = localMinTimeProvider.time();
+    private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+        Map<TablePartitionId, @Nullable Long> partitionStates = 
localMinTimeProvider.minTimePerPartition();
 
-        if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
-            LOG.info("Catalog compaction skipped, nothing to compact 
[timestamp={}].", localMinimum);
+        // Find the minimum time among all partitions.
+        long partitionMinTime = Long.MAX_VALUE;
 
-            return CompletableFutures.nullCompletedFuture();
+        for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) 
{
+            Long state = e.getValue();
+
+            if (state == null) {
+                LOG.debug("Partition state is missing [partition={}, all={}]", 
e.getKey(), partitionStates);

Review Comment:
   Also if this map is huge enough toString() may take some time even if debug 
is not enabled.
   Also you missing period at the end of the sentence.
   I suggest not to log ALL partitions and add period to the end of message.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,62 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
                     return;
                 }
 
-                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
+                lastRunFuture = startCompaction(lwm, 
logicalTopologyService.localLogicalTopology());
             }
         });
     }
 
-    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
-        long localMinimum = localMinTimeProvider.time();
+    private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+        Map<TablePartitionId, @Nullable Long> partitionStates = 
localMinTimeProvider.minTimePerPartition();
 
-        if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
-            LOG.info("Catalog compaction skipped, nothing to compact 
[timestamp={}].", localMinimum);
+        // Find the minimum time among all partitions.
+        long partitionMinTime = Long.MAX_VALUE;
 
-            return CompletableFutures.nullCompletedFuture();
+        for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) 
{
+            Long state = e.getValue();
+
+            if (state == null) {
+                LOG.debug("Partition state is missing [partition={}, all={}]", 
e.getKey(), partitionStates);

Review Comment:
   I'm not sure we need dump ALL partitions even on debug level :thinking: 



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -299,7 +334,11 @@ private CompletableFuture<Void> 
startCompaction(LogicalTopologySnapshot topology
                             catalogCompactionFut,
                             propagateToReplicasFut
                     );
-                }, executor);
+                }, executor).whenComplete((r, t) -> {

Review Comment:
   My main point was that this handler should be removed. Because we already 
have such logging in other places.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,70 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
                     return;
                 }
 
-                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
+                lastRunFuture = startCompaction(lwm, 
logicalTopologyService.localLogicalTopology());
             }
         });
     }
 
-    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
-        long localMinimum = localMinTimeProvider.time();
+    private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+        Map<TablePartitionId, @Nullable Long> partitionStates = 
localMinTimeProvider.minTimePerPartition();
 
-        if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
-            LOG.info("Catalog compaction skipped, nothing to compact 
[timestamp={}].", localMinimum);
+        // Find the minimum time among all partitions.
+        long partitionMinTime = Long.MAX_VALUE;
 
-            return CompletableFutures.nullCompletedFuture();
+        for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) 
{
+            Long state = e.getValue();
+
+            if (state == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Partition state is missing [partition={}, 
all={}]", e.getKey(), partitionStates);
+                }
+                return null;
+            }
+
+            partitionMinTime = Math.min(partitionMinTime, state);
         }
 
-        return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinimum)
+        // Choose the minimum time between the low watermark and the minimum 
time among all partitions.
+        long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Local minimum required time: [partitionMinTime={}, 
lowWatermark={}, chosen={}]",
+                    partitionMinTime,
+                    lwm,
+                    chosenMinTime
+            );
+        }
+
+        return chosenMinTime;
+    }
+
+    private CompletableFuture<Void> startCompaction(HybridTimestamp lwm, 
LogicalTopologySnapshot topologySnapshot) {
+        LOG.info("Catalog compaction started at [lowWaterMark={}]", lwm);
+
+        Long localMinRequiredTime = getMinLocalTime(lwm);
+
+        if (localMinRequiredTime == null) {
+            // If do not have local time yet, Use a placeholder value that is 
going to be overwritten.
+            localMinRequiredTime = Long.MAX_VALUE;

Review Comment:
   How this was fixed?
   Why not to return MIN_VALUE instead of null from `getMinLocalTime`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to