denis-chudov commented on code in PR #1871:
URL: https://github.com/apache/ignite-3/pull/1871#discussion_r1165518955
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -99,10 +114,12 @@ public LeaseUpdater(
LeaseTracker leaseTracker,
HybridClock clock
) {
+ this.clusterService = clusterService;
this.msManager = msManager;
this.leaseTracker = leaseTracker;
this.clock = clock;
+ this.longLeasePeriod =
IgniteSystemProperties.getLong("IGNITE_LONG_LEASE", 3_600_000L);
Review Comment:
I thought we agreed on 2 minutes during the last discussion. Not very
important though, but 1 hour may be too long, don't you think?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -181,22 +210,20 @@ public void run() {
HybridTimestamp now = clock.now();
- // Nothing holds the lease.
- if (lease == EMPTY_LEASE
- // The lease is near to expiration.
- || now.getPhysical() >
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+ // The lease is expired or near to the one.
+ if (now.getPhysical() >
(lease.getExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
ClusterNode candidate =
nextLeaseHolder(entry.getValue());
if (candidate == null) {
continue;
}
- if (isReplicationGroupUpdateLeaseholder(lease,
candidate)) {
- updateLeaseInMetaStorage(grpId, lease, candidate);
+ if (isLeaseIsOutdated(lease)) {
// New lease is granting.
- } else if (candidate.equals(lease.getLeaseholder())) {
- updateLeaseInMetaStorage(grpId, lease, candidate);
+ writeNewLeasInMetaStorage(grpId, lease, candidate);
+ } else if (lease.isAccepted() &&
candidate.equals(lease.getLeaseholder())) {
// Old lease is renewing.
+ prolongLeaseInMetaStorage(grpId, lease);
Review Comment:
These methods don't do only the changes to meta storage. Please rename them
to, let say, `grantNewLease` and `prolongLease`.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -181,22 +210,20 @@ public void run() {
HybridTimestamp now = clock.now();
- // Nothing holds the lease.
- if (lease == EMPTY_LEASE
- // The lease is near to expiration.
- || now.getPhysical() >
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+ // The lease is expired or near to the one.
Review Comment:
```suggestion
// The lease is expired or close to this.
```
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/conciliation/LeaseAgreement.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.conciliation;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+
+/**
+ * The agreement is formed from {@link
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse}.
Review Comment:
Seems full package here is not needed
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/conciliation/LeaseConciliator.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.conciliation;
+
+import static
org.apache.ignite.internal.placementdriver.LeaseUpdater.LEASE_PERIOD;
+import static
org.apache.ignite.internal.placementdriver.conciliation.LeaseAgreement.UNDEFINED_AGREEMENT;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.LeaseUpdater;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.network.ClusterService;
+
+/**
+ * This class conciliates a lease with leaseholder. If the lease is
conciliated, it is ready available to accept.
+ */
+public class LeaseConciliator {
Review Comment:
In fact, as the conciliator is not a third party, but a part of active
actor, it should be named according its function that it performs on behalf of
placement driver. I would say that this is a lease negotiator, because it
negotiates the leases with the candidates.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -54,7 +58,13 @@ public class LeaseUpdater {
private static final long UPDATE_LEASE_MS = 200L;
/** Lease holding interval. */
- private static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
+ public static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
Review Comment:
maybe `LEASE_INTERVAL` as we got used to this word?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/conciliation/LeaseAgreement.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.conciliation;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse;
+
+/**
+ * The agreement is formed from {@link
org.apache.ignite.internal.placementdriver.message.LeaseGrantedMessageResponse}.
+ */
+public class LeaseAgreement {
+ /** The agreement, which has not try conciliating yet. */
+ public static final LeaseAgreement UNDEFINED_AGREEMENT = new
LeaseAgreement(null, completedFuture(null));
+
+ /** Lease. */
+ private final Lease lease;
+
+ /** Future to {@link LeaseGrantedMessageResponse} response. */
+ private final CompletableFuture<LeaseGrantedMessageResponse> responseFut;
+
+ /**
+ * The constructor.
+ *
+ * @param lease Lease.
+ * @param remoteNodeResponseFuture The future of response from the remote
node which is conciliating the agreement.
+ */
+ public LeaseAgreement(Lease lease,
CompletableFuture<LeaseGrantedMessageResponse> remoteNodeResponseFuture) {
+ this.lease = lease;
+ this.responseFut = remoteNodeResponseFuture;
+ }
+
+ /**
+ * Gets a lease about which the leaseholder was notified.
+ *
+ * @return Lease.
+ */
+ public Lease getLease() {
+ return lease;
+ }
+
+ /**
+ * Gets a accepted flag. The flag is true, when the lease is accepted by
leaseholder.
+ *
+ * @return Accepted flag.
+ */
+ public boolean isAccepted() {
+ if (!responseFut.isDone()) {
+ return false;
+ }
+
+ LeaseGrantedMessageResponse resp = responseFut.join();
+
+ if (resp != null) {
+ return resp.accepted();
+ }
+
+ return false;
+ }
+
+ /**
+ * The property is {@code null} if the lease is accepted and not {@code
null} if the leaseholder does not apply the lease and proposes
+ * the other node.
+ *
+ * @return Node id to propose a lease.
+ */
+ public String getRedirectTo() {
Review Comment:
Seems this method is not used anywhere. Without use cases, I can't tell if
we should return false in case when there was no answer yet, or throw exception.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Meta storage.
+ */
+public class Lease implements Serializable {
+ /** The object is used when nothing holds the lease. Empty lease is always
expired. */
+ public static Lease EMPTY_LEASE = new Lease(null, MIN_VALUE, MIN_VALUE,
false);
+
+ /** A node that holds a lease until {@code stopLeas}. */
+ private final ClusterNode leaseholder;
+
+ /** The lease is accepted, when the holder knows about it and applies all
related obligations. */
+ private final boolean accepted;
+
+ /** Lease start timestamp. The timestamp is assigned when the lease
created and is not changed when the lease is prolonged. */
+ private final HybridTimestamp startTime;
+
+ /** Timestamp to expiration the lease. */
+ private final HybridTimestamp expirationTime;
+
+ /**
+ * Creates a new lease.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ */
+ public Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime) {
+ this(leaseholder, startTime, leaseExpirationTime, false);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ * @param accepted The flag is true when the holder accepted the lease,
the false otherwise.
+ */
+ private Lease(ClusterNode leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime, boolean accepted) {
+ this.leaseholder = leaseholder;
+ this.expirationTime = leaseExpirationTime;
+ this.startTime = startTime;
+ this.accepted = accepted;
+ }
+
+ /**
+ * Prolongs a lease until new timestamp. Only an accepted lease can be
prolonged.
+ *
+ * @param to The new lease expiration timestamp.
+ * @return A new lease which will have the same properties except of
expiration timestamp.
+ */
+ public Lease prolongLease(HybridTimestamp to) {
+ assert accepted : "The lease should be accepted by leaseholder before
prolongation ["
Review Comment:
Not sure about this. There may be a case:
- unaccepted lease is saved to meta storage
- active actor is changed
- new active actor begins the negotiation on this lease with the candidate.
To do this, it needs to perform successful meta storage invoke first, to
minimize the races on this lease with old active actor. During this invoke, it
should prolong the lease to avoid ABA problem.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -210,39 +237,88 @@ public void run() {
}
/**
- * Writes a lease in Meta storage.
+ * Writes a new lease in Meta storage.
*
* @param grpId Replication group id.
* @param lease Old lease to apply CAS in Meta storage.
* @param candidate Lease candidate.
*/
- private void updateLeaseInMetaStorage(ReplicationGroupId grpId, Lease
lease, ClusterNode candidate) {
+ private void writeNewLeasInMetaStorage(ReplicationGroupId grpId, Lease
lease, ClusterNode candidate) {
var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
- var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_PERIOD, 0);
+
+ HybridTimestamp startTs = clock.now();
+
+ var expirationTs = new HybridTimestamp(startTs.getPhysical() +
longLeasePeriod, 0);
byte[] leaseRaw = ByteUtils.toBytes(lease);
- Lease renewedLease = new Lease(candidate, newTs);
+ Lease renewedLease = new Lease(candidate, startTs, expirationTs);
msManager.invoke(
or(notExists(leaseKey), value(leaseKey).eq(leaseRaw)),
put(leaseKey, ByteUtils.toBytes(renewedLease)),
noop()
+ ).thenAccept(isCreated -> {
+ if (isCreated) {
+ boolean force = candidate.equals(lease.getLeaseholder());
+
+ leaseConciliator.conciliate(grpId, renewedLease, force);
+ }
+ });
+ }
+
+ /**
+ * Writes a prolong lease in Meta storage.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to prolong.
+ */
+ private void prolongLeaseInMetaStorage(ReplicationGroupId grpId, Lease
lease) {
+ var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
+ var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_PERIOD, 0);
+
+ byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+ Lease renewedLease = lease.prolongLease(newTs);
+
+ msManager.invoke(
+ value(leaseKey).eq(leaseRaw),
+ put(leaseKey, ByteUtils.toBytes(renewedLease)),
+ noop()
+ );
+ }
+
+ /**
+ * Writes an accepted lease in Meta storage.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to accept.
+ */
+ private void acceptLeaseInMetaStorage(ReplicationGroupId grpId, Lease
lease) {
Review Comment:
maybe just `publishLease`?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java:
##########
@@ -195,10 +206,17 @@ private void
withRaftClientIfPresent(Consumer<TopologyAwareRaftGroupService> clo
}
private void onLeaderChange(ClusterNode leader, Long term) {
- if (leader.equals(clusterService.topologyService().localMember())) {
- takeOverActiveActor();
- } else {
- stepDownActiveActor();
+ if (!busyLock.enterBusy()) {
Review Comment:
Seems that `takeOverActiveActor` and `stepDownActiveActor` should also be
performed within busy locks.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -181,22 +210,20 @@ public void run() {
HybridTimestamp now = clock.now();
- // Nothing holds the lease.
- if (lease == EMPTY_LEASE
- // The lease is near to expiration.
- || now.getPhysical() >
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+ // The lease is expired or near to the one.
+ if (now.getPhysical() >
(lease.getExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
Review Comment:
Please use the methods for time comparison with clock skew. I would suggest
to calculate a value of HybridTimestamp before the cycle - some
`outdatedLeaseThreshold` - and compare all expiration times with this threshold
using `HybridTimestamp#after`.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseUpdateClosure.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+
+/**
+ * Closure to notify about a lease update.
+ */
+public interface LeaseUpdateClosure {
Review Comment:
it is not used anymore.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -210,39 +237,88 @@ public void run() {
}
/**
- * Writes a lease in Meta storage.
+ * Writes a new lease in Meta storage.
*
* @param grpId Replication group id.
* @param lease Old lease to apply CAS in Meta storage.
* @param candidate Lease candidate.
*/
- private void updateLeaseInMetaStorage(ReplicationGroupId grpId, Lease
lease, ClusterNode candidate) {
+ private void writeNewLeasInMetaStorage(ReplicationGroupId grpId, Lease
lease, ClusterNode candidate) {
var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
- var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_PERIOD, 0);
+
+ HybridTimestamp startTs = clock.now();
+
+ var expirationTs = new HybridTimestamp(startTs.getPhysical() +
longLeasePeriod, 0);
byte[] leaseRaw = ByteUtils.toBytes(lease);
- Lease renewedLease = new Lease(candidate, newTs);
+ Lease renewedLease = new Lease(candidate, startTs, expirationTs);
msManager.invoke(
or(notExists(leaseKey), value(leaseKey).eq(leaseRaw)),
put(leaseKey, ByteUtils.toBytes(renewedLease)),
noop()
+ ).thenAccept(isCreated -> {
+ if (isCreated) {
+ boolean force = candidate.equals(lease.getLeaseholder());
+
+ leaseConciliator.conciliate(grpId, renewedLease, force);
+ }
+ });
+ }
+
+ /**
+ * Writes a prolong lease in Meta storage.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to prolong.
+ */
+ private void prolongLeaseInMetaStorage(ReplicationGroupId grpId, Lease
lease) {
+ var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
+ var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_PERIOD, 0);
+
+ byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+ Lease renewedLease = lease.prolongLease(newTs);
+
+ msManager.invoke(
+ value(leaseKey).eq(leaseRaw),
+ put(leaseKey, ByteUtils.toBytes(renewedLease)),
+ noop()
+ );
+ }
+
+ /**
+ * Writes an accepted lease in Meta storage.
+ *
+ * @param grpId Replication group id.
+ * @param lease Lease to accept.
+ */
+ private void acceptLeaseInMetaStorage(ReplicationGroupId grpId, Lease
lease) {
+ var leaseKey = ByteArray.fromString(PLACEMENTDRIVER_PREFIX +
grpId);
+ var newTs = new HybridTimestamp(clock.now().getPhysical() +
LEASE_PERIOD, 0);
+
+ byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+ Lease renewedLease = lease.acceptLease(newTs);
+
+ msManager.invoke(
+ value(leaseKey).eq(leaseRaw),
+ put(leaseKey, ByteUtils.toBytes(renewedLease)),
+ noop()
);
}
/**
- * Checks that a leaseholder candidate can take a lease on the
replication group.
+ * Checks that the lease is outdated.
*
* @param lease Lease.
- * @param candidate The node is a leaseholder candidate.
* @return True when the candidate can be a leaseholder, otherwise
false.
*/
- private boolean isReplicationGroupUpdateLeaseholder(Lease lease,
ClusterNode candidate) {
+ private boolean isLeaseIsOutdated(Lease lease) {
Review Comment:
Actually, this returns true not only if the lease is outdated, but when
there is no lease at all (due to EMPTY_LEASE). Please rename this to
`isLeaseIsOutdatedOrNonExistent` and explain in javadoc why the logic for both
cases should be the same: we can't prolong the expired lease because we already
have an interval of time when the lease was not active, so we must start ne
negotiation round from the beginning; the same we do for the groups that don't
have leaseholders at all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]