Caideyipi commented on code in PR #16425:
URL: https://github.com/apache/iotdb/pull/16425#discussion_r2358299523
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java:
##########
@@ -92,317 +90,258 @@ public <R> boolean batchGet(
if (cacheEntry == null) {
return false;
}
- skrEntry.setValue(mappingFunction.apply(cacheEntry.getValue()));
+ if (!mappingFunction.apply(cacheEntry.getValue(),
skrEntry.getValue())) {
+ return false;
+ }
}
}
return true;
}
@Override
- public void compute(IDualKeyCacheComputation<FK, SK, V> computation) {
- FK firstKey = computation.getFirstKey();
+ public void update(
+ final FK firstKey,
+ final @Nonnull SK secondKey,
+ final V value,
+ final ToIntFunction<V> updater,
+ final boolean createIfNotExists) {
+
ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
- SK[] secondKeyList = computation.getSecondKeyList();
- if (cacheEntryGroup == null) {
- for (int i = 0; i < secondKeyList.length; i++) {
- computation.computeValue(i, null);
- }
- cacheStats.recordMiss(secondKeyList.length);
- } else {
- T cacheEntry;
- int hitCount = 0;
- for (int i = 0; i < secondKeyList.length; i++) {
- cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
- if (cacheEntry == null) {
- computation.computeValue(i, null);
- } else {
- computation.computeValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- hitCount++;
- }
+ if (Objects.isNull(cacheEntryGroup)) {
+ if (createIfNotExists) {
+ cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey, sizeComputer);
+ firstKeyMap.put(firstKey, cacheEntryGroup);
+ } else {
+ return;
}
- cacheStats.recordHit(hitCount);
- cacheStats.recordMiss(secondKeyList.length - hitCount);
}
- }
- @Override
- public void update(IDualKeyCacheUpdating<FK, SK, V> updating) {
- FK firstKey = updating.getFirstKey();
- ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
- SK[] secondKeyList = updating.getSecondKeyList();
- if (cacheEntryGroup == null) {
- for (int i = 0; i < secondKeyList.length; i++) {
- updating.updateValue(i, null);
- }
- cacheStats.recordMiss(secondKeyList.length);
- } else {
- T cacheEntry;
- int hitCount = 0;
- for (int i = 0; i < secondKeyList.length; i++) {
- cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
- if (cacheEntry == null) {
- updating.updateValue(i, null);
- } else {
- int changeSize = 0;
- synchronized (cacheEntry) {
- if (cacheEntry.getBelongedGroup() != null) {
- // Only update the value when the cache entry is not evicted.
- // If the cache entry is evicted, getBelongedGroup is null.
- // Synchronized is to guarantee the cache entry is not evicted
during the update.
- changeSize = updating.updateValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- if (changeSize != 0) {
- cacheStats.increaseMemoryUsage(changeSize);
+ final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
+ cacheEntryGroup.computeCacheEntry(
+ secondKey,
+ memory ->
+ (sk, cacheEntry) -> {
+ if (Objects.isNull(cacheEntry)) {
+ if (!createIfNotExists) {
+ return null;
+ }
+ cacheEntry =
+ cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
+ cacheEntryManager.put(cacheEntry);
+ memory.getAndAdd(
+ sizeComputer.computeSecondKeySize(sk)
+ + sizeComputer.computeValueSize(cacheEntry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
}
- }
- }
- if (changeSize != 0 && cacheStats.isExceedMemoryCapacity()) {
- executeCacheEviction(changeSize);
- }
- hitCount++;
- }
- }
- cacheStats.recordHit(hitCount);
- cacheStats.recordMiss(secondKeyList.length - hitCount);
- }
+ memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+
+ mayEvict();
}
@Override
- public void put(FK firstKey, SK secondKey, V value) {
- int usedMemorySize = putToCache(firstKey, secondKey, value);
- cacheStats.increaseMemoryUsage(usedMemorySize);
- if (cacheStats.isExceedMemoryCapacity()) {
- executeCacheEviction(usedMemorySize);
+ public void update(
+ final FK firstKey, final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater) {
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
+ }
+ entryGroup.computeCacheEntry(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+ if (Objects.nonNull(cacheEntry)) {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ }
+ return cacheEntry;
+ });
+ });
}
+ mayEvict();
}
- private int putToCache(FK firstKey, SK secondKey, V value) {
- AtomicInteger usedMemorySize = new AtomicInteger(0);
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
-
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- }
- ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
- cacheEntryGroup.computeCacheEntry(
- secondKey,
- (sk, cacheEntry) -> {
- if (cacheEntry == null) {
- cacheEntry =
- cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
- cacheEntryManager.put(cacheEntry);
-
usedMemorySize.getAndAdd(sizeComputer.computeSecondKeySize(sk));
- } else {
- V existingValue = cacheEntry.getValue();
- if (existingValue != value && !existingValue.equals(value)) {
- cacheEntry.replaceValue(value);
-
usedMemorySize.getAndAdd(-sizeComputer.computeValueSize(existingValue));
+ @Override
+ public void update(
+ final Predicate<FK> firstKeyChecker,
+ final Predicate<SK> secondKeyChecker,
+ final ToIntFunction<V> updater) {
+ for (final FK firstKey : firstKeyMap.getAllKeys()) {
+ if (!firstKeyChecker.test(firstKey)) {
+ continue;
+ }
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
}
- // update the cache status
- cacheEntryManager.access(cacheEntry);
- }
- usedMemorySize.getAndAdd(sizeComputer.computeValueSize(value));
- return cacheEntry;
- });
- return cacheEntryGroup;
- });
- return usedMemorySize.get();
+ entryGroup.computeCacheEntry(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+ });
+ }
+ mayEvict();
+ }
}
- /**
- * Each thread putting new cache value only needs to evict cache values,
total memory equals that
- * the new cache value occupied.
- */
- private void executeCacheEviction(int targetSize) {
- int evictedSize;
- while (targetSize > 0 && cacheStats.memoryUsage() > 0) {
- evictedSize = evictOneCacheEntry();
- cacheStats.decreaseMemoryUsage(evictedSize);
- targetSize -= evictedSize;
+ private void mayEvict() {
+ long exceedMemory;
+ while ((exceedMemory = cacheStats.getExceedMemory()) > 0) {
+ // Not compute each time to save time when FK is too many
+ // The hard-coded size is 100
+ do {
+ exceedMemory -= evictOneCacheEntry();
+ } while (exceedMemory > 0 && firstKeyMap.size() > 100);
}
}
- private int evictOneCacheEntry() {
-
- ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
+ // The returned delta may have some error, but it's OK
+ // Because the delta is only for loop round estimation
+ private long evictOneCacheEntry() {
+ final ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
if (evictCacheEntry == null) {
return 0;
}
- synchronized (evictCacheEntry) {
- AtomicInteger evictedSize = new AtomicInteger(0);
-
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
-
- ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
- evictCacheEntry.setBelongedGroup(null);
- belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
-
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
-
- if (belongedGroup.isEmpty()) {
- firstKeyMap.compute(
- belongedGroup.getFirstKey(),
- (firstKey, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- // has been removed by other threads
- return null;
- }
- if (cacheEntryGroup.isEmpty()) {
-
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- return null;
- }
- // some other thread has put value to it
- return cacheEntryGroup;
- });
+ final ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
+ evictCacheEntry.setBelongedGroup(null);
+
+ long memory =
belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
+
+ final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
+ firstKeyMap.get(belongedGroup.getFirstKey());
Review Comment:
The fkMap's computes have been abondoned due to exceeding lock usage... I'd
rather not remove the cacheEntryGroup..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]