JackieTien97 commented on code in PR #15291:
URL: https://github.com/apache/iotdb/pull/15291#discussion_r2043386795
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheStats.java:
##########
@@ -30,26 +31,19 @@ class CacheStats implements IDualKeyCacheStats {
private final long memoryThreshold;
- private final AtomicLong memoryUsage = new AtomicLong(0);
+ private final Supplier<Long> memoryComputation;
Review Comment:
I think we still need a AtomicLong for total memory usage instead of
computing total memory each time
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheStats.java:
##########
@@ -116,13 +110,12 @@ public long entriesCount() {
}
void reset() {
Review Comment:
don't need to reset memory?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheStats.java:
##########
@@ -102,7 +96,7 @@ public double hitRate() {
@Override
public long memoryUsage() {
- return memoryUsage.get();
+ return memoryComputation.get();
Review Comment:
for tree model, its computation is too large, metric may fetch it each 5
seconds
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java:
##########
@@ -79,415 +77,240 @@ public V get(final FK firstKey, final SK secondKey) {
}
}
- @Override
- public void compute(final IDualKeyCacheComputation<FK, SK, V> computation) {
- final FK firstKey = computation.getFirstKey();
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- final SK[] secondKeyList = computation.getSecondKeyList();
- if (cacheEntryGroup == null) {
- for (int i = 0; i < secondKeyList.length; i++) {
- computation.computeValue(i, null);
- }
- cacheStats.recordMiss(secondKeyList.length);
- } else {
- T cacheEntry;
- int hitCount = 0;
- for (int i = 0; i < secondKeyList.length; i++) {
- cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
- if (cacheEntry == null) {
- computation.computeValue(i, null);
- } else {
- computation.computeValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- hitCount++;
- }
- }
- cacheStats.recordHit(hitCount);
- cacheStats.recordMiss(secondKeyList.length - hitCount);
- }
- }
-
- @Override
- public void updateWithLock(final IDualKeyCacheUpdating<FK, SK, V> updating) {
- final FK firstKey = updating.getFirstKey();
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- final SK[] secondKeyList = updating.getSecondKeyList();
- if (cacheEntryGroup == null) {
- for (int i = 0; i < secondKeyList.length; i++) {
- updating.updateValue(i, null);
- }
- cacheStats.recordMiss(secondKeyList.length);
- } else {
- T cacheEntry;
- for (int i = 0; i < secondKeyList.length; i++) {
- cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
- if (cacheEntry == null) {
- updating.updateValue(i, null);
- } else {
- int changeSize = 0;
- synchronized (cacheEntry) {
- if (cacheEntry.getBelongedGroup() != null) {
- // Only update the value when the cache entry is not evicted.
- // If the cache entry is evicted, getBelongedGroup is null.
- // Synchronized is to guarantee the cache entry is not evicted
during the update.
- changeSize = updating.updateValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- }
- }
- if (changeSize > 0) {
- increaseMemoryUsageAndMayEvict(changeSize);
- }
- }
- }
- }
- }
-
- @Override
- public void put(final FK firstKey, final SK secondKey, final V value) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
-
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
- cacheEntryGroup.computeCacheEntry(
- secondKey,
- (sk, cacheEntry) -> {
- if (cacheEntry == null) {
- cacheEntry =
- cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
- cacheEntryManager.put(cacheEntry);
- cacheStats.increaseEntryCount();
-
usedMemorySize.getAndAdd(sizeComputer.computeSecondKeySize(sk));
- } else {
- final V existingValue = cacheEntry.getValue();
- if (existingValue != value && !existingValue.equals(value)) {
- cacheEntry.replaceValue(value);
-
usedMemorySize.getAndAdd(-sizeComputer.computeValueSize(existingValue));
- }
- // update the cache status
- cacheEntryManager.access(cacheEntry);
- }
- usedMemorySize.getAndAdd(sizeComputer.computeValueSize(value));
- return cacheEntry;
- });
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
- }
-
@Override
public void update(
final FK firstKey,
final @Nonnull SK secondKey,
final V value,
final ToIntFunction<V> updater,
final boolean createIfNotExists) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
-
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- if (!createIfNotExists) {
- return null;
- }
- cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
-
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
-
- final T cacheEntry =
- createIfNotExists
- ? cacheEntryGroup.computeCacheEntryIfAbsent(
- secondKey,
- sk -> {
- final T entry =
- cacheEntryManager.createCacheEntry(
- secondKey, value, finalCacheEntryGroup);
- cacheEntryManager.put(entry);
- cacheStats.increaseEntryCount();
- usedMemorySize.getAndAdd(
- sizeComputer.computeSecondKeySize(sk)
- +
sizeComputer.computeValueSize(entry.getValue()));
- return entry;
- })
- : cacheEntryGroup.getCacheEntry(secondKey);
-
- if (Objects.nonNull(cacheEntry)) {
- final int result = updater.applyAsInt(cacheEntry.getValue());
- if (Objects.nonNull(cacheEntryGroup.getCacheEntry(secondKey))) {
- usedMemorySize.getAndAdd(result);
- }
- }
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
+
+ ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
+ if (Objects.isNull(cacheEntryGroup)) {
+ if (createIfNotExists) {
+ cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey, sizeComputer);
+ firstKeyMap.put(firstKey, cacheEntryGroup);
+ } else {
+ return;
+ }
+ }
+
+ final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
+ cacheEntryGroup.computeCacheEntry(
+ secondKey,
+ memory ->
+ (sk, cacheEntry) -> {
+ if (Objects.isNull(cacheEntry)) {
+ if (!createIfNotExists) {
+ return null;
+ }
+ cacheEntry =
+ cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
+ cacheEntryManager.put(cacheEntry);
+ cacheStats.increaseEntryCount();
+ memory.getAndAdd(
+ sizeComputer.computeSecondKeySize(sk)
+ + sizeComputer.computeValueSize(cacheEntry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ }
+ memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+
+ mayEvict();
}
@Override
public void update(
final FK firstKey, final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
-
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- return null;
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
-
- cacheEntryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- final int result =
updater.applyAsInt(entry.getValue().getValue());
- if
(Objects.nonNull(finalCacheEntryGroup.getCacheEntry(entry.getKey()))) {
- usedMemorySize.getAndAdd(result);
- }
- });
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
+ }
+ entryGroup.computeCacheEntry(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+ if (Objects.nonNull(cacheEntry)) {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ }
+ return cacheEntry;
+ });
+ });
+ }
+ mayEvict();
}
@Override
public void update(
final Predicate<FK> firstKeyChecker,
final Predicate<SK> secondKeyChecker,
final ToIntFunction<V> updater) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
for (final FK firstKey : firstKeyMap.getAllKeys()) {
if (!firstKeyChecker.test(firstKey)) {
continue;
}
- firstKeyMap.compute(
- firstKey,
- (fk, entryGroup) -> {
- if (Objects.nonNull(entryGroup)) {
- entryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- final int result =
updater.applyAsInt(entry.getValue().getValue());
- if
(Objects.nonNull(entryGroup.getCacheEntry(entry.getKey()))) {
- usedMemorySize.getAndAdd(result);
- }
- });
- }
- return entryGroup;
- });
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
+ }
+ entryGroup.computeCacheEntry(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+ });
+ }
+ mayEvict();
}
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
}
- private void increaseMemoryUsageAndMayEvict(final int memorySize) {
- cacheStats.increaseMemoryUsage(memorySize);
- while (cacheStats.isExceedMemoryCapacity()) {
- cacheStats.decreaseMemoryUsage(evictOneCacheEntry());
+ private void mayEvict() {
+ long exceedMemory;
+ while ((exceedMemory = cacheStats.getExceedMemory()) > 0) {
+ // Not compute each time to save time when FK is too many
+ // The hard-coded size is 100
+ do {
+ exceedMemory -= evictOneCacheEntry();
+ } while (exceedMemory > 0 && firstKeyMap.size() > 100);
}
}
- private int evictOneCacheEntry() {
+ // The returned delta may have some error, but it's OK
+ // Because the delta is only for loop round estimation
+ private long evictOneCacheEntry() {
final ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
if (evictCacheEntry == null) {
return 0;
}
- final AtomicInteger evictedSize = new AtomicInteger(0);
-
final ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
evictCacheEntry.setBelongedGroup(null);
- firstKeyMap.compute(
- belongedGroup.getFirstKey(),
- (firstKey, cacheEntryGroup) -> {
- belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
- cacheStats.decreaseEntryCount();
- evictedSize.getAndAdd(
- sizeComputer.computeValueSize(evictCacheEntry.getValue())
- +
sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
-
- if (cacheEntryGroup == null) {
- // has been removed by other threads
- return null;
- }
-
- if (cacheEntryGroup.isEmpty()) {
- evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- return null;
- }
+ long memory =
belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
Review Comment:
```suggestion
if (memory != 0) {
cacheStats.decreaseEntryCount();
}
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java:
##########
@@ -79,415 +77,240 @@ public V get(final FK firstKey, final SK secondKey) {
}
}
- @Override
- public void compute(final IDualKeyCacheComputation<FK, SK, V> computation) {
- final FK firstKey = computation.getFirstKey();
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- final SK[] secondKeyList = computation.getSecondKeyList();
- if (cacheEntryGroup == null) {
- for (int i = 0; i < secondKeyList.length; i++) {
- computation.computeValue(i, null);
- }
- cacheStats.recordMiss(secondKeyList.length);
- } else {
- T cacheEntry;
- int hitCount = 0;
- for (int i = 0; i < secondKeyList.length; i++) {
- cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
- if (cacheEntry == null) {
- computation.computeValue(i, null);
- } else {
- computation.computeValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- hitCount++;
- }
- }
- cacheStats.recordHit(hitCount);
- cacheStats.recordMiss(secondKeyList.length - hitCount);
- }
- }
-
- @Override
- public void updateWithLock(final IDualKeyCacheUpdating<FK, SK, V> updating) {
- final FK firstKey = updating.getFirstKey();
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- final SK[] secondKeyList = updating.getSecondKeyList();
- if (cacheEntryGroup == null) {
- for (int i = 0; i < secondKeyList.length; i++) {
- updating.updateValue(i, null);
- }
- cacheStats.recordMiss(secondKeyList.length);
- } else {
- T cacheEntry;
- for (int i = 0; i < secondKeyList.length; i++) {
- cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
- if (cacheEntry == null) {
- updating.updateValue(i, null);
- } else {
- int changeSize = 0;
- synchronized (cacheEntry) {
- if (cacheEntry.getBelongedGroup() != null) {
- // Only update the value when the cache entry is not evicted.
- // If the cache entry is evicted, getBelongedGroup is null.
- // Synchronized is to guarantee the cache entry is not evicted
during the update.
- changeSize = updating.updateValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- }
- }
- if (changeSize > 0) {
- increaseMemoryUsageAndMayEvict(changeSize);
- }
- }
- }
- }
- }
-
- @Override
- public void put(final FK firstKey, final SK secondKey, final V value) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
-
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
- cacheEntryGroup.computeCacheEntry(
- secondKey,
- (sk, cacheEntry) -> {
- if (cacheEntry == null) {
- cacheEntry =
- cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
- cacheEntryManager.put(cacheEntry);
- cacheStats.increaseEntryCount();
-
usedMemorySize.getAndAdd(sizeComputer.computeSecondKeySize(sk));
- } else {
- final V existingValue = cacheEntry.getValue();
- if (existingValue != value && !existingValue.equals(value)) {
- cacheEntry.replaceValue(value);
-
usedMemorySize.getAndAdd(-sizeComputer.computeValueSize(existingValue));
- }
- // update the cache status
- cacheEntryManager.access(cacheEntry);
- }
- usedMemorySize.getAndAdd(sizeComputer.computeValueSize(value));
- return cacheEntry;
- });
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
- }
-
@Override
public void update(
final FK firstKey,
final @Nonnull SK secondKey,
final V value,
final ToIntFunction<V> updater,
final boolean createIfNotExists) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
-
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- if (!createIfNotExists) {
- return null;
- }
- cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
-
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
-
- final T cacheEntry =
- createIfNotExists
- ? cacheEntryGroup.computeCacheEntryIfAbsent(
- secondKey,
- sk -> {
- final T entry =
- cacheEntryManager.createCacheEntry(
- secondKey, value, finalCacheEntryGroup);
- cacheEntryManager.put(entry);
- cacheStats.increaseEntryCount();
- usedMemorySize.getAndAdd(
- sizeComputer.computeSecondKeySize(sk)
- +
sizeComputer.computeValueSize(entry.getValue()));
- return entry;
- })
- : cacheEntryGroup.getCacheEntry(secondKey);
-
- if (Objects.nonNull(cacheEntry)) {
- final int result = updater.applyAsInt(cacheEntry.getValue());
- if (Objects.nonNull(cacheEntryGroup.getCacheEntry(secondKey))) {
- usedMemorySize.getAndAdd(result);
- }
- }
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
+
+ ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
+ if (Objects.isNull(cacheEntryGroup)) {
+ if (createIfNotExists) {
+ cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey, sizeComputer);
+ firstKeyMap.put(firstKey, cacheEntryGroup);
+ } else {
+ return;
+ }
+ }
+
+ final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
+ cacheEntryGroup.computeCacheEntry(
+ secondKey,
+ memory ->
+ (sk, cacheEntry) -> {
+ if (Objects.isNull(cacheEntry)) {
+ if (!createIfNotExists) {
+ return null;
+ }
+ cacheEntry =
+ cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
+ cacheEntryManager.put(cacheEntry);
+ cacheStats.increaseEntryCount();
+ memory.getAndAdd(
+ sizeComputer.computeSecondKeySize(sk)
+ + sizeComputer.computeValueSize(cacheEntry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ }
+ memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+
+ mayEvict();
}
@Override
public void update(
final FK firstKey, final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
-
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- return null;
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
-
- cacheEntryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- final int result =
updater.applyAsInt(entry.getValue().getValue());
- if
(Objects.nonNull(finalCacheEntryGroup.getCacheEntry(entry.getKey()))) {
- usedMemorySize.getAndAdd(result);
- }
- });
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
+ }
+ entryGroup.computeCacheEntry(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+ if (Objects.nonNull(cacheEntry)) {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ }
+ return cacheEntry;
+ });
+ });
+ }
+ mayEvict();
}
@Override
public void update(
final Predicate<FK> firstKeyChecker,
final Predicate<SK> secondKeyChecker,
final ToIntFunction<V> updater) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
for (final FK firstKey : firstKeyMap.getAllKeys()) {
if (!firstKeyChecker.test(firstKey)) {
continue;
}
- firstKeyMap.compute(
- firstKey,
- (fk, entryGroup) -> {
- if (Objects.nonNull(entryGroup)) {
- entryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- final int result =
updater.applyAsInt(entry.getValue().getValue());
- if
(Objects.nonNull(entryGroup.getCacheEntry(entry.getKey()))) {
- usedMemorySize.getAndAdd(result);
- }
- });
- }
- return entryGroup;
- });
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
+ }
+ entryGroup.computeCacheEntry(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+ });
+ }
+ mayEvict();
}
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
}
- private void increaseMemoryUsageAndMayEvict(final int memorySize) {
- cacheStats.increaseMemoryUsage(memorySize);
- while (cacheStats.isExceedMemoryCapacity()) {
- cacheStats.decreaseMemoryUsage(evictOneCacheEntry());
+ private void mayEvict() {
+ long exceedMemory;
+ while ((exceedMemory = cacheStats.getExceedMemory()) > 0) {
+ // Not compute each time to save time when FK is too many
+ // The hard-coded size is 100
+ do {
+ exceedMemory -= evictOneCacheEntry();
+ } while (exceedMemory > 0 && firstKeyMap.size() > 100);
}
}
- private int evictOneCacheEntry() {
+ // The returned delta may have some error, but it's OK
+ // Because the delta is only for loop round estimation
+ private long evictOneCacheEntry() {
final ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
if (evictCacheEntry == null) {
return 0;
}
- final AtomicInteger evictedSize = new AtomicInteger(0);
-
final ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
evictCacheEntry.setBelongedGroup(null);
- firstKeyMap.compute(
- belongedGroup.getFirstKey(),
- (firstKey, cacheEntryGroup) -> {
- belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
- cacheStats.decreaseEntryCount();
- evictedSize.getAndAdd(
- sizeComputer.computeValueSize(evictCacheEntry.getValue())
- +
sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
-
- if (cacheEntryGroup == null) {
- // has been removed by other threads
- return null;
- }
-
- if (cacheEntryGroup.isEmpty()) {
- evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- return null;
- }
+ long memory =
belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
- // some other thread has put value to it
- return cacheEntryGroup;
- });
-
- return evictedSize.get();
+ final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
+ firstKeyMap.get(belongedGroup.getFirstKey());
+ if (Objects.nonNull(cacheEntryGroup) && cacheEntryGroup.isEmpty()) {
+ firstKeyMap.remove(belongedGroup.getFirstKey());
+ memory +=
+ sizeComputer.computeFirstKeySize(belongedGroup.getFirstKey())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
Review Comment:
```suggestion
if (firstKeyMap.remove(belongedGroup.getFirstKey()) != null) {
memory +=
sizeComputer.computeFirstKeySize(belongedGroup.getFirstKey())
+ RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
}
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java:
##########
@@ -79,415 +77,240 @@ public V get(final FK firstKey, final SK secondKey) {
}
}
- @Override
- public void compute(final IDualKeyCacheComputation<FK, SK, V> computation) {
- final FK firstKey = computation.getFirstKey();
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- final SK[] secondKeyList = computation.getSecondKeyList();
- if (cacheEntryGroup == null) {
- for (int i = 0; i < secondKeyList.length; i++) {
- computation.computeValue(i, null);
- }
- cacheStats.recordMiss(secondKeyList.length);
- } else {
- T cacheEntry;
- int hitCount = 0;
- for (int i = 0; i < secondKeyList.length; i++) {
- cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
- if (cacheEntry == null) {
- computation.computeValue(i, null);
- } else {
- computation.computeValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- hitCount++;
- }
- }
- cacheStats.recordHit(hitCount);
- cacheStats.recordMiss(secondKeyList.length - hitCount);
- }
- }
-
- @Override
- public void updateWithLock(final IDualKeyCacheUpdating<FK, SK, V> updating) {
- final FK firstKey = updating.getFirstKey();
- final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
- final SK[] secondKeyList = updating.getSecondKeyList();
- if (cacheEntryGroup == null) {
- for (int i = 0; i < secondKeyList.length; i++) {
- updating.updateValue(i, null);
- }
- cacheStats.recordMiss(secondKeyList.length);
- } else {
- T cacheEntry;
- for (int i = 0; i < secondKeyList.length; i++) {
- cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
- if (cacheEntry == null) {
- updating.updateValue(i, null);
- } else {
- int changeSize = 0;
- synchronized (cacheEntry) {
- if (cacheEntry.getBelongedGroup() != null) {
- // Only update the value when the cache entry is not evicted.
- // If the cache entry is evicted, getBelongedGroup is null.
- // Synchronized is to guarantee the cache entry is not evicted
during the update.
- changeSize = updating.updateValue(i, cacheEntry.getValue());
- cacheEntryManager.access(cacheEntry);
- }
- }
- if (changeSize > 0) {
- increaseMemoryUsageAndMayEvict(changeSize);
- }
- }
- }
- }
- }
-
- @Override
- public void put(final FK firstKey, final SK secondKey, final V value) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
-
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
- cacheEntryGroup.computeCacheEntry(
- secondKey,
- (sk, cacheEntry) -> {
- if (cacheEntry == null) {
- cacheEntry =
- cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
- cacheEntryManager.put(cacheEntry);
- cacheStats.increaseEntryCount();
-
usedMemorySize.getAndAdd(sizeComputer.computeSecondKeySize(sk));
- } else {
- final V existingValue = cacheEntry.getValue();
- if (existingValue != value && !existingValue.equals(value)) {
- cacheEntry.replaceValue(value);
-
usedMemorySize.getAndAdd(-sizeComputer.computeValueSize(existingValue));
- }
- // update the cache status
- cacheEntryManager.access(cacheEntry);
- }
- usedMemorySize.getAndAdd(sizeComputer.computeValueSize(value));
- return cacheEntry;
- });
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
- }
-
@Override
public void update(
final FK firstKey,
final @Nonnull SK secondKey,
final V value,
final ToIntFunction<V> updater,
final boolean createIfNotExists) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
-
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- if (!createIfNotExists) {
- return null;
- }
- cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
-
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
-
- final T cacheEntry =
- createIfNotExists
- ? cacheEntryGroup.computeCacheEntryIfAbsent(
- secondKey,
- sk -> {
- final T entry =
- cacheEntryManager.createCacheEntry(
- secondKey, value, finalCacheEntryGroup);
- cacheEntryManager.put(entry);
- cacheStats.increaseEntryCount();
- usedMemorySize.getAndAdd(
- sizeComputer.computeSecondKeySize(sk)
- +
sizeComputer.computeValueSize(entry.getValue()));
- return entry;
- })
- : cacheEntryGroup.getCacheEntry(secondKey);
-
- if (Objects.nonNull(cacheEntry)) {
- final int result = updater.applyAsInt(cacheEntry.getValue());
- if (Objects.nonNull(cacheEntryGroup.getCacheEntry(secondKey))) {
- usedMemorySize.getAndAdd(result);
- }
- }
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
+
+ ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
+ if (Objects.isNull(cacheEntryGroup)) {
+ if (createIfNotExists) {
+ cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey, sizeComputer);
+ firstKeyMap.put(firstKey, cacheEntryGroup);
+ } else {
+ return;
+ }
+ }
+
+ final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
+ cacheEntryGroup.computeCacheEntry(
+ secondKey,
+ memory ->
+ (sk, cacheEntry) -> {
+ if (Objects.isNull(cacheEntry)) {
+ if (!createIfNotExists) {
+ return null;
+ }
+ cacheEntry =
+ cacheEntryManager.createCacheEntry(secondKey, value,
finalCacheEntryGroup);
+ cacheEntryManager.put(cacheEntry);
+ cacheStats.increaseEntryCount();
+ memory.getAndAdd(
+ sizeComputer.computeSecondKeySize(sk)
+ + sizeComputer.computeValueSize(cacheEntry.getValue())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
+ }
+ memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+
+ mayEvict();
}
@Override
public void update(
final FK firstKey, final Predicate<SK> secondKeyChecker, final
ToIntFunction<V> updater) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
-
- firstKeyMap.compute(
- firstKey,
- (k, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- return null;
- }
- final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
-
- cacheEntryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- final int result =
updater.applyAsInt(entry.getValue().getValue());
- if
(Objects.nonNull(finalCacheEntryGroup.getCacheEntry(entry.getKey()))) {
- usedMemorySize.getAndAdd(result);
- }
- });
- return cacheEntryGroup;
- });
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
+ }
+ entryGroup.computeCacheEntry(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+ if (Objects.nonNull(cacheEntry)) {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ }
+ return cacheEntry;
+ });
+ });
+ }
+ mayEvict();
}
@Override
public void update(
final Predicate<FK> firstKeyChecker,
final Predicate<SK> secondKeyChecker,
final ToIntFunction<V> updater) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
for (final FK firstKey : firstKeyMap.getAllKeys()) {
if (!firstKeyChecker.test(firstKey)) {
continue;
}
- firstKeyMap.compute(
- firstKey,
- (fk, entryGroup) -> {
- if (Objects.nonNull(entryGroup)) {
- entryGroup
- .getAllCacheEntries()
- .forEachRemaining(
- entry -> {
- if (!secondKeyChecker.test(entry.getKey())) {
- return;
- }
- final int result =
updater.applyAsInt(entry.getValue().getValue());
- if
(Objects.nonNull(entryGroup.getCacheEntry(entry.getKey()))) {
- usedMemorySize.getAndAdd(result);
- }
- });
- }
- return entryGroup;
- });
+ final ICacheEntryGroup<FK, SK, V, T> entryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.nonNull(entryGroup)) {
+ entryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ if (!secondKeyChecker.test(entry.getKey())) {
+ return;
+ }
+ entryGroup.computeCacheEntry(
+ entry.getKey(),
+ memory ->
+ (secondKey, cacheEntry) -> {
+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+ return cacheEntry;
+ });
+ });
+ }
+ mayEvict();
}
- increaseMemoryUsageAndMayEvict(usedMemorySize.get());
}
- private void increaseMemoryUsageAndMayEvict(final int memorySize) {
- cacheStats.increaseMemoryUsage(memorySize);
- while (cacheStats.isExceedMemoryCapacity()) {
- cacheStats.decreaseMemoryUsage(evictOneCacheEntry());
+ private void mayEvict() {
+ long exceedMemory;
+ while ((exceedMemory = cacheStats.getExceedMemory()) > 0) {
+ // Not compute each time to save time when FK is too many
+ // The hard-coded size is 100
+ do {
+ exceedMemory -= evictOneCacheEntry();
+ } while (exceedMemory > 0 && firstKeyMap.size() > 100);
}
}
- private int evictOneCacheEntry() {
+ // The returned delta may have some error, but it's OK
+ // Because the delta is only for loop round estimation
+ private long evictOneCacheEntry() {
final ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
if (evictCacheEntry == null) {
return 0;
}
- final AtomicInteger evictedSize = new AtomicInteger(0);
-
final ICacheEntryGroup<FK, SK, V, T> belongedGroup =
evictCacheEntry.getBelongedGroup();
evictCacheEntry.setBelongedGroup(null);
- firstKeyMap.compute(
- belongedGroup.getFirstKey(),
- (firstKey, cacheEntryGroup) -> {
- belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
- cacheStats.decreaseEntryCount();
- evictedSize.getAndAdd(
- sizeComputer.computeValueSize(evictCacheEntry.getValue())
- +
sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
-
- if (cacheEntryGroup == null) {
- // has been removed by other threads
- return null;
- }
-
- if (cacheEntryGroup.isEmpty()) {
- evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- return null;
- }
+ long memory =
belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
- // some other thread has put value to it
- return cacheEntryGroup;
- });
-
- return evictedSize.get();
+ final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
+ firstKeyMap.get(belongedGroup.getFirstKey());
+ if (Objects.nonNull(cacheEntryGroup) && cacheEntryGroup.isEmpty()) {
+ firstKeyMap.remove(belongedGroup.getFirstKey());
+ memory +=
+ sizeComputer.computeFirstKeySize(belongedGroup.getFirstKey())
+ + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY;
+ }
+ return memory;
}
@Override
public void invalidateAll() {
- executeInvalidateAll();
- }
-
- private void executeInvalidateAll() {
firstKeyMap.clear();
cacheEntryManager.cleanUp();
- cacheStats.resetMemoryUsageAndEntriesCount();
- }
-
- @Override
- public void cleanUp() {
- executeInvalidateAll();
- cacheStats.reset();
+ cacheStats.resetEntriesCount();
}
@Override
public IDualKeyCacheStats stats() {
return cacheStats;
}
- @Override
- @TestOnly
- public void evictOneEntry() {
- cacheStats.decreaseMemoryUsage(evictOneCacheEntry());
- }
-
@Override
public void invalidate(final FK firstKey) {
- int estimateSize = 0;
final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.remove(firstKey);
if (cacheEntryGroup != null) {
- estimateSize += sizeComputer.computeFirstKeySize(firstKey);
for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
it.hasNext(); ) {
final Map.Entry<SK, T> entry = it.next();
if (cacheEntryManager.invalidate(entry.getValue())) {
cacheStats.decreaseEntryCount();
- estimateSize +=
- sizeComputer.computeSecondKeySize(entry.getKey())
- + sizeComputer.computeValueSize(entry.getValue().getValue());
}
}
- cacheStats.decreaseMemoryUsage(estimateSize);
}
}
@Override
public void invalidate(final FK firstKey, final SK secondKey) {
- final AtomicInteger usedMemorySize = new AtomicInteger(0);
-
- firstKeyMap.compute(
- firstKey,
- (key, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- // has been removed by other threads
- return null;
- }
-
- final T entry = cacheEntryGroup.getCacheEntry(secondKey);
- if (Objects.nonNull(entry) && cacheEntryManager.invalidate(entry)) {
- cacheStats.decreaseEntryCount();
- usedMemorySize.getAndAdd(
- sizeComputer.computeSecondKeySize(entry.getSecondKey())
- + sizeComputer.computeValueSize(entry.getValue()));
- cacheEntryGroup.removeCacheEntry(entry.getSecondKey());
- }
+ final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.isNull(cacheEntryGroup)) {
+ return;
+ }
- if (cacheEntryGroup.isEmpty()) {
-
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- return null;
- }
+ final T entry = cacheEntryGroup.getCacheEntry(secondKey);
+ if (Objects.nonNull(entry) && cacheEntryManager.invalidate(entry)) {
+ cacheStats.decreaseEntryCount();
+ cacheEntryGroup.removeCacheEntry(entry.getSecondKey());
+ }
- return cacheEntryGroup;
- });
- cacheStats.decreaseMemoryUsage(usedMemorySize.get());
+ if (cacheEntryGroup.isEmpty()) {
+ firstKeyMap.remove(firstKey);
+ }
}
@Override
public void invalidate(final FK firstKey, final Predicate<SK>
secondKeyChecker) {
- final AtomicInteger estimateSize = new AtomicInteger(0);
- firstKeyMap.compute(
- firstKey,
- (key, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- // has been removed by other threads
- return null;
- }
-
- for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
- it.hasNext(); ) {
- final Map.Entry<SK, T> entry = it.next();
- if (secondKeyChecker.test(entry.getKey())
- && cacheEntryManager.invalidate(entry.getValue())) {
- cacheStats.decreaseEntryCount();
- cacheEntryGroup.removeCacheEntry(entry.getKey());
- estimateSize.addAndGet(
- sizeComputer.computeSecondKeySize(entry.getKey())
- +
sizeComputer.computeValueSize(entry.getValue().getValue()));
- }
- }
-
- if (cacheEntryGroup.isEmpty()) {
- estimateSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- return null;
- }
-
- return cacheEntryGroup;
- });
+ final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.isNull(cacheEntryGroup)) {
+ return;
+ }
+ for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
+ it.hasNext(); ) {
+ final Map.Entry<SK, T> entry = it.next();
+ if (secondKeyChecker.test(entry.getKey()) &&
cacheEntryManager.invalidate(entry.getValue())) {
+ cacheStats.decreaseEntryCount();
+ cacheEntryGroup.removeCacheEntry(entry.getKey());
+ }
+ }
- cacheStats.decreaseMemoryUsage(estimateSize.get());
+ if (cacheEntryGroup.isEmpty()) {
+ firstKeyMap.remove(firstKey);
+ }
}
@Override
public void invalidate(
final Predicate<FK> firstKeyChecker, final Predicate<SK>
secondKeyChecker) {
- final AtomicInteger estimateSize = new AtomicInteger(0);
for (final FK firstKey : firstKeyMap.getAllKeys()) {
if (!firstKeyChecker.test(firstKey)) {
continue;
}
- firstKeyMap.compute(
- firstKey,
- (fk, cacheEntryGroup) -> {
- if (cacheEntryGroup == null) {
- // has been removed by other threads
- return null;
- }
-
- for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
- it.hasNext(); ) {
- final Map.Entry<SK, T> entry = it.next();
-
- if (secondKeyChecker.test(entry.getKey())
- && cacheEntryManager.invalidate(entry.getValue())) {
- cacheStats.decreaseEntryCount();
- cacheEntryGroup.removeCacheEntry(entry.getKey());
- estimateSize.addAndGet(
- sizeComputer.computeSecondKeySize(entry.getKey())
- +
sizeComputer.computeValueSize(entry.getValue().getValue()));
- }
- }
-
- if (cacheEntryGroup.isEmpty()) {
-
estimateSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
- return null;
- }
- return cacheEntryGroup;
- });
+ final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
firstKeyMap.get(firstKey);
+ if (Objects.isNull(cacheEntryGroup)) {
+ return;
+ }
+
+ for (final Iterator<Map.Entry<SK, T>> it =
cacheEntryGroup.getAllCacheEntries();
+ it.hasNext(); ) {
+ final Map.Entry<SK, T> entry = it.next();
+
+ if (secondKeyChecker.test(entry.getKey())
+ && cacheEntryManager.invalidate(entry.getValue())) {
+ cacheStats.decreaseEntryCount();
+ cacheEntryGroup.removeCacheEntry(entry.getKey());
+ }
+ }
+
+ if (cacheEntryGroup.isEmpty()) {
+ firstKeyMap.remove(firstKey);
+ }
}
- cacheStats.decreaseMemoryUsage(estimateSize.get());
+ }
+
+ private long getMemory() {
+ return Arrays.stream(firstKeyMap.maps)
+ .flatMap(map -> Objects.nonNull(map) ? map.values().stream() :
Stream.empty())
+ .map(ICacheEntryGroup::getMemory)
+ .reduce(0L, Long::sum);
Review Comment:
use for-each loop instead of using stream, stream has overhead.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java:
##########
@@ -549,5 +370,11 @@ List<K> getAllKeys() {
});
return res;
}
+
+ int size() {
+ return Arrays.stream(maps)
+ .map(map -> Objects.nonNull(map) ? map.size() : 0)
+ .reduce(0, Integer::sum);
+ }
Review Comment:
use for-each loop instead of using stream, stream has overhead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]