sanpwc commented on code in PR #4240:
URL: https://github.com/apache/ignite-3/pull/4240#discussion_r1725014890
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java:
##########
@@ -31,4 +31,8 @@ public class CatalogCompactionMessageGroup {
/** See {@link CatalogCompactionMinimumTimesResponse} for the details. */
public static final short MINIMUM_TIMES_RESPONSE = 1;
+
+ public static final short PREPARE_TO_UPDATE_TIME_ON_REPLICAS_REQUEST = 2;
Review Comment:
I like the way how you've explained the message by adding a link to the
class e.g.
`/** See {@link CatalogCompactionMinimumTimesResponse} for the details. */`.
I'd rather add similar comments for newly added message codes.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -194,6 +200,31 @@ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return;
}
+ if (message.messageType() ==
CatalogCompactionMessageGroup.PREPARE_TO_UPDATE_TIME_ON_REPLICAS_REQUEST) {
+ assert correlationId != null;
+
+ long txBeginTime =
((CatalogCompactionPrepareUpdateTxBeginTimeRequest) message).timestamp();
+
+ propagateTimeToReplicasLocal(txBeginTime)
+ .whenComplete((result, ex) -> {
+
CatalogCompactionPrepareUpdateTxBeginTimeResponseBuilder responseBuilder =
Review Comment:
Please check my comment in
CatalogCompactionPrepareUpdateTxBeginTimeResponse. I believe that that should
be simplified.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +441,95 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
- CompletableFuture<Void> propagateTimeToReplicas(long timestamp,
Collection<? extends ClusterNode> topologyNodes) {
+ CompletableFuture<Boolean> propagateTimeToReplicas(long timestamp,
Collection<? extends ClusterNode> nodes) {
+ CatalogCompactionPrepareUpdateTxBeginTimeRequest request =
COMPACTION_MESSAGES_FACTORY
+ .catalogCompactionPrepareUpdateTxBeginTimeRequest()
+ .timestamp(timestamp)
+ .build();
+
+ List<CompletableFuture<?>> responseFutures = new
ArrayList<>(nodes.size());
+ AtomicBoolean successFlag = new AtomicBoolean(true);
Review Comment:
The flag is used inside tests only, is it intended? If it's true, all that
response.success(), successFlag is too much for checking whether all replicas
got a new timestamp. Please also check the comment in
CatalogCompactionPrepareUpdateTxBeginTimeResponse.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionPrepareUpdateTxBeginTimeResponse.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction.message;
+
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of tx time propagation to replicas.
+ *
+ * @see CatalogCompactionPrepareUpdateTxBeginTimeRequest
+ */
+@Transferable(CatalogCompactionMessageGroup.PREPARE_TO_UPDATE_TIME_ON_REPLICAS_RESPONSE)
+public interface CatalogCompactionPrepareUpdateTxBeginTimeResponse extends
NetworkMessage {
Review Comment:
Curious why do you need the response?
- Seems that boolean result from the propagateToReplicasFut is ignored.
```
CompletableFuture<Boolean> propagateToReplicasFut =
return CompletableFuture.allOf(
catalogCompactionFut,
propagateToReplicasFut
);
```
- There's no big deal where you will log a business logic related warn
message on the sender or the recipient side I guess. Meaning that you still
will catch and log all the messagingService related exceptions on the sender
side of course.
- And, what's more important the general approach of infinite continues
`CatalogCompactionPrepareUpdateTxBeginTimeRequest` propagation is robust enough
on it's own. Basically it assumes that some iterations will fail e.g. because
of the gap between physical and logical topology topology adjustments.
- The only useful thing that I have in my mind is a back pressure, however
it's not really the case because aforementioned requests are rarely sended
ones.
Thus I believe you may use send (or even weakSend, personally I'd use send
here), handle messagingService related exceptions on the server side, and
business logic related exceptions on the recipient side. In that case you will
not need a CatalogCompactionPrepareUpdateTxBeginTimeResponse.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -194,6 +200,31 @@ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return;
}
+ if (message.messageType() ==
CatalogCompactionMessageGroup.PREPARE_TO_UPDATE_TIME_ON_REPLICAS_REQUEST) {
Review Comment:
I'd refactor that a bit by adding at least a method for each particular
message handler, like handlePrepareToUpdateTimeOnReplicasRequest(),
handleMinimumTimesRequest(), etc.
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +441,95 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
- CompletableFuture<Void> propagateTimeToReplicas(long timestamp,
Collection<? extends ClusterNode> topologyNodes) {
+ CompletableFuture<Boolean> propagateTimeToReplicas(long timestamp,
Collection<? extends ClusterNode> nodes) {
+ CatalogCompactionPrepareUpdateTxBeginTimeRequest request =
COMPACTION_MESSAGES_FACTORY
+ .catalogCompactionPrepareUpdateTxBeginTimeRequest()
+ .timestamp(timestamp)
+ .build();
+
+ List<CompletableFuture<?>> responseFutures = new
ArrayList<>(nodes.size());
+ AtomicBoolean successFlag = new AtomicBoolean(true);
+
+ for (ClusterNode node : nodes) {
+ CompletableFuture<?> fut =
+ messagingService.invoke(node, request, ANSWER_TIMEOUT)
+ .thenApply(message -> {
+ CatalogCompactionPrepareUpdateTxBeginTimeResponse
response =
+
(CatalogCompactionPrepareUpdateTxBeginTimeResponse) message;
+
+ if (response.error() != null) {
+ throw new IllegalStateException(format(
+ "Remote node responded with error
[node={}, error={}]",
+ node.name(), response.error()));
+ }
+
+ if (!response.success()) {
+ successFlag.set(false);
+ }
+
+ return null;
+ });
+
+ responseFutures.add(fut);
+ }
+
+ return CompletableFutures.allOf(responseFutures).thenApply(ignore ->
successFlag.get());
+ }
+
+ CompletableFuture<Boolean> propagateTimeToReplicasLocal(long txBeginTime) {
HybridTimestamp nowTs = clockService.now();
return schemaSyncService.waitForMetadataCompleteness(nowTs)
.thenComposeAsync(ignore -> {
- Map<Integer, Integer> tablesWithPartitions =
-
catalogManagerFacade.collectTablesWithPartitionsBetween(timestamp,
nowTs.longValue());
+ AtomicBoolean abortFlag = new AtomicBoolean();
+ Int2IntMap tablesWithPartitions =
+
catalogManagerFacade.collectTablesWithPartitionsBetween(txBeginTime,
nowTs.longValue());
- Set<String> topologyNodeNames = topologyNodes.stream()
- .map(ClusterNode::name)
- .collect(Collectors.toSet());
+ ObjectIterator<Entry> itr =
tablesWithPartitions.int2IntEntrySet().iterator();
- // TODO https://issues.apache.org/jira/browse/IGNITE-22951
Minimize the number of network requests
- return
CompletableFutures.allOf(tablesWithPartitions.entrySet().stream()
- .map(e -> invokeOnReplicas(e.getKey(),
e.getValue(), timestamp, nowTs, topologyNodeNames))
- .collect(Collectors.toList())
- );
+ return invokeOnLocalReplicas(txBeginTime, itr, abortFlag)
+ .thenApply(v -> !abortFlag.get());
}, executor);
}
- private CompletableFuture<Void> invokeOnReplicas(
- int tableId,
- int partitions,
- long txBeginTime,
- HybridTimestamp nowTs,
- Set<String> logicalTopologyNodes
- ) {
- List<TablePartitionId> replicationGroupIds = new
ArrayList<>(partitions);
+ private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime,
ObjectIterator<Entry> tabTtr, AtomicBoolean abortFlag) {
+ if (!tabTtr.hasNext() || abortFlag.get()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ Entry tableWithPartitions = tabTtr.next();
+ int tableId = tableWithPartitions.getIntKey();
+ int partitions = tableWithPartitions.getIntValue();
+ List<CompletableFuture<?>> partFutures = new ArrayList<>(partitions);
+ HybridTimestamp nowTs = clockService.now();
for (int p = 0; p < partitions; p++) {
- replicationGroupIds.add(new TablePartitionId(tableId, p));
- }
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
p);
- return placementDriver.getAssignments(replicationGroupIds, nowTs)
- .thenComposeAsync(tokenizedAssignments -> {
- assert tokenizedAssignments.size() ==
replicationGroupIds.size();
+ CompletableFuture<?> fut = placementDriver
+ .getPrimaryReplica(tablePartitionId, nowTs)
+ .thenCompose(meta -> {
+ if (abortFlag.get()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
- List<CompletableFuture<?>> replicaInvokeFutures = new
ArrayList<>(partitions);
+ // If primary is not elected yet - we'll update
replication groups on next iteration.
+ if (meta == null || meta.getLeaseholder() == null) {
+ LOG.info("Primary replica is not selected yet,
aborting minimum required "
+ + "time propagation
[tablePartitionId={}].", tablePartitionId);
- for (int p = 0; p < partitions; p++) {
- TablePartitionId replicationGroupId =
replicationGroupIds.get(p);
- TokenizedAssignments tokenizedAssignment =
tokenizedAssignments.get(p);
+ abortFlag.set(true);
Review Comment:
I don't think that it's a correct approach to abort whole procedure. Let's
consider following scenario:
№ iteration | available primary | timestamp
1 p1 p1=[-1], p2=[-1] // we
will not send the message because p2 is unavailable.
2 p2 p1=[-1], p2=[-1] // p2 is
available, however p1 isn't.
3 p1 p1=[-1], p2=[-1]
4 p2 p1=[-1], p2=[-1]
However if we'll send the timestamp on each iteration to all available
partitions, generally we'll make a progress in situation above
№ iteration | available primary | timestamp
1 p1 p1=[t1], p2=[-1] // min
= -1;
2 p2 p1=[t1], p2=[t2] // min
is t1 - compaction is possible.
3 p1 p1=[t3], p2=[t2] // min
is t2
4 p2 p1=[t3], p2=[t4]
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -273,28 +304,36 @@ private CompletableFuture<Void>
startCompaction(LogicalTopologySnapshot topology
CompletableFuture<Boolean> catalogCompactionFut;
if (catalog == null) {
- LOG.info("Catalog compaction skipped, nothing to
compact (ts={})", minRequiredTime);
+ LOG.info("Catalog compaction skipped, nothing to
compact [timestamp={}].", minRequiredTime);
catalogCompactionFut =
CompletableFutures.falseCompletedFuture();
} else {
catalogCompactionFut = tryCompactCatalog(catalog,
topologySnapshot).whenComplete((res, ex) -> {
if (ex != null) {
- LOG.warn("Catalog compaction has failed
(timestamp={})", ex, minRequiredTime);
+ LOG.warn("Catalog compaction has failed
[timestamp={}].", ex, minRequiredTime);
} else {
if (res) {
- LOG.info("Catalog compaction completed
successfully (timestamp={})", minRequiredTime);
+ LOG.info("Catalog compaction completed
successfully [timestamp={}].", minRequiredTime);
} else {
- LOG.info("Catalog compaction skipped
(timestamp={})", minRequiredTime);
+ LOG.info("Catalog compaction skipped
[timestamp={}].", minRequiredTime);
}
}
});
}
- CompletableFuture<Void> propagateToReplicasFut =
+ CompletableFuture<Boolean> propagateToReplicasFut =
propagateTimeToReplicas(minActiveTxBeginTime,
topologySnapshot.nodes())
- .whenComplete((res, ex) -> {
+ .whenComplete((success, ex) -> {
if (ex != null) {
- LOG.warn("Failed to propagate
minimum active tx begin time to replicas", ex);
+ LOG.warn("Failed to propagate
minimum required time to replicas.", ex);
+ }
+
+ if (!success) {
+ LOG.info("Propagation of minimum
required time to replicas "
Review Comment:
So, if it `!success` we will log `"completed successfully"`?
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +441,95 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
- CompletableFuture<Void> propagateTimeToReplicas(long timestamp,
Collection<? extends ClusterNode> topologyNodes) {
+ CompletableFuture<Boolean> propagateTimeToReplicas(long timestamp,
Collection<? extends ClusterNode> nodes) {
Review Comment:
Method name is a bit misleading. I mean that in reality you do spread
information about global min tx start time across cluster nodes, not the
replicas.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]