korlov42 commented on code in PR #4137:
URL: https://github.com/apache/ignite-3/pull/4137#discussion_r1698158496


##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumStartTimeProvider.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the minimum starting time among all locally started active RW 
transactions.

Review Comment:
   ```suggestion
    * Provides the minimum begin time among all active RW transactions started 
locally.
   ```



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java:
##########
@@ -342,17 +342,26 @@ public CompletableFuture<Integer> 
execute(List<CatalogCommand> commands) {
         return saveUpdateAndWaitForActivation(new 
BulkUpdateProducer(List.copyOf(commands)));
     }
 
+    /**
+     * Trim all catalog versions up to the given catalog (exclusively).
+     *
+     * @param catalog Earliest observable catalog.
+     * @return Operation future, which is completing with {@code true} if a 
new snapshot has been successfully written, {@code false}
+     *         otherwise if a snapshot with the same or greater version 
already exists.
+     */
+    public CompletableFuture<Boolean> compactCatalog(Catalog catalog) {

Review Comment:
   it's better not to accept catalog object from outside, otherwise we may end 
up in broken cluster. Let's pass new minimal catalog version here



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -198,115 +225,127 @@ public CompletableFuture<Boolean> 
onLowWatermarkChanged(HybridTimestamp newLowWa
     }
 
     @TestOnly
-    CompletableFuture<Boolean> lastRunFuture() {
+    synchronized CompletableFuture<Void> lastRunFuture() {
         return lastRunFuture;
     }
 
     /** Starts the catalog compaction routine. */
-    CompletableFuture<Boolean> triggerCompaction(@Nullable HybridTimestamp 
lwm) {
+    void triggerCompaction(@Nullable HybridTimestamp lwm) {
         if (lwm == null || 
!localNodeName.equals(compactionCoordinatorNodeName)) {
-            return CompletableFutures.falseCompletedFuture();
+            return;
         }
 
-        return inBusyLock(busyLock, () -> {
-            CompletableFuture<Boolean> fut = lastRunFuture;
+        inBusyLock(busyLock, () -> {
+            synchronized (this) {
+                CompletableFuture<Void> fut = lastRunFuture;
 
-            if (!fut.isDone()) {
-                LOG.info("Catalog compaction is already in progress, skipping 
(timestamp={})", lwm.longValue());
+                if (!fut.isDone()) {
+                    LOG.info("Catalog compaction is already in progress, 
skipping (timestamp={})", lwm.longValue());

Review Comment:
   looks like `return` statement is missing



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumStartTimeProvider.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the minimum starting time among all locally started active RW 
transactions.

Review Comment:
   the same for javadoc of the method



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionHelper.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class contains utility methods for interacting with the catalog manager.
+ * These methods are only needed for catalog compaction routine.
+ */
+class CatalogManagerCompactionHelper {

Review Comment:
   nitpicking: it's, probably, better to use `Facade` suffix rather than 
`Helper`. Helper is usually utility class/set of utility methods



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -198,115 +225,127 @@ public CompletableFuture<Boolean> 
onLowWatermarkChanged(HybridTimestamp newLowWa
     }
 
     @TestOnly
-    CompletableFuture<Boolean> lastRunFuture() {
+    synchronized CompletableFuture<Void> lastRunFuture() {
         return lastRunFuture;
     }
 
     /** Starts the catalog compaction routine. */
-    CompletableFuture<Boolean> triggerCompaction(@Nullable HybridTimestamp 
lwm) {
+    void triggerCompaction(@Nullable HybridTimestamp lwm) {
         if (lwm == null || 
!localNodeName.equals(compactionCoordinatorNodeName)) {
-            return CompletableFutures.falseCompletedFuture();
+            return;
         }
 
-        return inBusyLock(busyLock, () -> {
-            CompletableFuture<Boolean> fut = lastRunFuture;
+        inBusyLock(busyLock, () -> {
+            synchronized (this) {
+                CompletableFuture<Void> fut = lastRunFuture;
 
-            if (!fut.isDone()) {
-                LOG.info("Catalog compaction is already in progress, skipping 
(timestamp={})", lwm.longValue());
+                if (!fut.isDone()) {
+                    LOG.info("Catalog compaction is already in progress, 
skipping (timestamp={})", lwm.longValue());
+                }
 
-                return CompletableFutures.falseCompletedFuture();
+                lastRunFuture = 
startCompaction(logicalTopologyService.localLogicalTopology());
             }
-
-            fut = 
startCompaction(logicalTopologyService.localLogicalTopology())
-                    .whenComplete((res, ex) -> {
-                        if (ex != null) {
-                            LOG.warn("Catalog compaction has failed 
(timestamp={})", ex, lwm.longValue());
-                        } else if (LOG.isDebugEnabled()) {
-                            if (res) {
-                                LOG.debug("Catalog compaction completed 
successfully (timestamp={})", lwm.longValue());
-                            } else {
-                                LOG.debug("Catalog compaction skipped 
(timestamp={})", lwm.longValue());
-                            }
-                        }
-                    });
-
-            lastRunFuture = fut;
-
-            return fut;
         });
     }
 
-    private CompletableFuture<Boolean> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
+    private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot 
topologySnapshot) {
         long localMinimum = localMinTimeProvider.time();
 
-        if (catalogByTsNullable(localMinimum) == null) {
-            return CompletableFutures.falseCompletedFuture();
+        if (catalogManagerHelper.catalogByTsNullable(localMinimum) == null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Catalog compaction skipped, nothing to compact 
(ts={})", localMinimum);
+            }
+
+            return CompletableFutures.nullCompletedFuture();
         }
 
         return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), 
localMinimum)
-                .thenComposeAsync(ts -> {
-                    Catalog catalog = catalogByTsNullable(ts);
+                .thenComposeAsync(timeHolder -> {
+                    long minRequiredTime = timeHolder.minRequiredTime;
+                    HybridTimestamp minActiveTxStartTime = 
timeHolder.minActiveTxStartTime;
+                    Catalog catalog = 
catalogManagerHelper.catalogByTsNullable(minRequiredTime);
+
+                    CompletableFuture<Boolean> catalogCompactionFut;
 
                     if (catalog == null) {
-                        return CompletableFutures.falseCompletedFuture();
-                    }
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Catalog compaction skipped, nothing to 
compact (ts={})", minRequiredTime);
+                        }
 
-                    return requiredNodes(catalog)
-                            .thenCompose(requiredNodes -> {
-                                List<String> missingNodes = 
missingNodes(requiredNodes, topologySnapshot.nodes());
+                        catalogCompactionFut = 
CompletableFutures.falseCompletedFuture();
+                    } else {
+                        catalogCompactionFut = tryCompactCatalog(catalog, 
topologySnapshot).whenComplete((res, ex) -> {
+                            if (ex != null) {
+                                LOG.warn("Catalog compaction has failed 
(timestamp={})", ex, minRequiredTime);
+                            } else {
+                                if (res && LOG.isDebugEnabled()) {
+                                    LOG.debug("Catalog compaction completed 
successfully (timestamp={})", minRequiredTime);
+                                } else {
+                                    LOG.debug("Catalog compaction skipped 
(timestamp={})", minRequiredTime);
+                                }
+                            }
+                        });
+                    }
 
-                                if (!missingNodes.isEmpty()) {
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.debug("Catalog compaction aborted 
due to missing cluster members (nodes={})", missingNodes);
-                                    }
+                    if (minActiveTxStartTime == null) {

Review Comment:
   I think, we always need to propagate some time. Given that initially 
partition time is set to min possible clock value, we still must be able to do 
compaction, even if there is no transactional load. The safest possible option 
here is, if current node doesn't have active transactions, send current value 
of low watermark acquired just _before_ collecting time of active transactions



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +414,101 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(HybridTimestamp 
minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerHelper.collectTablesWithPartitionsBetween(
+                minimumRequiredTime.longValue(),
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime.longValue(),
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            Iterator<Map.Entry<Integer, Integer>> tabItr,
+            long txBeginTime,
+            HybridTimestamp nowTs
+    ) {
+        if (!tabItr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+        int parts = tableWithPartsCount.getValue();
+
+        List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+            CompletableFuture<Object> fut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
+                    .thenCompose(tokenizedAssignments -> {
+                        if (tokenizedAssignments == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Assignment assignment = 
tokenizedAssignments.nodes().iterator().next();
+
+                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
+                                REPLICA_MESSAGES_FACTORY,
+                                replicationGroupId
+                        );
+
+                        UpdateMinimumActiveTxStartTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
+                                .updateMinimumActiveTxStartTimeReplicaRequest()
+                                .groupId(partIdMessage)
+                                .timestamp(txBeginTime)
+                                .build();
+
+                        return 
replicaService.invoke(assignment.consistentId(), msg);
+                    });
+
+            partitionFutures.add(fut);
+        }
+
+        return CompletableFutures.allOf(partitionFutures)
+                .thenCompose(ignore -> invokeOnReplicas(tabItr, txBeginTime, 
nowTs));
+    }
+
+    private CompletableFuture<Boolean> tryCompactCatalog(Catalog catalog, 
LogicalTopologySnapshot topologySnapshot) {
+        return requiredNodes(catalog)
+                .thenCompose(requiredNodes -> {
+                    List<String> missingNodes = missingNodes(requiredNodes, 
topologySnapshot.nodes());
+
+                    if (!missingNodes.isEmpty()) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Catalog compaction aborted due to 
missing cluster members (nodes={})", missingNodes);
+                        }

Review Comment:
   first, you don't need to wrap such methods in `if` statement. Second, 
compaction seems to be really infrequent process, does it make sense to log 
main stages of compaction with INFO severity?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to