vldpyatkov commented on code in PR #1765: URL: https://github.com/apache/ignite-3/pull/1765#discussion_r1143193384
########## modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessageResponse.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.placementdriver.message; + +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transferable; + +/** + * Response for lease granted message. + */ +@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE_RESPONSE) +public interface LeaseGrantedMessageResponse extends PlacementDriverReplicaMessage { + boolean accepted(); + + @Marshallable + HybridTimestamp leaseExpirationTime(); Review Comment: What is the reason to return the timestamp? ########## modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java: ########## @@ -144,24 +133,17 @@ public void start() { String thisNodeName = clusterService.topologyService().localMember().name(); if (placementDriverNodes.contains(thisNodeName)) { - leaseUpdater.init(thisNodeName); - - return TopologyAwareRaftGroupService.start( - replicationGroupId, - clusterService, - raftMessagesFactory, - raftConfiguration, - PeersAndLearners.fromConsistentIds(placementDriverNodes), - true, - raftClientExecutor, - logicalTopologyService, - raftGroupEventsClientListener, - true - ).thenCompose(client -> { - TopologyAwareRaftGroupService topologyAwareClient = (TopologyAwareRaftGroupService) client; - - return topologyAwareClient.subscribeLeader(this::onLeaderChange).thenApply(v -> topologyAwareClient); - }); + try { + leaseUpdater.init(thisNodeName); + + return raftManager.startRaftGroupService( + replicationGroupId, + PeersAndLearners.fromConsistentIds(placementDriverNodes), + topologyAwareRaftGroupServiceFactory Review Comment: The factory is being tracked to the class only to pass it to the method (as same as replicationGroupId). It looks confused. Maybe better to use wrapper over raftManager. ########## modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/PlacementDriverMessageGroup.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.placementdriver.message; + +import static org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup.GROUP_NAME; +import static org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup.GROUP_TYPE; + +import org.apache.ignite.network.annotations.MessageGroup; + +/** + * Message group for placement driver messages. + */ +@MessageGroup(groupType = GROUP_TYPE, groupName = GROUP_NAME) +public interface PlacementDriverMessageGroup { + /** Placement driver message group type. */ + short GROUP_TYPE = 11; + + String GROUP_NAME = "PlacementDriverMessages"; + + short LEASE_GRANTED_MESSAGE = 0; Review Comment: Maybe better to add a "request" suffix here. ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java: ########## @@ -55,13 +58,13 @@ // see https://issues.apache.org/jira/browse/IGNITE-18273 public class Loza implements RaftManager { /** Factory. */ - private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory(); + public static final RaftMessagesFactory FACTORY = new RaftMessagesFactory(); Review Comment: Does it really need to use this field for other classes? ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java: ########## @@ -69,4 +121,96 @@ public CompletableFuture<?> processRequest(ReplicaRequest request) { public ReplicationGroupId groupId() { return replicaGrpId; } + + private void onLeaderElected(ClusterNode clusterNode, Long term) { + leaderRef.set(clusterNode); + + if (!leaderFuture.isDone()) { + leaderFuture.complete(leaderRef); + } + } + + private CompletableFuture<ClusterNode> leaderFuture() { + return leaderFuture.thenApply(AtomicReference::get); + } + + /** + * Process placement driver message. + * + * @param msg Message to process. + * @return Future that contains a result. + */ + public CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage(PlacementDriverReplicaMessage msg) { + if (msg instanceof LeaseGrantedMessage) { + return processLeaseGrantedMessage((LeaseGrantedMessage) msg); + } + + return failedFuture(new AssertionError("Unknown message type, msg=" + msg)); + } + + /** + * Process lease granted message. Can either accept lease or decline with redirection proposal. In the case of lease acceptance, + * initiates the leadership transfer, if this replica is not a group leader. + * + * @param msg Message to process. + * @return Future that contains a result. + */ + public CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) { + return leaderFuture().thenCompose(leader -> { + if (hasAcceptedLease(msg.leaseStartTime(), msg.leaseExpirationTime())) { + return acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()); + } else if (msg.force()) { + if (!leader.equals(localNodeSupplier.get())) { Review Comment: This check has no guarantees, because the result falls behind of the real situation (the leader can already be changed). ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java: ########## @@ -69,4 +121,96 @@ public CompletableFuture<?> processRequest(ReplicaRequest request) { public ReplicationGroupId groupId() { return replicaGrpId; } + + private void onLeaderElected(ClusterNode clusterNode, Long term) { + leaderRef.set(clusterNode); + + if (!leaderFuture.isDone()) { + leaderFuture.complete(leaderRef); + } + } + + private CompletableFuture<ClusterNode> leaderFuture() { + return leaderFuture.thenApply(AtomicReference::get); + } + + /** + * Process placement driver message. + * + * @param msg Message to process. + * @return Future that contains a result. + */ + public CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage(PlacementDriverReplicaMessage msg) { + if (msg instanceof LeaseGrantedMessage) { + return processLeaseGrantedMessage((LeaseGrantedMessage) msg); + } + + return failedFuture(new AssertionError("Unknown message type, msg=" + msg)); + } + + /** + * Process lease granted message. Can either accept lease or decline with redirection proposal. In the case of lease acceptance, + * initiates the leadership transfer, if this replica is not a group leader. + * + * @param msg Message to process. + * @return Future that contains a result. + */ + public CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) { + return leaderFuture().thenCompose(leader -> { + if (hasAcceptedLease(msg.leaseStartTime(), msg.leaseExpirationTime())) { + return acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()); + } else if (msg.force()) { + if (!leader.equals(localNodeSupplier.get())) { + // Replica must wait till safe time reaches the lease start timestamp to make sure that all updates made on the + // group leader are received. + return safeTime.waitFor(msg.leaseStartTime()).thenCompose(v -> { Review Comment: I remember we consider a safe time as a time on leader. But in the new reality it is a time in primary replica, possibly. If the safe time is replicated from the primary replica, we have no opportunity to use it for primary replica choosing. ########## modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/message/LeaseGrantedMessage.java: ########## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.placementdriver.message; + +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.network.annotations.Marshallable; +import org.apache.ignite.network.annotations.Transferable; + +/** + * Lease granted message. + */ +@Transferable(PlacementDriverMessageGroup.LEASE_GRANTED_MESSAGE) +public interface LeaseGrantedMessage extends PlacementDriverReplicaMessage { + @Marshallable + HybridTimestamp leaseStartTime(); Review Comment: I do not sure that the timestamp is required. ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -106,101 +119,140 @@ public ReplicaManager( this.clusterNetSvc = clusterNetSvc; this.clock = clock; this.messageGroupsToHandle = messageGroupsToHandle; - this.handler = (message, senderConsistentId, correlationId) -> { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); + this.localNodeSupplier = () -> clusterNetSvc.topologyService().localMember(); + this.handler = this::onReplicaMessageReceived; + this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived; Review Comment: Why do you save the link if it is just a reference to the local method? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -871,7 +878,7 @@ private void updateAssignmentInternal(ConfigurationNotificationEvent<byte[]> ass .thenCompose(v -> storageUpdateHandlerFut) .thenComposeAsync(v -> { try { - return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration); + return raftMgr.startRaftGroupService(replicaGrpId, newConfiguration, raftGroupServiceFactory); Review Comment: The same as before, too complicated to hold a factory only to pass it to method start. ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java: ########## @@ -17,33 +17,85 @@ package org.apache.ignite.internal.replicator; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; + import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessage; +import org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse; +import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory; +import org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; import org.apache.ignite.internal.replicator.listener.ReplicaListener; import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.lang.IgniteStringFormatter; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkMessage; /** * Replica server. */ public class Replica { + private static final PlacementDriverMessagesFactory PLACEMENT_DRIVER_MESSAGES_FACTORY = new PlacementDriverMessagesFactory(); + /** Replica group identity, this id is the same as the considered partition's id. */ private final ReplicationGroupId replicaGrpId; /** Replica listener. */ private final ReplicaListener listener; + /** Safe time tracker. */ + private final PendingComparableValuesTracker<HybridTimestamp> safeTime; + + /** Topology aware Raft client. */ + private final TopologyAwareRaftGroupService raftClient; + + /** Supplier that returns a {@link ClusterNode} instance of the local node. */ + private final Supplier<ClusterNode> localNodeSupplier; + + // TODO IGNITE-18960 after replica inoperability logic is introduced, this future should be replaced with something like + // VersionedValue (so that PlacementDriverMessages would wait for new leader election) + private CompletableFuture<AtomicReference<ClusterNode>> leaderFuture = new CompletableFuture<>(); + + private AtomicReference<ClusterNode> leaderRef = new AtomicReference<>(); + + /** Mutex to change {@link #leaseStartTime} and {@link #leaseExpirationTime} consistently. */ + private final Object leaseAcceptanceMutex = new Object(); + + /** Latest lease start time. */ + private volatile HybridTimestamp leaseStartTime = null; + + /** Latest lease expiration time. */ + private volatile HybridTimestamp leaseExpirationTime = null; + /** * The constructor of a replica server. * * @param replicaGrpId Replication group id. * @param listener Replica listener. + * @param safeTime Safe time tracker. + * @param raftClient Topology aware Raft client. + * @param localNodeSupplier Supplier that returns a {@link ClusterNode} instance of the local node. */ public Replica( ReplicationGroupId replicaGrpId, - ReplicaListener listener + ReplicaListener listener, + PendingComparableValuesTracker<HybridTimestamp> safeTime, + TopologyAwareRaftGroupService raftClient, + Supplier<ClusterNode> localNodeSupplier ) { this.replicaGrpId = replicaGrpId; this.listener = listener; + this.safeTime = safeTime; + this.raftClient = raftClient; + this.localNodeSupplier = localNodeSupplier; Review Comment: Is it really so complicated to use a supplier for the local node? ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java: ########## @@ -69,4 +121,96 @@ public CompletableFuture<?> processRequest(ReplicaRequest request) { public ReplicationGroupId groupId() { return replicaGrpId; } + + private void onLeaderElected(ClusterNode clusterNode, Long term) { + leaderRef.set(clusterNode); + + if (!leaderFuture.isDone()) { + leaderFuture.complete(leaderRef); + } + } + + private CompletableFuture<ClusterNode> leaderFuture() { + return leaderFuture.thenApply(AtomicReference::get); + } + + /** + * Process placement driver message. + * + * @param msg Message to process. + * @return Future that contains a result. + */ + public CompletableFuture<? extends NetworkMessage> processPlacementDriverMessage(PlacementDriverReplicaMessage msg) { + if (msg instanceof LeaseGrantedMessage) { + return processLeaseGrantedMessage((LeaseGrantedMessage) msg); + } + + return failedFuture(new AssertionError("Unknown message type, msg=" + msg)); + } + + /** + * Process lease granted message. Can either accept lease or decline with redirection proposal. In the case of lease acceptance, + * initiates the leadership transfer, if this replica is not a group leader. + * + * @param msg Message to process. + * @return Future that contains a result. + */ + public CompletableFuture<LeaseGrantedMessageResponse> processLeaseGrantedMessage(LeaseGrantedMessage msg) { + return leaderFuture().thenCompose(leader -> { + if (hasAcceptedLease(msg.leaseStartTime(), msg.leaseExpirationTime())) { + return acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()); + } else if (msg.force()) { + if (!leader.equals(localNodeSupplier.get())) { + // Replica must wait till safe time reaches the lease start timestamp to make sure that all updates made on the + // group leader are received. + return safeTime.waitFor(msg.leaseStartTime()).thenCompose(v -> { + CompletableFuture<LeaseGrantedMessageResponse> respFut = + acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()); + + return raftClient.transferLeadership(new Peer(localNodeSupplier.get().name())) + .thenCompose(ignored -> respFut); + }); + } else { + return acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()); + } + } else { + if (leader.equals(localNodeSupplier.get())) { + return acceptLease(msg.leaseStartTime(), msg.leaseExpirationTime()); + } else { + return proposeLeaseRedirect(leader); + } + } + }); + } + + private CompletableFuture<LeaseGrantedMessageResponse> acceptLease( + HybridTimestamp leaseStartTime, + HybridTimestamp leaseExpirationTime + ) { + synchronized (leaseAcceptanceMutex) { + this.leaseStartTime = leaseStartTime; + this.leaseExpirationTime = leaseExpirationTime; + } + + LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse() + .accepted(true) + .build(); + + return completedFuture(resp); + } + + private CompletableFuture<LeaseGrantedMessageResponse> proposeLeaseRedirect(ClusterNode groupLeader) { + LeaseGrantedMessageResponse resp = PLACEMENT_DRIVER_MESSAGES_FACTORY.leaseGrantedMessageResponse() + .accepted(false) + .redirectProposal(groupLeader.name()) + .build(); + + return completedFuture(resp); + } + + private boolean hasAcceptedLease(HybridTimestamp leaseStartTime, HybridTimestamp leaseExpirationTime) { + synchronized (leaseAcceptanceMutex) { + return leaseStartTime.equals(this.leaseStartTime) && leaseExpirationTime.equals(this.leaseExpirationTime); Review Comment: I very doubt that the replica can receive a message with the same timestamp. Also, you tear a lock of leaseAcceptanceMutex, I think, it doesn't give a tread safe guarantees. ########## modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java: ########## @@ -106,101 +119,140 @@ public ReplicaManager( this.clusterNetSvc = clusterNetSvc; this.clock = clock; this.messageGroupsToHandle = messageGroupsToHandle; - this.handler = (message, senderConsistentId, correlationId) -> { - if (!busyLock.enterBusy()) { - throw new IgniteException(new NodeStoppingException()); + this.localNodeSupplier = () -> clusterNetSvc.topologyService().localMember(); + this.handler = this::onReplicaMessageReceived; + this.placementDriverMessageHandler = this::onPlacementDriverMessageReceived; + } + + private void onReplicaMessageReceived(NetworkMessage message, String senderConsistentId, @Nullable Long correlationId) { + if (!busyLock.enterBusy()) { + throw new IgniteException(new NodeStoppingException()); + } + + try { + if (!(message instanceof ReplicaRequest)) { + return; } - try { - if (!(message instanceof ReplicaRequest)) { - return; - } + ReplicaRequest request = (ReplicaRequest) message; - ReplicaRequest request = (ReplicaRequest) message; + // Notify the sender that the Replica is created and ready to process requests. + if (request instanceof AwaitReplicaRequest) { + replicas.compute(request.groupId(), (replicationGroupId, replicaFut) -> { + if (replicaFut == null) { + replicaFut = new CompletableFuture<>(); + } - // Notify the sender that the Replica is created and ready to process requests. - if (request instanceof AwaitReplicaRequest) { - replicas.compute(request.groupId(), (replicationGroupId, replicaFut) -> { - if (replicaFut == null) { - replicaFut = new CompletableFuture<>(); - } + if (!replicaFut.isDone()) { + replicaFut.thenCompose( + ignore -> { + IgniteUtils.inBusyLock( + busyLock, + () -> sendAwaitReplicaResponse(senderConsistentId, correlationId) + ); - if (!replicaFut.isDone()) { - replicaFut.thenCompose( - ignore -> { - IgniteUtils.inBusyLock( - busyLock, - () -> sendAwaitReplicaResponse(senderConsistentId, correlationId) - ); + return null; + } + ); - return null; - } - ); + return replicaFut; + } else { + IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)); - return replicaFut; - } else { - IgniteUtils.inBusyLock(busyLock, () -> sendAwaitReplicaResponse(senderConsistentId, correlationId)); + return replicaFut; + } + }); - return replicaFut; - } - }); + return; + } - return; - } + CompletableFuture<Replica> replicaFut = replicas.get(request.groupId()); - CompletableFuture<Replica> replicaFut = replicas.get(request.groupId()); + HybridTimestamp requestTimestamp = extractTimestamp(request); - HybridTimestamp requestTimestamp = extractTimestamp(request); + if (replicaFut == null || !replicaFut.isDone()) { + sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, request.groupId(), requestTimestamp); - if (replicaFut == null || !replicaFut.isDone()) { - sendReplicaUnavailableErrorResponse(senderConsistentId, correlationId, request, requestTimestamp); + return; + } - return; - } + // replicaFut is always completed here. + CompletableFuture<?> result = replicaFut.join().processRequest(request); - // replicaFut is always completed here. - CompletableFuture<?> result = replicaFut.join().processRequest(request); + result.handle((res, ex) -> { + NetworkMessage msg; - result.handle((res, ex) -> { - NetworkMessage msg; + if (ex == null) { + msg = prepareReplicaResponse(requestTimestamp, res); + } else { + LOG.warn("Failed to process replica request [request={}]", ex, request); - if (ex == null) { - msg = prepareReplicaResponse(requestTimestamp, res); - } else { - LOG.warn("Failed to process replica request [request={}]", ex, request); + msg = prepareReplicaErrorResponse(requestTimestamp, ex); + } - msg = prepareReplicaErrorResponse(requestTimestamp, ex); - } + clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId); - clusterNetSvc.messagingService().respond(senderConsistentId, msg, correlationId); + return null; + }); + } finally { + busyLock.leaveBusy(); + } + } - return null; - }); - } finally { - busyLock.leaveBusy(); + private void onPlacementDriverMessageReceived(NetworkMessage msg0, String senderConsistentId, @Nullable Long correlationId) { + if (!busyLock.enterBusy()) { + throw new IgniteException(new NodeStoppingException()); + } + + try { + if (!(msg0 instanceof PlacementDriverReplicaMessage)) { Review Comment: Which other types of messages do you wanting here? I doubt the other type is possible. It this case, assert is more preferable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
