sanpwc commented on code in PR #4240:
URL: https://github.com/apache/ignite-3/pull/4240#discussion_r1727148569
##########
modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java:
##########
@@ -401,74 +433,35 @@ private static List<String> missingNodes(Set<String>
requiredNodes, Collection<L
return
requiredNodes.stream().filter(not(logicalNodeIds::contains)).collect(Collectors.toList());
}
- CompletableFuture<Void> propagateTimeToReplicas(long timestamp,
Collection<? extends ClusterNode> topologyNodes) {
- HybridTimestamp nowTs = clockService.now();
-
- return schemaSyncService.waitForMetadataCompleteness(nowTs)
- .thenComposeAsync(ignore -> {
- Map<Integer, Integer> tablesWithPartitions =
-
catalogManagerFacade.collectTablesWithPartitionsBetween(timestamp,
nowTs.longValue());
-
- Set<String> topologyNodeNames = topologyNodes.stream()
- .map(ClusterNode::name)
- .collect(Collectors.toSet());
-
- // TODO https://issues.apache.org/jira/browse/IGNITE-22951
Minimize the number of network requests
- return
CompletableFutures.allOf(tablesWithPartitions.entrySet().stream()
- .map(e -> invokeOnReplicas(e.getKey(),
e.getValue(), timestamp, nowTs, topologyNodeNames))
- .collect(Collectors.toList())
- );
- }, executor);
- }
-
- private CompletableFuture<Void> invokeOnReplicas(
- int tableId,
- int partitions,
- long txBeginTime,
- HybridTimestamp nowTs,
- Set<String> logicalTopologyNodes
- ) {
- List<TablePartitionId> replicationGroupIds = new
ArrayList<>(partitions);
-
- for (int p = 0; p < partitions; p++) {
- replicationGroupIds.add(new TablePartitionId(tableId, p));
+ private CompletableFuture<Void> invokeOnLocalReplicas(long txBeginTime,
ObjectIterator<Entry> tabTtr) {
+ if (!tabTtr.hasNext()) {
+ return CompletableFutures.nullCompletedFuture();
}
- return placementDriver.getAssignments(replicationGroupIds, nowTs)
- .thenComposeAsync(tokenizedAssignments -> {
- assert tokenizedAssignments.size() ==
replicationGroupIds.size();
-
- List<CompletableFuture<?>> replicaInvokeFutures = new
ArrayList<>(partitions);
-
- for (int p = 0; p < partitions; p++) {
- TablePartitionId replicationGroupId =
replicationGroupIds.get(p);
- TokenizedAssignments tokenizedAssignment =
tokenizedAssignments.get(p);
+ Entry tableWithPartitions = tabTtr.next();
+ int tableId = tableWithPartitions.getIntKey();
+ int partitions = tableWithPartitions.getIntValue();
+ List<CompletableFuture<?>> partFutures = new ArrayList<>(partitions);
+ HybridTimestamp nowTs = clockService.now();
- if (tokenizedAssignment == null) {
-
throwAssignmentsNotReadyException(replicationGroupId);
+ for (int p = 0; p < partitions; p++) {
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
p);
+
+ CompletableFuture<?> fut = placementDriver
+ .getPrimaryReplica(tablePartitionId, nowTs)
+ .thenCompose(meta -> {
+ // If primary is not elected yet - we'll update
replication groups on next iteration.
+ if (meta == null || meta.getLeaseholder() == null) {
+ return CompletableFutures.nullCompletedFuture();
}
- Set<String> assignments =
tokenizedAssignment.nodes().stream()
-
.map(Assignment::consistentId).collect(Collectors.toSet());
-
- String targetNodeName;
-
- if (assignments.contains(localNodeName)) {
- targetNodeName = localNodeName;
- } else {
- targetNodeName = assignments.stream()
- .filter(logicalTopologyNodes::contains)
- .findAny()
- .orElseThrow(() -> new
IllegalStateException("Current topology doesn't include assignment nodes "
- + "(assignments=" +
tokenizedAssignment.nodes()
- + ", topology=" +
logicalTopologyNodes
- + ", replication group=" +
replicationGroupId + ").")
- );
+ if (!localNodeName.equals(meta.getLeaseholder())) {
Review Comment:
Please check nodeId for equality instead of a node name here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]