JackieTien97 commented on code in PR #13295:
URL: https://github.com/apache/iotdb/pull/13295#discussion_r1732217236
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java:
##########
@@ -194,6 +196,69 @@ private int putToCache(FK firstKey, SK secondKey, V value)
{
return usedMemorySize.get();
}
+ @Override
+ public void update(
+ final FK firstKey,
+ final SK secondKey,
+ final V value,
+ final ToIntFunction<V> updater,
+ final boolean createIfNotExists) {
+ final AtomicInteger usedMemorySize = new AtomicInteger(0);
+
+ firstKeyMap.compute(
+ firstKey,
+ (k, cacheEntryGroup) -> {
+ if (cacheEntryGroup == null) {
+ if (!createIfNotExists) {
+ return null;
+ }
+ cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
+
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
+ }
+ final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup =
cacheEntryGroup;
+
+ if (Objects.nonNull(secondKey)) {
+ final T cacheEntry =
+ createIfNotExists
+ ? cacheEntryGroup.computeCacheEntryIfAbsent(
+ secondKey,
+ sk -> {
+ final T entry =
+ cacheEntryManager.createCacheEntry(
+ secondKey, value, finalCacheEntryGroup);
+ cacheEntryManager.put(entry);
+ usedMemorySize.getAndAdd(
+ sizeComputer.computeSecondKeySize(sk)
+ +
sizeComputer.computeValueSize(entry.getValue()));
+ return entry;
+ })
+ : cacheEntryGroup.getCacheEntry(secondKey);
+
+ if (Objects.nonNull(cacheEntry)) {
+ final int result = updater.applyAsInt(cacheEntry.getValue());
+ if (Objects.nonNull(cacheEntryGroup.getCacheEntry(secondKey))) {
+ usedMemorySize.getAndAdd(result);
+ }
+ }
+ } else {
+ cacheEntryGroup
+ .getAllCacheEntries()
+ .forEachRemaining(
+ entry -> {
+ final int result =
updater.applyAsInt(entry.getValue().getValue());
+ if
(Objects.nonNull(finalCacheEntryGroup.getCacheEntry(entry.getKey()))) {
+ usedMemorySize.getAndAdd(result);
+ }
+ });
+ }
+ return cacheEntryGroup;
+ });
+ cacheStats.increaseMemoryUsage(usedMemorySize.get());
Review Comment:
if usedMemorySize < 0, call `decreaseMemoryUsage`?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache;
+
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.LastCacheContainer;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.TsPrimitiveType;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+@ThreadSafe
+public class TableDeviceLastCache {
+ static final int INSTANCE_SIZE =
+ (int)
RamUsageEstimator.shallowSizeOfInstance(TableDeviceLastCache.class);
+
+ private final Map<String, TimeValuePair> measurement2CachedLastMap = new
ConcurrentHashMap<>();
Review Comment:
we need to distinguish cache miss from actual-null for this measurement or
device.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java:
##########
@@ -36,32 +46,144 @@ public class TableDeviceCacheEntry {
// create from remote
// there may exist key is not null, but value is null in this map, which
means that the key's
// corresponding value is null, doesn't mean that the key doesn't exist
- private final Map<String, String> attributeMap;
+ private final AtomicReference<ConcurrentHashMap<String, String>>
attributeMap =
+ new AtomicReference<>();
+ private final AtomicReference<TableDeviceLastCache> lastCache = new
AtomicReference<>();
+
+ /////////////////////////////// Attribute ///////////////////////////////
- public TableDeviceCacheEntry(final @Nonnull Map<String, String>
attributeMap) {
- this.attributeMap = attributeMap;
+ public int setAttribute(
+ final String database,
+ final String tableName,
+ final @Nonnull Map<String, String> attributeSetMap) {
+ return (attributeMap.compareAndSet(null, new ConcurrentHashMap<>())
+ ? (int) RamUsageEstimator.shallowSizeOf(attributeMap)
+ : 0)
+ + updateAttribute(database, tableName, attributeSetMap);
}
- public void update(final @Nonnull Map<String, String> updateMap) {
+ public int updateAttribute(
+ final String database, final String tableName, final @Nonnull
Map<String, String> updateMap) {
+ final Map<String, String> map = attributeMap.get();
+ if (Objects.isNull(map)) {
+ return 0;
+ }
+ final AtomicInteger diff = new AtomicInteger(0);
updateMap.forEach(
(k, v) -> {
if (Objects.nonNull(v)) {
- attributeMap.put(k, v);
+ if (!map.containsKey(k)) {
+ k =
DataNodeTableCache.getInstance().tryGetInternColumnName(database, tableName, k);
+ diff.addAndGet(RamUsageEstimator.NUM_BYTES_OBJECT_REF);
+ }
+ diff.addAndGet(
+ (int) (RamUsageEstimator.sizeOf(v) -
RamUsageEstimator.sizeOf(map.put(k, v))));
} else {
- attributeMap.remove(k);
+ map.remove(k);
+ diff.addAndGet((int) (-RamUsageEstimator.sizeOf(k) -
RamUsageEstimator.sizeOf(v)));
+ }
+ });
+ // Typically the "update" and "invalidate" won't be concurrently called
+ // Here we reserve the check for consistency and potential safety
+ return Objects.nonNull(attributeMap.get()) ? diff.get() : 0;
+ }
+
+ public int invalidateAttribute() {
+ final AtomicInteger size = new AtomicInteger(0);
+ attributeMap.updateAndGet(
+ map -> {
+ if (Objects.nonNull(map)) {
+ size.set(
+ (int)
+ (RamUsageEstimator.NUM_BYTES_OBJECT_REF * map.size()
+ + map.values().stream()
+ .mapToLong(RamUsageEstimator::sizeOf)
+ .reduce(0, Long::sum)));
}
+ return null;
});
+ return size.get();
}
public String getAttribute(final String key) {
- return attributeMap.get(key);
+ final Map<String, String> map = attributeMap.get();
+ return Objects.nonNull(map) ? map.get(key) : null;
}
public Map<String, String> getAttributeMap() {
- return attributeMap;
+ return attributeMap.get();
+ }
+
+ /////////////////////////////// Last Cache ///////////////////////////////
+
+ public int updateLastCache(
+ final String database,
+ final String tableName,
+ final Map<String, TimeValuePair> measurementUpdateMap) {
+ return (lastCache.compareAndSet(null, new TableDeviceLastCache())
+ ? TableDeviceLastCache.INSTANCE_SIZE
+ : 0)
+ + tryUpdate(database, tableName, measurementUpdateMap);
+ }
+
+ public int tryUpdate(
+ final String database,
+ final String tableName,
+ final Map<String, TimeValuePair> measurementUpdateMap) {
+ final TableDeviceLastCache cache = lastCache.get();
+ final int result =
+ Objects.nonNull(cache) ? cache.update(database, tableName,
measurementUpdateMap) : 0;
+ return Objects.nonNull(lastCache.get()) ? result : 0;
+ }
+
+ public TimeValuePair getTimeValuePair(final String measurement) {
+ final TableDeviceLastCache cache = lastCache.get();
+ return Objects.nonNull(cache) ? cache.getTimeValuePair(measurement) : null;
+ }
+
+ // Shall pass in "null" if last by time
Review Comment:
empty string for time column which is consistent with tsfile for aligned
timeseries's time column
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java:
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache;
+
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.LastCacheContainer;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
+
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.TsPrimitiveType;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+@ThreadSafe
+public class TableDeviceLastCache {
+ static final int INSTANCE_SIZE =
+ (int)
RamUsageEstimator.shallowSizeOfInstance(TableDeviceLastCache.class);
+
+ private final Map<String, TimeValuePair> measurement2CachedLastMap = new
ConcurrentHashMap<>();
+ private long lastTime = Long.MIN_VALUE;
+
+ public int update(
+ final String database,
+ final String tableName,
+ final Map<String, TimeValuePair> measurementUpdateMap) {
+ final AtomicInteger diff = new AtomicInteger(0);
+ measurementUpdateMap.forEach(
+ (k, v) -> {
+ if (!measurement2CachedLastMap.containsKey(k)) {
+ k =
DataNodeTableCache.getInstance().tryGetInternColumnName(database, tableName, k);
+ }
+ if (lastTime < v.getTimestamp()) {
+ lastTime = v.getTimestamp();
+ }
+ measurement2CachedLastMap.compute(
+ k,
+ (measurement, tvPair) -> {
+ if (Objects.isNull(tvPair) || tvPair.getTimestamp() <=
v.getTimestamp()) {
+ diff.addAndGet(
+ Objects.nonNull(tvPair)
+ ? LastCacheContainer.getDiffSize(tvPair.getValue(),
v.getValue())
+ : RamUsageEstimator.NUM_BYTES_OBJECT_REF +
v.getSize());
Review Comment:
size for Map.Node should be taken into account when we first created entry.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java:
##########
@@ -84,39 +95,147 @@ public Map<String, String> getDeviceAttribute(
}
// The input deviceId shall have its tailing nulls trimmed
- public void put(
+ public void putAttributes(
final String database,
final String tableName,
final String[] deviceId,
final ConcurrentMap<String, String> attributeMap) {
readWriteLock.readLock().lock();
try {
- dualKeyCache.put(
+ dualKeyCache.update(
new TableId(database, tableName),
new TableDeviceId(deviceId),
- new TableDeviceCacheEntry(attributeMap));
+ new TableDeviceCacheEntry(),
+ entry -> entry.setAttribute(database, tableName, attributeMap),
+ true);
} finally {
readWriteLock.readLock().unlock();
}
}
- public void update(
+ public void updateAttributes(
final String database,
final String tableName,
final String[] deviceId,
final Map<String, String> attributeMap) {
+ dualKeyCache.update(
+ new TableId(database, tableName),
+ new TableDeviceId(deviceId),
+ new TableDeviceCacheEntry(),
+ entry -> entry.updateAttribute(database, tableName, attributeMap),
+ false);
+ }
+
+ // Shall pass in "null" for deviceId when invalidating attribute for a table
+ public void invalidateAttributes(
+ final String database, final String tableName, final String[] deviceId) {
+ dualKeyCache.update(
+ new TableId(database, tableName),
+ Objects.nonNull(deviceId) ? new TableDeviceId(deviceId) : null,
+ new TableDeviceCacheEntry(),
+ entry -> -entry.invalidateAttribute(),
+ false);
+ }
+
+ /////////////////////////////// Last Cache ///////////////////////////////
+
+ // The input "TimeValuePair" shall never contain null value
+ public void updateLastCache(
+ final String database,
+ final String tableName,
+ final String[] deviceId,
+ final Map<String, TimeValuePair> measurementUpdateMap) {
readWriteLock.readLock().lock();
try {
- final TableDeviceCacheEntry entry =
- dualKeyCache.get(new TableId(database, tableName), new
TableDeviceId(deviceId));
- if (Objects.nonNull(entry)) {
- entry.update(attributeMap);
- }
+ forceUpdateCache(database, tableName, deviceId, measurementUpdateMap);
} finally {
readWriteLock.readLock().unlock();
}
}
+ // The input "TimeValuePair" shall never contain null value
+ public void mayUpdateLastCacheWithoutLock(
+ final String database,
+ final String tableName,
+ final String[] deviceId,
+ final Map<String, TimeValuePair> measurementUpdateMap) {
+ if (putLastCacheWhenWriting) {
+ forceUpdateCache(database, tableName, deviceId, measurementUpdateMap);
+ } else {
+ dualKeyCache.update(
+ new TableId(database, tableName),
+ new TableDeviceId(deviceId),
+ new TableDeviceCacheEntry(),
+ entry -> entry.tryUpdate(database, tableName, measurementUpdateMap),
+ false);
+ }
+ }
+
+ private void forceUpdateCache(
+ final String database,
+ final String tableName,
+ final String[] deviceId,
+ final Map<String, TimeValuePair> measurementUpdateMap) {
+ dualKeyCache.update(
+ new TableId(database, tableName),
+ new TableDeviceId(deviceId),
+ new TableDeviceCacheEntry(),
+ entry -> entry.updateLastCache(database, tableName,
measurementUpdateMap),
+ true);
+ }
+
+ public TimeValuePair getLastEntry(
+ final String database,
+ final String tableName,
+ final String[] deviceId,
+ final String measurement) {
+ final TableDeviceCacheEntry entry =
Review Comment:
add java doc for all these public methods in TableDeviceSchemaCache, like
whether database contains `root.` and deviceId contains database.tableName for
its first index, and can measurement be `time`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]