korlov42 commented on code in PR #4240:
URL: https://github.com/apache/ignite-3/pull/4240#discussion_r1724483369


##########
modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java:
##########
@@ -545,4 +586,46 @@ private CatalogCompactionRunner createRunner(
 
         return runner;
     }
+
+    static class DummyTestPrimaryAffinity implements IntFunction<LogicalNode> {

Review Comment:
   nitpicking: usually it's either `Dummy-` or `Test-`, because both prefixes 
denotes this is not a production entity



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +441,94 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
-    CompletableFuture<Void> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> topologyNodes) {
+    CompletableFuture<Boolean> propagateTimeToReplicas(long timestamp, 
Collection<? extends ClusterNode> nodes) {
+        CatalogCompactionPrepareUpdateTxBeginTimeRequest request = 
COMPACTION_MESSAGES_FACTORY
+                .catalogCompactionPrepareUpdateTxBeginTimeRequest()
+                .timestamp(timestamp)
+                .build();
+
+        List<CompletableFuture<?>> responseFutures = new 
ArrayList<>(nodes.size());
+        AtomicBoolean successFlag = new AtomicBoolean(true);
+
+        for (ClusterNode node : nodes) {
+            CompletableFuture<?> fut =
+                    messagingService.invoke(node, request, ANSWER_TIMEOUT)
+                    .thenApply(message -> {
+                        CatalogCompactionPrepareUpdateTxBeginTimeResponse 
response =
+                                
(CatalogCompactionPrepareUpdateTxBeginTimeResponse) message;
+
+                        if (response.error() != null) {
+                            throw new IllegalStateException(format(
+                                    "Remote node responded with error 
[node={}, error={}]",
+                                    node.name(), response.error()));
+                        }
+
+                        if (!response.success()) {
+                            successFlag.set(false);
+                        }
+
+                        return null;
+                    });
+
+            responseFutures.add(fut);
+        }
+
+        return CompletableFutures.allOf(responseFutures).thenApply(ignore -> 
successFlag.get());
+    }
+
+    CompletableFuture<Boolean> propagateTimeToReplicasLocal(long txBeginTime) {
         HybridTimestamp nowTs = clockService.now();
 
         return schemaSyncService.waitForMetadataCompleteness(nowTs)
                 .thenComposeAsync(ignore -> {
-                    Map<Integer, Integer> tablesWithPartitions =
-                            
catalogManagerFacade.collectTablesWithPartitionsBetween(timestamp, 
nowTs.longValue());
+                    AtomicBoolean abortFlag = new AtomicBoolean();
+                    Int2IntMap tablesWithPartitions =
+                            
catalogManagerFacade.collectTablesWithPartitionsBetween(txBeginTime, 
nowTs.longValue());
 
-                    Set<String> topologyNodeNames = topologyNodes.stream()
-                            .map(ClusterNode::name)
-                            .collect(Collectors.toSet());
+                    ObjectIterator<Entry> itr = 
tablesWithPartitions.int2IntEntrySet().iterator();
 
-                    // TODO https://issues.apache.org/jira/browse/IGNITE-22951 
Minimize the number of network requests
-                    return 
CompletableFutures.allOf(tablesWithPartitions.entrySet().stream()
-                            .map(e -> invokeOnReplicas(e.getKey(), 
e.getValue(), timestamp, nowTs, topologyNodeNames))
-                            .collect(Collectors.toList())
-                    );
+                    return invokeOnLocalReplicas(txBeginTime, itr, abortFlag)
+                            .thenApply(v -> !abortFlag.get());
                 }, executor);
     }
 
-    private CompletableFuture<Void> invokeOnReplicas(
-            int tableId,
-            int partitions,
-            long txBeginTime,
-            HybridTimestamp nowTs,
-            Set<String> logicalTopologyNodes
-    ) {
-        List<TablePartitionId> replicationGroupIds = new 
ArrayList<>(partitions);
+    private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime, 
ObjectIterator<Entry> tabTtr, AtomicBoolean abortFlag) {
+        if (!tabTtr.hasNext() || abortFlag.get()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry tableWithPartitions = tabTtr.next();
+        int tableId = tableWithPartitions.getIntKey();
+        int partitions = tableWithPartitions.getIntValue();
+        List<CompletableFuture<?>> partFutures = new ArrayList<>(partitions);
 
         for (int p = 0; p < partitions; p++) {
-            replicationGroupIds.add(new TablePartitionId(tableId, p));
-        }
+            TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
p);
 
-        return placementDriver.getAssignments(replicationGroupIds, nowTs)
-                .thenComposeAsync(tokenizedAssignments -> {
-                    assert tokenizedAssignments.size() == 
replicationGroupIds.size();
+            CompletableFuture<?> fut = placementDriver
+                    .getPrimaryReplica(tablePartitionId, clockService.now())

Review Comment:
   does it make sense to acquire time once and use it for every partition?
   



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -290,12 +321,20 @@ private CompletableFuture<Void> 
startCompaction(LogicalTopologySnapshot topology
                         });
                     }
 
-                    CompletableFuture<Void> propagateToReplicasFut =
+                    CompletableFuture<Boolean> propagateToReplicasFut =
                             propagateTimeToReplicas(minActiveTxBeginTime, 
topologySnapshot.nodes())
-                                    .whenComplete((res, ex) -> {
+                                    .whenComplete((success, ex) -> {
                                         if (ex != null) {
                                             LOG.warn("Failed to propagate 
minimum active tx begin time to replicas", ex);
                                         }
+
+                                        if (!success) {
+                                            LOG.info("Propagation of minimum 
required time to replicas "

Review Comment:
   Parameters must be in square brackets, sentence must end with a period `.`. 
Please check this everywhere within this patch.
   
   At line 328 log refers to timestamp as `minimum active tx begin time`, but 
in newly introduced lines the same value called `minimum required time`. Let's 
stick to either of this, but we have to pick one (personally, I prefer the 
latter). Also, it's better to check other places too



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -194,6 +200,31 @@ public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
                 return;
             }
 
+            if (message.messageType() == 
CatalogCompactionMessageGroup.PREPARE_TO_UPDATE_TIME_ON_REPLICAS_REQUEST) {
+                assert correlationId != null;
+
+                long txBeginTime = 
((CatalogCompactionPrepareUpdateTxBeginTimeRequest) message).timestamp();
+
+                propagateTimeToReplicasLocal(txBeginTime)
+                        .whenComplete((result, ex) -> {
+                                    
CatalogCompactionPrepareUpdateTxBeginTimeResponseBuilder responseBuilder =
+                                            
COMPACTION_MESSAGES_FACTORY.catalogCompactionPrepareUpdateTxBeginTimeResponse();
+
+                                    if (ex != null) {
+                                        responseBuilder.error(ex.toString());
+
+                                        LOG.warn("Failed to update minimum 
active tx begin time on replicas", ex);

Review Comment:
   ```suggestion
                                           LOG.warn("Failed to update minimum 
active tx begin time on replicas.", ex);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to