xtern commented on code in PR #4137:
URL: https://github.com/apache/ignite-3/pull/4137#discussion_r1708999187


##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(

Review Comment:
   Thanks, I believe that these optimizations will be done in 
https://issues.apache.org/jira/browse/IGNITE-22951



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            Iterator<Map.Entry<Integer, Integer>> tabItr,
+            long txBeginTime,
+            HybridTimestamp nowTs
+    ) {
+        if (!tabItr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+        int parts = tableWithPartsCount.getValue();
+
+        List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+            CompletableFuture<Object> fut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
+                    .thenCompose(tokenizedAssignments -> {
+                        if (tokenizedAssignments == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Assignment assignment = 
tokenizedAssignments.nodes().iterator().next();
+
+                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
+                                REPLICA_MESSAGES_FACTORY,
+                                replicationGroupId
+                        );
+
+                        UpdateMinimumActiveTxBeginTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
+                                .updateMinimumActiveTxBeginTimeReplicaRequest()
+                                .groupId(partIdMessage)
+                                .timestamp(txBeginTime)
+                                .build();
+
+                        return 
replicaService.invoke(assignment.consistentId(), msg);

Review Comment:
   Fixed, now sending message to node in logical topology (if local node exists 
in assignments - sending to local)



##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String> 
requiredNodes, Collection<L
         return 
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
     }
 
+    CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+        Map<Integer, Integer> tablesWithPartitions = 
catalogManagerFacade.collectTablesWithPartitionsBetween(
+                minimumRequiredTime,
+                clockService.nowLong()
+        );
+
+        return invokeOnReplicas(
+                tablesWithPartitions.entrySet().iterator(),
+                minimumRequiredTime,
+                clockService.now()
+        );
+    }
+
+    private CompletableFuture<Void> invokeOnReplicas(
+            Iterator<Map.Entry<Integer, Integer>> tabItr,
+            long txBeginTime,
+            HybridTimestamp nowTs
+    ) {
+        if (!tabItr.hasNext()) {
+            return CompletableFutures.nullCompletedFuture();
+        }
+
+        Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+        int parts = tableWithPartsCount.getValue();
+
+        List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+        for (int p = 0; p < parts; p++) {
+            TablePartitionId replicationGroupId = new 
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+            CompletableFuture<Object> fut = 
placementDriver.getAssignments(replicationGroupId, nowTs)
+                    .thenCompose(tokenizedAssignments -> {
+                        if (tokenizedAssignments == null) {
+                            
throwAssignmentsNotReadyException(replicationGroupId);
+                        }
+
+                        Assignment assignment = 
tokenizedAssignments.nodes().iterator().next();
+
+                        TablePartitionIdMessage partIdMessage = 
ReplicaMessageUtils.toTablePartitionIdMessage(
+                                REPLICA_MESSAGES_FACTORY,
+                                replicationGroupId
+                        );
+
+                        UpdateMinimumActiveTxBeginTimeReplicaRequest msg = 
REPLICATION_MESSAGES_FACTORY
+                                .updateMinimumActiveTxBeginTimeReplicaRequest()
+                                .groupId(partIdMessage)
+                                .timestamp(txBeginTime)
+                                .build();
+
+                        return 
replicaService.invoke(assignment.consistentId(), msg);
+                    });
+
+            partitionFutures.add(fut);
+        }
+
+        return CompletableFutures.allOf(partitionFutures)
+                .thenCompose(ignore -> invokeOnReplicas(tabItr, txBeginTime, 
nowTs));

Review Comment:
   > How often are you going to trigger minBeginTime move?
   
   I suppose that mostly when LWM changes (every 5 min?)
   
   > What if compactionCoordinatorNodeName has changed? What will prevent 
further recursive calls?
   
   Nothing. We decided that this is ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to