korlov42 commented on code in PR #4137: URL: https://github.com/apache/ignite-3/pull/4137#discussion_r1698158496
########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumStartTimeProvider.java: ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.jetbrains.annotations.Nullable; + +/** + * Provides the minimum starting time among all locally started active RW transactions. Review Comment: ```suggestion * Provides the minimum begin time among all active RW transactions started locally. ``` ########## modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java: ########## @@ -342,17 +342,26 @@ public CompletableFuture<Integer> execute(List<CatalogCommand> commands) { return saveUpdateAndWaitForActivation(new BulkUpdateProducer(List.copyOf(commands))); } + /** + * Trim all catalog versions up to the given catalog (exclusively). + * + * @param catalog Earliest observable catalog. + * @return Operation future, which is completing with {@code true} if a new snapshot has been successfully written, {@code false} + * otherwise if a snapshot with the same or greater version already exists. + */ + public CompletableFuture<Boolean> compactCatalog(Catalog catalog) { Review Comment: it's better not to accept catalog object from outside, otherwise we may end up in broken cluster. Let's pass new minimal catalog version here ########## modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java: ########## @@ -198,115 +225,127 @@ public CompletableFuture<Boolean> onLowWatermarkChanged(HybridTimestamp newLowWa } @TestOnly - CompletableFuture<Boolean> lastRunFuture() { + synchronized CompletableFuture<Void> lastRunFuture() { return lastRunFuture; } /** Starts the catalog compaction routine. */ - CompletableFuture<Boolean> triggerCompaction(@Nullable HybridTimestamp lwm) { + void triggerCompaction(@Nullable HybridTimestamp lwm) { if (lwm == null || !localNodeName.equals(compactionCoordinatorNodeName)) { - return CompletableFutures.falseCompletedFuture(); + return; } - return inBusyLock(busyLock, () -> { - CompletableFuture<Boolean> fut = lastRunFuture; + inBusyLock(busyLock, () -> { + synchronized (this) { + CompletableFuture<Void> fut = lastRunFuture; - if (!fut.isDone()) { - LOG.info("Catalog compaction is already in progress, skipping (timestamp={})", lwm.longValue()); + if (!fut.isDone()) { + LOG.info("Catalog compaction is already in progress, skipping (timestamp={})", lwm.longValue()); Review Comment: looks like `return` statement is missing ########## modules/transactions/src/main/java/org/apache/ignite/internal/tx/ActiveTxMinimumStartTimeProvider.java: ########## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.jetbrains.annotations.Nullable; + +/** + * Provides the minimum starting time among all locally started active RW transactions. Review Comment: the same for javadoc of the method ########## modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogManagerCompactionHelper.java: ########## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.catalog.compaction; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.Catalog; +import org.apache.ignite.internal.catalog.CatalogManagerImpl; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; +import org.jetbrains.annotations.Nullable; + +/** + * Class contains utility methods for interacting with the catalog manager. + * These methods are only needed for catalog compaction routine. + */ +class CatalogManagerCompactionHelper { Review Comment: nitpicking: it's, probably, better to use `Facade` suffix rather than `Helper`. Helper is usually utility class/set of utility methods ########## modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java: ########## @@ -198,115 +225,127 @@ public CompletableFuture<Boolean> onLowWatermarkChanged(HybridTimestamp newLowWa } @TestOnly - CompletableFuture<Boolean> lastRunFuture() { + synchronized CompletableFuture<Void> lastRunFuture() { return lastRunFuture; } /** Starts the catalog compaction routine. */ - CompletableFuture<Boolean> triggerCompaction(@Nullable HybridTimestamp lwm) { + void triggerCompaction(@Nullable HybridTimestamp lwm) { if (lwm == null || !localNodeName.equals(compactionCoordinatorNodeName)) { - return CompletableFutures.falseCompletedFuture(); + return; } - return inBusyLock(busyLock, () -> { - CompletableFuture<Boolean> fut = lastRunFuture; + inBusyLock(busyLock, () -> { + synchronized (this) { + CompletableFuture<Void> fut = lastRunFuture; - if (!fut.isDone()) { - LOG.info("Catalog compaction is already in progress, skipping (timestamp={})", lwm.longValue()); + if (!fut.isDone()) { + LOG.info("Catalog compaction is already in progress, skipping (timestamp={})", lwm.longValue()); + } - return CompletableFutures.falseCompletedFuture(); + lastRunFuture = startCompaction(logicalTopologyService.localLogicalTopology()); } - - fut = startCompaction(logicalTopologyService.localLogicalTopology()) - .whenComplete((res, ex) -> { - if (ex != null) { - LOG.warn("Catalog compaction has failed (timestamp={})", ex, lwm.longValue()); - } else if (LOG.isDebugEnabled()) { - if (res) { - LOG.debug("Catalog compaction completed successfully (timestamp={})", lwm.longValue()); - } else { - LOG.debug("Catalog compaction skipped (timestamp={})", lwm.longValue()); - } - } - }); - - lastRunFuture = fut; - - return fut; }); } - private CompletableFuture<Boolean> startCompaction(LogicalTopologySnapshot topologySnapshot) { + private CompletableFuture<Void> startCompaction(LogicalTopologySnapshot topologySnapshot) { long localMinimum = localMinTimeProvider.time(); - if (catalogByTsNullable(localMinimum) == null) { - return CompletableFutures.falseCompletedFuture(); + if (catalogManagerHelper.catalogByTsNullable(localMinimum) == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Catalog compaction skipped, nothing to compact (ts={})", localMinimum); + } + + return CompletableFutures.nullCompletedFuture(); } return determineGlobalMinimumRequiredTime(topologySnapshot.nodes(), localMinimum) - .thenComposeAsync(ts -> { - Catalog catalog = catalogByTsNullable(ts); + .thenComposeAsync(timeHolder -> { + long minRequiredTime = timeHolder.minRequiredTime; + HybridTimestamp minActiveTxStartTime = timeHolder.minActiveTxStartTime; + Catalog catalog = catalogManagerHelper.catalogByTsNullable(minRequiredTime); + + CompletableFuture<Boolean> catalogCompactionFut; if (catalog == null) { - return CompletableFutures.falseCompletedFuture(); - } + if (LOG.isDebugEnabled()) { + LOG.debug("Catalog compaction skipped, nothing to compact (ts={})", minRequiredTime); + } - return requiredNodes(catalog) - .thenCompose(requiredNodes -> { - List<String> missingNodes = missingNodes(requiredNodes, topologySnapshot.nodes()); + catalogCompactionFut = CompletableFutures.falseCompletedFuture(); + } else { + catalogCompactionFut = tryCompactCatalog(catalog, topologySnapshot).whenComplete((res, ex) -> { + if (ex != null) { + LOG.warn("Catalog compaction has failed (timestamp={})", ex, minRequiredTime); + } else { + if (res && LOG.isDebugEnabled()) { + LOG.debug("Catalog compaction completed successfully (timestamp={})", minRequiredTime); + } else { + LOG.debug("Catalog compaction skipped (timestamp={})", minRequiredTime); + } + } + }); + } - if (!missingNodes.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Catalog compaction aborted due to missing cluster members (nodes={})", missingNodes); - } + if (minActiveTxStartTime == null) { Review Comment: I think, we always need to propagate some time. Given that initially partition time is set to min possible clock value, we still must be able to do compaction, even if there is no transactional load. The safest possible option here is, if current node doesn't have active transactions, send current value of low watermark acquired just _before_ collecting time of active transactions ########## modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java: ########## @@ -376,10 +414,101 @@ private static List<String> missingNodes(Set<String> requiredNodes, Collection<L return requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList()); } + CompletableFuture<Void> propagateTimeToReplicas(HybridTimestamp minimumRequiredTime) { + Map<Integer, Integer> tablesWithPartitions = catalogManagerHelper.collectTablesWithPartitionsBetween( + minimumRequiredTime.longValue(), + clockService.nowLong() + ); + + return invokeOnReplicas( + tablesWithPartitions.entrySet().iterator(), + minimumRequiredTime.longValue(), + clockService.now() + ); + } + + private CompletableFuture<Void> invokeOnReplicas( + Iterator<Map.Entry<Integer, Integer>> tabItr, + long txBeginTime, + HybridTimestamp nowTs + ) { + if (!tabItr.hasNext()) { + return CompletableFutures.nullCompletedFuture(); + } + + Entry<Integer, Integer> tableWithPartsCount = tabItr.next(); + int parts = tableWithPartsCount.getValue(); + + List<CompletableFuture<?>> partitionFutures = new ArrayList<>(); + + for (int p = 0; p < parts; p++) { + TablePartitionId replicationGroupId = new TablePartitionId(tableWithPartsCount.getKey(), p); + + CompletableFuture<Object> fut = placementDriver.getAssignments(replicationGroupId, nowTs) + .thenCompose(tokenizedAssignments -> { + if (tokenizedAssignments == null) { + throwAssignmentsNotReadyException(replicationGroupId); + } + + Assignment assignment = tokenizedAssignments.nodes().iterator().next(); + + TablePartitionIdMessage partIdMessage = ReplicaMessageUtils.toTablePartitionIdMessage( + REPLICA_MESSAGES_FACTORY, + replicationGroupId + ); + + UpdateMinimumActiveTxStartTimeReplicaRequest msg = REPLICATION_MESSAGES_FACTORY + .updateMinimumActiveTxStartTimeReplicaRequest() + .groupId(partIdMessage) + .timestamp(txBeginTime) + .build(); + + return replicaService.invoke(assignment.consistentId(), msg); + }); + + partitionFutures.add(fut); + } + + return CompletableFutures.allOf(partitionFutures) + .thenCompose(ignore -> invokeOnReplicas(tabItr, txBeginTime, nowTs)); + } + + private CompletableFuture<Boolean> tryCompactCatalog(Catalog catalog, LogicalTopologySnapshot topologySnapshot) { + return requiredNodes(catalog) + .thenCompose(requiredNodes -> { + List<String> missingNodes = missingNodes(requiredNodes, topologySnapshot.nodes()); + + if (!missingNodes.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Catalog compaction aborted due to missing cluster members (nodes={})", missingNodes); + } Review Comment: first, you don't need to wrap such methods in `if` statement. Second, compaction seems to be really infrequent process, does it make sense to log main stages of compaction with INFO severity? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
