alievmirza commented on code in PR #3422:
URL: https://github.com/apache/ignite-3/pull/3422#discussion_r1560850169


##########
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java:
##########
@@ -88,4 +90,23 @@ CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
      */
     @Nullable
     ReplicaMeta currentLease(ReplicationGroupId groupId);
+
+    /**
+     * Gets a cached lease by a zone replication group.
+     *
+     * @param grpId Replication group id.
+     * @return Lease or {@code null}.
+     */
+    ReplicaMeta getLeaseMeta(ReplicationGroupId grpId);
+
+    /**
+     * Tries to update the lease in order to include the new subgroup.
+     * TODO: When replicas are started by zone, the method is removed.
+     *
+     * @param zoneId Zone id.
+     * @param enlistmentConsistencyToken Lease token.
+     * @param subGrps Table ids.
+     * @return Future to complete.
+     */
+    CompletableFuture<Void> addSubgroups(ZonePartitionId zoneId, Long 
enlistmentConsistencyToken, Set<ReplicationGroupId> subGrps);

Review Comment:
   Please add explanation what subgroup is



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -630,6 +674,57 @@ public CompletableFuture<Void> start() {
                 TimeUnit.MILLISECONDS
         );
 
+        scheduledTableLeaseUpdateExecutor.scheduleAtFixedRate(() -> {
+                    for (Map.Entry<ZonePartitionId, Set<ReplicationGroupId>> 
entry : zonePartIdToTablePartId.entrySet()) {
+                        ZonePartitionId repGrp = entry.getKey();
+
+                        ReplicaMeta meta = 
placementDriver.getLeaseMeta(repGrp);
+
+                        if (meta != null) {
+                            HashSet<ReplicationGroupId> diff = new 
HashSet<>(entry.getValue());
+                            diff.removeAll(meta.subgroups());
+
+                            if (meta.getLeaseholderId().equals(localNodeId) && 
!diff.isEmpty()) {
+                                LOG.info("New subgroups are found for existing 
lease [repGrp={}, subGroups={}]", repGrp, diff);
+
+                                try {
+                                    placementDriver.addSubgroups(repGrp, 
meta.getStartTime().longValue(), diff)
+                                            .thenCompose(unused -> {
+                                                
ArrayList<CompletableFuture<?>> requestToReplicas = new ArrayList<>();
+
+                                                for (ReplicationGroupId partId 
: diff) {
+                                                    EmptyPrimaryReplicaRequest 
req = REPLICA_MESSAGES_FACTORY.emptyPrimaryReplicaRequest()
+                                                            
.enlistmentConsistencyToken(meta.getStartTime().longValue())
+                                                            .groupId(partId)
+                                                            .build();
+
+                                                    CompletableFuture<Replica> 
replicaFut = replicas.get(repGrp);
+
+                                                    if (replicaFut != null) {
+                                                        
requestToReplicas.add(replicaFut.thenCompose(
+                                                                replica -> 
replica.processRequest(req, localNodeId)));
+                                                    }
+                                                }
+
+                                                return 
allOf(requestToReplicas.toArray(CompletableFuture[]::new));
+                                            }).join();
+                                } catch (Exception ex) {
+                                    LOG.error(
+                                            "Failed to add new subgroups to 
the replication group [repGrp={}, subGroups={}]",
+                                            ex,
+                                            repGrp,
+                                            diff
+                                    );
+                                }
+                            }
+                        }
+                    }
+                },
+                0,
+                1,

Review Comment:
   why not `LeaseUpdater#UPDATE_LEASE_MS`? it seems to me that conceptually it 
is the same process 



##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -141,6 +152,34 @@ public CompletableFuture<ReplicaResult> 
processRequest(ReplicaRequest request, S
                 request.groupId(),
                 replicaGrpId);
 
+        if (!waitForActualStateFuture.isDone() && request instanceof 
PrimaryReplicaRequest) {
+            var targetPrimaryReq = (PrimaryReplicaRequest) request;
+
+            if (request instanceof EmptyPrimaryReplicaRequest) {
+                return 
waitForActualState(FastTimestamps.coarseCurrentTimeMillis() + 10_000)
+                        .thenComposeAsync(
+                                v -> 
sendPrimaryReplicaChangeToReplicationGroup(targetPrimaryReq.enlistmentConsistencyToken()),
+                                executor
+                        )
+                        .thenComposeAsync(
+                                unused -> completedFuture(new 
ReplicaResult(null, null)),
+                                executor
+                        );
+            }
+
+            return placementDriver.addSubgroups(

Review Comment:
   As we discussed, this adding must be after the waitForActualState



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -444,7 +453,8 @@ private void updateLeaseBatchInternal() {
 
                 for (Map.Entry<ReplicationGroupId, Boolean> entry : 
toBeNegotiated.entrySet()) {
                     Lease lease = renewedLeases.get(entry.getKey());
-                    boolean force = entry.getValue();
+                    // TODO check if tests are failed?

Review Comment:
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to