xtern commented on code in PR #4137:
URL: https://github.com/apache/ignite-3/pull/4137#discussion_r1709000167


##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            Iterator<Map.Entry<Integer, Integer>> tabItr,
+            long txBeginTime,
+            HybridTimestamp nowTs
+    ) {
+        if (!tabItr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+        int parts = tableWithPartsCount.getValue();
+
+        List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+            CompletableFuture<Object> fut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
+                    .thenCompose(tokenizedAssignments -> {
+                        if (tokenizedAssignments == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Assignment assignment = 
tokenizedAssignments.nodes().iterator().next();
+
+                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
+                                REPLICA_MESSAGES_FACTORY,
+                                replicationGroupId
+                        );
+
+                        UpdateMinimumActiveTxBeginTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
+                                .updateMinimumActiveTxBeginTimeReplicaRequest()
+                                .groupId(partIdMessage)
+                                .timestamp(txBeginTime)
+                                .build();
+
+                        return 
replicaService.invoke(assignment.consistentId(), msg);
+                    });
+
+            partitionFutures.add(fut);
+        }
+
+        return CompletableFutures.allOf(partitionFutures)

Review Comment:
   > why it's safe to silently ignore all types of exceptions
   
   We don't ignore exceptions here. Exception will be logged.
   
   ```
                       CompletableFuture<Void> propagateToReplicasFut = 
propagateTimeToReplicas(minActiveTxBeginTime)
                               .whenComplete((res, ex) -> {
                                   if (ex != null) {
                                       LOG.warn("Failed to propagate minimum 
active tx begin time to replicas", ex);
                                   }
                               });
   ```
   
   > why recursive call is considered as a proper failover
   
   We don't have any failover :thinking:  process will be aborted in case of 
any exception with exception logging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to