denis-chudov commented on code in PR #2095:
URL: https://github.com/apache/ignite-3/pull/2095#discussion_r1279105584
##########
modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java:
##########
@@ -465,23 +468,30 @@ public static <T> void assertValueInStorage(
*/
public static void assertDataNodesFromManager(
DistributionZoneManager distributionZoneManager,
+ Supplier<Long> causalityToken,
int zoneId,
@Nullable Set<LogicalNode> expectedValue,
long timeoutMillis
- ) throws InterruptedException {
+ ) throws InterruptedException, ExecutionException, TimeoutException {
Set<String> expectedValueNames =
expectedValue == null ? null :
expectedValue.stream().map(ClusterNode::name).collect(Collectors.toSet());
boolean success = waitForCondition(() -> {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19506 change
this to the causality versioned call to dataNodes.
- Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
+ Set<String> dataNodes = null;
+ try {
+ dataNodes =
distributionZoneManager.dataNodes(causalityToken.get(), zoneId).get(5,
TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // Ignore
+ }
return Objects.equals(dataNodes, expectedValueNames);
}, timeoutMillis);
// We do a second check simply to print a nice error message in case
the condition above is not achieved.
if (!success) {
- Set<String> dataNodes = distributionZoneManager.dataNodes(zoneId);
+ Set<String> dataNodes = null;
+
+ dataNodes =
distributionZoneManager.dataNodes(causalityToken.get(), zoneId).get(5,
TimeUnit.SECONDS);
Review Comment:
can be merged into one line
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -598,69 +598,69 @@ private CompletableFuture<?>
onTableCreate(ConfigurationNotificationEvent<TableV
CatalogTableDescriptor tableDescriptor =
toTableDescriptor(ctx.newValue());
CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor.zoneId());
- List<Set<Assignment>> assignments;
+ CompletableFuture<List<Set<Assignment>>> assignmentsFuture;
int tableId = tableDescriptor.id();
// Check if the table already has assignments in the vault.
// So, it means, that it is a recovery process and we should use
the vault assignments instead of calculation for the new ones.
if (partitionAssignments(vaultManager, tableId, 0) != null) {
- assignments = tableAssignments(vaultManager, tableId,
zoneDescriptor.partitions());
+ assignmentsFuture =
completedFuture(tableAssignments(vaultManager, tableId,
zoneDescriptor.partitions()));
} else {
- assignments = AffinityUtils.calculateAssignments(
- // TODO:
https://issues.apache.org/jira/browse/IGNITE-19425 use data nodes from
DistributionZoneManager instead.
-
baselineMgr.nodes().stream().map(ClusterNode::name).collect(toList()),
- zoneDescriptor.partitions(),
- zoneDescriptor.replicas()
- );
+ assignmentsFuture =
distributionZoneManager.dataNodes(ctx.storageRevision(),
tableDescriptor.zoneId())
+ .thenApply(dataNodes ->
AffinityUtils.calculateAssignments(
+ dataNodes,
+ zoneDescriptor.partitions(),
+ zoneDescriptor.replicas()
+ ));
}
- assert !assignments.isEmpty() : "Couldn't create the table with
empty assignments.";
-
CompletableFuture<?> createTableFut = createTableLocally(
ctx.storageRevision(),
tableDescriptor,
zoneDescriptor,
- assignments
+ assignmentsFuture
).whenComplete((v, e) -> {
if (e == null) {
for (var listener : assignmentsChangeListeners) {
listener.accept(this);
}
}
- });
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19506
Probably should be reworked so that
- // the future is returned along with createTableFut. Right now it
will break some tests.
- writeTableAssignmentsToMetastore(tableId, assignments);
+ }).thenCompose(ignored ->
writeTableAssignmentsToMetastore(tableId, assignmentsFuture));
return createTableFut;
} finally {
busyLock.leaveBusy();
}
}
- private CompletableFuture<Boolean> writeTableAssignmentsToMetastore(int
tableId, List<Set<Assignment>> assignments) {
- assert !assignments.isEmpty();
+ private CompletableFuture<Boolean> writeTableAssignmentsToMetastore(
+ int tableId,
+ CompletableFuture<List<Set<Assignment>>> assignmentsFuture
+ ) {
+ return assignmentsFuture.thenCompose(newAssignments -> {
+ assert !newAssignments.isEmpty();
- List<Operation> partitionAssignments = new
ArrayList<>(assignments.size());
+ List<Operation> partitionAssignments = new
ArrayList<>(newAssignments.size());
- for (int i = 0; i < assignments.size(); i++) {
- partitionAssignments.add(put(
- stablePartAssignmentsKey(
- new TablePartitionId(tableId, i)),
- ByteUtils.toBytes(assignments.get(i))));
- }
+ for (int i = 0; i < newAssignments.size(); i++) {
+ partitionAssignments.add(put(
+ stablePartAssignmentsKey(
+ new TablePartitionId(tableId, i)),
+ ByteUtils.toBytes(newAssignments.get(i))));
+ }
- Condition condition = Conditions.notExists(new
ByteArray(partitionAssignments.get(0).key()));
+ Condition condition = Conditions.notExists(new
ByteArray(partitionAssignments.get(0).key()));
- return metaStorageMgr
- .invoke(condition, partitionAssignments,
Collections.emptyList())
- .exceptionally(e -> {
- LOG.error("Couldn't write assignments to metastore", e);
+ return metaStorageMgr
+ .invoke(condition, partitionAssignments,
Collections.emptyList())
+ .exceptionally(e -> {
+ LOG.error("Couldn't write assignments to metastore",
e);
+
+ return null;
+ });
+ });
Review Comment:
pls remove empty line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]