xtern commented on code in PR #4137:
URL: https://github.com/apache/ignite-3/pull/4137#discussion_r1708999187
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
Review Comment:
Thanks, I believe that these optimizations will be done in
https://issues.apache.org/jira/browse/IGNITE-22951
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
+ Iterator<Map.Entry<Integer, Integer>> tabItr,
+ long txBeginTime,
+ HybridTimestamp nowTs
+ ) {
+ if (!tabItr.hasNext()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+ int parts = tableWithPartsCount.getValue();
+
+ List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+ for (int p = 0; p < parts; p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+ CompletableFuture<Object> fut =
placementDriver.getAssignments(replicationGroupId, nowTs)
+ .thenCompose(tokenizedAssignments -> {
+ if (tokenizedAssignments == null) {
+
throwAssignmentsNotReadyException(replicationGroupId);
+ }
+
+ Assignment assignment =
tokenizedAssignments.nodes().iterator().next();
+
+ TablePartitionIdMessage partIdMessage =
ReplicaMessageUtils.toTablePartitionIdMessage(
+ REPLICA_MESSAGES_FACTORY,
+ replicationGroupId
+ );
+
+ UpdateMinimumActiveTxBeginTimeReplicaRequest msg =
REPLICATION_MESSAGES_FACTORY
+ .updateMinimumActiveTxBeginTimeReplicaRequest()
+ .groupId(partIdMessage)
+ .timestamp(txBeginTime)
+ .build();
+
+ return
replicaService.invoke(assignment.consistentId(), msg);
Review Comment:
Fixed, now sending message to node in logical topology (if local node exists
in assignments - sending to local)
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -376,10 +406,110 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
+ CompletableFuture<Void> propagateTimeToReplicas(long minimumRequiredTime) {
+ Map<Integer, Integer> tablesWithPartitions =
catalogManagerFacade.collectTablesWithPartitionsBetween(
+ minimumRequiredTime,
+ clockService.nowLong()
+ );
+
+ return invokeOnReplicas(
+ tablesWithPartitions.entrySet().iterator(),
+ minimumRequiredTime,
+ clockService.now()
+ );
+ }
+
+ private CompletableFuture<Void> invokeOnReplicas(
+ Iterator<Map.Entry<Integer, Integer>> tabItr,
+ long txBeginTime,
+ HybridTimestamp nowTs
+ ) {
+ if (!tabItr.hasNext()) {
+ return CompletableFutures.nullCompletedFuture();
+ }
+
+ Entry<Integer, Integer> tableWithPartsCount = tabItr.next();
+ int parts = tableWithPartsCount.getValue();
+
+ List<CompletableFuture<?>> partitionFutures = new ArrayList<>();
+
+ for (int p = 0; p < parts; p++) {
+ TablePartitionId replicationGroupId = new
TablePartitionId(tableWithPartsCount.getKey(), p);
+
+ CompletableFuture<Object> fut =
placementDriver.getAssignments(replicationGroupId, nowTs)
+ .thenCompose(tokenizedAssignments -> {
+ if (tokenizedAssignments == null) {
+
throwAssignmentsNotReadyException(replicationGroupId);
+ }
+
+ Assignment assignment =
tokenizedAssignments.nodes().iterator().next();
+
+ TablePartitionIdMessage partIdMessage =
ReplicaMessageUtils.toTablePartitionIdMessage(
+ REPLICA_MESSAGES_FACTORY,
+ replicationGroupId
+ );
+
+ UpdateMinimumActiveTxBeginTimeReplicaRequest msg =
REPLICATION_MESSAGES_FACTORY
+ .updateMinimumActiveTxBeginTimeReplicaRequest()
+ .groupId(partIdMessage)
+ .timestamp(txBeginTime)
+ .build();
+
+ return
replicaService.invoke(assignment.consistentId(), msg);
+ });
+
+ partitionFutures.add(fut);
+ }
+
+ return CompletableFutures.allOf(partitionFutures)
+ .thenCompose(ignore -> invokeOnReplicas(tabItr, txBeginTime,
nowTs));
Review Comment:
> How often are you going to trigger minBeginTime move?
I suppose that mostly when LWM changes (every 5 min?)
> What if compactionCoordinatorNodeName has changed? What will prevent
further recursive calls?
Nothing. We decided that this is ok.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]