denis-chudov commented on code in PR #1726:
URL: https://github.com/apache/ignite-3/pull/1726#discussion_r1130055745


##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/TopologyTracker.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import java.util.concurrent.atomic.AtomicReference;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * The class tracks a logical topology.
+ */
+public class TopologyTracker {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(TopologyTracker.class);
+
+    /** Topology service. */
+    private final LogicalTopologyService topologyService;
+
+    /** Logical topology snapshot. */
+    private AtomicReference<LogicalTopologySnapshot> topologySnapRef;
+
+    /** Listener to track topology updates. */
+    private final TopologyListener topologyListener;
+
+    /**
+     * The constructor.
+     *
+     * @param topologyService Topology service.
+     */
+    public TopologyTracker(LogicalTopologyService topologyService) {
+        this.topologyService = topologyService;
+
+        this.topologySnapRef = new AtomicReference<>();
+        this.topologyListener = new TopologyListener();
+    }
+
+    /**
+     * Initializes topology on start.
+     */
+    public void startTrack() {
+        topologyService.addEventListener(topologyListener);
+
+        topologyService.logicalTopologyOnLeader().thenAccept(topologySnap -> {
+            LogicalTopologySnapshot logicalTopologySnap0;
+
+            do {
+                logicalTopologySnap0 = topologySnapRef.get();
+
+                if (logicalTopologySnap0 != null && 
logicalTopologySnap0.version() >= topologySnap.version()) {
+                    break;
+                }
+            } while (!topologySnapRef.compareAndSet(logicalTopologySnap0, 
topologySnap));
+
+            LOG.info("Logical topology initialized for placement driver 
[topologySnap={}]", topologySnap);
+        });
+    }
+
+    /**
+     * Stops the tracker.
+     */
+    public void stopTrack() {
+        topologyService.removeEventListener(topologyListener);
+    }
+
+    /**
+     * Gets a node by consistent id from the logical topology.
+     *
+     * @param consistentId Node consistent id.
+     * @return Cluster node or {@code null} if topology has no a node with the 
consistent id.
+     */
+    public ClusterNode localTopologyNodeByConsistentId(String consistentId) {

Review Comment:
   I would suggest `nodeByConsistentIdFromLocalTopologySnapshot`



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.configuration.notifications.ConfigurationListener;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * The class tracks assignment of all replication groups.
+ */
+public class AssignmentsTracker {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(AssignmentsTracker.class);
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Tables configuration. */
+    private final TablesConfiguration tablesCfg;
+
+    /** Map replication group id to assignment nodes. */
+    private Map<ReplicationGroupId, Set<Assignment>> groupAssignments;
+
+    /** Assignment configuration listener. */
+    private final AssignmentsCfgListener assignmentsCfgListener;
+
+    /** Assignment Metastorage watch listener. */
+    private final AssignmentsListener assignmentsListener;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     * @param tablesCfg Table configuration.
+     */
+    public AssignmentsTracker(VaultManager vaultManager, MetaStorageManager 
msManager, TablesConfiguration tablesCfg) {
+        this.vaultManager = vaultManager;
+        this.msManager = msManager;
+        this.tablesCfg = tablesCfg;
+
+        this.groupAssignments = new ConcurrentHashMap<>();
+        this.assignmentsCfgListener = new AssignmentsCfgListener();
+        this.assignmentsListener = new AssignmentsListener();
+    }
+
+    /**
+     * Restores assignments form Vault and subscribers on further updates.
+     */
+    public void startTrack() {
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().listen(assignmentsCfgListener);
+        
msManager.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), 
assignmentsListener);
+
+        try (Cursor<VaultEntry> cursor = vaultManager.range(
+                ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
+                
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX))
+        )) {
+            for (VaultEntry entry : cursor) {
+                String key = entry.key().toString();
+
+                key = key.replace(STABLE_ASSIGNMENTS_PREFIX, "");
+
+                TablePartitionId grpId = TablePartitionId.fromString(key);
+
+                Set<Assignment> assignments = 
ByteUtils.fromBytes(entry.value());
+
+                groupAssignments.put(grpId, assignments);
+            }
+        }
+
+        LOG.info("Assignment cache initialized for placement driver 
[groupAssignments={}]", groupAssignments);
+    }
+
+    private static String incrementLastChar(String str) {
+        char lastChar = str.charAt(str.length() - 1);
+
+        return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
+    }
+
+    /**
+     * Stops the tracker.
+     */
+    public void stopTrack() {
+        msManager.unregisterWatch(assignmentsListener);
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().stopListen(assignmentsCfgListener);
+    }
+
+    /**
+     * Gets assignments.
+     *
+     * @return Map replication group id to its assignment.
+     */
+    public Map<ReplicationGroupId, Set<Assignment>> assignments() {
+        return groupAssignments;
+    }
+
+    /**
+     * Configuration assignments listener.
+     */
+    private class AssignmentsCfgListener implements 
ConfigurationListener<byte[]> {
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<byte[]> assignmentsCtx) {
+            ExtendedTableConfiguration tblCfg = 
assignmentsCtx.config(ExtendedTableConfiguration.class);
+
+            UUID tblId = tblCfg.id().value();
+
+            LOG.debug("Table assignments configuration update for placement 
driver [revision={}, tblId={}]",
+                    assignmentsCtx.storageRevision(), tblId);
+
+            List<Set<Assignment>> tableAssignments =
+                    assignmentsCtx.newValue() == null ? null : 
ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+            boolean leaseRenewalRequired = false;
+
+            for (int part = 0; part < tblCfg.partitions().value(); part++) {
+                var replicationGrpId = new TablePartitionId(tblId, part);
+
+                if (tableAssignments == null) {
+                    groupAssignments.remove(replicationGrpId);
+                } else {
+                    Set<Assignment> prevAssignment = 
groupAssignments.put(replicationGrpId, tableAssignments.get(part));
+
+                    if (CollectionUtils.nullOrEmpty(prevAssignment)) {
+                        leaseRenewalRequired = true;
+                    }
+                }
+            }
+
+            if (leaseRenewalRequired) {
+                triggerToRenewLeases();
+            }
+
+            return completedFuture(null);
+        }
+    }
+
+    /**
+     * Metastorage assignments watch.
+     */
+    private class AssignmentsListener implements WatchListener {
+        @Override
+        public void onUpdate(WatchEvent event) {
+            assert !event.entryEvent().newEntry().empty() : "New assignments 
are empty";
+
+            LOG.debug("Assignment update [revision={}, key={}]", 
event.revision(),
+                    new ByteArray(event.entryEvent().newEntry().key()));
+
+            boolean leaseRenewalRequired = false;
+
+            for (EntryEvent evt : event.entryEvents()) {
+                var replicationGrpId = TablePartitionId.fromString(
+                        new String(evt.newEntry().key(), 
StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, ""));
+
+                if (evt.newEntry().empty()) {
+                    groupAssignments.remove(replicationGrpId);
+                } else {
+                    Set<Assignment> prevAssignment = 
groupAssignments.put(replicationGrpId, 
ByteUtils.fromBytes(evt.newEntry().value()));
+
+                    if (CollectionUtils.nullOrEmpty(prevAssignment)) {
+                        leaseRenewalRequired = true;
+                    }
+                }
+            }
+
+            if (leaseRenewalRequired) {
+                triggerToRenewLeases();
+            }

Review Comment:
   same as for  `leaseRenewalRequired` flag in configuration listener



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.affinity.Assignment;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A processor to manger leases. The process is started when placement driver 
activates and stopped when it deactivates.
+ */
+public class LeaseUpdater {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaseUpdater.class);
+
+    /**
+     * Cluster cLock skew.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     */
+    private static final long CLOCK_SKEW = 7L;
+
+    /** Update attempts interval in milliseconds. */
+    private static final long UPDATE_LEASE_MS = 200L;
+
+    /** Lease holding interval. */
+    private static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Assignments tracker. */
+    private final AssignmentsTracker assignmentsTracker;
+
+    /** Topology tracker. */
+    private final TopologyTracker topologyTracker;
+
+    /** Lease tracker. */
+    private final LeaseTracker leaseTracker;
+
+    /** Cluster clock. */
+    private final HybridClock clock;
+
+    /** Closure to update leases. */
+    private final Updater updater;
+
+    /** Dedicated thread to update leases. */
+    private volatile Thread updaterTread;
+
+    /** Node name. */
+    private String nodeName;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     * @param topologyService Topology service.
+     * @param tablesConfiguration Tables configuration.
+     * @param leaseTracker Lease tracker.
+     * @param clock Cluster clock.
+     */
+    public LeaseUpdater(
+            VaultManager vaultManager,
+            MetaStorageManager msManager,
+            LogicalTopologyService topologyService,
+            TablesConfiguration tablesConfiguration,
+            LeaseTracker leaseTracker,
+            HybridClock clock
+    ) {
+        this.msManager = msManager;
+        this.leaseTracker = leaseTracker;
+        this.clock = clock;
+
+        this.assignmentsTracker = new AssignmentsTracker(vaultManager, 
msManager, tablesConfiguration);
+        this.topologyTracker = new TopologyTracker(topologyService);
+        this.updater = new Updater();
+    }
+
+    /**
+     * Initializes the class.
+     */
+    public void init(String nodeName) {
+        this.nodeName = nodeName;
+
+        topologyTracker.startTrack();
+        assignmentsTracker.startTrack();
+    }
+
+    /**
+     * De-initializes the class.
+     */
+    public void deInit() {
+        topologyTracker.stopTrack();
+        assignmentsTracker.stopTrack();
+    }
+
+    /**
+     * Activates a lease updater to renew leases.
+     */
+    public void activate() {
+        //TODO: IGNITE-18879 Implement lease maintenance.
+        updaterTread = new Thread(updater, 
NamedThreadFactory.threadPrefix(nodeName, "lease-updater"));
+
+        updaterTread.start();
+    }
+
+    /**
+     * Runnable to update lease in Metastorage.
+     */
+    private class Updater implements Runnable {
+        @Override
+        public void run() {
+            while (!updaterTread.isInterrupted()) {
+                for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : 
assignmentsTracker.assignments().entrySet()) {
+                    ReplicationGroupId grpId = entry.getKey();
+
+                    Lease lease = leaseTracker.getLease(grpId);
+
+                    HybridTimestamp now = clock.now();
+
+                    // Nothing holds the lease.
+                    if (lease.getLeaseExpirationTime() == null
+                            // The lease is near to expiration.
+                            || now.getPhysical() > 
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+                        var leaseKey = 
ByteArray.fromString(PLACEMENTDRIVER_PREFIX + grpId);
+
+                        var newTs = new HybridTimestamp(now.getPhysical() + 
LEASE_PERIOD, 0);
+
+                        ClusterNode candidate = 
nextLeaseHolder(entry.getValue());
+
+                        // Now nothing is holding the lease, and new lease is 
being granted.
+                        if (lease.getLeaseholder() == null && candidate != null
+                                // The lease is prolongation
+                                || lease.getLeaseholder() != null && 
lease.getLeaseholder().equals(candidate)
+                                // New lease is being granted, and the 
previous lease is expired.
+                                || candidate != null && 
(lease.getLeaseExpirationTime() == null
+                                || compareWithClockSkew(now, 
lease.getLeaseExpirationTime()) > 0)) {
+                            byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+                            Lease renewedLease = new Lease(candidate, newTs);
+
+                            msManager.invoke(
+                                    or(notExists(leaseKey), 
value(leaseKey).eq(leaseRaw)),
+                                    put(leaseKey, 
ByteUtils.toBytes(renewedLease)),
+                                    noop()
+                            );
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(UPDATE_LEASE_MS);
+                } catch (InterruptedException e) {
+                    LOG.warn("Lease updater is interrupted");
+                }
+            }
+        }
+    }
+
+    /**
+     * Compares two timestamps with the clock skew.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     *
+     * @param ts1 First timestamp.
+     * @param ts2 Second timestamp.
+     * @return Result of comparison.
+     */
+    private static int compareWithClockSkew(HybridTimestamp ts1, 
HybridTimestamp ts2) {
+        if (ts1.getPhysical() - CLOCK_SKEW < ts2.getPhysical() && 
ts1.getPhysical() + CLOCK_SKEW < ts2.getPhysical()) {

Review Comment:
   This condition is incorrect: if the second part is true, the first should be 
also true. Perhaps, you mean `>` in the second part.
   But actually, we can't claim the equality of two timestamps if this method 
returns `0`, in fact it means that we don't know did one happen before or after 
another or not. This this should be definitely mentioned because its 
counter-intuitive and contradicts with traditional comparator pattern.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Lease.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Metastorage.

Review Comment:
   ```suggestion
    * The real lease is stored in Meta storage.
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/Lease.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A lease representation in memory.
+ * The real lease is stored in Metastorage.
+ */
+public class Lease implements Serializable {
+    /** A node that holds a lease until {@code stopLeas}. */
+    private ClusterNode leaseholder;
+
+    /** Timestamp to expiration the lease. */
+    private HybridTimestamp leaseExpirationTime;
+
+    /**
+     * Default constructor.
+     */
+    public Lease() {
+        this(null, null);
+    }
+
+    /**
+     * The constructor.
+     *
+     * @param leaseholder Lease holder.
+     * @param leaseExpirationTime Lease expiration timestamp.
+     */
+    public Lease(ClusterNode leaseholder, HybridTimestamp leaseExpirationTime) 
{
+        this.leaseholder = leaseholder;
+        this.leaseExpirationTime = leaseExpirationTime;
+    }
+
+    /**
+     * Get a leaseholder node.
+     *
+     * @return Leaseholder or {@code null} if nothing holds the lease.
+     */
+    public ClusterNode getLeaseholder() {
+        return leaseholder;
+    }
+
+    /**
+     * Sets a leaseholder node.
+     *
+     * @param leaseholder Leaseholder node.
+     */
+    public void setLeaseholder(ClusterNode leaseholder) {
+        this.leaseholder = leaseholder;
+    }
+
+    /**
+     * Gets a lease expiration timestamp.
+     *
+     * @return Lease expiration timestamp or {@code null} if nothing holds the 
lease.
+     */
+    public HybridTimestamp getLeaseExpirationTime() {
+        return leaseExpirationTime;
+    }
+
+    /**
+     * Sets a lease expiration timestamp.
+     *
+     * @param leaseExpirationTime Lease expiration timestamp.
+     */
+    public void setLeaseExpirationTime(HybridTimestamp leaseExpirationTime) {
+        this.leaseExpirationTime = leaseExpirationTime;
+    }

Review Comment:
   Why do we need setters for the fields? Besides, they are never used. Better 
make these fields final.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.configuration.notifications.ConfigurationListener;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * The class tracks assignment of all replication groups.
+ */
+public class AssignmentsTracker {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(AssignmentsTracker.class);
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Tables configuration. */
+    private final TablesConfiguration tablesCfg;
+
+    /** Map replication group id to assignment nodes. */
+    private Map<ReplicationGroupId, Set<Assignment>> groupAssignments;
+
+    /** Assignment configuration listener. */
+    private final AssignmentsCfgListener assignmentsCfgListener;
+
+    /** Assignment Metastorage watch listener. */
+    private final AssignmentsListener assignmentsListener;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.

Review Comment:
   ```suggestion
        * @param msManager Meta storage manager.
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.configuration.notifications.ConfigurationListener;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * The class tracks assignment of all replication groups.
+ */
+public class AssignmentsTracker {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(AssignmentsTracker.class);
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Tables configuration. */
+    private final TablesConfiguration tablesCfg;
+
+    /** Map replication group id to assignment nodes. */
+    private Map<ReplicationGroupId, Set<Assignment>> groupAssignments;
+
+    /** Assignment configuration listener. */
+    private final AssignmentsCfgListener assignmentsCfgListener;
+
+    /** Assignment Metastorage watch listener. */
+    private final AssignmentsListener assignmentsListener;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     * @param tablesCfg Table configuration.
+     */
+    public AssignmentsTracker(VaultManager vaultManager, MetaStorageManager 
msManager, TablesConfiguration tablesCfg) {
+        this.vaultManager = vaultManager;
+        this.msManager = msManager;
+        this.tablesCfg = tablesCfg;
+
+        this.groupAssignments = new ConcurrentHashMap<>();
+        this.assignmentsCfgListener = new AssignmentsCfgListener();
+        this.assignmentsListener = new AssignmentsListener();
+    }
+
+    /**
+     * Restores assignments form Vault and subscribers on further updates.
+     */
+    public void startTrack() {
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().listen(assignmentsCfgListener);
+        
msManager.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), 
assignmentsListener);
+
+        try (Cursor<VaultEntry> cursor = vaultManager.range(
+                ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
+                
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX))
+        )) {
+            for (VaultEntry entry : cursor) {
+                String key = entry.key().toString();
+
+                key = key.replace(STABLE_ASSIGNMENTS_PREFIX, "");
+
+                TablePartitionId grpId = TablePartitionId.fromString(key);
+
+                Set<Assignment> assignments = 
ByteUtils.fromBytes(entry.value());
+
+                groupAssignments.put(grpId, assignments);
+            }
+        }
+
+        LOG.info("Assignment cache initialized for placement driver 
[groupAssignments={}]", groupAssignments);
+    }
+
+    private static String incrementLastChar(String str) {
+        char lastChar = str.charAt(str.length() - 1);
+
+        return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
+    }
+
+    /**
+     * Stops the tracker.
+     */
+    public void stopTrack() {
+        msManager.unregisterWatch(assignmentsListener);
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().stopListen(assignmentsCfgListener);
+    }
+
+    /**
+     * Gets assignments.
+     *
+     * @return Map replication group id to its assignment.
+     */
+    public Map<ReplicationGroupId, Set<Assignment>> assignments() {
+        return groupAssignments;
+    }
+
+    /**
+     * Configuration assignments listener.
+     */
+    private class AssignmentsCfgListener implements 
ConfigurationListener<byte[]> {
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<byte[]> assignmentsCtx) {
+            ExtendedTableConfiguration tblCfg = 
assignmentsCtx.config(ExtendedTableConfiguration.class);
+
+            UUID tblId = tblCfg.id().value();
+
+            LOG.debug("Table assignments configuration update for placement 
driver [revision={}, tblId={}]",
+                    assignmentsCtx.storageRevision(), tblId);
+
+            List<Set<Assignment>> tableAssignments =
+                    assignmentsCtx.newValue() == null ? null : 
ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+            boolean leaseRenewalRequired = false;
+
+            for (int part = 0; part < tblCfg.partitions().value(); part++) {
+                var replicationGrpId = new TablePartitionId(tblId, part);
+
+                if (tableAssignments == null) {
+                    groupAssignments.remove(replicationGrpId);
+                } else {
+                    Set<Assignment> prevAssignment = 
groupAssignments.put(replicationGrpId, tableAssignments.get(part));
+
+                    if (CollectionUtils.nullOrEmpty(prevAssignment)) {
+                        leaseRenewalRequired = true;
+                    }
+                }
+            }
+
+            if (leaseRenewalRequired) {
+                triggerToRenewLeases();
+            }

Review Comment:
   seems that `leaseRenewalRequired` flag should take into account assignments 
removal as well, if there were some.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.configuration.notifications.ConfigurationListener;
+import 
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.CollectionUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * The class tracks assignment of all replication groups.
+ */
+public class AssignmentsTracker {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(AssignmentsTracker.class);
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Tables configuration. */
+    private final TablesConfiguration tablesCfg;
+
+    /** Map replication group id to assignment nodes. */
+    private Map<ReplicationGroupId, Set<Assignment>> groupAssignments;
+
+    /** Assignment configuration listener. */
+    private final AssignmentsCfgListener assignmentsCfgListener;
+
+    /** Assignment Metastorage watch listener. */
+    private final AssignmentsListener assignmentsListener;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     * @param tablesCfg Table configuration.
+     */
+    public AssignmentsTracker(VaultManager vaultManager, MetaStorageManager 
msManager, TablesConfiguration tablesCfg) {
+        this.vaultManager = vaultManager;
+        this.msManager = msManager;
+        this.tablesCfg = tablesCfg;
+
+        this.groupAssignments = new ConcurrentHashMap<>();
+        this.assignmentsCfgListener = new AssignmentsCfgListener();
+        this.assignmentsListener = new AssignmentsListener();
+    }
+
+    /**
+     * Restores assignments form Vault and subscribers on further updates.
+     */
+    public void startTrack() {
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().listen(assignmentsCfgListener);
+        
msManager.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX), 
assignmentsListener);
+
+        try (Cursor<VaultEntry> cursor = vaultManager.range(
+                ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
+                
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX))
+        )) {
+            for (VaultEntry entry : cursor) {
+                String key = entry.key().toString();
+
+                key = key.replace(STABLE_ASSIGNMENTS_PREFIX, "");
+
+                TablePartitionId grpId = TablePartitionId.fromString(key);
+
+                Set<Assignment> assignments = 
ByteUtils.fromBytes(entry.value());
+
+                groupAssignments.put(grpId, assignments);
+            }
+        }
+
+        LOG.info("Assignment cache initialized for placement driver 
[groupAssignments={}]", groupAssignments);
+    }
+
+    private static String incrementLastChar(String str) {
+        char lastChar = str.charAt(str.length() - 1);
+
+        return str.substring(0, str.length() - 1) + (char) (lastChar + 1);
+    }
+
+    /**
+     * Stops the tracker.
+     */
+    public void stopTrack() {
+        msManager.unregisterWatch(assignmentsListener);
+        ((ExtendedTableConfiguration) 
tablesCfg.tables().any()).assignments().stopListen(assignmentsCfgListener);
+    }
+
+    /**
+     * Gets assignments.
+     *
+     * @return Map replication group id to its assignment.
+     */
+    public Map<ReplicationGroupId, Set<Assignment>> assignments() {
+        return groupAssignments;
+    }
+
+    /**
+     * Configuration assignments listener.
+     */
+    private class AssignmentsCfgListener implements 
ConfigurationListener<byte[]> {
+        @Override
+        public CompletableFuture<?> 
onUpdate(ConfigurationNotificationEvent<byte[]> assignmentsCtx) {
+            ExtendedTableConfiguration tblCfg = 
assignmentsCtx.config(ExtendedTableConfiguration.class);
+
+            UUID tblId = tblCfg.id().value();
+
+            LOG.debug("Table assignments configuration update for placement 
driver [revision={}, tblId={}]",
+                    assignmentsCtx.storageRevision(), tblId);
+
+            List<Set<Assignment>> tableAssignments =
+                    assignmentsCtx.newValue() == null ? null : 
ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+            boolean leaseRenewalRequired = false;
+
+            for (int part = 0; part < tblCfg.partitions().value(); part++) {
+                var replicationGrpId = new TablePartitionId(tblId, part);
+
+                if (tableAssignments == null) {
+                    groupAssignments.remove(replicationGrpId);
+                } else {
+                    Set<Assignment> prevAssignment = 
groupAssignments.put(replicationGrpId, tableAssignments.get(part));
+
+                    if (CollectionUtils.nullOrEmpty(prevAssignment)) {
+                        leaseRenewalRequired = true;
+                    }
+                }
+            }
+
+            if (leaseRenewalRequired) {
+                triggerToRenewLeases();
+            }
+
+            return completedFuture(null);
+        }
+    }
+
+    /**
+     * Metastorage assignments watch.

Review Comment:
   ```suggestion
        * Meta storage assignments watch.
   ```



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.affinity.Assignment;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A processor to manger leases. The process is started when placement driver 
activates and stopped when it deactivates.
+ */
+public class LeaseUpdater {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaseUpdater.class);
+
+    /**
+     * Cluster cLock skew.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     */
+    private static final long CLOCK_SKEW = 7L;
+
+    /** Update attempts interval in milliseconds. */
+    private static final long UPDATE_LEASE_MS = 200L;
+
+    /** Lease holding interval. */
+    private static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Assignments tracker. */
+    private final AssignmentsTracker assignmentsTracker;
+
+    /** Topology tracker. */
+    private final TopologyTracker topologyTracker;
+
+    /** Lease tracker. */
+    private final LeaseTracker leaseTracker;
+
+    /** Cluster clock. */
+    private final HybridClock clock;
+
+    /** Closure to update leases. */
+    private final Updater updater;
+
+    /** Dedicated thread to update leases. */
+    private volatile Thread updaterTread;
+
+    /** Node name. */
+    private String nodeName;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     * @param topologyService Topology service.
+     * @param tablesConfiguration Tables configuration.
+     * @param leaseTracker Lease tracker.
+     * @param clock Cluster clock.
+     */
+    public LeaseUpdater(
+            VaultManager vaultManager,
+            MetaStorageManager msManager,
+            LogicalTopologyService topologyService,
+            TablesConfiguration tablesConfiguration,
+            LeaseTracker leaseTracker,
+            HybridClock clock
+    ) {
+        this.msManager = msManager;
+        this.leaseTracker = leaseTracker;
+        this.clock = clock;
+
+        this.assignmentsTracker = new AssignmentsTracker(vaultManager, 
msManager, tablesConfiguration);
+        this.topologyTracker = new TopologyTracker(topologyService);
+        this.updater = new Updater();
+    }
+
+    /**
+     * Initializes the class.
+     */
+    public void init(String nodeName) {
+        this.nodeName = nodeName;
+
+        topologyTracker.startTrack();
+        assignmentsTracker.startTrack();
+    }
+
+    /**
+     * De-initializes the class.
+     */
+    public void deInit() {
+        topologyTracker.stopTrack();
+        assignmentsTracker.stopTrack();
+    }
+
+    /**
+     * Activates a lease updater to renew leases.
+     */
+    public void activate() {
+        //TODO: IGNITE-18879 Implement lease maintenance.
+        updaterTread = new Thread(updater, 
NamedThreadFactory.threadPrefix(nodeName, "lease-updater"));
+
+        updaterTread.start();
+    }
+
+    /**
+     * Runnable to update lease in Metastorage.
+     */
+    private class Updater implements Runnable {
+        @Override
+        public void run() {
+            while (!updaterTread.isInterrupted()) {
+                for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : 
assignmentsTracker.assignments().entrySet()) {
+                    ReplicationGroupId grpId = entry.getKey();
+
+                    Lease lease = leaseTracker.getLease(grpId);
+
+                    HybridTimestamp now = clock.now();
+
+                    // Nothing holds the lease.
+                    if (lease.getLeaseExpirationTime() == null
+                            // The lease is near to expiration.
+                            || now.getPhysical() > 
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+                        var leaseKey = 
ByteArray.fromString(PLACEMENTDRIVER_PREFIX + grpId);
+
+                        var newTs = new HybridTimestamp(now.getPhysical() + 
LEASE_PERIOD, 0);
+
+                        ClusterNode candidate = 
nextLeaseHolder(entry.getValue());
+
+                        // Now nothing is holding the lease, and new lease is 
being granted.
+                        if (lease.getLeaseholder() == null && candidate != null
+                                // The lease is prolongation
+                                || lease.getLeaseholder() != null && 
lease.getLeaseholder().equals(candidate)
+                                // New lease is being granted, and the 
previous lease is expired.
+                                || candidate != null && 
(lease.getLeaseExpirationTime() == null
+                                || compareWithClockSkew(now, 
lease.getLeaseExpirationTime()) > 0)) {
+                            byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+                            Lease renewedLease = new Lease(candidate, newTs);
+
+                            msManager.invoke(
+                                    or(notExists(leaseKey), 
value(leaseKey).eq(leaseRaw)),
+                                    put(leaseKey, 
ByteUtils.toBytes(renewedLease)),
+                                    noop()
+                            );
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(UPDATE_LEASE_MS);
+                } catch (InterruptedException e) {
+                    LOG.warn("Lease updater is interrupted");
+                }
+            }
+        }
+    }
+
+    /**
+     * Compares two timestamps with the clock skew.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     *
+     * @param ts1 First timestamp.
+     * @param ts2 Second timestamp.
+     * @return Result of comparison.
+     */
+    private static int compareWithClockSkew(HybridTimestamp ts1, 
HybridTimestamp ts2) {
+        if (ts1.getPhysical() - CLOCK_SKEW < ts2.getPhysical() && 
ts1.getPhysical() + CLOCK_SKEW < ts2.getPhysical()) {
+            return 0;
+        }
+
+        return ts1.compareTo(ts2);
+    }
+
+    /**
+     * Finds a node that can be the leaseholder.
+     *
+     * @param assignments Replication group assignment.
+     * @return Cluster node, or {@code null} if no node in assignments can be 
the leaseholder.
+     */
+    private ClusterNode nextLeaseHolder(Set<Assignment> assignments) {
+        //TODO: IGNITE-18879 Implement more intellectual algorithm to choose a 
node.
+        for (Assignment assignment : assignments) {
+            ClusterNode candidate = 
topologyTracker.localTopologyNodeByConsistentId(assignment.consistentId());
+
+            if (candidate != null) {
+                return candidate;
+            }
+        }
+
+        return null;
+    }
+
+
+    /**
+     * Stops a dedicated thread to renew or assign leases.
+     */
+    public void deactivate() {

Review Comment:
   pls put this method after `activate` method.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.affinity.Assignment;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A processor to manger leases. The process is started when placement driver 
activates and stopped when it deactivates.
+ */
+public class LeaseUpdater {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaseUpdater.class);
+
+    /**
+     * Cluster cLock skew.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     */
+    private static final long CLOCK_SKEW = 7L;
+
+    /** Update attempts interval in milliseconds. */
+    private static final long UPDATE_LEASE_MS = 200L;
+
+    /** Lease holding interval. */
+    private static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Assignments tracker. */
+    private final AssignmentsTracker assignmentsTracker;
+
+    /** Topology tracker. */
+    private final TopologyTracker topologyTracker;
+
+    /** Lease tracker. */
+    private final LeaseTracker leaseTracker;
+
+    /** Cluster clock. */
+    private final HybridClock clock;
+
+    /** Closure to update leases. */
+    private final Updater updater;
+
+    /** Dedicated thread to update leases. */
+    private volatile Thread updaterTread;
+
+    /** Node name. */
+    private String nodeName;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     * @param topologyService Topology service.
+     * @param tablesConfiguration Tables configuration.
+     * @param leaseTracker Lease tracker.
+     * @param clock Cluster clock.
+     */
+    public LeaseUpdater(
+            VaultManager vaultManager,
+            MetaStorageManager msManager,
+            LogicalTopologyService topologyService,
+            TablesConfiguration tablesConfiguration,
+            LeaseTracker leaseTracker,
+            HybridClock clock
+    ) {
+        this.msManager = msManager;
+        this.leaseTracker = leaseTracker;
+        this.clock = clock;
+
+        this.assignmentsTracker = new AssignmentsTracker(vaultManager, 
msManager, tablesConfiguration);
+        this.topologyTracker = new TopologyTracker(topologyService);
+        this.updater = new Updater();
+    }
+
+    /**
+     * Initializes the class.
+     */
+    public void init(String nodeName) {
+        this.nodeName = nodeName;
+
+        topologyTracker.startTrack();
+        assignmentsTracker.startTrack();
+    }
+
+    /**
+     * De-initializes the class.
+     */
+    public void deInit() {
+        topologyTracker.stopTrack();
+        assignmentsTracker.stopTrack();
+    }
+
+    /**
+     * Activates a lease updater to renew leases.
+     */
+    public void activate() {
+        //TODO: IGNITE-18879 Implement lease maintenance.
+        updaterTread = new Thread(updater, 
NamedThreadFactory.threadPrefix(nodeName, "lease-updater"));
+
+        updaterTread.start();
+    }
+
+    /**
+     * Runnable to update lease in Metastorage.
+     */
+    private class Updater implements Runnable {

Review Comment:
   pls move the `Updater` class to the bottom of the `LeaseUpdater` class.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.affinity.Assignment;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A processor to manger leases. The process is started when placement driver 
activates and stopped when it deactivates.
+ */
+public class LeaseUpdater {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaseUpdater.class);
+
+    /**
+     * Cluster cLock skew.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     */
+    private static final long CLOCK_SKEW = 7L;
+
+    /** Update attempts interval in milliseconds. */
+    private static final long UPDATE_LEASE_MS = 200L;
+
+    /** Lease holding interval. */
+    private static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Assignments tracker. */
+    private final AssignmentsTracker assignmentsTracker;
+
+    /** Topology tracker. */
+    private final TopologyTracker topologyTracker;
+
+    /** Lease tracker. */
+    private final LeaseTracker leaseTracker;
+
+    /** Cluster clock. */
+    private final HybridClock clock;
+
+    /** Closure to update leases. */
+    private final Updater updater;
+
+    /** Dedicated thread to update leases. */
+    private volatile Thread updaterTread;
+
+    /** Node name. */
+    private String nodeName;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     * @param topologyService Topology service.
+     * @param tablesConfiguration Tables configuration.
+     * @param leaseTracker Lease tracker.
+     * @param clock Cluster clock.
+     */
+    public LeaseUpdater(
+            VaultManager vaultManager,
+            MetaStorageManager msManager,
+            LogicalTopologyService topologyService,
+            TablesConfiguration tablesConfiguration,
+            LeaseTracker leaseTracker,
+            HybridClock clock
+    ) {
+        this.msManager = msManager;
+        this.leaseTracker = leaseTracker;
+        this.clock = clock;
+
+        this.assignmentsTracker = new AssignmentsTracker(vaultManager, 
msManager, tablesConfiguration);
+        this.topologyTracker = new TopologyTracker(topologyService);
+        this.updater = new Updater();
+    }
+
+    /**
+     * Initializes the class.
+     */
+    public void init(String nodeName) {
+        this.nodeName = nodeName;
+
+        topologyTracker.startTrack();
+        assignmentsTracker.startTrack();
+    }
+
+    /**
+     * De-initializes the class.
+     */
+    public void deInit() {
+        topologyTracker.stopTrack();
+        assignmentsTracker.stopTrack();
+    }
+
+    /**
+     * Activates a lease updater to renew leases.
+     */
+    public void activate() {
+        //TODO: IGNITE-18879 Implement lease maintenance.
+        updaterTread = new Thread(updater, 
NamedThreadFactory.threadPrefix(nodeName, "lease-updater"));
+
+        updaterTread.start();
+    }
+
+    /**
+     * Runnable to update lease in Metastorage.
+     */
+    private class Updater implements Runnable {
+        @Override
+        public void run() {
+            while (!updaterTread.isInterrupted()) {
+                for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : 
assignmentsTracker.assignments().entrySet()) {
+                    ReplicationGroupId grpId = entry.getKey();
+
+                    Lease lease = leaseTracker.getLease(grpId);
+
+                    HybridTimestamp now = clock.now();
+
+                    // Nothing holds the lease.
+                    if (lease.getLeaseExpirationTime() == null
+                            // The lease is near to expiration.
+                            || now.getPhysical() > 
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+                        var leaseKey = 
ByteArray.fromString(PLACEMENTDRIVER_PREFIX + grpId);
+
+                        var newTs = new HybridTimestamp(now.getPhysical() + 
LEASE_PERIOD, 0);
+
+                        ClusterNode candidate = 
nextLeaseHolder(entry.getValue());
+
+                        // Now nothing is holding the lease, and new lease is 
being granted.
+                        if (lease.getLeaseholder() == null && candidate != null
+                                // The lease is prolongation
+                                || lease.getLeaseholder() != null && 
lease.getLeaseholder().equals(candidate)
+                                // New lease is being granted, and the 
previous lease is expired.
+                                || candidate != null && 
(lease.getLeaseExpirationTime() == null
+                                || compareWithClockSkew(now, 
lease.getLeaseExpirationTime()) > 0)) {
+                            byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+                            Lease renewedLease = new Lease(candidate, newTs);
+
+                            msManager.invoke(
+                                    or(notExists(leaseKey), 
value(leaseKey).eq(leaseRaw)),
+                                    put(leaseKey, 
ByteUtils.toBytes(renewedLease)),
+                                    noop()
+                            );
+                        }
+                    }
+                }
+
+                try {
+                    Thread.sleep(UPDATE_LEASE_MS);
+                } catch (InterruptedException e) {
+                    LOG.warn("Lease updater is interrupted");
+                }
+            }
+        }
+    }
+
+    /**
+     * Compares two timestamps with the clock skew.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     *

Review Comment:
   Javadoc should describe what exactly this method does.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.EntryEvent;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.WatchEvent;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import 
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.vault.VaultEntry;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+
+/**
+ * Class tracks cluster leases in memory.
+ * At first, the class state recoveries from Vault, then updates on watch's 
listener.
+ */
+public class LeaseTracker {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaseTracker.class);
+
+    /** Vault manager. */
+    private final VaultManager vaultManager;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Leases cache. */
+    private final Map<ReplicationGroupId, Lease> leases;
+
+    /** Listener to update a leases cache. */
+    private final UpdateListener updateListener = new UpdateListener();
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     */
+    public LeaseTracker(VaultManager vaultManager, MetaStorageManager 
msManager) {
+        this.vaultManager = vaultManager;
+        this.msManager = msManager;
+
+        this.leases = new ConcurrentHashMap<>();
+    }
+
+    /**
+     * Recoveries state from Vault and subscribers on further updates.
+     */
+    public void startTrack() {
+        
msManager.registerPrefixWatch(ByteArray.fromString(PLACEMENTDRIVER_PREFIX), 
updateListener);
+
+        try (Cursor<VaultEntry> cursor = vaultManager.range(
+                ByteArray.fromString(PLACEMENTDRIVER_PREFIX),
+                ByteArray.fromString(incrementLastChar(PLACEMENTDRIVER_PREFIX))
+        )) {
+            for (VaultEntry entry : cursor) {
+                String key = entry.key().toString();
+
+                key = key.replace(PLACEMENTDRIVER_PREFIX, "");
+
+                TablePartitionId grpId = TablePartitionId.fromString(key);
+                Lease lease = ByteUtils.fromBytes(entry.value());
+
+                leases.put(grpId, lease);
+            }
+        }
+
+        LOG.info("Leases cache recovered [leases={}]", leases);
+    }
+
+    /**
+     * Stops the tracker.
+     */
+    public void stopTrack() {
+        msManager.unregisterWatch(updateListener);
+    }
+
+    /**
+     * Get a lease for a particular group.
+     *
+     * @param grpId Replication group id.
+     * @return A lease is associated with the group.
+     */
+    public Lease getLease(ReplicationGroupId grpId) {
+        return leases.getOrDefault(grpId, new Lease());

Review Comment:
   this logic with getOrDefault is kind of unobvious.



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
+import static org.apache.ignite.internal.metastorage.dsl.Conditions.value;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_PREFIX;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.affinity.Assignment;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * A processor to manger leases. The process is started when placement driver 
activates and stopped when it deactivates.
+ */
+public class LeaseUpdater {
+    /** Ignite logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(LeaseUpdater.class);
+
+    /**
+     * Cluster cLock skew.
+     * TODO: IGNITE-18978 Method to comparison timestamps with clock skew.
+     */
+    private static final long CLOCK_SKEW = 7L;
+
+    /** Update attempts interval in milliseconds. */
+    private static final long UPDATE_LEASE_MS = 200L;
+
+    /** Lease holding interval. */
+    private static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
+
+    /** Metastorage manager. */
+    private final MetaStorageManager msManager;
+
+    /** Assignments tracker. */
+    private final AssignmentsTracker assignmentsTracker;
+
+    /** Topology tracker. */
+    private final TopologyTracker topologyTracker;
+
+    /** Lease tracker. */
+    private final LeaseTracker leaseTracker;
+
+    /** Cluster clock. */
+    private final HybridClock clock;
+
+    /** Closure to update leases. */
+    private final Updater updater;
+
+    /** Dedicated thread to update leases. */
+    private volatile Thread updaterTread;
+
+    /** Node name. */
+    private String nodeName;
+
+    /**
+     * The constructor.
+     *
+     * @param vaultManager Vault manager.
+     * @param msManager Metastorage manager.
+     * @param topologyService Topology service.
+     * @param tablesConfiguration Tables configuration.
+     * @param leaseTracker Lease tracker.
+     * @param clock Cluster clock.
+     */
+    public LeaseUpdater(
+            VaultManager vaultManager,
+            MetaStorageManager msManager,
+            LogicalTopologyService topologyService,
+            TablesConfiguration tablesConfiguration,
+            LeaseTracker leaseTracker,
+            HybridClock clock
+    ) {
+        this.msManager = msManager;
+        this.leaseTracker = leaseTracker;
+        this.clock = clock;
+
+        this.assignmentsTracker = new AssignmentsTracker(vaultManager, 
msManager, tablesConfiguration);
+        this.topologyTracker = new TopologyTracker(topologyService);
+        this.updater = new Updater();
+    }
+
+    /**
+     * Initializes the class.
+     */
+    public void init(String nodeName) {
+        this.nodeName = nodeName;
+
+        topologyTracker.startTrack();
+        assignmentsTracker.startTrack();
+    }
+
+    /**
+     * De-initializes the class.
+     */
+    public void deInit() {
+        topologyTracker.stopTrack();
+        assignmentsTracker.stopTrack();
+    }
+
+    /**
+     * Activates a lease updater to renew leases.
+     */
+    public void activate() {
+        //TODO: IGNITE-18879 Implement lease maintenance.
+        updaterTread = new Thread(updater, 
NamedThreadFactory.threadPrefix(nodeName, "lease-updater"));
+
+        updaterTread.start();
+    }
+
+    /**
+     * Runnable to update lease in Metastorage.
+     */
+    private class Updater implements Runnable {
+        @Override
+        public void run() {
+            while (!updaterTread.isInterrupted()) {
+                for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry : 
assignmentsTracker.assignments().entrySet()) {
+                    ReplicationGroupId grpId = entry.getKey();
+
+                    Lease lease = leaseTracker.getLease(grpId);
+
+                    HybridTimestamp now = clock.now();
+
+                    // Nothing holds the lease.
+                    if (lease.getLeaseExpirationTime() == null
+                            // The lease is near to expiration.
+                            || now.getPhysical() > 
(lease.getLeaseExpirationTime().getPhysical() - LEASE_PERIOD / 2)) {
+                        var leaseKey = 
ByteArray.fromString(PLACEMENTDRIVER_PREFIX + grpId);
+
+                        var newTs = new HybridTimestamp(now.getPhysical() + 
LEASE_PERIOD, 0);
+
+                        ClusterNode candidate = 
nextLeaseHolder(entry.getValue());
+
+                        // Now nothing is holding the lease, and new lease is 
being granted.
+                        if (lease.getLeaseholder() == null && candidate != null
+                                // The lease is prolongation
+                                || lease.getLeaseholder() != null && 
lease.getLeaseholder().equals(candidate)
+                                // New lease is being granted, and the 
previous lease is expired.
+                                || candidate != null && 
(lease.getLeaseExpirationTime() == null
+                                || compareWithClockSkew(now, 
lease.getLeaseExpirationTime()) > 0)) {

Review Comment:
   This condition is too complicated and the part with comparing with clock 
skew is a final nail in the coffin.
   First, seems that the candidate should be not null in any case, why several 
comparisons?
   Does anything actually depend on the current leaseholder?
   Some parts, for example related to lease expiration time, could be assigned 
to separate variables.
   And are you sure that we need to implement this right now instead of putting 
a todo with some of maintenance tickets?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to