korlov42 commented on code in PR #4256:
URL: https://github.com/apache/ignite-3/pull/4256#discussion_r1732263598


##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -225,6 +225,8 @@ public CompletableFuture<Boolean> 
saveSnapshot(SnapshotEntry update) {
 
             Iif iif = iif(versionIsRecent, saveSnapshotAndDropOutdatedUpdates, 
ops().yield(false));
 
+            LOG.info("Snapshot version is updated. [Old version: {}, new 
version: {}]", oldSnapshotVersion, snapshotVersion);

Review Comment:
   ```suggestion
               LOG.info("Snapshot version is updated [oldVersion={}, 
newVersion={}].", oldSnapshotVersion, snapshotVersion);
   ```



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java:
##########
@@ -225,6 +225,8 @@ public CompletableFuture<Boolean> 
saveSnapshot(SnapshotEntry update) {
 
             Iif iif = iif(versionIsRecent, saveSnapshotAndDropOutdatedUpdates, 
ops().yield(false));
 
+            LOG.info("Snapshot version is updated. [Old version: {}, new 
version: {}]", oldSnapshotVersion, snapshotVersion);

Review Comment:
   btw, this message is not quite correct. Next invoke is conditional, so there 
is chance that update won't be applied.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,70 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
                     return;
                 }
 
-                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
+                lastRunFuture = startCompaction(lwm, 
logicalTopologyService.localLogicalTopology());
             }
         });
     }
 
-    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
-        long localMinimum = localMinTimeProvider.time();
+    private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+        Map<TablePartitionId, @Nullable Long> partitionStates = 
localMinTimeProvider.minTimePerPartition();
 
-        if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
-            LOG.info("Catalog compaction skipped, nothing to compact 
[timestamp={}].", localMinimum);
+        // Find the minimum time among all partitions.
+        long partitionMinTime = Long.MAX_VALUE;
 
-            return CompletableFutures.nullCompletedFuture();
+        for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) 
{
+            Long state = e.getValue();
+
+            if (state == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Partition state is missing [partition={}, 
all={}]", e.getKey(), partitionStates);
+                }
+                return null;
+            }
+
+            partitionMinTime = Math.min(partitionMinTime, state);
         }
 
-        return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinimum)
+        // Choose the minimum time between the low watermark and the minimum 
time among all partitions.
+        long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Local minimum required time: [partitionMinTime={}, 
lowWatermark={}, chosen={}]",
+                    partitionMinTime,
+                    lwm,
+                    chosenMinTime
+            );
+        }
+
+        return chosenMinTime;
+    }
+
+    private CompletableFuture<Void> startCompaction(HybridTimestamp lwm, 
LogicalTopologySnapshot topologySnapshot) {
+        LOG.info("Catalog compaction started at [lowWaterMark={}]", lwm);
+
+        Long localMinRequiredTime = getMinLocalTime(lwm);
+
+        if (localMinRequiredTime == null) {
+            // If do not have local time yet, Use a placeholder value that is 
going to be overwritten.
+            localMinRequiredTime = Long.MAX_VALUE;
+        }
+
+        return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinRequiredTime)
                 .thenComposeAsync(timeHolder -> {
                     long minRequiredTime = timeHolder.minRequiredTime;
                     long minActiveTxBeginTime = 
timeHolder.minActiveTxBeginTime;
+
+                    assert minRequiredTime != Long.MAX_VALUE : "Unexpected 
minRequiredTime is returned from determineGlobalMin call";
+
                     Catalog catalog = 
catalogManagerFacade.catalogByTsNullable(minRequiredTime);
 
                     CompletableFuture<Boolean> catalogCompactionFut;
 
                     if (catalog == null) {
-                        LOG.info("Catalog compaction skipped, nothing to 
compact [timestamp={}].", minRequiredTime);
+                        if (LOG.isInfoEnabled()) {
+                            LOG.info("Catalog compaction skipped, nothing to 
compact (timestamp={}). No catalog at minRequiredTime",

Review Comment:
   please format all messages according to 
[guideline](https://cwiki.apache.org/confluence/display/IGNITE/Java+Logging+Guidelines).
 Besides, every log message [must end with a 
period](https://cwiki.apache.org/confluence/display/IGNITE/Java+Code+Style+Guide#JavaCodeStyleGuide-1.4Punctuation).



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -247,30 +238,70 @@ void triggerCompaction(@Nullable HybridTimestamp lwm) {
                     return;
                 }
 
-                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
+                lastRunFuture = startCompaction(lwm, 
logicalTopologyService.localLogicalTopology());
             }
         });
     }
 
-    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
-        long localMinimum = localMinTimeProvider.time();
+    private @Nullable Long getMinLocalTime(HybridTimestamp lwm) {
+        Map<TablePartitionId, @Nullable Long> partitionStates = 
localMinTimeProvider.minTimePerPartition();
 
-        if (catalogManagerFacade.catalogByTsNullable(localMinimum) == null) {
-            LOG.info("Catalog compaction skipped, nothing to compact 
[timestamp={}].", localMinimum);
+        // Find the minimum time among all partitions.
+        long partitionMinTime = Long.MAX_VALUE;
 
-            return CompletableFutures.nullCompletedFuture();
+        for (Map.Entry<TablePartitionId, Long> e : partitionStates.entrySet()) 
{
+            Long state = e.getValue();
+
+            if (state == null) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Partition state is missing [partition={}, 
all={}]", e.getKey(), partitionStates);
+                }
+                return null;
+            }
+
+            partitionMinTime = Math.min(partitionMinTime, state);
         }
 
-        return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinimum)
+        // Choose the minimum time between the low watermark and the minimum 
time among all partitions.
+        long chosenMinTime = Math.min(lwm.longValue(), partitionMinTime);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Local minimum required time: [partitionMinTime={}, 
lowWatermark={}, chosen={}]",
+                    partitionMinTime,
+                    lwm,
+                    chosenMinTime
+            );
+        }
+
+        return chosenMinTime;
+    }
+
+    private CompletableFuture<Void> startCompaction(HybridTimestamp lwm, 
LogicalTopologySnapshot topologySnapshot) {
+        LOG.info("Catalog compaction started at [lowWaterMark={}]", lwm);
+
+        Long localMinRequiredTime = getMinLocalTime(lwm);
+
+        if (localMinRequiredTime == null) {
+            // If do not have local time yet, Use a placeholder value that is 
going to be overwritten.
+            localMinRequiredTime = Long.MAX_VALUE;
+        }
+
+        return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinRequiredTime)
                 .thenComposeAsync(timeHolder -> {
                     long minRequiredTime = timeHolder.minRequiredTime;
                     long minActiveTxBeginTime = 
timeHolder.minActiveTxBeginTime;
+
+                    assert minRequiredTime != Long.MAX_VALUE : "Unexpected 
minRequiredTime is returned from determineGlobalMin call";
+
                     Catalog catalog = 
catalogManagerFacade.catalogByTsNullable(minRequiredTime);
 
                     CompletableFuture<Boolean> catalogCompactionFut;
 
                     if (catalog == null) {
-                        LOG.info("Catalog compaction skipped, nothing to 
compact [timestamp={}].", minRequiredTime);
+                        if (LOG.isInfoEnabled()) {

Review Comment:
   no need to make it conditional for already computed params



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to