sanpwc commented on code in PR #4138:
URL: https://github.com/apache/ignite-3/pull/4138#discussion_r1698840074
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -324,15 +324,6 @@ public static CompletableFuture<?>[]
triggerAllTablePartitionsRebalance(
/** Key prefix for switch append assignments. */
public static final String ASSIGNMENTS_SWITCH_APPEND_PREFIX =
"assignments.switch.append.";
Review Comment:
Curious about why we do use APPEND_PREFIX only in RebalanceUtil itself,
switchAppendKey with package-private scope but do register watch fo
REDUCE_PREFIX in TableManager. That's suspicious. Not related to your PR though.
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java:
##########
@@ -407,31 +310,22 @@ static void doStableKeySwitch(
pendingPartAssignmentsKey,
stablePartAssignmentsKey,
switchReduceKey,
- switchAppendKey,
- stableChangeTriggerKey
+ switchAppendKey
)
).get();
- // TODO: IGNITE-22661 Potentially unsafe to use the latest catalog
version, as the tables might not already present
- // in the catalog. Better to take the version from Assignments.
- int catalogVersion = catalogService.latestCatalogVersion();
-
- Set<Assignment> calculatedAssignments =
- calculateAssignments(tablePartitionId, catalogService,
distributionZoneManager, catalogVersion).get();
+ Set<Assignment> calculatedAssignments =
calculateAssignmentsFn.apply(tablePartitionId).get();
Review Comment:
You still have calculateAssignments method in
DistributionZoneRebalanceEngine that has zero usages.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1180,8 +1168,8 @@ private RaftGroupEventsListener
createRaftGroupEventsListener(int zoneId, TableP
replicaGrpId,
busyLock,
partitionMover,
- rebalanceScheduler,
- zoneId
Review Comment:
Method parameter zoneId is no longer used.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1225,6 +1213,28 @@ private PartitionReplicaListener createReplicaListener(
);
}
+ private CompletableFuture<Set<Assignment>>
calculateAssignments(TablePartitionId tablePartitionId) {
+ // TODO: IGNITE-22661 Potentially unsafe to use the latest catalog
version, as the tables might not already present
Review Comment:
Nice, that you've moved todo here. Initially I though that you've just
removed it entirely.
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java:
##########
@@ -158,98 +148,6 @@ void assignmentsChangingOnNodeLeaveNodeJoin() throws
Exception {
);
}
- /**
- * Tests functionality of tables counters for partition rebalances per
zone. Every partition from zone has tables counter when pending
- * has changed, and we decrement it every time raft configuration for
table's partition is rebalanced, and after this counter equals to
- * zero rewrite stable.
- *
- * @throws Exception If failed
- */
- @Test
- void testRebalanceTablesCounterForZone() throws Exception {
- cluster.startAndInit(3);
-
- String zoneName = "ZONE";
-
- createZone(zoneName, 3, 3);
-
- Set<Integer> tableIds = new HashSet<>();
-
- tableIds.add(createTestTable("TEST1", zoneName));
- tableIds.add(createTestTable("TEST2", zoneName));
- tableIds.add(createTestTable("TEST3", zoneName));
-
- Set<String> allNodes =
cluster.runningNodes().map(IgniteImpl::name).collect(Collectors.toSet());
-
- for (Integer tableId : tableIds) {
- waitForStableAssignmentsInMetastore(allNodes, tableId);
- }
-
- AtomicBoolean dropMessages = new AtomicBoolean(true);
-
- cluster.runningNodes().forEach(
- n -> n.dropMessages((nodeName, msg) -> msg instanceof
ChangePeersAsyncRequest && dropMessages.get())
- );
-
- alterZone(zoneName, 2);
-
- CatalogManager catalogManager = cluster.aliveNode().catalogManager();
-
- int zoneId =
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
-
- waitForTablesCounterInMetastore(3, zoneId, 0);
- waitForTablesCounterInMetastore(3, zoneId, 1);
- waitForTablesCounterInMetastore(3, zoneId, 2);
-
- dropMessages.set(false);
-
- for (Integer tableId : tableIds) {
- waitForStableAssignmentsInMetastore(2, tableId);
- }
-
- waitForTablesCounterInMetastore(0, zoneId, 0);
- waitForTablesCounterInMetastore(0, zoneId, 1);
- waitForTablesCounterInMetastore(0, zoneId, 2);
- }
-
- @Test
- void testRebalanceTablesCounterForZonePrevCatalogVersion() throws
Exception {
- cluster.startAndInit(3);
-
- String zoneName = "ZONE";
-
- createZone(zoneName, 3, 3);
-
- Set<Integer> tableIds = new HashSet<>();
-
- tableIds.add(createTestTable("TEST1", zoneName));
-
- Set<String> allNodes =
cluster.runningNodes().map(IgniteImpl::name).collect(Collectors.toSet());
-
- for (Integer tableId : tableIds) {
- waitForStableAssignmentsInMetastore(allNodes, tableId);
- }
-
- // Block low watermark change with an open ro tx.
- cluster.aliveNode().transactions().begin(new
TransactionOptions().readOnly(true));
-
- alterZone(zoneName, 2);
-
- dropTestTable("TEST1");
Review Comment:
Same for dropTestTable. Zero usages.
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java:
##########
@@ -158,98 +148,6 @@ void assignmentsChangingOnNodeLeaveNodeJoin() throws
Exception {
);
}
- /**
- * Tests functionality of tables counters for partition rebalances per
zone. Every partition from zone has tables counter when pending
- * has changed, and we decrement it every time raft configuration for
table's partition is rebalanced, and after this counter equals to
- * zero rewrite stable.
- *
- * @throws Exception If failed
- */
- @Test
- void testRebalanceTablesCounterForZone() throws Exception {
- cluster.startAndInit(3);
-
- String zoneName = "ZONE";
-
- createZone(zoneName, 3, 3);
-
- Set<Integer> tableIds = new HashSet<>();
-
- tableIds.add(createTestTable("TEST1", zoneName));
- tableIds.add(createTestTable("TEST2", zoneName));
- tableIds.add(createTestTable("TEST3", zoneName));
-
- Set<String> allNodes =
cluster.runningNodes().map(IgniteImpl::name).collect(Collectors.toSet());
-
- for (Integer tableId : tableIds) {
- waitForStableAssignmentsInMetastore(allNodes, tableId);
- }
-
- AtomicBoolean dropMessages = new AtomicBoolean(true);
-
- cluster.runningNodes().forEach(
- n -> n.dropMessages((nodeName, msg) -> msg instanceof
ChangePeersAsyncRequest && dropMessages.get())
- );
-
- alterZone(zoneName, 2);
-
- CatalogManager catalogManager = cluster.aliveNode().catalogManager();
-
- int zoneId =
catalogManager.catalog(catalogManager.latestCatalogVersion()).zone(zoneName).id();
-
- waitForTablesCounterInMetastore(3, zoneId, 0);
- waitForTablesCounterInMetastore(3, zoneId, 1);
- waitForTablesCounterInMetastore(3, zoneId, 2);
-
- dropMessages.set(false);
-
- for (Integer tableId : tableIds) {
- waitForStableAssignmentsInMetastore(2, tableId);
- }
-
- waitForTablesCounterInMetastore(0, zoneId, 0);
- waitForTablesCounterInMetastore(0, zoneId, 1);
- waitForTablesCounterInMetastore(0, zoneId, 2);
- }
-
- @Test
- void testRebalanceTablesCounterForZonePrevCatalogVersion() throws
Exception {
- cluster.startAndInit(3);
-
- String zoneName = "ZONE";
-
- createZone(zoneName, 3, 3);
-
- Set<Integer> tableIds = new HashSet<>();
-
- tableIds.add(createTestTable("TEST1", zoneName));
-
- Set<String> allNodes =
cluster.runningNodes().map(IgniteImpl::name).collect(Collectors.toSet());
-
- for (Integer tableId : tableIds) {
- waitForStableAssignmentsInMetastore(allNodes, tableId);
- }
-
- // Block low watermark change with an open ro tx.
- cluster.aliveNode().transactions().begin(new
TransactionOptions().readOnly(true));
-
- alterZone(zoneName, 2);
Review Comment:
alterZone has zero usages. Let's also remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]