denis-chudov commented on code in PR #2141:
URL: https://github.com/apache/ignite-3/pull/2141#discussion_r1252358663
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java:
##########
@@ -89,32 +96,30 @@ public LeaseTracker(VaultManager vaultManager,
MetaStorageManager msManager) {
this.vaultManager = vaultManager;
this.msManager = msManager;
- this.leases = new ConcurrentHashMap<>();
+ this.leases = new
ConcurrentSkipListMap<>(Comparator.comparing(Object::toString));
this.primaryReplicaWaiters = new ConcurrentHashMap<>();
}
/**
* Recoveries state from Vault and subscribers on further updates.
*/
public void startTrack() {
-
msManager.registerPrefixWatch(ByteArray.fromString(PLACEMENTDRIVER_PREFIX),
updateListener);
+ msManager.registerPrefixWatch(PLACEMENTDRIVER_LEASES_KEY,
updateListener);
try (Cursor<VaultEntry> cursor = vaultManager.range(
- ByteArray.fromString(PLACEMENTDRIVER_PREFIX),
- ByteArray.fromString(incrementLastChar(PLACEMENTDRIVER_PREFIX))
+ PLACEMENTDRIVER_LEASES_KEY,
+
ByteArray.fromString(incrementLastChar(PLACEMENTDRIVER_LEASES_KEY_STRING))
)) {
for (VaultEntry entry : cursor) {
- String key = entry.key().toString();
-
- key = key.replace(PLACEMENTDRIVER_PREFIX, "");
-
- TablePartitionId grpId = TablePartitionId.fromString(key);
- Lease lease = fromBytes(entry.value());
-
- leases.put(grpId, lease);
-
- primaryReplicaWaiters.computeIfAbsent(grpId, groupId -> new
PendingIndependentComparableValuesTracker<>(MIN_VALUE))
- .update(lease.getExpirationTime(), lease);
+ leases.clear();
Review Comment:
I have rewritten the leases cache.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java:
##########
@@ -305,14 +328,33 @@ public void run() {
// leaseholders at all.
if (isLeaseOutdated(lease)) {
// New lease is granting.
- writeNewLeaseInMetaStorage(grpId, lease,
candidate);
+ writeNewLeaseInMetaStorage(grpId, lease,
candidate, renewedLeases, toBeNegotiated);
} else if (lease.isProlongable() &&
candidate.name().equals(lease.getLeaseholder())) {
// Old lease is renewing.
- prolongLeaseInMetaStorage(grpId, lease);
+ prolongLeaseInMetaStorage(grpId, lease,
renewedLeases);
}
}
}
+ byte[] renewedValue = new
LeaseBatch(renewedLeases.values()).bytes();
+
+ var key = PLACEMENTDRIVER_LEASES_KEY;
+
+ msManager.invoke(
+ or(notExists(key), value(key).eq(new
LeaseBatch(leaseTracker.leasesCurrent()).bytes())),
Review Comment:
I have rewritten the leases cache.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]