denis-chudov commented on code in PR #2488: URL: https://github.com/apache/ignite-3/pull/2488#discussion_r1318227297
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -2328,19 +2345,25 @@ private CompletableFuture<Boolean> ensureReplicaIsPrimary(ReplicaRequest request } if (expectedTerm != null) { - return raftClient.refreshAndGetLeaderWithTerm() - .thenCompose(replicaAndTerm -> { - long currentTerm = replicaAndTerm.term(); - - if (expectedTerm == currentTerm) { - return completedFuture(null); + return placementDriver.getPrimaryReplica(replicationGroupId, hybridClock.now().addPhysicalTime(HybridTimestamp.CLOCK_SKEW)) + .thenCompose(primaryReplica -> { + long currentEnlistmentConsistencyToken = primaryReplica.getStartTime().longValue(); + + if (expectedTerm.equals(currentEnlistmentConsistencyToken)) { + if (primaryReplica.getExpirationTime().before(hybridClock.now())) { + return failedFuture( + new PrimaryReplicaMissException(expectedTerm, currentEnlistmentConsistencyToken)); + } else { + return completedFuture(null); + } } else { - return failedFuture(new PrimaryReplicaMissException(expectedTerm, currentTerm)); + return failedFuture(new PrimaryReplicaMissException(expectedTerm, currentEnlistmentConsistencyToken)); } } ); } else if (request instanceof ReadOnlyReplicaRequest || request instanceof ReplicaSafeTimeSyncRequest) { - return raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm -> isLocalPeer(replicaAndTerm.leader())); + return placementDriver.getPrimaryReplica(replicationGroupId, hybridClock.now().addPhysicalTime(HybridTimestamp.CLOCK_SKEW)) Review Comment: Seems that there are no usages of `getPrimaryReplica` without adding `CLOCK_SKEW` to the argument, and this clock skew be always necessary. Shouldn't we add this clock skew to time comparison inside of this method? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java: ########## @@ -1553,19 +1559,20 @@ public void close() { * @return Cluster node to evalute read-only request. */ protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int partId) { - RaftGroupService svc = raftGroupServiceByPartitionId.get(partId); + TablePartitionId tablePartitionId = new TablePartitionId(tableId, partId); - return svc.refreshAndGetLeaderWithTerm().handle((res, e) -> { - if (e != null) { - throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, e); - } else { - if (res == null || res.leader() == null) { - throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, e); - } else { - return clusterNodeResolver.apply(res.leader().consistentId()); - } - } - }); + return placementDriver.awaitPrimaryReplica(tablePartitionId, clock.now()) + .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, TimeUnit.SECONDS).handle((res, e) -> { + if (e != null) { + throw withCause(TransactionException::new, REPLICA_UNAVAILABLE_ERR, e); + } else { + if (res == null || res.getLeaseholder() == null) { Review Comment: Do we need the check whether leaseholder is null for the primary replica lease? 'null' leaseholder is possible only for empty lease. And in the same time you removed `null` check from `enlist` method. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -2328,19 +2345,25 @@ private CompletableFuture<Boolean> ensureReplicaIsPrimary(ReplicaRequest request } if (expectedTerm != null) { - return raftClient.refreshAndGetLeaderWithTerm() - .thenCompose(replicaAndTerm -> { - long currentTerm = replicaAndTerm.term(); - - if (expectedTerm == currentTerm) { - return completedFuture(null); + return placementDriver.getPrimaryReplica(replicationGroupId, hybridClock.now().addPhysicalTime(HybridTimestamp.CLOCK_SKEW)) + .thenCompose(primaryReplica -> { + long currentEnlistmentConsistencyToken = primaryReplica.getStartTime().longValue(); + + if (expectedTerm.equals(currentEnlistmentConsistencyToken)) { + if (primaryReplica.getExpirationTime().before(hybridClock.now())) { + return failedFuture( + new PrimaryReplicaMissException(expectedTerm, currentEnlistmentConsistencyToken)); Review Comment: are you going to add TODOs here to rename `term` variables? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org