JAkutenshi commented on code in PR #3422:
URL: https://github.com/apache/ignite-3/pull/3422#discussion_r1560045650
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java:
##########
@@ -138,8 +140,8 @@ public static UpdateStatus valueOf(int ordinal) {
* @return Future representing result of updating keys in {@code
metaStorageMgr}
*/
public static CompletableFuture<Void> updatePendingAssignmentsKeys(
- CatalogTableDescriptor tableDescriptor,
- TablePartitionId partId,
+ CatalogZoneDescriptor zoneDescriptor,
+ ZonePartitionId partId,
Review Comment:
The `partId` name clashes in all place in RebalanceUtils. Also there and
below are no changes in jdoc, I suggest something like `@param zonePartitionId
Unique aggregate identifier of a partition of a zone.`
I ready to commit fix in the next commit too if it is appropriate.
##########
modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java:
##########
@@ -136,6 +138,25 @@ public HybridTimestamp getStartTime() {
public HybridTimestamp getExpirationTime() {
return HybridTimestamp.MAX_VALUE;
}
+
+ @Override
+ public Set<ReplicationGroupId> subgroups() {
+ return null;
Review Comment:
Is return null there fine? I see that it's the fake impl, but may lead to
excessive test fails: subgroups() calls 9 times.
##########
modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngineTest.java:
##########
@@ -295,7 +294,7 @@ void dataNodesTriggersAssignmentsChanging() {
checkAssignments(zoneNodes, RebalanceUtil::pendingPartAssignmentsKey);
- verify(keyValueStorage, timeout(1000).times(8)).invoke(any(), any());
+ verify(keyValueStorage, timeout(1000).times(2)).invoke(any(), any());
Review Comment:
Why it's decreased?
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/DistributionZoneRebalanceEngine.java:
##########
@@ -375,72 +362,61 @@ private CompletableFuture<Void>
recalculateAssignmentsAndScheduleRebalance(
return nullCompletedFuture();
}
- List<CatalogTableDescriptor> tableDescriptors =
findTablesByZoneId(zoneDescriptor.id(), catalogVersion, catalogService);
-
- return triggerPartitionsRebalanceForAllTables(
+ return triggerPartitionsRebalanceForZone(
causalityToken,
zoneDescriptor,
- dataNodes,
- tableDescriptors
+ dataNodes
);
});
}
- private CompletableFuture<Void> triggerPartitionsRebalanceForAllTables(
+ private CompletableFuture<Void> triggerPartitionsRebalanceForZone(
long revision,
CatalogZoneDescriptor zoneDescriptor,
- Set<String> dataNodes,
- List<CatalogTableDescriptor> tableDescriptors
+ Set<String> dataNodes
) {
- List<CompletableFuture<?>> tableFutures = new
ArrayList<>(tableDescriptors.size());
-
- for (CatalogTableDescriptor tableDescriptor : tableDescriptors) {
- CompletableFuture<?>[] partitionFutures =
RebalanceUtil.triggerAllTablePartitionsRebalance(
- tableDescriptor,
- zoneDescriptor,
- dataNodes,
- revision,
- metaStorageManager
- );
-
- // This set is used to deduplicate exceptions (if there is an
exception from upstream, for instance,
- // when reading from MetaStorage, it will be encountered by every
partition future) to avoid noise
- // in the logs.
- Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
-
- for (int partId = 0; partId < partitionFutures.length; partId++) {
- int finalPartId = partId;
-
- partitionFutures[partId].exceptionally(e -> {
- Throwable cause = ExceptionUtils.unwrapCause(e);
-
- if (unwrappedCauses.add(cause)) {
- // The exception is specific to this partition.
- LOG.error(
- "Exception on updating assignments for
[table={}, partition={}]",
- e,
- tableInfo(tableDescriptor), finalPartId
- );
- } else {
- // The exception is from upstream and not specific for
this partition, so don't log the partition index.
- LOG.error(
- "Exception on updating assignments for
[table={}]",
- e,
- tableInfo(tableDescriptor)
- );
- }
+ CompletableFuture<?>[] partitionFutures =
RebalanceUtil.triggerZonePartitionsRebalance(
+ zoneDescriptor,
+ dataNodes,
+ revision,
+ metaStorageManager
+ );
- return null;
- });
- }
+ // This set is used to deduplicate exceptions (if there is an
exception from upstream, for instance,
+ // when reading from MetaStorage, it will be encountered by every
partition future) to avoid noise
+ // in the logs.
+ Set<Throwable> unwrappedCauses = ConcurrentHashMap.newKeySet();
+
+ for (int partId = 0; partId < partitionFutures.length; partId++) {
+ int finalPartId = partId;
+
Review Comment:
Looks redundant: `finalPartId` and `partId` only reads in the loop (except
partId++) statement, but aren't modifying.
##########
modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/LeaseTrackerTest.java:
##########
@@ -73,7 +75,8 @@ public void testLeaseCleanup() {
LeaseTracker leaseTracker = new LeaseTracker(
msManager,
mock(ClusterNodeResolver.class),
- new TestClockService(new HybridClockImpl())
+ new TestClockService(new HybridClockImpl()),
+ tablePartitionId -> new
ZonePartitionId(tablePartitionId.tableId(), tablePartitionId.partitionId())
Review Comment:
`zonePartitionId` ?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1459,12 +1467,6 @@ private CompletableFuture<Void> destroyTableLocally(int
tableId) {
InternalTable internalTable = table.internalTable();
int partitions = internalTable.partitions();
- // TODO https://issues.apache.org/jira/browse/IGNITE-18991 Move
assigment manipulations to Distribution zones.
- Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
- .mapToObj(p -> stablePartAssignmentsKey(new
TablePartitionId(tableId, p)))
- .collect(toSet());
- metaStorageMgr.removeAll(assignmentKeys);
Review Comment:
Why we don't remove assignment keys now?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1759,75 +1761,91 @@ private CompletableFuture<Void>
handleChangePendingAssignmentEvent(
}
int partId = extractPartitionNumber(pendingAssignmentsEntry.key());
- int tblId = extractTableId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX);
+ int zoneId = extractZoneId(pendingAssignmentsEntry.key(),
PENDING_ASSIGNMENTS_PREFIX);
- var replicaGrpId = new TablePartitionId(tblId, partId);
+ var zonePartitionId = new ZonePartitionId(zoneId, partId);
// Stable assignments from the meta store, which revision is bounded
by the current pending event.
- Entry stableAssignmentsEntry =
metaStorageMgr.getLocally(stablePartAssignmentsKey(replicaGrpId), revision);
+ Entry stableAssignmentsEntry =
metaStorageMgr.getLocally(stablePartAssignmentsKey(zonePartitionId), revision);
Review Comment:
Offtopic, but isn't a good idea to move all `get*Assignments*` methods to
idk AssignmentsManager with metastore inside?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/negotiation/LeaseAgreement.java:
##########
@@ -116,6 +117,19 @@ public String getRedirectTo() {
return resp != null ? resp.redirectProposal() : null;
}
+ /**
+ * The lease was considered by the set of replication subgroups.
+ *
+ * @return A set of applied groups.
+ */
+ public Set<ReplicationGroupId> applicableFor() {
+ assert ready() : "The method should be invoked only after the
agreement is ready";
Review Comment:
Why not `responseFut.get()` waiting instead of immediate exception throwing?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -630,6 +674,57 @@ public CompletableFuture<Void> start() {
TimeUnit.MILLISECONDS
);
+ scheduledTableLeaseUpdateExecutor.scheduleAtFixedRate(() -> {
+ for (Map.Entry<ZonePartitionId, Set<ReplicationGroupId>>
entry : zonePartIdToTablePartId.entrySet()) {
+ ZonePartitionId repGrp = entry.getKey();
+
+ ReplicaMeta meta =
placementDriver.getLeaseMeta(repGrp);
Review Comment:
Is `ReplicaMeta` == `Lease` in fact? What difference between Lease and Lease
Meta? It's sounds ambiguous
##########
modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceRaftGroupEventsListener.java:
##########
@@ -131,6 +133,7 @@ public class RebalanceRaftGroupEventsListener implements
RaftGroupEventsListener
private final ScheduledExecutorService rebalanceScheduler;
/** Zone id. */
+ // TODO: remove this
Review Comment:
Should it be removed now? There 5 usages, 3 out of 5 are
`RebalanceUtils#tablesCounterKey(zondeId, partId)` that may replaced with
`zonePartId` as single argument. And 2 places where `zoneId` uses in logs
messages. In real, there is the constructor and it's call pass
`zonePartId.zoneId()` as `zoneId` arg.
I'm ready to commit this fix and close this todo item.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -210,11 +223,15 @@ public byte[] bytes() {
byte[] leaseholderIdBytes = stringToBytes(leaseholderId);
byte[] proposedCandidateBytes = stringToBytes(proposedCandidate);
byte[] groupIdBytes = toBytes(replicationGroupId);
+ byte[] patsBytes = toBytes(parts);
Review Comment:
Parts?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java:
##########
@@ -259,6 +376,7 @@ private void awaitPrimaryReplica(
resultFuture.complete(replicaMeta);
}
})
+
Review Comment:
redundant space?
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java:
##########
@@ -1832,8 +1850,10 @@ private Set<Assignment>
getAssignmentsFromMetaStorage(MetaStorageManager metaSto
: Assignments.fromBytes(e.value()).nodes();
}
- private int tableId(Ignite node, String tableName) {
- return (unwrapTableImpl(node.tables().table(tableName))).tableId();
+ private int zoneId(IgniteImpl node, String zoneName) {
+ int zoneId = getZoneIdStrict(node.catalogManager(),
zoneName.toUpperCase(), node.clock().nowLong());
Review Comment:
Isn't better just return statement? `getZoneIdStrict` shows explicitly that
the result is zoneId
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -354,7 +356,7 @@ public class IgniteImpl implements Ignite {
private final IgniteDeployment deploymentManager;
- private final DistributionZoneManager distributionZoneManager;
+ private DistributionZoneManager distributionZoneManager = null;
Review Comment:
The same as above
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -596,10 +611,18 @@ public class IgniteImpl implements Ignite {
logicalTopologyService,
raftMgr,
topologyAwareRaftGroupServiceFactory,
- clockService
- );
+ clockService,
+ tablePartId -> {
+ CatalogTableDescriptor tbl =
catalogManager.table(tablePartId.tableId(),
catalogManager.latestCatalogVersion());
- ReplicationConfiguration replicationConfig =
clusterConfigRegistry.getConfiguration(ReplicationConfiguration.KEY);
+ int zoneId = tbl == null ? 2 : tbl.zoneId();
Review Comment:
Why `2`? Should be there some named constant at least?
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -309,7 +311,7 @@ public class IgniteImpl implements Ignite {
private final ClusterInitializer clusterInitializer;
/** Replica manager. */
- private final ReplicaManager replicaMgr;
+ private ReplicaManager replicaMgr = null;
Review Comment:
Why? It's still doesn't change below
##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java:
##########
@@ -76,6 +76,9 @@ public void testPrimaryReplicaDirectUpdateForExplicitTxn()
throws InterruptedExc
JraftServerImpl server = (JraftServerImpl)
txTestCluster.raftServers.get(leader.consistentId()).server();
var groupId = new TablePartitionId(accounts.tableId(), 0);
+ // TODO:IGNITE-XXXX It need to be don before the message blocking to
update lease subgroups.
+ accounts.recordView().insert(null, makeValue(1, 500.));
Review Comment:
`500.0` to avoid mistakes in a future, the tiny dot is easy to miss or erase
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]