sanpwc commented on code in PR #4137:
URL: https://github.com/apache/ignite-3/pull/4137#discussion_r1705617351


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -127,6 +128,12 @@ public class PartitionListener implements 
RaftGroupListener, BeforeApplyHandler
 
     private final IndexMetaStorage indexMetaStorage;
 
+    /**
+     * Timestamp with minimum starting time among all pending RW transactions 
in the cluster.

Review Comment:
   Minor. You are not consistent with definitions. Sometimes you name the 
transactions as pending and sometimes as active, sometimes you name tx start 
time as starting time and sometimes as begin time.
   E.g.
   /**
    * Command to store the minimum **starting time** among all **active** RW 
transactions
    * into transient state of each replication group.
    */
   @Transferable(UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND)
   public interface UpdateMinimumActiveTx**BeginTime**Command extends 
WriteCommand {
   
   Up to you, whether to fix it or not.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            Iterator<Map.Entry<Integer, Integer>> tabItr,
+            long txBeginTime,
+            HybridTimestamp nowTs
+    ) {
+        if (!tabItr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+        int parts = tableWithPartsCount.getValue();
+
+        List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+            CompletableFuture<Object> fut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
+                    .thenCompose(tokenizedAssignments -> {
+                        if (tokenizedAssignments == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Assignment assignment = 
tokenizedAssignments.nodes().iterator().next();
+
+                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
+                                REPLICA_MESSAGES_FACTORY,
+                                replicationGroupId
+                        );
+
+                        UpdateMinimumActiveTxBeginTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
+                                .updateMinimumActiveTxBeginTimeReplicaRequest()
+                                .groupId(partIdMessage)
+                                .timestamp(txBeginTime)
+                                .build();
+
+                        return 
replicaService.invoke(assignment.consistentId(), msg);

Review Comment:
   What if there's no such node in topology?
   Moreover, what if it will never appear?
   I mean following scenario
   zone.scaleDownAutoAdjust = Long.MAX_VALUE //INF.
   p1.assignments = [A,B,C]
   LogicalTopology = [B,C]



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -4136,6 +4139,23 @@ private CompletableFuture<?> 
processVacuumTxStateReplicaRequest(VacuumTxStateRep
         return raftClient.run(cmd);
     }
 
+    private CompletableFuture<?> 
processMinimumActiveTxTimeReplicaRequest(UpdateMinimumActiveTxBeginTimeReplicaRequest
 request) {
+        Command cmd = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+                .timestamp(request.timestamp())
+                .build();
+
+        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+        // The timestamp must increase monotonically, otherwise it will have 
to be

Review Comment:
   In that case you will need to extend SafeTimePropagatingCommand instead of 
simple WriteCommand and specify safe time of course. 



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(

Review Comment:
   Could you please elaborate what partition between (minimumRequiredTime, 
now()) means?



##########
modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap.Entry;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import 
org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner.TimeHolder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests to verify catalog compaction.
+ */
+class ItCatalogCompactionTest extends ClusterPerClassIntegrationTest {
+    private static final int CLUSTER_SIZE = 3;
+
+    @Override
+    protected int initialNodes() {
+        return CLUSTER_SIZE;
+    }
+
+    @Test
+    void testRaftGroupsUpdate() throws InterruptedException {
+        IgniteImpl ignite = CLUSTER.aliveNode();
+        CatalogManagerImpl catalogManager = ((CatalogManagerImpl) 
CLUSTER.aliveNode().catalogManager());
+
+        sql(format("create zone if not exists test with partitions=16, 
replicas={}, storage_profiles='default'",
+                initialNodes()));
+        sql("alter zone test set default");
+
+        // Latest active catalog contains all required tables.
+        {
+            sql("create table a(a int primary key)");
+
+            Catalog minRequiredCatalog = 
catalogManager.catalog(catalogManager.latestCatalogVersion());
+            assertNotNull(minRequiredCatalog);
+
+            sql("create table b(a int primary key)");
+
+            HybridTimestamp expectedTime = 
HybridTimestamp.hybridTimestamp(minRequiredCatalog.time());
+
+            CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+                    .propagateTimeToReplicas(expectedTime.longValue());
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            ensureTimestampStoredInAllReplicas(expectedTime, 2);
+        }
+
+        // Latest active catalog does not contain all required tables.
+        // Replicas of dropped tables must also be updated.
+        long requiredTime = CLUSTER.aliveNode().clockService().nowLong();
+
+        {
+            sql("drop table a");
+            sql("drop table b");
+
+            HybridTimestamp expectedTime = 
HybridTimestamp.hybridTimestamp(requiredTime);
+
+            CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+                    .propagateTimeToReplicas(expectedTime.longValue());
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            ensureTimestampStoredInAllReplicas(expectedTime, 2);
+        }
+
+        // Update to lower timestamp should not succeed.
+        {
+            HybridTimestamp expectedTime = 
HybridTimestamp.hybridTimestamp(requiredTime - 1);
+
+            CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+                    .propagateTimeToReplicas(expectedTime.longValue());
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            
ensureTimestampStoredInAllReplicas(HybridTimestamp.hybridTimestamp(requiredTime),
 2);
+        }
+    }
+
+    @Test
+    void testGlobalMinimumTxBeginTime() {
+        IgniteImpl node0 = CLUSTER.node(0);
+        IgniteImpl node1 = CLUSTER.node(1);
+        IgniteImpl node2 = CLUSTER.node(2);
+
+        List<CatalogCompactionRunner> compactors = List.of(
+                node0.catalogCompactionRunner(),
+                node1.catalogCompactionRunner(),
+                node2.catalogCompactionRunner()
+        );
+
+        Collection<ClusterNode> topologyNodes = node0.clusterNodes();
+
+        InternalTransaction tx1 = (InternalTransaction) 
node0.transactions().begin();
+        InternalTransaction tx2 = (InternalTransaction) 
node1.transactions().begin();
+        InternalTransaction readonlyTx = (InternalTransaction) 
node1.transactions().begin(new TransactionOptions().readOnly(true));
+        InternalTransaction tx3 = (InternalTransaction) 
node2.transactions().begin();
+
+        compactors.forEach(compactor -> {
+            TimeHolder timeHolder = 
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+            assertThat(timeHolder.minActiveTxBeginTime, 
is(tx1.startTimestamp().longValue()));
+        });
+
+        tx1.rollback();
+
+        compactors.forEach(compactor -> {
+            TimeHolder timeHolder = 
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+            assertThat(timeHolder.minActiveTxBeginTime, 
is(tx2.startTimestamp().longValue()));
+        });
+
+        tx2.commit();
+
+        compactors.forEach(compactor -> {
+            TimeHolder timeHolder = 
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+            assertThat(timeHolder.minActiveTxBeginTime, 
is(tx3.startTimestamp().longValue()));
+        });
+
+        tx3.rollback();
+
+        // Since there are no active RW transactions in the cluster, the 
minimum time will be min(now()) across all nodes.
+        compactors.forEach(compactor -> {
+            long minTime = Stream.of(node0, node1, node2).map(node -> 
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
+
+            TimeHolder timeHolder = 
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+            long maxTime = Stream.of(node0, node1, node2).map(node -> 
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
+
+            // Read-only transactions are not counted,
+            assertThat(timeHolder.minActiveTxBeginTime, 
greaterThan(readonlyTx.startTimestamp().longValue()));
+
+            assertThat(timeHolder.minActiveTxBeginTime, 
greaterThanOrEqualTo(minTime));
+            assertThat(timeHolder.minActiveTxBeginTime, 
lessThanOrEqualTo(maxTime));
+        });
+
+        readonlyTx.rollback();
+    }
+
+    private static void ensureTimestampStoredInAllReplicas(HybridTimestamp 
expTime, int expTablesCount) throws InterruptedException {
+        Int2IntMap tablesWithPartitions = 
catalogManagerFacade().collectTablesWithPartitionsBetween(

Review Comment:
   Generally it's not a good idea to test something that uses method X with 
method X itself. I mean that 
   `propagateTimeToReplicas` uses `collectTablesWithPartitionsBetween`.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -666,6 +683,16 @@ private void 
handleVacuumTxStatesCommand(VacuumTxStatesCommand cmd, long command
         txStateStorage.removeAll(cmd.txIds(), commandIndex, commandTerm);
     }
 
+    private void 
handleUpdateMinimalActiveTxTimeCommand(UpdateMinimumActiveTxBeginTimeCommand 
cmd, long commandIndex, long commandTerm) {
+        // Skips the write command because the storage has already executed it.
+        if (commandIndex <= storage.lastAppliedIndex()) {
+            return;
+        }
+
+        long minActiveTxStartTime0 = minActiveTxStartTime;
+        minActiveTxStartTime = Math.max(cmd.timestamp(), 
minActiveTxStartTime0);

Review Comment:
   Please add a javadoc explaining why you have `max` here.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(

Review Comment:
   It seems like an inefficient solution. Because of dozens of tables x 
partitions replication groups we should consider using
   
   - Message coalescing.
   - Depending on whether you actually need compaction coordinator or it's 
possible to trigger minStarTime on all nodes consider sending requests to self 
only.
   - Adding some randomization on sending requests between replication groups.
   
   All the aforementioned issues, are a matter of optimization and not 
correctness, though rather important ones, and thus  may be covered separately. 
However it's required to ensure that general catalog compaction ideas are 
extensible enough to fit given optimizations.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumBeginTimeProvider.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the minimum begin time among all active RW transactions started 
locally.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActiveTxMinimumBeginTimeProvider {

Review Comment:
   I'd rather add Local in the class name, something like 
`ActiveLocalTxMinimumBeginTimeProvider`.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            Iterator<Map.Entry<Integer, Integer>> tabItr,
+            long txBeginTime,
+            HybridTimestamp nowTs
+    ) {
+        if (!tabItr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+        int parts = tableWithPartsCount.getValue();
+
+        List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+            CompletableFuture<Object> fut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
+                    .thenCompose(tokenizedAssignments -> {
+                        if (tokenizedAssignments == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Assignment assignment = 
tokenizedAssignments.nodes().iterator().next();

Review Comment:
   Depending on whether we really need best-effort single compaction 
coordinator I'd rather prefer primary collocated local calls or it it's not 
reasonable add some assignment roundRobbin (or randomized)(1) selected with a 
in-topology verification(2). [1] is both matter of optimization and 
correctness. [2] is a matter of optimization. 
   Please check my other comments for more details.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            Iterator<Map.Entry<Integer, Integer>> tabItr,
+            long txBeginTime,
+            HybridTimestamp nowTs
+    ) {
+        if (!tabItr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+        int parts = tableWithPartsCount.getValue();
+
+        List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+            CompletableFuture<Object> fut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
+                    .thenCompose(tokenizedAssignments -> {
+                        if (tokenizedAssignments == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Assignment assignment = 
tokenizedAssignments.nodes().iterator().next();
+
+                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
+                                REPLICA_MESSAGES_FACTORY,
+                                replicationGroupId
+                        );
+
+                        UpdateMinimumActiveTxBeginTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
+                                .updateMinimumActiveTxBeginTimeReplicaRequest()
+                                .groupId(partIdMessage)
+                                .timestamp(txBeginTime)
+                                .build();
+
+                        return 
replicaService.invoke(assignment.consistentId(), msg);
+                    });
+
+            partitionFutures.add(fut);
+        }
+
+        return CompletableFutures.allOf(partitionFutures)
+                .thenCompose(ignore -> invokeOnReplicas(tabItr, txBeginTime, 
nowTs));

Review Comment:
   Maybe I'm missing it.
   
   1.  How often are you going to trigger minBeginTime move? 
   2. What if compactionCoordinatorNodeName has changed? What will prevent 
further recursive calls?



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            Iterator<Map.Entry<Integer, Integer>> tabItr,
+            long txBeginTime,
+            HybridTimestamp nowTs
+    ) {
+        if (!tabItr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+        int parts = tableWithPartsCount.getValue();
+
+        List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+            CompletableFuture<Object> fut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
+                    .thenCompose(tokenizedAssignments -> {
+                        if (tokenizedAssignments == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Assignment assignment = 
tokenizedAssignments.nodes().iterator().next();
+
+                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
+                                REPLICA_MESSAGES_FACTORY,
+                                replicationGroupId
+                        );
+
+                        UpdateMinimumActiveTxBeginTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
+                                .updateMinimumActiveTxBeginTimeReplicaRequest()
+                                .groupId(partIdMessage)
+                                .timestamp(txBeginTime)
+                                .build();
+
+                        return 
replicaService.invoke(assignment.consistentId(), msg);
+                    });
+
+            partitionFutures.add(fut);
+        }
+
+        return CompletableFutures.allOf(partitionFutures)

Review Comment:
   Please add a comment explaining why it's safe to silently ignore all types 
of exceptions here and why recursive call is considered as a proper failover 
(btw it's not).



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumBeginTimeProvider.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the minimum begin time among all active RW transactions started 
locally.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActiveTxMinimumBeginTimeProvider {
+    /** Returns the minimum begin time among all active RW transactions 
started locally. */

Review Comment:
   ... or null if there are no pending transactions, right? 



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumBeginTimeProvider.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the minimum begin time among all active RW transactions started 
locally.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActiveTxMinimumBeginTimeProvider {
+    /** Returns the minimum begin time among all active RW transactions 
started locally. */
+    @Nullable HybridTimestamp minimumBeginTime();

Review Comment:
   BTW, curious what time are you going to use as global minimum in case of 
zero pending transactions cluster wide?



##########
modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap.Entry;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import 
org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner.TimeHolder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import 
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests to verify catalog compaction.
+ */
+class ItCatalogCompactionTest extends ClusterPerClassIntegrationTest {
+    private static final int CLUSTER_SIZE = 3;
+
+    @Override
+    protected int initialNodes() {
+        return CLUSTER_SIZE;
+    }
+
+    @Test
+    void testRaftGroupsUpdate() throws InterruptedException {
+        IgniteImpl ignite = CLUSTER.aliveNode();
+        CatalogManagerImpl catalogManager = ((CatalogManagerImpl) 
CLUSTER.aliveNode().catalogManager());
+
+        sql(format("create zone if not exists test with partitions=16, 
replicas={}, storage_profiles='default'",
+                initialNodes()));
+        sql("alter zone test set default");
+
+        // Latest active catalog contains all required tables.
+        {
+            sql("create table a(a int primary key)");
+
+            Catalog minRequiredCatalog = 
catalogManager.catalog(catalogManager.latestCatalogVersion());
+            assertNotNull(minRequiredCatalog);
+
+            sql("create table b(a int primary key)");
+
+            HybridTimestamp expectedTime = 
HybridTimestamp.hybridTimestamp(minRequiredCatalog.time());
+
+            CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+                    .propagateTimeToReplicas(expectedTime.longValue());
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            ensureTimestampStoredInAllReplicas(expectedTime, 2);
+        }
+
+        // Latest active catalog does not contain all required tables.
+        // Replicas of dropped tables must also be updated.
+        long requiredTime = CLUSTER.aliveNode().clockService().nowLong();
+
+        {
+            sql("drop table a");
+            sql("drop table b");
+
+            HybridTimestamp expectedTime = 
HybridTimestamp.hybridTimestamp(requiredTime);
+
+            CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+                    .propagateTimeToReplicas(expectedTime.longValue());
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            ensureTimestampStoredInAllReplicas(expectedTime, 2);
+        }
+
+        // Update to lower timestamp should not succeed.
+        {
+            HybridTimestamp expectedTime = 
HybridTimestamp.hybridTimestamp(requiredTime - 1);
+
+            CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+                    .propagateTimeToReplicas(expectedTime.longValue());
+
+            assertThat(fut, willCompleteSuccessfully());
+
+            
ensureTimestampStoredInAllReplicas(HybridTimestamp.hybridTimestamp(requiredTime),
 2);
+        }

Review Comment:
   1. Infinitely dead assignment case is not covered. I mean the one when 
`Assignment assignment = tokenizedAssignments.nodes().iterator().next();` is 
not present in topology and never will be.
   2. Table adjustments while invokeOnReplicas recursive calls is not covered.
   3. Failover cases are not covered. E.g. temporary assignment death. 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -4136,6 +4139,23 @@ private CompletableFuture<?> 
processVacuumTxStateReplicaRequest(VacuumTxStateRep
         return raftClient.run(cmd);
     }
 
+    private CompletableFuture<?> 
processMinimumActiveTxTimeReplicaRequest(UpdateMinimumActiveTxBeginTimeReplicaRequest
 request) {
+        Command cmd = 
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+                .timestamp(request.timestamp())
+                .build();
+
+        CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+        // The timestamp must increase monotonically, otherwise it will have 
to be
+        // stored on disk so that reordering does not occur after the node is 
restarted.
+        applyCmdWithRetryOnSafeTimeReorderException(
+                cmd,
+                resultFuture
+        );
+
+        return resultFuture.thenApply(res -> null);

Review Comment:
   `.thenApply(res -> null);` seems useless here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to