vldpyatkov commented on code in PR #2141:
URL: https://github.com/apache/ignite-3/pull/2141#discussion_r1251068220
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java:
##########
@@ -89,32 +96,30 @@ public LeaseTracker(VaultManager vaultManager,
MetaStorageManager msManager) {
this.vaultManager = vaultManager;
this.msManager = msManager;
- this.leases = new ConcurrentHashMap<>();
+ this.leases = new
ConcurrentSkipListMap<>(Comparator.comparing(Object::toString));
this.primaryReplicaWaiters = new ConcurrentHashMap<>();
}
/**
* Recoveries state from Vault and subscribers on further updates.
*/
public void startTrack() {
-
msManager.registerPrefixWatch(ByteArray.fromString(PLACEMENTDRIVER_PREFIX),
updateListener);
+ msManager.registerPrefixWatch(PLACEMENTDRIVER_LEASES_KEY,
updateListener);
try (Cursor<VaultEntry> cursor = vaultManager.range(
- ByteArray.fromString(PLACEMENTDRIVER_PREFIX),
- ByteArray.fromString(incrementLastChar(PLACEMENTDRIVER_PREFIX))
+ PLACEMENTDRIVER_LEASES_KEY,
+
ByteArray.fromString(incrementLastChar(PLACEMENTDRIVER_LEASES_KEY_STRING))
)) {
for (VaultEntry entry : cursor) {
- String key = entry.key().toString();
-
- key = key.replace(PLACEMENTDRIVER_PREFIX, "");
-
- TablePartitionId grpId = TablePartitionId.fromString(key);
- Lease lease = fromBytes(entry.value());
-
- leases.put(grpId, lease);
-
- primaryReplicaWaiters.computeIfAbsent(grpId, groupId -> new
PendingIndependentComparableValuesTracker<>(MIN_VALUE))
- .update(lease.getExpirationTime(), lease);
+ leases.clear();
Review Comment:
It is a suspicious code. If here is possible the only entry, why do you use
range scan?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java:
##########
@@ -162,23 +176,28 @@ public CompletableFuture<Void> onUpdate(WatchEvent event)
{
for (EntryEvent entry : event.entryEvents()) {
Entry msEntry = entry.newEntry();
- String key = new ByteArray(msEntry.key()).toString();
-
- key = key.replace(PLACEMENTDRIVER_PREFIX, "");
+ LeaseBatch leaseBatch =
LeaseBatch.fromBytes(ByteBuffer.wrap(msEntry.value()).order(ByteOrder.LITTLE_ENDIAN));
- TablePartitionId grpId = TablePartitionId.fromString(key);
+ Set<ReplicationGroupId> actualGroups = new HashSet<>();
- if (msEntry.empty()) {
- leases.remove(grpId);
- tryRemoveTracker(grpId);
- } else {
- Lease lease = fromBytes(msEntry.value());
+ for (Lease lease : leaseBatch.leases()) {
+ ReplicationGroupId grpId = lease.replicationGroupId();
+ actualGroups.add(grpId);
leases.put(grpId, lease);
primaryReplicaWaiters.computeIfAbsent(grpId, groupId ->
new PendingIndependentComparableValuesTracker<>(MIN_VALUE))
.update(lease.getExpirationTime(), lease);
Review Comment:
Because now we have the same expiration timestamp for all leases in batch,
we will be able to got rid of that waiters (replace them to one).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]