alievmirza commented on code in PR #3422:
URL: https://github.com/apache/ignite-3/pull/3422#discussion_r1565972714


##########
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java:
##########
@@ -88,4 +90,24 @@ CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
      */
     @Nullable
     ReplicaMeta currentLease(ReplicationGroupId groupId);
+
+    /**
+     * Gets a cached lease by a zone replication group.
+     *
+     * @param grpId Replication group id.
+     * @return Lease or {@code null}.
+     */
+    ReplicaMeta getLeaseMeta(ReplicationGroupId grpId);
+
+    /**
+     * Tries to update the lease in order to include the new subgroup. The set 
of groups will be added to the set of lease subgroups
+     * ({@link ReplicaMeta#subgroups()}) for the specific lease determined by 
the zone id.
+     * TODO: When replicas are started by zone, the method is removed.

Review Comment:
   Ticket?



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -416,17 +416,44 @@ private void 
onPlacementDriverMessageReceived(NetworkMessage msg0, ClusterNode s
         }
 
         try {
-            CompletableFuture<Replica> replicaFut = 
replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture<>());
-
-            replicaFut
-                    .thenCompose(replica -> 
replica.processPlacementDriverMessage(msg))
-                    .whenComplete((response, ex) -> {
-                        if (ex == null) {
-                            
clusterNetSvc.messagingService().respond(senderConsistentId, response, 
correlationId);
-                        } else if (!(unwrapCause(ex) instanceof 
NodeStoppingException)) {
-                            LOG.error("Failed to process placement driver 
message [msg={}].", ex, msg);
-                        }
-                    });
+            Set<ReplicationGroupId> replicationGroupIds = 
zonePartIdToTablePartId.get((ZonePartitionId) msg.groupId());
+
+            CompletableFuture<LeaseGrantedMessageResponse>[] futures = new 
CompletableFuture[replicationGroupIds.size()];
+
+            int i = 0;
+
+            for (ReplicationGroupId grpId : replicationGroupIds) {
+                CompletableFuture<Replica> replicaFut = 
replicas.computeIfAbsent(grpId, k -> new CompletableFuture<>());
+                futures[i++] = replicaFut.thenCompose(replica -> 
replica.processPlacementDriverMessage(msg));
+            }
+
+            // 1) PD -> Zones instead of Tables
+            // 2) TX flow from TableId to ZoneId
+
+            // 3) Refactoring for encapsulating raft into Replica entity
+            // 4) One Replica many rafts
+            // 5) One Replica one raft group
+            allOf(futures).whenComplete((responses, ex) -> {
+                // TODO allOf all replicas of the zone from msg.groupId() 
(zoneId, partId) from {@code replicas}

Review Comment:
   seems like outdated todo



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -630,6 +674,57 @@ public CompletableFuture<Void> start() {
                 TimeUnit.MILLISECONDS
         );
 
+        scheduledTableLeaseUpdateExecutor.scheduleAtFixedRate(() -> {
+                    for (Map.Entry<ZonePartitionId, Set<ReplicationGroupId>> 
entry : zonePartIdToTablePartId.entrySet()) {

Review Comment:
   something wrong with the formatting 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -416,17 +416,44 @@ private void 
onPlacementDriverMessageReceived(NetworkMessage msg0, ClusterNode s
         }
 
         try {
-            CompletableFuture<Replica> replicaFut = 
replicas.computeIfAbsent(msg.groupId(), k -> new CompletableFuture<>());
-
-            replicaFut
-                    .thenCompose(replica -> 
replica.processPlacementDriverMessage(msg))
-                    .whenComplete((response, ex) -> {
-                        if (ex == null) {
-                            
clusterNetSvc.messagingService().respond(senderConsistentId, response, 
correlationId);
-                        } else if (!(unwrapCause(ex) instanceof 
NodeStoppingException)) {
-                            LOG.error("Failed to process placement driver 
message [msg={}].", ex, msg);
-                        }
-                    });
+            Set<ReplicationGroupId> replicationGroupIds = 
zonePartIdToTablePartId.get((ZonePartitionId) msg.groupId());
+
+            CompletableFuture<LeaseGrantedMessageResponse>[] futures = new 
CompletableFuture[replicationGroupIds.size()];
+
+            int i = 0;
+
+            for (ReplicationGroupId grpId : replicationGroupIds) {
+                CompletableFuture<Replica> replicaFut = 
replicas.computeIfAbsent(grpId, k -> new CompletableFuture<>());
+                futures[i++] = replicaFut.thenCompose(replica -> 
replica.processPlacementDriverMessage(msg));
+            }
+
+            // 1) PD -> Zones instead of Tables

Review Comment:
   let's remove this comment 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -630,6 +674,57 @@ public CompletableFuture<Void> start() {
                 TimeUnit.MILLISECONDS
         );
 
+        scheduledTableLeaseUpdateExecutor.scheduleAtFixedRate(() -> {
+                    for (Map.Entry<ZonePartitionId, Set<ReplicationGroupId>> 
entry : zonePartIdToTablePartId.entrySet()) {
+                        ZonePartitionId repGrp = entry.getKey();
+
+                        ReplicaMeta meta = 
placementDriver.getLeaseMeta(repGrp);
+
+                        if (meta != null) {
+                            HashSet<ReplicationGroupId> diff = new 
HashSet<>(entry.getValue());
+                            diff.removeAll(meta.subgroups());
+
+                            if (meta.getLeaseholderId().equals(localNodeId) && 
!diff.isEmpty()) {
+                                LOG.info("New subgroups are found for existing 
lease [repGrp={}, subGroups={}]", repGrp, diff);
+
+                                try {
+                                    placementDriver.addSubgroups(repGrp, 
meta.getStartTime().longValue(), diff)
+                                            .thenCompose(unused -> {
+                                                
ArrayList<CompletableFuture<?>> requestToReplicas = new ArrayList<>();
+
+                                                for (ReplicationGroupId partId 
: diff) {
+                                                    EmptyPrimaryReplicaRequest 
req = REPLICA_MESSAGES_FACTORY.emptyPrimaryReplicaRequest()
+                                                            
.enlistmentConsistencyToken(meta.getStartTime().longValue())
+                                                            .groupId(partId)
+                                                            .build();
+
+                                                    CompletableFuture<Replica> 
replicaFut = replicas.get(repGrp);
+
+                                                    if (replicaFut != null) {
+                                                        
requestToReplicas.add(replicaFut.thenCompose(
+                                                                replica -> 
replica.processRequest(req, localNodeId)));
+                                                    }
+                                                }
+
+                                                return 
allOf(requestToReplicas.toArray(CompletableFuture[]::new));
+                                            }).join();

Review Comment:
   Is join here required? why not get with timeout? 



##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java:
##########
@@ -310,6 +310,6 @@ private int createTestTable(String tableName, String 
zoneName) throws Interrupte
 
         return 
catalogManager.catalog(catalogManager.latestCatalogVersion()).tables().stream()
                 .filter(t -> t.name().equals(tableName))
-                .findFirst().get().id();
+                .findFirst().get().zoneId();

Review Comment:
   `createTestTable` returns `zoneId`, something wrong



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1300,7 +1306,7 @@ private CompletableFuture<Void> createTableLocally(
         String tableName = tableDescriptor.name();
         int tableId = tableDescriptor.id();
 
-        LOG.trace("Creating local table: name={}, id={}, token={}", 
tableDescriptor.name(), tableDescriptor.id(), causalityToken);
+        LOG.info("Creating local table: name={}, id={}, token={}", 
tableDescriptor.name(), tableDescriptor.id(), causalityToken);

Review Comment:
   let's return to trace



##########
modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestThreeNodesThreeReplicas.java:
##########
@@ -76,6 +76,9 @@ public void testPrimaryReplicaDirectUpdateForExplicitTxn() 
throws InterruptedExc
         JraftServerImpl server = (JraftServerImpl) 
txTestCluster.raftServers.get(leader.consistentId()).server();
         var groupId = new TablePartitionId(accounts.tableId(), 0);
 
+        // TODO:IGNITE-XXXX It need to be don before the message blocking to 
update lease subgroups.

Review Comment:
   please add details 



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1374,6 +1380,8 @@ private CompletableFuture<Void> createTableLocally(
             }
 
             return allOf(localPartsUpdateFuture, 
tablesByIdFuture).thenComposeAsync(ignore -> inBusyLock(busyLock, () -> {
+                        LOG.info("Table storage created " + tableId);

Review Comment:
   log formatting 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to