alievmirza commented on code in PR #3422:
URL: https://github.com/apache/ignite-3/pull/3422#discussion_r1565972714
##########
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java:
##########
@@ -88,4 +90,24 @@ CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
*/
@Nullable
ReplicaMeta currentLease(ReplicationGroupId groupId);
+
+ /**
+ * Gets a cached lease by a zone replication group.
+ *
+ * @param grpId Replication group id.
+ * @return Lease or {@code null}.
+ */
+ ReplicaMeta getLeaseMeta(ReplicationGroupId grpId);
+
+ /**
+ * Tries to update the lease in order to include the new subgroup. The set
of groups will be added to the set of lease subgroups
+ * ({@link ReplicaMeta#subgroups()}) for the specific lease determined by
the zone id.
+ * TODO: When replicas are started by zone, the method is removed.
Review Comment:
Ticket?
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -416,17 +416,44 @@ private void
onPlacementDriverMessageReceived(NetworkMessage msg0, ClusterNode s
}
try {
- CompletableFuture<Replica> replicaFut =
replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture<>());
-
- replicaFut
- .thenCompose(replica ->
replica.processPlacementDriverMessage(msg))
- .whenComplete((response, ex) -> {
- if (ex == null) {
-
clusterNetSvc.messagingService().respond(senderConsistentId, response,
correlationId);
- } else if (!(unwrapCause(ex) instanceof
NodeStoppingException)) {
- LOG.error("Failed to process placement driver
message [msg={}].", ex, msg);
- }
- });
+ Set<ReplicationGroupId> replicationGroupIds =
zonePartIdToTablePartId.get((ZonePartitionId) msg.groupId());
+
+ CompletableFuture<LeaseGrantedMessageResponse>[] futures = new
CompletableFuture[replicationGroupIds.size()];
+
+ int i = 0;
+
+ for (ReplicationGroupId grpId : replicationGroupIds) {
+ CompletableFuture<Replica> replicaFut =
replicas.computeIfAbsent(grpId, k -> new CompletableFuture<>());
+ futures[i++] = replicaFut.thenCompose(replica ->
replica.processPlacementDriverMessage(msg));
+ }
+
+ // 1) PD -> Zones instead of Tables
+ // 2) TX flow from TableId to ZoneId
+
+ // 3) Refactoring for encapsulating raft into Replica entity
+ // 4) One Replica many rafts
+ // 5) One Replica one raft group
+ allOf(futures).whenComplete((responses, ex) -> {
+ // TODO allOf all replicas of the zone from msg.groupId()
(zoneId, partId) from {@code replicas}
Review Comment:
seems like outdated todo
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -630,6 +674,57 @@ public CompletableFuture<Void> start() {
TimeUnit.MILLISECONDS
);
+ scheduledTableLeaseUpdateExecutor.scheduleAtFixedRate(() -> {
+ for (Map.Entry<ZonePartitionId, Set<ReplicationGroupId>>
entry : zonePartIdToTablePartId.entrySet()) {
Review Comment:
something wrong with the formatting
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -416,17 +416,44 @@ private void
onPlacementDriverMessageReceived(NetworkMessage msg0, ClusterNode s
}
try {
- CompletableFuture<Replica> replicaFut =
replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture<>());
-
- replicaFut
- .thenCompose(replica ->
replica.processPlacementDriverMessage(msg))
- .whenComplete((response, ex) -> {
- if (ex == null) {
-
clusterNetSvc.messagingService().respond(senderConsistentId, response,
correlationId);
- } else if (!(unwrapCause(ex) instanceof
NodeStoppingException)) {
- LOG.error("Failed to process placement driver
message [msg={}].", ex, msg);
- }
- });
+ Set<ReplicationGroupId> replicationGroupIds =
zonePartIdToTablePartId.get((ZonePartitionId) msg.groupId());
+
+ CompletableFuture<LeaseGrantedMessageResponse>[] futures = new
CompletableFuture[replicationGroupIds.size()];
+
+ int i = 0;
+
+ for (ReplicationGroupId grpId : replicationGroupIds) {
+ CompletableFuture<Replica> replicaFut =
replicas.computeIfAbsent(grpId, k -> new CompletableFuture<>());
+ futures[i++] = replicaFut.thenCompose(replica ->
replica.processPlacementDriverMessage(msg));
+ }
+
+ // 1) PD -> Zones instead of Tables
Review Comment:
let's remove this comment
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -630,6 +674,57 @@ public CompletableFuture<Void> start() {
TimeUnit.MILLISECONDS
);
+ scheduledTableLeaseUpdateExecutor.scheduleAtFixedRate(() -> {
+ for (Map.Entry<ZonePartitionId, Set<ReplicationGroupId>>
entry : zonePartIdToTablePartId.entrySet()) {
+ ZonePartitionId repGrp = entry.getKey();
+
+ ReplicaMeta meta =
placementDriver.getLeaseMeta(repGrp);
+
+ if (meta != null) {
+ HashSet<ReplicationGroupId> diff = new
HashSet<>(entry.getValue());
+ diff.removeAll(meta.subgroups());
+
+ if (meta.getLeaseholderId().equals(localNodeId) &&
!diff.isEmpty()) {
+ LOG.info("New subgroups are found for existing
lease [repGrp={}, subGroups={}]", repGrp, diff);
+
+ try {
+ placementDriver.addSubgroups(repGrp,
meta.getStartTime().longValue(), diff)
+ .thenCompose(unused -> {
+
ArrayList<CompletableFuture<?>> requestToReplicas = new ArrayList<>();
+
+ for (ReplicationGroupId partId
: diff) {
+ EmptyPrimaryReplicaRequest
req = REPLICA_MESSAGES_FACTORY.emptyPrimaryReplicaRequest()
+
.enlistmentConsistencyToken(meta.getStartTime().longValue())
+ .groupId(partId)
+ .build();
+
+ CompletableFuture<Replica>
replicaFut = replicas.get(repGrp);
+
+ if (replicaFut != null) {
+
requestToReplicas.add(replicaFut.thenCompose(
+ replica ->
replica.processRequest(req, localNodeId)));
+ }
+ }
+
+ return
allOf(requestToReplicas.toArray(CompletableFuture[]::new));
+ }).join();
Review Comment:
Is join here required? why not get with timeout?
##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java:
##########
@@ -310,6 +310,6 @@ private int createTestTable(String tableName, String
zoneName) throws Interrupte
return
catalogManager.catalog(catalogManager.latestCatalogVersion()).tables().stream()
.filter(t -> t.name().equals(tableName))
- .findFirst().get().id();
+ .findFirst().get().zoneId();
Review Comment:
`createTestTable` returns `zoneId`, something wrong
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1300,7 +1306,7 @@ private CompletableFuture<Void> createTableLocally(
String tableName = tableDescriptor.name();
int tableId = tableDescriptor.id();
- LOG.trace("Creating local table: name={}, id={}, token={}",
tableDescriptor.name(), tableDescriptor.id(), causalityToken);
+ LOG.info("Creating local table: name={}, id={}, token={}",
tableDescriptor.name(), tableDescriptor.id(), causalityToken);
Review Comment:
let's return to trace
##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java:
##########
@@ -76,6 +76,9 @@ public void testPrimaryReplicaDirectUpdateForExplicitTxn()
throws InterruptedExc
JraftServerImpl server = (JraftServerImpl)
txTestCluster.raftServers.get(leader.consistentId()).server();
var groupId = new TablePartitionId(accounts.tableId(), 0);
+ // TODO:IGNITE-XXXX It need to be don before the message blocking to
update lease subgroups.
Review Comment:
please add details
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1374,6 +1380,8 @@ private CompletableFuture<Void> createTableLocally(
}
return allOf(localPartsUpdateFuture,
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
+ LOG.info("Table storage created " + tableId);
Review Comment:
log formatting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]