sanpwc commented on code in PR #4137:
URL: https://github.com/apache/ignite-3/pull/4137#discussion_r1705617351
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -127,6 +128,12 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
private final IndexMetaStorage indexMetaStorage;
+ /**
+ * Timestamp with minimum starting time among all pending RW transactions
in the cluster.
Review Comment:
Minor. You are not consistent with definitions. Sometimes you name the
transactions as pending and sometimes as active, sometimes you name tx start
time as starting time and sometimes as begin time.
E.g.
/**
* Command to store the minimum **starting time** among all **active** RW
transactions
* into transient state of each replication group.
*/
@Transferable(UPDATE_MINIMUM_ACTIVE_TX_TIME_COMMAND)
public interface UpdateMinimumActiveTx**BeginTime**Command extends
WriteCommand {
Up to you, whether to fix it or not.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
+ Iterator<Map.Entry<Integer, Integer>> tabItr,
+ long txBeginTime,
+ HybridTimestamp nowTs
+ ) {
+ if (!tabItr.hasNext()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+ int parts = tableWithPartsCount.getValue();
+
+ List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+ for (int p = 0; p < parts; p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+ CompletableFuture<Object> fut =
placementDriver.getAssignments(replicationGroupId, nowTs)
+ .thenCompose(tokenizedAssignments -> {
+ if (tokenizedAssignments == null) {
+
throwAssignmentsNotReadyException(replicationGroupId);
+ }
+
+ Assignment assignment =
tokenizedAssignments.nodes().iterator().next();
+
+ TablePartitionIdMessage partIdMessage =
ReplicaMessageUtils.toTablePartitionIdMessage(
+ REPLICA_MESSAGES_FACTORY,
+ replicationGroupId
+ );
+
+ UpdateMinimumActiveTxBeginTimeReplicaRequest msg =
REPLICATION_MESSAGES_FACTORY
+ .updateMinimumActiveTxBeginTimeReplicaRequest()
+ .groupId(partIdMessage)
+ .timestamp(txBeginTime)
+ .build();
+
+ return
replicaService.invoke(assignment.consistentId(), msg);
Review Comment:
What if there's no such node in topology?
Moreover, what if it will never appear?
I mean following scenario
zone.scaleDownAutoAdjust = Long.MAX_VALUE //INF.
p1.assignments = [A,B,C]
LogicalTopology = [B,C]
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -4136,6 +4139,23 @@ private CompletableFuture<?>
processVacuumTxStateReplicaRequest(VacuumTxStateRep
return raftClient.run(cmd);
}
+ private CompletableFuture<?>
processMinimumActiveTxTimeReplicaRequest(UpdateMinimumActiveTxBeginTimeReplicaRequest
request) {
+ Command cmd =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+ .timestamp(request.timestamp())
+ .build();
+
+ CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+ // The timestamp must increase monotonically, otherwise it will have
to be
Review Comment:
In that case you will need to extend SafeTimePropagatingCommand instead of
simple WriteCommand and specify safe time of course.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
Review Comment:
Could you please elaborate what partition between (minimumRequiredTime,
now()) means?
##########
modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap.Entry;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import
org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner.TimeHolder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests to verify catalog compaction.
+ */
+class ItCatalogCompactionTest extends ClusterPerClassIntegrationTest {
+ private static final int CLUSTER_SIZE = 3;
+
+ @Override
+ protected int initialNodes() {
+ return CLUSTER_SIZE;
+ }
+
+ @Test
+ void testRaftGroupsUpdate() throws InterruptedException {
+ IgniteImpl ignite = CLUSTER.aliveNode();
+ CatalogManagerImpl catalogManager = ((CatalogManagerImpl)
CLUSTER.aliveNode().catalogManager());
+
+ sql(format("create zone if not exists test with partitions=16,
replicas={}, storage_profiles='default'",
+ initialNodes()));
+ sql("alter zone test set default");
+
+ // Latest active catalog contains all required tables.
+ {
+ sql("create table a(a int primary key)");
+
+ Catalog minRequiredCatalog =
catalogManager.catalog(catalogManager.latestCatalogVersion());
+ assertNotNull(minRequiredCatalog);
+
+ sql("create table b(a int primary key)");
+
+ HybridTimestamp expectedTime =
HybridTimestamp.hybridTimestamp(minRequiredCatalog.time());
+
+ CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+ .propagateTimeToReplicas(expectedTime.longValue());
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ ensureTimestampStoredInAllReplicas(expectedTime, 2);
+ }
+
+ // Latest active catalog does not contain all required tables.
+ // Replicas of dropped tables must also be updated.
+ long requiredTime = CLUSTER.aliveNode().clockService().nowLong();
+
+ {
+ sql("drop table a");
+ sql("drop table b");
+
+ HybridTimestamp expectedTime =
HybridTimestamp.hybridTimestamp(requiredTime);
+
+ CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+ .propagateTimeToReplicas(expectedTime.longValue());
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ ensureTimestampStoredInAllReplicas(expectedTime, 2);
+ }
+
+ // Update to lower timestamp should not succeed.
+ {
+ HybridTimestamp expectedTime =
HybridTimestamp.hybridTimestamp(requiredTime - 1);
+
+ CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+ .propagateTimeToReplicas(expectedTime.longValue());
+
+ assertThat(fut, willCompleteSuccessfully());
+
+
ensureTimestampStoredInAllReplicas(HybridTimestamp.hybridTimestamp(requiredTime),
2);
+ }
+ }
+
+ @Test
+ void testGlobalMinimumTxBeginTime() {
+ IgniteImpl node0 = CLUSTER.node(0);
+ IgniteImpl node1 = CLUSTER.node(1);
+ IgniteImpl node2 = CLUSTER.node(2);
+
+ List<CatalogCompactionRunner> compactors = List.of(
+ node0.catalogCompactionRunner(),
+ node1.catalogCompactionRunner(),
+ node2.catalogCompactionRunner()
+ );
+
+ Collection<ClusterNode> topologyNodes = node0.clusterNodes();
+
+ InternalTransaction tx1 = (InternalTransaction)
node0.transactions().begin();
+ InternalTransaction tx2 = (InternalTransaction)
node1.transactions().begin();
+ InternalTransaction readonlyTx = (InternalTransaction)
node1.transactions().begin(new TransactionOptions().readOnly(true));
+ InternalTransaction tx3 = (InternalTransaction)
node2.transactions().begin();
+
+ compactors.forEach(compactor -> {
+ TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+ assertThat(timeHolder.minActiveTxBeginTime,
is(tx1.startTimestamp().longValue()));
+ });
+
+ tx1.rollback();
+
+ compactors.forEach(compactor -> {
+ TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+ assertThat(timeHolder.minActiveTxBeginTime,
is(tx2.startTimestamp().longValue()));
+ });
+
+ tx2.commit();
+
+ compactors.forEach(compactor -> {
+ TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+ assertThat(timeHolder.minActiveTxBeginTime,
is(tx3.startTimestamp().longValue()));
+ });
+
+ tx3.rollback();
+
+ // Since there are no active RW transactions in the cluster, the
minimum time will be min(now()) across all nodes.
+ compactors.forEach(compactor -> {
+ long minTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
+
+ TimeHolder timeHolder =
await(compactor.determineGlobalMinimumRequiredTime(topologyNodes, 0L));
+
+ long maxTime = Stream.of(node0, node1, node2).map(node ->
node.clockService().nowLong()).min(Long::compareTo).orElseThrow();
+
+ // Read-only transactions are not counted,
+ assertThat(timeHolder.minActiveTxBeginTime,
greaterThan(readonlyTx.startTimestamp().longValue()));
+
+ assertThat(timeHolder.minActiveTxBeginTime,
greaterThanOrEqualTo(minTime));
+ assertThat(timeHolder.minActiveTxBeginTime,
lessThanOrEqualTo(maxTime));
+ });
+
+ readonlyTx.rollback();
+ }
+
+ private static void ensureTimestampStoredInAllReplicas(HybridTimestamp
expTime, int expTablesCount) throws InterruptedException {
+ Int2IntMap tablesWithPartitions =
catalogManagerFacade().collectTablesWithPartitionsBetween(
Review Comment:
Generally it's not a good idea to test something that uses method X with
method X itself. I mean that
`propagateTimeToReplicas` uses `collectTablesWithPartitionsBetween`.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -666,6 +683,16 @@ private void
handleVacuumTxStatesCommand(VacuumTxStatesCommand cmd, long command
txStateStorage.removeAll(cmd.txIds(), commandIndex, commandTerm);
}
+ private void
handleUpdateMinimalActiveTxTimeCommand(UpdateMinimumActiveTxBeginTimeCommand
cmd, long commandIndex, long commandTerm) {
+ // Skips the write command because the storage has already executed it.
+ if (commandIndex <= storage.lastAppliedIndex()) {
+ return;
+ }
+
+ long minActiveTxStartTime0 = minActiveTxStartTime;
+ minActiveTxStartTime = Math.max(cmd.timestamp(),
minActiveTxStartTime0);
Review Comment:
Please add a javadoc explaining why you have `max` here.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
Review Comment:
It seems like an inefficient solution. Because of dozens of tables x
partitions replication groups we should consider using
- Message coalescing.
- Depending on whether you actually need compaction coordinator or it's
possible to trigger minStarTime on all nodes consider sending requests to self
only.
- Adding some randomization on sending requests between replication groups.
All the aforementioned issues, are a matter of optimization and not
correctness, though rather important ones, and thus may be covered separately.
However it's required to ensure that general catalog compaction ideas are
extensible enough to fit given optimizations.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumBeginTimeProvider.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the minimum begin time among all active RW transactions started
locally.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActiveTxMinimumBeginTimeProvider {
Review Comment:
I'd rather add Local in the class name, something like
`ActiveLocalTxMinimumBeginTimeProvider`.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
+ Iterator<Map.Entry<Integer, Integer>> tabItr,
+ long txBeginTime,
+ HybridTimestamp nowTs
+ ) {
+ if (!tabItr.hasNext()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+ int parts = tableWithPartsCount.getValue();
+
+ List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+ for (int p = 0; p < parts; p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+ CompletableFuture<Object> fut =
placementDriver.getAssignments(replicationGroupId, nowTs)
+ .thenCompose(tokenizedAssignments -> {
+ if (tokenizedAssignments == null) {
+
throwAssignmentsNotReadyException(replicationGroupId);
+ }
+
+ Assignment assignment =
tokenizedAssignments.nodes().iterator().next();
Review Comment:
Depending on whether we really need best-effort single compaction
coordinator I'd rather prefer primary collocated local calls or it it's not
reasonable add some assignment roundRobbin (or randomized)(1) selected with a
in-topology verification(2). [1] is both matter of optimization and
correctness. [2] is a matter of optimization.
Please check my other comments for more details.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
+ Iterator<Map.Entry<Integer, Integer>> tabItr,
+ long txBeginTime,
+ HybridTimestamp nowTs
+ ) {
+ if (!tabItr.hasNext()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+ int parts = tableWithPartsCount.getValue();
+
+ List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+ for (int p = 0; p < parts; p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+ CompletableFuture<Object> fut =
placementDriver.getAssignments(replicationGroupId, nowTs)
+ .thenCompose(tokenizedAssignments -> {
+ if (tokenizedAssignments == null) {
+
throwAssignmentsNotReadyException(replicationGroupId);
+ }
+
+ Assignment assignment =
tokenizedAssignments.nodes().iterator().next();
+
+ TablePartitionIdMessage partIdMessage =
ReplicaMessageUtils.toTablePartitionIdMessage(
+ REPLICA_MESSAGES_FACTORY,
+ replicationGroupId
+ );
+
+ UpdateMinimumActiveTxBeginTimeReplicaRequest msg =
REPLICATION_MESSAGES_FACTORY
+ .updateMinimumActiveTxBeginTimeReplicaRequest()
+ .groupId(partIdMessage)
+ .timestamp(txBeginTime)
+ .build();
+
+ return
replicaService.invoke(assignment.consistentId(), msg);
+ });
+
+ partitionFutures.add(fut);
+ }
+
+ return CompletableFutures.allOf(partitionFutures)
+ .thenCompose(ignore -> invokeOnReplicas(tabItr, txBeginTime,
nowTs));
Review Comment:
Maybe I'm missing it.
1. How often are you going to trigger minBeginTime move?
2. What if compactionCoordinatorNodeName has changed? What will prevent
further recursive calls?
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
+ Iterator<Map.Entry<Integer, Integer>> tabItr,
+ long txBeginTime,
+ HybridTimestamp nowTs
+ ) {
+ if (!tabItr.hasNext()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+ int parts = tableWithPartsCount.getValue();
+
+ List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+ for (int p = 0; p < parts; p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+ CompletableFuture<Object> fut =
placementDriver.getAssignments(replicationGroupId, nowTs)
+ .thenCompose(tokenizedAssignments -> {
+ if (tokenizedAssignments == null) {
+
throwAssignmentsNotReadyException(replicationGroupId);
+ }
+
+ Assignment assignment =
tokenizedAssignments.nodes().iterator().next();
+
+ TablePartitionIdMessage partIdMessage =
ReplicaMessageUtils.toTablePartitionIdMessage(
+ REPLICA_MESSAGES_FACTORY,
+ replicationGroupId
+ );
+
+ UpdateMinimumActiveTxBeginTimeReplicaRequest msg =
REPLICATION_MESSAGES_FACTORY
+ .updateMinimumActiveTxBeginTimeReplicaRequest()
+ .groupId(partIdMessage)
+ .timestamp(txBeginTime)
+ .build();
+
+ return
replicaService.invoke(assignment.consistentId(), msg);
+ });
+
+ partitionFutures.add(fut);
+ }
+
+ return CompletableFutures.allOf(partitionFutures)
Review Comment:
Please add a comment explaining why it's safe to silently ignore all types
of exceptions here and why recursive call is considered as a proper failover
(btw it's not).
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumBeginTimeProvider.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the minimum begin time among all active RW transactions started
locally.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActiveTxMinimumBeginTimeProvider {
+ /** Returns the minimum begin time among all active RW transactions
started locally. */
Review Comment:
... or null if there are no pending transactions, right?
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumBeginTimeProvider.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides the minimum begin time among all active RW transactions started
locally.
+ */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
+public interface ActiveTxMinimumBeginTimeProvider {
+ /** Returns the minimum begin time among all active RW transactions
started locally. */
+ @Nullable HybridTimestamp minimumBeginTime();
Review Comment:
BTW, curious what time are you going to use as global minimum in case of
zero pending transactions cluster wide?
##########
modules/catalog-compaction/src/integrationTest/java/org/apache/ignite/internal/catalog/compaction/ItCatalogCompactionTest.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction;
+
+import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntMap.Entry;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
+import
org.apache.ignite.internal.catalog.compaction.CatalogCompactionRunner.TimeHolder;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.raft.Peer;
+import org.apache.ignite.internal.raft.RaftNodeId;
+import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
+import
org.apache.ignite.internal.raft.server.impl.JraftServerImpl.DelegatingStateMachine;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.jraft.RaftGroupService;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests to verify catalog compaction.
+ */
+class ItCatalogCompactionTest extends ClusterPerClassIntegrationTest {
+ private static final int CLUSTER_SIZE = 3;
+
+ @Override
+ protected int initialNodes() {
+ return CLUSTER_SIZE;
+ }
+
+ @Test
+ void testRaftGroupsUpdate() throws InterruptedException {
+ IgniteImpl ignite = CLUSTER.aliveNode();
+ CatalogManagerImpl catalogManager = ((CatalogManagerImpl)
CLUSTER.aliveNode().catalogManager());
+
+ sql(format("create zone if not exists test with partitions=16,
replicas={}, storage_profiles='default'",
+ initialNodes()));
+ sql("alter zone test set default");
+
+ // Latest active catalog contains all required tables.
+ {
+ sql("create table a(a int primary key)");
+
+ Catalog minRequiredCatalog =
catalogManager.catalog(catalogManager.latestCatalogVersion());
+ assertNotNull(minRequiredCatalog);
+
+ sql("create table b(a int primary key)");
+
+ HybridTimestamp expectedTime =
HybridTimestamp.hybridTimestamp(minRequiredCatalog.time());
+
+ CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+ .propagateTimeToReplicas(expectedTime.longValue());
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ ensureTimestampStoredInAllReplicas(expectedTime, 2);
+ }
+
+ // Latest active catalog does not contain all required tables.
+ // Replicas of dropped tables must also be updated.
+ long requiredTime = CLUSTER.aliveNode().clockService().nowLong();
+
+ {
+ sql("drop table a");
+ sql("drop table b");
+
+ HybridTimestamp expectedTime =
HybridTimestamp.hybridTimestamp(requiredTime);
+
+ CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+ .propagateTimeToReplicas(expectedTime.longValue());
+
+ assertThat(fut, willCompleteSuccessfully());
+
+ ensureTimestampStoredInAllReplicas(expectedTime, 2);
+ }
+
+ // Update to lower timestamp should not succeed.
+ {
+ HybridTimestamp expectedTime =
HybridTimestamp.hybridTimestamp(requiredTime - 1);
+
+ CompletableFuture<Void> fut = ignite.catalogCompactionRunner()
+ .propagateTimeToReplicas(expectedTime.longValue());
+
+ assertThat(fut, willCompleteSuccessfully());
+
+
ensureTimestampStoredInAllReplicas(HybridTimestamp.hybridTimestamp(requiredTime),
2);
+ }
Review Comment:
1. Infinitely dead assignment case is not covered. I mean the one when
`Assignment assignment = tokenizedAssignments.nodes().iterator().next();` is
not present in topology and never will be.
2. Table adjustments while invokeOnReplicas recursive calls is not covered.
3. Failover cases are not covered. E.g. temporary assignment death.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -4136,6 +4139,23 @@ private CompletableFuture<?>
processVacuumTxStateReplicaRequest(VacuumTxStateRep
return raftClient.run(cmd);
}
+ private CompletableFuture<?>
processMinimumActiveTxTimeReplicaRequest(UpdateMinimumActiveTxBeginTimeReplicaRequest
request) {
+ Command cmd =
PARTITION_REPLICATION_MESSAGES_FACTORY.updateMinimumActiveTxBeginTimeCommand()
+ .timestamp(request.timestamp())
+ .build();
+
+ CompletableFuture<Object> resultFuture = new CompletableFuture<>();
+
+ // The timestamp must increase monotonically, otherwise it will have
to be
+ // stored on disk so that reordering does not occur after the node is
restarted.
+ applyCmdWithRetryOnSafeTimeReorderException(
+ cmd,
+ resultFuture
+ );
+
+ return resultFuture.thenApply(res -> null);
Review Comment:
`.thenApply(res -> null);` seems useless here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]