vldpyatkov commented on code in PR #3422:
URL: https://github.com/apache/ignite-3/pull/3422#discussion_r1562408439


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -630,6 +674,57 @@ public CompletableFuture<Void> start() {
                 TimeUnit.MILLISECONDS
         );
 
+        scheduledTableLeaseUpdateExecutor.scheduleAtFixedRate(() -> {
+                    for (Map.Entry<ZonePartitionId, Set<ReplicationGroupId>> 
entry : zonePartIdToTablePartId.entrySet()) {
+                        ZonePartitionId repGrp = entry.getKey();
+
+                        ReplicaMeta meta = 
placementDriver.getLeaseMeta(repGrp);
+
+                        if (meta != null) {
+                            HashSet<ReplicationGroupId> diff = new 
HashSet<>(entry.getValue());
+                            diff.removeAll(meta.subgroups());
+
+                            if (meta.getLeaseholderId().equals(localNodeId) && 
!diff.isEmpty()) {
+                                LOG.info("New subgroups are found for existing 
lease [repGrp={}, subGroups={}]", repGrp, diff);
+
+                                try {
+                                    placementDriver.addSubgroups(repGrp, 
meta.getStartTime().longValue(), diff)
+                                            .thenCompose(unused -> {
+                                                
ArrayList<CompletableFuture<?>> requestToReplicas = new ArrayList<>();
+
+                                                for (ReplicationGroupId partId 
: diff) {
+                                                    EmptyPrimaryReplicaRequest 
req = REPLICA_MESSAGES_FACTORY.emptyPrimaryReplicaRequest()
+                                                            
.enlistmentConsistencyToken(meta.getStartTime().longValue())
+                                                            .groupId(partId)
+                                                            .build();
+
+                                                    CompletableFuture<Replica> 
replicaFut = replicas.get(repGrp);
+
+                                                    if (replicaFut != null) {
+                                                        
requestToReplicas.add(replicaFut.thenCompose(
+                                                                replica -> 
replica.processRequest(req, localNodeId)));
+                                                    }
+                                                }
+
+                                                return 
allOf(requestToReplicas.toArray(CompletableFuture[]::new));
+                                            }).join();
+                                } catch (Exception ex) {
+                                    LOG.error(
+                                            "Failed to add new subgroups to 
the replication group [repGrp={}, subGroups={}]",
+                                            ex,
+                                            repGrp,
+                                            diff
+                                    );
+                                }
+                            }
+                        }
+                    }
+                },
+                0,
+                1,

Review Comment:
   I think the class is not available in  ignite-replicator package.
   If you mean, theoretically more correct, to add the code to the lease 
updater thread, I agree. Unfortunately, currently, the placement driver does 
not know about replicas, and we are not going to change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to