denis-chudov commented on code in PR #1765:
URL: https://github.com/apache/ignite-3/pull/1765#discussion_r1143587373
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java:
##########
@@ -69,4 +121,96 @@ public CompletableFuture<?> processRequest(ReplicaRequest
request) {
public ReplicationGroupId groupId() {
return replicaGrpId;
}
+
+ private void onLeaderElected(ClusterNode clusterNode, Long term) {
+ leaderRef.set(clusterNode);
+
+ if (!leaderFuture.isDone()) {
+ leaderFuture.complete(leaderRef);
+ }
+ }
+
+ private CompletableFuture<ClusterNode> leaderFuture() {
+ return leaderFuture.thenApply(AtomicReference::get);
+ }
+
+ /**
+ * Process placement driver message.
+ *
+ * @param msg Message to process.
+ * @return Future that contains a result.
+ */
+ public CompletableFuture<? extends NetworkMessage>
processPlacementDriverMessage(PlacementDriverReplicaMessage msg) {
+ if (msg instanceof LeaseGrantedMessage) {
+ return processLeaseGrantedMessage((LeaseGrantedMessage) msg);
+ }
+
+ return failedFuture(new AssertionError("Unknown message type, msg=" +
msg));
+ }
+
+ /**
+ * Process lease granted message. Can either accept lease or decline with
redirection proposal. In the case of lease acceptance,
+ * initiates the leadership transfer, if this replica is not a group
leader.
+ *
+ * @param msg Message to process.
+ * @return Future that contains a result.
+ */
+ public CompletableFuture<LeaseGrantedMessageResponse>
processLeaseGrantedMessage(LeaseGrantedMessage msg) {
+ return leaderFuture().thenCompose(leader -> {
+ if (hasAcceptedLease(msg.leaseStartTime(),
msg.leaseExpirationTime())) {
+ return acceptLease(msg.leaseStartTime(),
msg.leaseExpirationTime());
+ } else if (msg.force()) {
+ if (!leader.equals(localNodeSupplier.get())) {
+ // Replica must wait till safe time reaches the lease
start timestamp to make sure that all updates made on the
+ // group leader are received.
+ return
safeTime.waitFor(msg.leaseStartTime()).thenCompose(v -> {
Review Comment:
We will need to change safe time propagation when we will move not PD-based
primaries in corresponding ticket. There still are things to think about, as we
discussed. I will append the ticket description
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]