sanpwc commented on code in PR #4240:
URL: https://github.com/apache/ignite-3/pull/4240#discussion_r1725014890


##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionMessageGroup.java:
##########
@@ -31,4 +31,8 @@ public class CatalogCompactionMessageGroup {
 
     /** See {@link CatalogCompactionMinimumTimesResponse} for the details. */
     public static final short MINIMUM_TIMES_RESPONSE = 1;
+
+    public static final short PREPARE_TO_UPDATE_TIME_ON_REPLICAS_REQUEST = 2;

Review Comment:
   I like the way how you've explained the message by adding a link to the 
class e.g. 
   `/** See {@link CatalogCompactionMinimumTimesResponse} for the details. */`. 
I'd rather add similar comments for newly added message codes.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -194,6 +200,31 @@ public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
                 return;
             }
 
+            if (message.messageType() == 
CatalogCompactionMessageGroup.PREPARE_TO_UPDATE_TIME_ON_REPLICAS_REQUEST) {
+                assert correlationId != null;
+
+                long txBeginTime = 
((CatalogCompactionPrepareUpdateTxBeginTimeRequest) message).timestamp();
+
+                propagateTimeToReplicasLocal(txBeginTime)
+                        .whenComplete((result, ex) -> {
+                                    
CatalogCompactionPrepareUpdateTxBeginTimeResponseBuilder responseBuilder =

Review Comment:
   Please check my comment in 
CatalogCompactionPrepareUpdateTxBeginTimeResponse. I believe that that should 
be simplified.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +441,95 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
-    CompletableFuture<Void> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> topologyNodes) {
+    CompletableFuture<Boolean> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> nodes) {
+        CatalogCompactionPrepareUpdateTxBeginTimeRequest request = 
COMPACTION_MESSAGES_FACTORY
+                .catalogCompactionPrepareUpdateTxBeginTimeRequest()
+                .timestamp(timestamp)
+                .build();
+
+        List<CompletableFuture<?>> responseFutures = new 
ArrayList<>(nodes.size());
+        AtomicBoolean successFlag = new AtomicBoolean(true);

Review Comment:
   The flag is used inside tests only, is it intended? If it's true, all that 
response.success(), successFlag is too much for checking whether all replicas 
got a new timestamp. Please also check the comment in 
CatalogCompactionPrepareUpdateTxBeginTimeResponse.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/message/CatalogCompactionPrepareUpdateTxBeginTimeResponse.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.compaction.message;
+
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.annotations.Transferable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of tx time propagation to replicas.
+ *
+ * @see CatalogCompactionPrepareUpdateTxBeginTimeRequest
+ */
+@Transferable(CatalogCompactionMessageGroup.PREPARE_TO_UPDATE_TIME_ON_REPLICAS_RESPONSE)
+public interface CatalogCompactionPrepareUpdateTxBeginTimeResponse extends 
NetworkMessage {

Review Comment:
   Curious why do you need the response?
   
   - Seems that boolean result from the propagateToReplicasFut is ignored.
   ```
                       CompletableFuture<Boolean> propagateToReplicasFut =
   
                       return CompletableFuture.allOf(
                               catalogCompactionFut,
                               propagateToReplicasFut
                       );
   ```
   
   - There's no big deal where you will log a business logic related warn 
message on the sender or the recipient side I guess. Meaning that you still 
will catch and log all the messagingService related exceptions on the sender 
side of course.
   - And, what's more important the general approach of infinite continues 
`CatalogCompactionPrepareUpdateTxBeginTimeRequest` propagation is robust enough 
on it's own. Basically it assumes that some iterations will fail e.g. because 
of the gap between physical and logical topology topology adjustments.
   - The only useful thing that I have in my mind is a back pressure, however 
it's not really the case because aforementioned requests are rarely sended 
ones. 
   
   Thus I believe you may use send (or even weakSend, personally I'd use send 
here), handle messagingService related exceptions on the server side, and 
business logic related exceptions on the recipient side. In that case you will 
not need a CatalogCompactionPrepareUpdateTxBeginTimeResponse.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -194,6 +200,31 @@ public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
                 return;
             }
 
+            if (message.messageType() == 
CatalogCompactionMessageGroup.PREPARE_TO_UPDATE_TIME_ON_REPLICAS_REQUEST) {

Review Comment:
   I'd refactor that a bit by adding at least a method for each particular 
message handler, like handlePrepareToUpdateTimeOnReplicasRequest(), 
handleMinimumTimesRequest(), etc.



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +441,95 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
-    CompletableFuture<Void> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> topologyNodes) {
+    CompletableFuture<Boolean> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> nodes) {
+        CatalogCompactionPrepareUpdateTxBeginTimeRequest request = 
COMPACTION_MESSAGES_FACTORY
+                .catalogCompactionPrepareUpdateTxBeginTimeRequest()
+                .timestamp(timestamp)
+                .build();
+
+        List<CompletableFuture<?>> responseFutures = new 
ArrayList<>(nodes.size());
+        AtomicBoolean successFlag = new AtomicBoolean(true);
+
+        for (ClusterNode node : nodes) {
+            CompletableFuture<?> fut =
+                    messagingService.invoke(node, request, ANSWER_TIMEOUT)
+                    .thenApply(message -> {
+                        CatalogCompactionPrepareUpdateTxBeginTimeResponse 
response =
+                                
(CatalogCompactionPrepareUpdateTxBeginTimeResponse) message;
+
+                        if (response.error() != null) {
+                            throw new IllegalStateException(format(
+                                    "Remote node responded with error 
[node={}, error={}]",
+                                    node.name(), response.error()));
+                        }
+
+                        if (!response.success()) {
+                            successFlag.set(false);
+                        }
+
+                        return null;
+                    });
+
+            responseFutures.add(fut);
+        }
+
+        return CompletableFutures.allOf(responseFutures).thenApply(ignore -> 
successFlag.get());
+    }
+
+    CompletableFuture<Boolean> propagateTimeToReplicasLocal(long txBeginTime) {
         HybridTimestamp nowTs = clockService.now();
 
         return schemaSyncService.waitForMetadataCompleteness(nowTs)
                 .thenComposeAsync(ignore -> {
-                    Map<Integer, Integer> tablesWithPartitions =
-                            
catalogManagerFacade.collectTablesWithPartitionsBetween(timestamp, 
nowTs.longValue());
+                    AtomicBoolean abortFlag = new AtomicBoolean();
+                    Int2IntMap tablesWithPartitions =
+                            
catalogManagerFacade.collectTablesWithPartitionsBetween(txBeginTime, 
nowTs.longValue());
 
-                    Set<String> topologyNodeNames = topologyNodes.stream()
-                            .map(ClusterNode::name)
-                            .collect(Collectors.toSet());
+                    ObjectIterator<Entry> itr = 
tablesWithPartitions.int2IntEntrySet().iterator();
 
-                    // TODO https://issues.apache.org/jira/browse/IGNITE-22951 
Minimize the number of network requests
-                    return 
CompletableFutures.allOf(tablesWithPartitions.entrySet().stream()
-                            .map(e -> invokeOnReplicas(e.getKey(), 
e.getValue(), timestamp, nowTs, topologyNodeNames))
-                            .collect(Collectors.toList())
-                    );
+                    return invokeOnLocalReplicas(txBeginTime, itr, abortFlag)
+                            .thenApply(v -> !abortFlag.get());
                 }, executor);
     }
 
-    private CompletableFuture<Void> invokeOnReplicas(
-            int tableId,
-            int partitions,
-            long txBeginTime,
-            HybridTimestamp nowTs,
-            Set<String> logicalTopologyNodes
-    ) {
-        List<TablePartitionId> replicationGroupIds = new 
ArrayList<>(partitions);
+    private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime, 
ObjectIterator<Entry> tabTtr, AtomicBoolean abortFlag) {
+        if (!tabTtr.hasNext() || abortFlag.get()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry tableWithPartitions = tabTtr.next();
+        int tableId = tableWithPartitions.getIntKey();
+        int partitions = tableWithPartitions.getIntValue();
+        List<CompletableFuture<?>> partFutures = new ArrayList<>(partitions);
+        HybridTimestamp nowTs = clockService.now();
 
         for (int p = 0; p < partitions; p++) {
-            replicationGroupIds.add(new TablePartitionId(tableId, p));
-        }
+            TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
p);
 
-        return placementDriver.getAssignments(replicationGroupIds, nowTs)
-                .thenComposeAsync(tokenizedAssignments -> {
-                    assert tokenizedAssignments.size() == 
replicationGroupIds.size();
+            CompletableFuture<?> fut = placementDriver
+                    .getPrimaryReplica(tablePartitionId, nowTs)
+                    .thenCompose(meta -> {
+                        if (abortFlag.get()) {
+                            return CompletableFutures.nullCompletedFuture();
+                        }
 
-                    List<CompletableFuture<?>> replicaInvokeFutures = new 
ArrayList<>(partitions);
+                        // If primary is not elected yet - we'll update 
replication groups on next iteration.
+                        if (meta == null || meta.getLeaseholder() == null) {
+                            LOG.info("Primary replica is not selected yet, 
aborting minimum required "
+                                    + "time propagation 
[tablePartitionId={}].", tablePartitionId);
 
-                    for (int p = 0; p < partitions; p++) {
-                        TablePartitionId replicationGroupId = 
replicationGroupIds.get(p);
-                        TokenizedAssignments tokenizedAssignment = 
tokenizedAssignments.get(p);
+                            abortFlag.set(true);

Review Comment:
   I don't think that it's a correct approach to abort whole procedure. Let's 
consider following scenario:
   №  iteration | available primary | timestamp
   1                     p1                            p1=[-1], p2=[-1] // we 
will not send the message because p2 is unavailable.
   2                    p2                            p1=[-1], p2=[-1] // p2 is 
available, however p1 isn't.
   3                    p1                            p1=[-1], p2=[-1] 
   4                    p2                           p1=[-1], p2=[-1] 
   
   However if we'll send the timestamp on each iteration to all available 
partitions, generally we'll make a progress in situation above
   
   №  iteration | available primary | timestamp
   1                     p1                            p1=[t1], p2=[-1] // min 
= -1;
   2                    p2                            p1=[t1], p2=[t2] // min 
is t1 - compaction is possible.
   3                    p1                            p1=[t3], p2=[t2] // min 
is t2
   4                    p2                           p1=[t3], p2=[t4] 



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -273,28 +304,36 @@ private CompletableFuture<Void> 
startCompaction(LogicalTopologySnapshot topology
                     CompletableFuture<Boolean> catalogCompactionFut;
 
                     if (catalog == null) {
-                        LOG.info("Catalog compaction skipped, nothing to 
compact (ts={})", minRequiredTime);
+                        LOG.info("Catalog compaction skipped, nothing to 
compact [timestamp={}].", minRequiredTime);
 
                         catalogCompactionFut = 
CompletableFutures.falseCompletedFuture();
                     } else {
                         catalogCompactionFut = tryCompactCatalog(catalog, 
topologySnapshot).whenComplete((res, ex) -> {
                             if (ex != null) {
-                                LOG.warn("Catalog compaction has failed 
(timestamp={})", ex, minRequiredTime);
+                                LOG.warn("Catalog compaction has failed 
[timestamp={}].", ex, minRequiredTime);
                             } else {
                                 if (res) {
-                                    LOG.info("Catalog compaction completed 
successfully (timestamp={})", minRequiredTime);
+                                    LOG.info("Catalog compaction completed 
successfully [timestamp={}].", minRequiredTime);
                                 } else {
-                                    LOG.info("Catalog compaction skipped 
(timestamp={})", minRequiredTime);
+                                    LOG.info("Catalog compaction skipped 
[timestamp={}].", minRequiredTime);
                                 }
                             }
                         });
                     }
 
-                    CompletableFuture<Void> propagateToReplicasFut =
+                    CompletableFuture<Boolean> propagateToReplicasFut =
                             propagateTimeToReplicas(minActiveTxBeginTime, 
topologySnapshot.nodes())
-                                    .whenComplete((res, ex) -> {
+                                    .whenComplete((success, ex) -> {
                                         if (ex != null) {
-                                            LOG.warn("Failed to propagate 
minimum active tx begin time to replicas", ex);
+                                            LOG.warn("Failed to propagate 
minimum required time to replicas.", ex);
+                                        }
+
+                                        if (!success) {
+                                            LOG.info("Propagation of minimum 
required time to replicas "

Review Comment:
   So, if it `!success` we will log `"completed successfully"`?



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +441,95 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
-    CompletableFuture<Void> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> topologyNodes) {
+    CompletableFuture<Boolean> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> nodes) {

Review Comment:
   Method name is a bit misleading. I mean that in reality you do spread 
information about global min tx start time across cluster nodes, not the 
replicas. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to