denis-chudov commented on code in PR #1765:
URL: https://github.com/apache/ignite-3/pull/1765#discussion_r1143587373


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -69,4 +121,96 @@ public CompletableFuture<?> processRequest(ReplicaRequest 
request) {
     public ReplicationGroupId groupId() {
         return replicaGrpId;
     }
+
+    private void onLeaderElected(ClusterNode clusterNode, Long term) {
+        leaderRef.set(clusterNode);
+
+        if (!leaderFuture.isDone()) {
+            leaderFuture.complete(leaderRef);
+        }
+    }
+
+    private CompletableFuture<ClusterNode> leaderFuture() {
+        return leaderFuture.thenApply(AtomicReference::get);
+    }
+
+    /**
+     * Process placement driver message.
+     *
+     * @param msg Message to process.
+     * @return Future that contains a result.
+     */
+    public CompletableFuture<? extends NetworkMessage> 
processPlacementDriverMessage(PlacementDriverReplicaMessage msg) {
+        if (msg instanceof LeaseGrantedMessage) {
+            return processLeaseGrantedMessage((LeaseGrantedMessage) msg);
+        }
+
+        return failedFuture(new AssertionError("Unknown message type, msg=" + 
msg));
+    }
+
+    /**
+     * Process lease granted message. Can either accept lease or decline with 
redirection proposal. In the case of lease acceptance,
+     * initiates the leadership transfer, if this replica is not a group 
leader.
+     *
+     * @param msg Message to process.
+     * @return Future that contains a result.
+     */
+    public CompletableFuture<LeaseGrantedMessageResponse> 
processLeaseGrantedMessage(LeaseGrantedMessage msg) {
+        return leaderFuture().thenCompose(leader -> {
+            if (hasAcceptedLease(msg.leaseStartTime(), 
msg.leaseExpirationTime())) {
+                return acceptLease(msg.leaseStartTime(), 
msg.leaseExpirationTime());
+            } else if (msg.force()) {
+                if (!leader.equals(localNodeSupplier.get())) {
+                    // Replica must wait till safe time reaches the lease 
start timestamp to make sure that all updates made on the
+                    // group leader are received.
+                    return 
safeTime.waitFor(msg.leaseStartTime()).thenCompose(v -> {

Review Comment:
   We will need to change safe time propagation when we move to PD-based 
primaries in corresponding ticket. There still are things to think about, as we 
discussed. I will append the ticket description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to