jt2594838 commented on code in PR #16425:
URL: https://github.com/apache/iotdb/pull/16425#discussion_r2354033621


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java:
##########
@@ -163,177 +199,250 @@ public List<Integer> 
computeWithoutTemplate(ISchemaComputation schemaComputation
    * @return The indexes of missed views and full paths of their source paths 
will be returned.
    */
   public Pair<List<Integer>, List<String>> computeSourceOfLogicalView(
-      ISchemaComputation schemaComputation) {
+      final ISchemaComputation schemaComputation) {
     if (!schemaComputation.hasLogicalViewNeedProcess()) {
       return new Pair<>(new ArrayList<>(), new ArrayList<>());
     }
-    return timeSeriesSchemaCache.computeSourceOfLogicalView(schemaComputation);
-  }
 
-  public List<Integer> computeWithTemplate(ISchemaComputation 
schemaComputation) {
-    return deviceUsingTemplateSchemaCache.compute(schemaComputation);
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final Pair<Integer, Integer> beginToEnd =
+        schemaComputation.getRangeOfLogicalViewSchemaListRecorded();
+    final List<LogicalViewSchema> logicalViewSchemaList =
+        schemaComputation.getLogicalViewSchemaList();
+    final List<Integer> indexListOfLogicalViewPaths =
+        schemaComputation.getIndexListOfLogicalViewPaths();
+
+    for (int i = beginToEnd.left; i < beginToEnd.right; i++) {
+      final LogicalViewSchema logicalViewSchema = logicalViewSchemaList.get(i);
+      final int realIndex = indexListOfLogicalViewPaths.get(i);
+      if (!logicalViewSchema.isWritable()) {
+        throw new RuntimeException(
+            new InsertNonWritableViewException(
+                schemaComputation
+                    .getDevicePath()
+                    
.concatAsMeasurementPath(schemaComputation.getMeasurements()[realIndex])
+                    .getFullPath()));
+      }
+
+      final PartialPath fullPath = logicalViewSchema.getSourcePathIfWritable();
+      final IDeviceSchema schema = 
deviceSchemaCache.getDeviceSchema(fullPath.getDevicePath());
+      if (!(schema instanceof DeviceNormalSchema)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      final DeviceNormalSchema treeSchema = (DeviceNormalSchema) schema;
+      final SchemaCacheEntry value = 
treeSchema.getSchemaCacheEntry(fullPath.getMeasurement());
+      if (Objects.isNull(value)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      if (value.isLogicalView()) {
+        throw new RuntimeException(
+            new UnsupportedOperationException(
+                String.format(
+                    "The source of view [%s] is also a view! Nested view is 
unsupported! "
+                        + "Please check it.",
+                    logicalViewSchema.getSourcePathIfWritable())));
+      }
+
+      schemaComputation.computeMeasurementOfView(realIndex, value, 
treeSchema.isAligned());
+    }
+
+    return new Pair<>(
+        indexOfMissingMeasurements,
+        indexOfMissingMeasurements.stream()
+            .map(index -> 
logicalViewSchemaList.get(index).getSourcePathStringIfWritable())
+            .collect(Collectors.toList()));
   }
 
-  /**
-   * Store the fetched schema in either the schemaCache or 
templateSchemaCache, depending on its
-   * associated device.
-   */
-  public void put(ClusterSchemaTree tree) {
-    PartialPath devicePath;
-    for (DeviceSchemaInfo deviceSchemaInfo : tree.getAllDevices()) {
-      devicePath = deviceSchemaInfo.getDevicePath();
-      if (deviceSchemaInfo.getTemplateId() != NON_TEMPLATE) {
-        deviceUsingTemplateSchemaCache.put(
-            devicePath, tree.getBelongedDatabase(devicePath), 
deviceSchemaInfo.getTemplateId());
-      } else {
-        for (MeasurementPath measurementPath : 
deviceSchemaInfo.getMeasurementSchemaPathList()) {
-          timeSeriesSchemaCache.putSingleMeasurementPath(
-              tree.getBelongedDatabase(devicePath), measurementPath);
-        }
+  public List<Integer> computeWithTemplate(final ISchemaComputation 
computation) {
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final String[] measurements = computation.getMeasurements();
+    final IDeviceSchema deviceSchema =
+        deviceSchemaCache.getDeviceSchema(computation.getDevicePath());
+
+    if (!(deviceSchema instanceof DeviceTemplateSchema)) {
+      return IntStream.range(0, 
measurements.length).boxed().collect(Collectors.toList());
+    }
+
+    final DeviceTemplateSchema deviceTemplateSchema = (DeviceTemplateSchema) 
deviceSchema;
+
+    computation.computeDevice(
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).isDirectAligned());
+    final Map<String, IMeasurementSchema> templateSchema =
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).getSchemaMap();

Review Comment:
   Save the template as a local var?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternUtil;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCache;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl.DualKeyCacheBuilder;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl.DualKeyCachePolicy;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.function.ToIntFunction;
+
+/**
+ * The {@link DeviceSchemaCache} caches some of the devices and their: 
measurement info / template
+ * info. The last value of one device is also cached here.
+ */
+@ThreadSafe
+public class DeviceSchemaCache {
+
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = 
LoggerFactory.getLogger(DeviceSchemaCache.class);
+
+  /**
+   * Leading_Segment, {@link IDeviceID}, Map{@literal <}Measurement, 
Schema{@literal
+   * >}/templateInfo{@literal >}
+   *
+   * <p>The segment is used to:
+   *
+   * <p>1. Keep abreast of the newer versions.
+   *
+   * <p>2. Optimize the speed in invalidation by databases for most scenarios.
+   */
+  private final IDualKeyCache<String, PartialPath, DeviceCacheEntry> 
dualKeyCache;
+
+  private final Map<String, String> databasePool = new ConcurrentHashMap<>();
+
+  private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock(false);
+
+  DeviceSchemaCache() {
+    dualKeyCache =
+        new DualKeyCacheBuilder<String, PartialPath, DeviceCacheEntry>()
+            .cacheEvictionPolicy(
+                
DualKeyCachePolicy.valueOf(config.getDataNodeSchemaCacheEvictionPolicy()))
+            .memoryCapacity(config.getAllocateMemoryForSchemaCache())
+            .firstKeySizeComputer(segment -> (int) 
RamUsageEstimator.sizeOf(segment))
+            .secondKeySizeComputer(PartialPath::estimateSize)
+            .valueSizeComputer(DeviceCacheEntry::estimateSize)
+            .build();
+    MetricService.getInstance().addMetricSet(new 
DataNodeSchemaCacheMetrics(this));
+  }
+
+  /////////////////////////////// Last Cache ///////////////////////////////
+
+  /**
+   * Get the last {@link TimeValuePair} of a measurement, the measurement 
shall never be "time".
+   *
+   * @param device {@link IDeviceID}
+   * @param measurement the measurement to get
+   * @return {@code null} iff cache miss, {@link 
DeviceLastCache#EMPTY_TIME_VALUE_PAIR} iff cache
+   *     hit but result is {@code null}, and the result value otherwise.
+   */
+  public TimeValuePair getLastEntry(final PartialPath device, final String 
measurement) {
+    final DeviceCacheEntry entry = dualKeyCache.get(getLeadingSegment(device), 
device);
+    return Objects.nonNull(entry) ? entry.getTimeValuePair(measurement) : null;
+  }
+
+  /**
+   * Invalidate the last cache of one device.
+   *
+   * @param device IDeviceID
+   */
+  public void invalidateDeviceLastCache(final PartialPath device) {
+    dualKeyCache.update(
+        getLeadingSegment(device), device, null, entry -> 
-entry.invalidateLastCache(), false);
+  }
+
+  /////////////////////////////// Tree model ///////////////////////////////
+
+  public void putDeviceSchema(final String database, final DeviceSchemaInfo 
deviceSchemaInfo) {
+    final PartialPath devicePath = deviceSchemaInfo.getDevicePath();
+    final String previousDatabase = databasePool.putIfAbsent(database, 
database);
+
+    dualKeyCache.update(
+        getLeadingSegment(devicePath),
+        devicePath,
+        new DeviceCacheEntry(),
+        entry ->
+            entry.setDeviceSchema(
+                Objects.nonNull(previousDatabase) ? previousDatabase : 
database, deviceSchemaInfo),
+        true);
+  }
+
+  public IDeviceSchema getDeviceSchema(final PartialPath device) {
+    final DeviceCacheEntry entry = dualKeyCache.get(getLeadingSegment(device), 
device);
+    return Objects.nonNull(entry) ? entry.getDeviceSchema() : null;
+  }
+
+  void updateLastCache(
+      final String database,
+      final PartialPath device,
+      final String[] measurements,
+      final @Nullable TimeValuePair[] timeValuePairs,
+      final boolean isAligned,
+      final IMeasurementSchema[] measurementSchemas,
+      final boolean initOrInvalidate) {
+    final String previousDatabase = databasePool.putIfAbsent(database, 
database);
+    final String database2Use = Objects.nonNull(previousDatabase) ? 
previousDatabase : database;
+

Review Comment:
   How about computeIfAbsent?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java:
##########
@@ -163,177 +199,250 @@ public List<Integer> 
computeWithoutTemplate(ISchemaComputation schemaComputation
    * @return The indexes of missed views and full paths of their source paths 
will be returned.
    */
   public Pair<List<Integer>, List<String>> computeSourceOfLogicalView(
-      ISchemaComputation schemaComputation) {
+      final ISchemaComputation schemaComputation) {
     if (!schemaComputation.hasLogicalViewNeedProcess()) {
       return new Pair<>(new ArrayList<>(), new ArrayList<>());
     }
-    return timeSeriesSchemaCache.computeSourceOfLogicalView(schemaComputation);
-  }
 
-  public List<Integer> computeWithTemplate(ISchemaComputation 
schemaComputation) {
-    return deviceUsingTemplateSchemaCache.compute(schemaComputation);
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final Pair<Integer, Integer> beginToEnd =
+        schemaComputation.getRangeOfLogicalViewSchemaListRecorded();
+    final List<LogicalViewSchema> logicalViewSchemaList =
+        schemaComputation.getLogicalViewSchemaList();
+    final List<Integer> indexListOfLogicalViewPaths =
+        schemaComputation.getIndexListOfLogicalViewPaths();
+
+    for (int i = beginToEnd.left; i < beginToEnd.right; i++) {
+      final LogicalViewSchema logicalViewSchema = logicalViewSchemaList.get(i);
+      final int realIndex = indexListOfLogicalViewPaths.get(i);
+      if (!logicalViewSchema.isWritable()) {
+        throw new RuntimeException(
+            new InsertNonWritableViewException(
+                schemaComputation
+                    .getDevicePath()
+                    
.concatAsMeasurementPath(schemaComputation.getMeasurements()[realIndex])
+                    .getFullPath()));
+      }
+
+      final PartialPath fullPath = logicalViewSchema.getSourcePathIfWritable();
+      final IDeviceSchema schema = 
deviceSchemaCache.getDeviceSchema(fullPath.getDevicePath());
+      if (!(schema instanceof DeviceNormalSchema)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      final DeviceNormalSchema treeSchema = (DeviceNormalSchema) schema;
+      final SchemaCacheEntry value = 
treeSchema.getSchemaCacheEntry(fullPath.getMeasurement());
+      if (Objects.isNull(value)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      if (value.isLogicalView()) {
+        throw new RuntimeException(
+            new UnsupportedOperationException(
+                String.format(
+                    "The source of view [%s] is also a view! Nested view is 
unsupported! "
+                        + "Please check it.",
+                    logicalViewSchema.getSourcePathIfWritable())));
+      }
+
+      schemaComputation.computeMeasurementOfView(realIndex, value, 
treeSchema.isAligned());
+    }
+
+    return new Pair<>(
+        indexOfMissingMeasurements,
+        indexOfMissingMeasurements.stream()
+            .map(index -> 
logicalViewSchemaList.get(index).getSourcePathStringIfWritable())
+            .collect(Collectors.toList()));
   }
 
-  /**
-   * Store the fetched schema in either the schemaCache or 
templateSchemaCache, depending on its
-   * associated device.
-   */
-  public void put(ClusterSchemaTree tree) {
-    PartialPath devicePath;
-    for (DeviceSchemaInfo deviceSchemaInfo : tree.getAllDevices()) {
-      devicePath = deviceSchemaInfo.getDevicePath();
-      if (deviceSchemaInfo.getTemplateId() != NON_TEMPLATE) {
-        deviceUsingTemplateSchemaCache.put(
-            devicePath, tree.getBelongedDatabase(devicePath), 
deviceSchemaInfo.getTemplateId());
-      } else {
-        for (MeasurementPath measurementPath : 
deviceSchemaInfo.getMeasurementSchemaPathList()) {
-          timeSeriesSchemaCache.putSingleMeasurementPath(
-              tree.getBelongedDatabase(devicePath), measurementPath);
-        }
+  public List<Integer> computeWithTemplate(final ISchemaComputation 
computation) {
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final String[] measurements = computation.getMeasurements();
+    final IDeviceSchema deviceSchema =
+        deviceSchemaCache.getDeviceSchema(computation.getDevicePath());
+
+    if (!(deviceSchema instanceof DeviceTemplateSchema)) {
+      return IntStream.range(0, 
measurements.length).boxed().collect(Collectors.toList());
+    }
+
+    final DeviceTemplateSchema deviceTemplateSchema = (DeviceTemplateSchema) 
deviceSchema;
+
+    computation.computeDevice(
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).isDirectAligned());
+    final Map<String, IMeasurementSchema> templateSchema =
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).getSchemaMap();
+    for (int i = 0; i < measurements.length; i++) {
+      if (!templateSchema.containsKey(measurements[i])) {
+        indexOfMissingMeasurements.add(i);
+        continue;
       }
+      final IMeasurementSchema schema = templateSchema.get(measurements[i]);
+      computation.computeMeasurement(
+          i,
+          new IMeasurementSchemaInfo() {

Review Comment:
   Better to use an inner class or extract as another method.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceLastCache.java:
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.TsPrimitiveType;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+@ThreadSafe
+public class DeviceLastCache {
+  static final int INSTANCE_SIZE =
+      (int) RamUsageEstimator.shallowSizeOfInstance(DeviceLastCache.class)
+          + (int) 
RamUsageEstimator.shallowSizeOfInstance(ConcurrentHashMap.class);
+
+  public static final TsPrimitiveType EMPTY_PRIMITIVE_TYPE =
+      new TsPrimitiveType() {
+        @Override
+        public void setObject(Object o) {
+          // Do nothing
+        }
+
+        @Override
+        public void reset() {
+          // Do nothing
+        }
+
+        @Override
+        public int getSize() {
+          return 0;
+        }
+
+        @Override
+        public Object getValue() {
+          return null;
+        }
+
+        @Override
+        public String getStringValue() {
+          return null;
+        }
+
+        @Override
+        public TSDataType getDataType() {
+          return null;
+        }
+      };
+
+  public static final TimeValuePair EMPTY_TIME_VALUE_PAIR =
+      new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);
+  private static final TimeValuePair PLACEHOLDER_TIME_VALUE_PAIR =
+      new TimeValuePair(Long.MIN_VALUE, EMPTY_PRIMITIVE_TYPE);

Review Comment:
   Add some comments to explain the difference between EMPTY and PLACEHOLDER.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceNormalSchema.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
+
+import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class DeviceNormalSchema implements IDeviceSchema {
+
+  static final int INSTANCE_SIZE =
+      (int) RamUsageEstimator.shallowSizeOfInstance(DeviceNormalSchema.class)
+          + (int) 
RamUsageEstimator.shallowSizeOfInstance(ConcurrentHashMap.class);
+  private final String database;
+  private final boolean isAligned;
+
+  private final ConcurrentMap<String, SchemaCacheEntry> measurementMap = new 
ConcurrentHashMap<>();
+
+  public DeviceNormalSchema(final String database, final boolean isAligned) {
+    this.database = database;
+    this.isAligned = isAligned;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public boolean isAligned() {
+    return isAligned;
+  }
+
+  public SchemaCacheEntry getSchemaCacheEntry(final String measurement) {
+    return measurementMap.get(measurement);
+  }
+
+  public int update(final String[] measurements, final IMeasurementSchema[] 
schemas) {
+    int diff = 0;
+    if (schemas == null) {
+      return diff;
+    }
+
+    final int length = measurements.length;
+
+    for (int i = 0; i < length; ++i) {
+      // Skip this to avoid instance creation/gc for writing performance
+      if (measurements[i] == null || 
measurementMap.containsKey(measurements[i])) {
+        continue;
+      }

Review Comment:
   Why can a measurement be skipped when it is contained?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java:
##########
@@ -163,177 +199,250 @@ public List<Integer> 
computeWithoutTemplate(ISchemaComputation schemaComputation
    * @return The indexes of missed views and full paths of their source paths 
will be returned.
    */
   public Pair<List<Integer>, List<String>> computeSourceOfLogicalView(
-      ISchemaComputation schemaComputation) {
+      final ISchemaComputation schemaComputation) {
     if (!schemaComputation.hasLogicalViewNeedProcess()) {
       return new Pair<>(new ArrayList<>(), new ArrayList<>());
     }
-    return timeSeriesSchemaCache.computeSourceOfLogicalView(schemaComputation);
-  }
 
-  public List<Integer> computeWithTemplate(ISchemaComputation 
schemaComputation) {
-    return deviceUsingTemplateSchemaCache.compute(schemaComputation);
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final Pair<Integer, Integer> beginToEnd =
+        schemaComputation.getRangeOfLogicalViewSchemaListRecorded();
+    final List<LogicalViewSchema> logicalViewSchemaList =
+        schemaComputation.getLogicalViewSchemaList();
+    final List<Integer> indexListOfLogicalViewPaths =
+        schemaComputation.getIndexListOfLogicalViewPaths();
+
+    for (int i = beginToEnd.left; i < beginToEnd.right; i++) {
+      final LogicalViewSchema logicalViewSchema = logicalViewSchemaList.get(i);
+      final int realIndex = indexListOfLogicalViewPaths.get(i);
+      if (!logicalViewSchema.isWritable()) {
+        throw new RuntimeException(
+            new InsertNonWritableViewException(
+                schemaComputation
+                    .getDevicePath()
+                    
.concatAsMeasurementPath(schemaComputation.getMeasurements()[realIndex])
+                    .getFullPath()));
+      }
+
+      final PartialPath fullPath = logicalViewSchema.getSourcePathIfWritable();
+      final IDeviceSchema schema = 
deviceSchemaCache.getDeviceSchema(fullPath.getDevicePath());
+      if (!(schema instanceof DeviceNormalSchema)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      final DeviceNormalSchema treeSchema = (DeviceNormalSchema) schema;
+      final SchemaCacheEntry value = 
treeSchema.getSchemaCacheEntry(fullPath.getMeasurement());
+      if (Objects.isNull(value)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      if (value.isLogicalView()) {
+        throw new RuntimeException(
+            new UnsupportedOperationException(
+                String.format(
+                    "The source of view [%s] is also a view! Nested view is 
unsupported! "
+                        + "Please check it.",
+                    logicalViewSchema.getSourcePathIfWritable())));
+      }
+
+      schemaComputation.computeMeasurementOfView(realIndex, value, 
treeSchema.isAligned());
+    }
+
+    return new Pair<>(
+        indexOfMissingMeasurements,
+        indexOfMissingMeasurements.stream()
+            .map(index -> 
logicalViewSchemaList.get(index).getSourcePathStringIfWritable())
+            .collect(Collectors.toList()));
   }
 
-  /**
-   * Store the fetched schema in either the schemaCache or 
templateSchemaCache, depending on its
-   * associated device.
-   */
-  public void put(ClusterSchemaTree tree) {
-    PartialPath devicePath;
-    for (DeviceSchemaInfo deviceSchemaInfo : tree.getAllDevices()) {
-      devicePath = deviceSchemaInfo.getDevicePath();
-      if (deviceSchemaInfo.getTemplateId() != NON_TEMPLATE) {
-        deviceUsingTemplateSchemaCache.put(
-            devicePath, tree.getBelongedDatabase(devicePath), 
deviceSchemaInfo.getTemplateId());
-      } else {
-        for (MeasurementPath measurementPath : 
deviceSchemaInfo.getMeasurementSchemaPathList()) {
-          timeSeriesSchemaCache.putSingleMeasurementPath(
-              tree.getBelongedDatabase(devicePath), measurementPath);
-        }
+  public List<Integer> computeWithTemplate(final ISchemaComputation 
computation) {
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final String[] measurements = computation.getMeasurements();
+    final IDeviceSchema deviceSchema =
+        deviceSchemaCache.getDeviceSchema(computation.getDevicePath());
+
+    if (!(deviceSchema instanceof DeviceTemplateSchema)) {
+      return IntStream.range(0, 
measurements.length).boxed().collect(Collectors.toList());
+    }
+
+    final DeviceTemplateSchema deviceTemplateSchema = (DeviceTemplateSchema) 
deviceSchema;
+
+    computation.computeDevice(
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).isDirectAligned());
+    final Map<String, IMeasurementSchema> templateSchema =
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).getSchemaMap();
+    for (int i = 0; i < measurements.length; i++) {
+      if (!templateSchema.containsKey(measurements[i])) {
+        indexOfMissingMeasurements.add(i);
+        continue;
       }
+      final IMeasurementSchema schema = templateSchema.get(measurements[i]);
+      computation.computeMeasurement(
+          i,
+          new IMeasurementSchemaInfo() {
+            @Override
+            public String getName() {
+              return schema.getMeasurementId();
+            }
+
+            @Override
+            public IMeasurementSchema getSchema() {
+              if (isLogicalView()) {
+                return new LogicalViewSchema(
+                    schema.getMeasurementId(), ((LogicalViewSchema) 
schema).getExpression());
+              } else {
+                return this.getSchemaAsMeasurementSchema();
+              }
+            }
+
+            @Override
+            public MeasurementSchema getSchemaAsMeasurementSchema() {
+              return new MeasurementSchema(
+                  schema.getMeasurementId(),
+                  schema.getType(),
+                  schema.getEncodingType(),
+                  schema.getCompressor());
+            }
+
+            @Override
+            public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+              throw new RuntimeException(
+                  new UnsupportedOperationException(
+                      "Function getSchemaAsLogicalViewSchema is not supported 
in DeviceUsingTemplateSchemaCache."));
+            }
+
+            @Override
+            public Map<String, String> getTagMap() {
+              return null;
+            }
+
+            @Override
+            public Map<String, String> getAttributeMap() {
+              return null;
+            }
+
+            @Override
+            public String getAlias() {
+              return null;
+            }
+
+            @Override
+            public boolean isLogicalView() {
+              return schema.isLogicalView();
+            }
+          });
     }
+    return indexOfMissingMeasurements;
   }
 
-  public TimeValuePair getLastCache(PartialPath seriesPath) {
-    return timeSeriesSchemaCache.getLastCache(seriesPath);
+  /**
+   * Store the fetched schema in either the {@link DeviceNormalSchema} or 
{@link
+   * DeviceTemplateSchema}, depending on its associated device.
+   */
+  public void put(final ClusterSchemaTree tree) {
+    tree.getAllDevices()
+        .forEach(
+            deviceSchemaInfo ->
+                deviceSchemaCache.putDeviceSchema(
+                    
tree.getBelongedDatabase(deviceSchemaInfo.getDevicePath()), deviceSchemaInfo));
   }
 
-  public boolean getLastCache(final Map<PartialPath, Map<String, 
TimeValuePair>> inputMap) {
-    return timeSeriesSchemaCache.getLastCache(inputMap);
+  public TimeValuePair getLastCache(final PartialPath seriesPath) {
+    return deviceSchemaCache.getLastEntry(seriesPath.getDevicePath(), 
seriesPath.getMeasurement());
   }
 
-  public void invalidateLastCache(PartialPath path) {
+  public void invalidateLastCache(final PartialPath path) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       return;
     }
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.invalidateLastCache(path);
-    } finally {
-      releaseReadLock();
-    }
+    deviceSchemaCache.invalidateLastCache(path.getDevicePath(), 
path.getMeasurement());
   }
 
-  public void invalidateLastCacheInDataRegion(String database) {
+  public void invalidateDatabaseLastCache(final String database) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       return;
     }
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.invalidateDataRegionLastCache(database);
-    } finally {
-      releaseReadLock();
-    }
-  }
-
-  /** get SchemaCacheEntry and update last cache */
-  @TestOnly
-  public void updateLastCache(
-      PartialPath devicePath,
-      String measurement,
-      TimeValuePair timeValuePair,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    timeSeriesSchemaCache.updateLastCache(
-        devicePath, measurement, timeValuePair, highPriorityUpdate, 
latestFlushedTime);
-  }
-
-  public void updateLastCache(
-      String database,
-      PartialPath devicePath,
-      String[] measurements,
-      MeasurementSchema[] measurementSchemas,
-      boolean isAligned,
-      IntFunction<TimeValuePair> timeValuePairProvider,
-      IntPredicate shouldUpdateProvider,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.updateLastCache(
-          database,
-          devicePath,
-          measurements,
-          measurementSchemas,
-          isAligned,
-          timeValuePairProvider,
-          shouldUpdateProvider,
-          highPriorityUpdate,
-          latestFlushedTime);
-    } finally {
-      releaseReadLock();
-    }
+    deviceSchemaCache.invalidateLastCache(database);
   }
 
-  public void updateLastCacheWithoutLock(
-      String database,
-      PartialPath devicePath,
-      String[] measurements,
-      MeasurementSchema[] measurementSchemas,
-      boolean isAligned,
-      IntFunction<TimeValuePair> timeValuePairProvider,
-      IntPredicate shouldUpdateProvider,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    timeSeriesSchemaCache.updateLastCache(
-        database,
-        devicePath,
-        measurements,
-        measurementSchemas,
-        isAligned,
-        timeValuePairProvider,
-        shouldUpdateProvider,
-        highPriorityUpdate,
-        latestFlushedTime);
+  /**
+   * Update the {@link DeviceLastCache} in writing for tree model. If a 
measurement is with all
+   * {@code null}s or is an id/attribute column, its {@link TimeValuePair}[] 
shall be {@code null}.
+   * For correctness, this will put the {@link DeviceCacheEntry} lazily and 
only update the existing
+   * {@link DeviceLastCache}s of measurements.
+   *
+   * @param database the device's database, WITH "root"
+   * @param deviceID {@link IDeviceID}
+   * @param measurements the fetched measurements
+   * @param timeValuePairs the {@link TimeValuePair}s with indexes 
corresponding to the measurements
+   */
+  public void updateLastCacheIfExists(
+      final String database,
+      final PartialPath deviceID,
+      final String[] measurements,
+      final @Nonnull TimeValuePair[] timeValuePairs,
+      final boolean isAligned,
+      final IMeasurementSchema[] measurementSchemas) {
+    deviceSchemaCache.updateLastCache(
+        database, deviceID, measurements, timeValuePairs, isAligned, 
measurementSchemas, false);
   }
 
   /**
-   * get or create SchemaCacheEntry and update last cache, only support 
non-aligned sensor or
-   * aligned sensor without only one sub sensor
+   * Update the {@link DeviceLastCache} on query in tree model.
+   *
+   * <p>Note: The query shall put the {@link DeviceLastCache} twice:
+   *
+   * <p>- First time set the "isCommit" to {@code false} before the query 
accesses data. It is just
+   * to allow the writing to update the cache, then avoid that the query put a 
stale value to cache
+   * and break the consistency. WARNING: The writing may temporarily put a 
stale value in cache if a
+   * stale value is written, but it won't affect the eventual consistency.
+   *
+   * <p>- Second time put the calculated {@link TimeValuePair}, and use {@link
+   * #updateLastCacheIfExists(String, PartialPath, String[], TimeValuePair[], 
boolean,
+   * IMeasurementSchema[])}. The input {@link TimeValuePair} shall never be or 
contain {@code null},
+   * if the measurement is with all {@code null}s, its {@link TimeValuePair} 
shall be {@link
+   * DeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed to 
update time column.
+   *
+   * <p>If the query has ended abnormally, it shall call this to invalidate 
the entry it has pushed
+   * in the first time, to avoid the stale writing damaging the eventual 
consistency. In this case
+   * and the "isInvalidate" shall be {@code true}.
+   *
+   * @param database the device's database, WITH "root"
+   * @param measurementPath the fetched {@link MeasurementPath}
+   * @param isInvalidate {@code true} if invalidate the first pushed cache, or 
{@code null} for the
+   *     first fetch.

Review Comment:
   How could a boolean be null? And why call it "fetch"?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java:
##########
@@ -295,6 +295,21 @@ public boolean allMeasurementFailed() {
     return true;
   }
 
+  public String[] getRawMeasurements() {
+    String[] measurements = getMeasurements();
+    MeasurementSchema[] measurementSchemas = getMeasurementSchemas();
+    String[] rawMeasurements = new String[measurements.length];
+    for (int i = 0; i < measurements.length; i++) {
+      if (measurementSchemas[i] != null) {
+        // get raw measurement rather than alias
+        rawMeasurements[i] = measurementSchemas[i].getMeasurementId();
+      } else {
+        rawMeasurements[i] = measurements[i];
+      }
+    }
+    return rawMeasurements;
+  }

Review Comment:
   May check if there is any alias first, or even use a field to cache the 
result.
   In most cases, there is no alias, and the copy is unnecessary.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java:
##########
@@ -30,14 +30,19 @@
  */
 interface ICacheEntryManager<FK, SK, V, T extends ICacheEntry<SK, V>> {
 
-  T createCacheEntry(SK secondKey, V value, ICacheEntryGroup<FK, SK, V, T> 
cacheEntryGroup);
+  T createCacheEntry(
+      final SK secondKey, final V value, final ICacheEntryGroup<FK, SK, V, T> 
cacheEntryGroup);
 
-  void access(T cacheEntry);
+  void access(final T cacheEntry);
 
-  void put(T cacheEntry);
+  void put(final T cacheEntry);
 
-  void invalid(T cacheEntry);
+  // A cacheEntry is removed iff the caller has called "invalid" and it 
returns "true"
+  // Shall never remove a cacheEntry directly or when the "invalid" returns 
false
+  boolean invalidate(final T cacheEntry);
 
+  // The "evict" is allowed to be synchronized called
+  // Inner implementation guarantees that an entry won't be concurrently 
evicted

Review Comment:
   synchronized -> concurrently



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java:
##########
@@ -163,177 +199,250 @@ public List<Integer> 
computeWithoutTemplate(ISchemaComputation schemaComputation
    * @return The indexes of missed views and full paths of their source paths 
will be returned.
    */
   public Pair<List<Integer>, List<String>> computeSourceOfLogicalView(
-      ISchemaComputation schemaComputation) {
+      final ISchemaComputation schemaComputation) {
     if (!schemaComputation.hasLogicalViewNeedProcess()) {
       return new Pair<>(new ArrayList<>(), new ArrayList<>());
     }
-    return timeSeriesSchemaCache.computeSourceOfLogicalView(schemaComputation);
-  }
 
-  public List<Integer> computeWithTemplate(ISchemaComputation 
schemaComputation) {
-    return deviceUsingTemplateSchemaCache.compute(schemaComputation);
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final Pair<Integer, Integer> beginToEnd =
+        schemaComputation.getRangeOfLogicalViewSchemaListRecorded();
+    final List<LogicalViewSchema> logicalViewSchemaList =
+        schemaComputation.getLogicalViewSchemaList();
+    final List<Integer> indexListOfLogicalViewPaths =
+        schemaComputation.getIndexListOfLogicalViewPaths();
+
+    for (int i = beginToEnd.left; i < beginToEnd.right; i++) {
+      final LogicalViewSchema logicalViewSchema = logicalViewSchemaList.get(i);
+      final int realIndex = indexListOfLogicalViewPaths.get(i);
+      if (!logicalViewSchema.isWritable()) {
+        throw new RuntimeException(
+            new InsertNonWritableViewException(
+                schemaComputation
+                    .getDevicePath()
+                    
.concatAsMeasurementPath(schemaComputation.getMeasurements()[realIndex])
+                    .getFullPath()));
+      }
+
+      final PartialPath fullPath = logicalViewSchema.getSourcePathIfWritable();
+      final IDeviceSchema schema = 
deviceSchemaCache.getDeviceSchema(fullPath.getDevicePath());
+      if (!(schema instanceof DeviceNormalSchema)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      final DeviceNormalSchema treeSchema = (DeviceNormalSchema) schema;
+      final SchemaCacheEntry value = 
treeSchema.getSchemaCacheEntry(fullPath.getMeasurement());
+      if (Objects.isNull(value)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      if (value.isLogicalView()) {
+        throw new RuntimeException(
+            new UnsupportedOperationException(
+                String.format(
+                    "The source of view [%s] is also a view! Nested view is 
unsupported! "
+                        + "Please check it.",
+                    logicalViewSchema.getSourcePathIfWritable())));
+      }
+
+      schemaComputation.computeMeasurementOfView(realIndex, value, 
treeSchema.isAligned());
+    }
+
+    return new Pair<>(
+        indexOfMissingMeasurements,
+        indexOfMissingMeasurements.stream()
+            .map(index -> 
logicalViewSchemaList.get(index).getSourcePathStringIfWritable())
+            .collect(Collectors.toList()));
   }
 
-  /**
-   * Store the fetched schema in either the schemaCache or 
templateSchemaCache, depending on its
-   * associated device.
-   */
-  public void put(ClusterSchemaTree tree) {
-    PartialPath devicePath;
-    for (DeviceSchemaInfo deviceSchemaInfo : tree.getAllDevices()) {
-      devicePath = deviceSchemaInfo.getDevicePath();
-      if (deviceSchemaInfo.getTemplateId() != NON_TEMPLATE) {
-        deviceUsingTemplateSchemaCache.put(
-            devicePath, tree.getBelongedDatabase(devicePath), 
deviceSchemaInfo.getTemplateId());
-      } else {
-        for (MeasurementPath measurementPath : 
deviceSchemaInfo.getMeasurementSchemaPathList()) {
-          timeSeriesSchemaCache.putSingleMeasurementPath(
-              tree.getBelongedDatabase(devicePath), measurementPath);
-        }
+  public List<Integer> computeWithTemplate(final ISchemaComputation 
computation) {
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final String[] measurements = computation.getMeasurements();
+    final IDeviceSchema deviceSchema =
+        deviceSchemaCache.getDeviceSchema(computation.getDevicePath());
+
+    if (!(deviceSchema instanceof DeviceTemplateSchema)) {
+      return IntStream.range(0, 
measurements.length).boxed().collect(Collectors.toList());
+    }
+
+    final DeviceTemplateSchema deviceTemplateSchema = (DeviceTemplateSchema) 
deviceSchema;
+
+    computation.computeDevice(
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).isDirectAligned());
+    final Map<String, IMeasurementSchema> templateSchema =
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).getSchemaMap();
+    for (int i = 0; i < measurements.length; i++) {
+      if (!templateSchema.containsKey(measurements[i])) {
+        indexOfMissingMeasurements.add(i);
+        continue;
       }
+      final IMeasurementSchema schema = templateSchema.get(measurements[i]);
+      computation.computeMeasurement(
+          i,
+          new IMeasurementSchemaInfo() {
+            @Override
+            public String getName() {
+              return schema.getMeasurementId();
+            }
+
+            @Override
+            public IMeasurementSchema getSchema() {
+              if (isLogicalView()) {
+                return new LogicalViewSchema(
+                    schema.getMeasurementId(), ((LogicalViewSchema) 
schema).getExpression());
+              } else {
+                return this.getSchemaAsMeasurementSchema();
+              }
+            }
+
+            @Override
+            public MeasurementSchema getSchemaAsMeasurementSchema() {
+              return new MeasurementSchema(
+                  schema.getMeasurementId(),
+                  schema.getType(),
+                  schema.getEncodingType(),
+                  schema.getCompressor());
+            }
+
+            @Override
+            public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+              throw new RuntimeException(
+                  new UnsupportedOperationException(
+                      "Function getSchemaAsLogicalViewSchema is not supported 
in DeviceUsingTemplateSchemaCache."));
+            }
+
+            @Override
+            public Map<String, String> getTagMap() {
+              return null;
+            }
+
+            @Override
+            public Map<String, String> getAttributeMap() {
+              return null;
+            }
+
+            @Override
+            public String getAlias() {
+              return null;
+            }
+
+            @Override
+            public boolean isLogicalView() {
+              return schema.isLogicalView();
+            }
+          });
     }
+    return indexOfMissingMeasurements;
   }
 
-  public TimeValuePair getLastCache(PartialPath seriesPath) {
-    return timeSeriesSchemaCache.getLastCache(seriesPath);
+  /**
+   * Store the fetched schema in either the {@link DeviceNormalSchema} or 
{@link
+   * DeviceTemplateSchema}, depending on its associated device.
+   */
+  public void put(final ClusterSchemaTree tree) {
+    tree.getAllDevices()
+        .forEach(
+            deviceSchemaInfo ->
+                deviceSchemaCache.putDeviceSchema(
+                    
tree.getBelongedDatabase(deviceSchemaInfo.getDevicePath()), deviceSchemaInfo));
   }
 
-  public boolean getLastCache(final Map<PartialPath, Map<String, 
TimeValuePair>> inputMap) {
-    return timeSeriesSchemaCache.getLastCache(inputMap);
+  public TimeValuePair getLastCache(final PartialPath seriesPath) {
+    return deviceSchemaCache.getLastEntry(seriesPath.getDevicePath(), 
seriesPath.getMeasurement());
   }
 
-  public void invalidateLastCache(PartialPath path) {
+  public void invalidateLastCache(final PartialPath path) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       return;
     }
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.invalidateLastCache(path);
-    } finally {
-      releaseReadLock();
-    }
+    deviceSchemaCache.invalidateLastCache(path.getDevicePath(), 
path.getMeasurement());
   }
 
-  public void invalidateLastCacheInDataRegion(String database) {
+  public void invalidateDatabaseLastCache(final String database) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       return;
     }
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.invalidateDataRegionLastCache(database);
-    } finally {
-      releaseReadLock();
-    }
-  }
-
-  /** get SchemaCacheEntry and update last cache */
-  @TestOnly
-  public void updateLastCache(
-      PartialPath devicePath,
-      String measurement,
-      TimeValuePair timeValuePair,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    timeSeriesSchemaCache.updateLastCache(
-        devicePath, measurement, timeValuePair, highPriorityUpdate, 
latestFlushedTime);
-  }
-
-  public void updateLastCache(
-      String database,
-      PartialPath devicePath,
-      String[] measurements,
-      MeasurementSchema[] measurementSchemas,
-      boolean isAligned,
-      IntFunction<TimeValuePair> timeValuePairProvider,
-      IntPredicate shouldUpdateProvider,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.updateLastCache(
-          database,
-          devicePath,
-          measurements,
-          measurementSchemas,
-          isAligned,
-          timeValuePairProvider,
-          shouldUpdateProvider,
-          highPriorityUpdate,
-          latestFlushedTime);
-    } finally {
-      releaseReadLock();
-    }
+    deviceSchemaCache.invalidateLastCache(database);
   }
 
-  public void updateLastCacheWithoutLock(
-      String database,
-      PartialPath devicePath,
-      String[] measurements,
-      MeasurementSchema[] measurementSchemas,
-      boolean isAligned,
-      IntFunction<TimeValuePair> timeValuePairProvider,
-      IntPredicate shouldUpdateProvider,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    timeSeriesSchemaCache.updateLastCache(
-        database,
-        devicePath,
-        measurements,
-        measurementSchemas,
-        isAligned,
-        timeValuePairProvider,
-        shouldUpdateProvider,
-        highPriorityUpdate,
-        latestFlushedTime);
+  /**
+   * Update the {@link DeviceLastCache} in writing for tree model. If a 
measurement is with all

Review Comment:
   Remove "for tree model" and search other places. The same for other concepts 
like "id/attribute column" .



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java:
##########
@@ -113,18 +98,27 @@ public void releaseWriteLock() {
    * @param measurements
    * @return timeseries partialPath and its SchemaEntity
    */
-  public ClusterSchemaTree get(PartialPath devicePath, String[] measurements) {
-    return timeSeriesSchemaCache.get(devicePath, measurements);
-  }
-
-  public ClusterSchemaTree get(PartialPath fullPath) {
-    ClusterSchemaTree clusterSchemaTree =
-        deviceUsingTemplateSchemaCache.get(fullPath.getDevicePath());
-    if (clusterSchemaTree == null || clusterSchemaTree.isEmpty()) {
-      return timeSeriesSchemaCache.get(fullPath);
-    } else {
-      return clusterSchemaTree;
+  public ClusterSchemaTree get(final PartialPath devicePath, final String[] 
measurements) {
+    final ClusterSchemaTree tree = new ClusterSchemaTree();
+    final IDeviceSchema schema = deviceSchemaCache.getDeviceSchema(devicePath);
+    if (!(schema instanceof DeviceNormalSchema)) {
+      return tree;
+    }

Review Comment:
   May print a warn log?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceCacheEntry.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
+
+import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
+
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.iotdb.commons.schema.SchemaConstant.NON_TEMPLATE;
+
+@ThreadSafe
+public class DeviceCacheEntry {
+
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(DeviceCacheEntry.class)
+          + 2 * RamUsageEstimator.shallowSizeOfInstance(AtomicReference.class);
+
+  // the cached attributeMap may not be the latest, but there won't be any 
correctness problems
+  // because when missing getting the key-value from this attributeMap, caller 
will try to get or
+  // create from remote
+  // there may exist key is not null, but value is null in this map, which 
means that the key's
+  // corresponding value is null, doesn't mean that the key doesn't exist
+  private final AtomicReference<IDeviceSchema> deviceSchema = new 
AtomicReference<>();
+  private final AtomicReference<DeviceLastCache> lastCache = new 
AtomicReference<>();
+
+  int setDeviceSchema(final String database, final DeviceSchemaInfo 
deviceSchemaInfo) {
+    // Safe here because tree schema is invalidated by the whole entry
+    if (deviceSchemaInfo.getTemplateId() == NON_TEMPLATE) {
+      final int result =
+          (deviceSchema.compareAndSet(
+                  null, new DeviceNormalSchema(database, 
deviceSchemaInfo.isAligned()))
+              ? DeviceNormalSchema.INSTANCE_SIZE
+              : 0);
+      return deviceSchema.get() instanceof DeviceNormalSchema
+          ? result
+              + ((DeviceNormalSchema) deviceSchema.get())
+                  .update(deviceSchemaInfo.getMeasurementSchemaInfoList())
+          : 0;
+    } else {
+      return deviceSchema.compareAndSet(
+              null, new DeviceTemplateSchema(database, 
deviceSchemaInfo.getTemplateId()))
+          ? DeviceTemplateSchema.INSTANCE_SIZE
+          : 0;
+    }
+  }
+
+  int setMeasurementSchema(
+      final String database,
+      final boolean isAligned,
+      final String[] measurements,
+      final IMeasurementSchema[] schemas) {
+    if (schemas == null) {
+      return 0;
+    }
+    // Safe here because schema is invalidated by the whole entry
+    final int result =
+        (deviceSchema.compareAndSet(null, new DeviceNormalSchema(database, 
isAligned))
+            ? DeviceNormalSchema.INSTANCE_SIZE
+            : 0);
+    return deviceSchema.get() instanceof DeviceNormalSchema
+        ? result + ((DeviceNormalSchema) 
deviceSchema.get()).update(measurements, schemas)
+        : 0;
+  }
+
+  IDeviceSchema getDeviceSchema() {
+    return deviceSchema.get();
+  }
+
+  int invalidateSchema() {
+    final AtomicInteger size = new AtomicInteger(0);
+    deviceSchema.updateAndGet(
+        schema -> {
+          size.set(schema.estimateSize());
+          return schema;
+        });
+    return size.get();
+  }
+
+  /////////////////////////////// Last Cache ///////////////////////////////
+
+  int initOrInvalidateLastCache(final String[] measurements, final boolean 
isInvalidate) {
+    int result =
+        lastCache.compareAndSet(null, new DeviceLastCache()) ? 
DeviceLastCache.INSTANCE_SIZE : 0;
+    final DeviceLastCache cache = lastCache.get();
+    result += Objects.nonNull(cache) ? cache.initOrInvalidate(measurements, 
isInvalidate) : 0;
+    return Objects.nonNull(lastCache.get()) ? result : 0;
+  }
+
+  int tryUpdateLastCache(
+      final String[] measurements, final TimeValuePair[] timeValuePairs, 
boolean invalidateNull) {
+    final DeviceLastCache cache = lastCache.get();
+    final int result =
+        Objects.nonNull(cache) ? cache.tryUpdate(measurements, timeValuePairs, 
invalidateNull) : 0;
+    return Objects.nonNull(lastCache.get()) ? result : 0;
+  }
+
+  int tryUpdateLastCache(final String[] measurements, final TimeValuePair[] 
timeValuePairs) {
+    return tryUpdateLastCache(measurements, timeValuePairs, false);
+  }
+
+  int invalidateLastCache(final String measurement) {
+    final DeviceLastCache cache = lastCache.get();
+    final int result = Objects.nonNull(cache) ? cache.invalidate(measurement) 
: 0;
+    return Objects.nonNull(lastCache.get()) ? result : 0;
+  }
+
+  TimeValuePair getTimeValuePair(final String measurement) {
+    final DeviceLastCache cache = lastCache.get();
+    return Objects.nonNull(cache) ? cache.getTimeValuePair(measurement) : null;
+  }
+
+  boolean updateInputMap(final @Nonnull Map<String, TimeValuePair> updateMap) {
+    for (final String measurement : updateMap.keySet()) {
+      final TimeValuePair result = getTimeValuePair(measurement);
+      if (result == null) {
+        return false;
+      }
+      updateMap.put(measurement, result);
+    }
+    return true;
+  }

Review Comment:
   Why return false immediately if any measurement is not found?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryManager.java:
##########
@@ -30,14 +30,19 @@
  */
 interface ICacheEntryManager<FK, SK, V, T extends ICacheEntry<SK, V>> {
 
-  T createCacheEntry(SK secondKey, V value, ICacheEntryGroup<FK, SK, V, T> 
cacheEntryGroup);
+  T createCacheEntry(
+      final SK secondKey, final V value, final ICacheEntryGroup<FK, SK, V, T> 
cacheEntryGroup);
 
-  void access(T cacheEntry);
+  void access(final T cacheEntry);
 
-  void put(T cacheEntry);
+  void put(final T cacheEntry);
 
-  void invalid(T cacheEntry);
+  // A cacheEntry is removed iff the caller has called "invalid" and it 
returns "true"

Review Comment:
   invalid -> invalidate



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java:
##########
@@ -163,177 +199,250 @@ public List<Integer> 
computeWithoutTemplate(ISchemaComputation schemaComputation
    * @return The indexes of missed views and full paths of their source paths 
will be returned.
    */
   public Pair<List<Integer>, List<String>> computeSourceOfLogicalView(
-      ISchemaComputation schemaComputation) {
+      final ISchemaComputation schemaComputation) {
     if (!schemaComputation.hasLogicalViewNeedProcess()) {
       return new Pair<>(new ArrayList<>(), new ArrayList<>());
     }
-    return timeSeriesSchemaCache.computeSourceOfLogicalView(schemaComputation);
-  }
 
-  public List<Integer> computeWithTemplate(ISchemaComputation 
schemaComputation) {
-    return deviceUsingTemplateSchemaCache.compute(schemaComputation);
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final Pair<Integer, Integer> beginToEnd =
+        schemaComputation.getRangeOfLogicalViewSchemaListRecorded();
+    final List<LogicalViewSchema> logicalViewSchemaList =
+        schemaComputation.getLogicalViewSchemaList();
+    final List<Integer> indexListOfLogicalViewPaths =
+        schemaComputation.getIndexListOfLogicalViewPaths();
+
+    for (int i = beginToEnd.left; i < beginToEnd.right; i++) {
+      final LogicalViewSchema logicalViewSchema = logicalViewSchemaList.get(i);
+      final int realIndex = indexListOfLogicalViewPaths.get(i);
+      if (!logicalViewSchema.isWritable()) {
+        throw new RuntimeException(
+            new InsertNonWritableViewException(
+                schemaComputation
+                    .getDevicePath()
+                    
.concatAsMeasurementPath(schemaComputation.getMeasurements()[realIndex])
+                    .getFullPath()));
+      }
+
+      final PartialPath fullPath = logicalViewSchema.getSourcePathIfWritable();
+      final IDeviceSchema schema = 
deviceSchemaCache.getDeviceSchema(fullPath.getDevicePath());
+      if (!(schema instanceof DeviceNormalSchema)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      final DeviceNormalSchema treeSchema = (DeviceNormalSchema) schema;
+      final SchemaCacheEntry value = 
treeSchema.getSchemaCacheEntry(fullPath.getMeasurement());
+      if (Objects.isNull(value)) {
+        indexOfMissingMeasurements.add(i);
+        continue;
+      }
+
+      if (value.isLogicalView()) {
+        throw new RuntimeException(
+            new UnsupportedOperationException(
+                String.format(
+                    "The source of view [%s] is also a view! Nested view is 
unsupported! "
+                        + "Please check it.",
+                    logicalViewSchema.getSourcePathIfWritable())));
+      }
+
+      schemaComputation.computeMeasurementOfView(realIndex, value, 
treeSchema.isAligned());
+    }
+
+    return new Pair<>(
+        indexOfMissingMeasurements,
+        indexOfMissingMeasurements.stream()
+            .map(index -> 
logicalViewSchemaList.get(index).getSourcePathStringIfWritable())
+            .collect(Collectors.toList()));
   }
 
-  /**
-   * Store the fetched schema in either the schemaCache or 
templateSchemaCache, depending on its
-   * associated device.
-   */
-  public void put(ClusterSchemaTree tree) {
-    PartialPath devicePath;
-    for (DeviceSchemaInfo deviceSchemaInfo : tree.getAllDevices()) {
-      devicePath = deviceSchemaInfo.getDevicePath();
-      if (deviceSchemaInfo.getTemplateId() != NON_TEMPLATE) {
-        deviceUsingTemplateSchemaCache.put(
-            devicePath, tree.getBelongedDatabase(devicePath), 
deviceSchemaInfo.getTemplateId());
-      } else {
-        for (MeasurementPath measurementPath : 
deviceSchemaInfo.getMeasurementSchemaPathList()) {
-          timeSeriesSchemaCache.putSingleMeasurementPath(
-              tree.getBelongedDatabase(devicePath), measurementPath);
-        }
+  public List<Integer> computeWithTemplate(final ISchemaComputation 
computation) {
+    final List<Integer> indexOfMissingMeasurements = new ArrayList<>();
+    final String[] measurements = computation.getMeasurements();
+    final IDeviceSchema deviceSchema =
+        deviceSchemaCache.getDeviceSchema(computation.getDevicePath());
+
+    if (!(deviceSchema instanceof DeviceTemplateSchema)) {
+      return IntStream.range(0, 
measurements.length).boxed().collect(Collectors.toList());
+    }
+
+    final DeviceTemplateSchema deviceTemplateSchema = (DeviceTemplateSchema) 
deviceSchema;
+
+    computation.computeDevice(
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).isDirectAligned());
+    final Map<String, IMeasurementSchema> templateSchema =
+        
templateManager.getTemplate(deviceTemplateSchema.getTemplateId()).getSchemaMap();
+    for (int i = 0; i < measurements.length; i++) {
+      if (!templateSchema.containsKey(measurements[i])) {
+        indexOfMissingMeasurements.add(i);
+        continue;
       }
+      final IMeasurementSchema schema = templateSchema.get(measurements[i]);
+      computation.computeMeasurement(
+          i,
+          new IMeasurementSchemaInfo() {
+            @Override
+            public String getName() {
+              return schema.getMeasurementId();
+            }
+
+            @Override
+            public IMeasurementSchema getSchema() {
+              if (isLogicalView()) {
+                return new LogicalViewSchema(
+                    schema.getMeasurementId(), ((LogicalViewSchema) 
schema).getExpression());
+              } else {
+                return this.getSchemaAsMeasurementSchema();
+              }
+            }
+
+            @Override
+            public MeasurementSchema getSchemaAsMeasurementSchema() {
+              return new MeasurementSchema(
+                  schema.getMeasurementId(),
+                  schema.getType(),
+                  schema.getEncodingType(),
+                  schema.getCompressor());
+            }
+
+            @Override
+            public LogicalViewSchema getSchemaAsLogicalViewSchema() {
+              throw new RuntimeException(
+                  new UnsupportedOperationException(
+                      "Function getSchemaAsLogicalViewSchema is not supported 
in DeviceUsingTemplateSchemaCache."));
+            }
+
+            @Override
+            public Map<String, String> getTagMap() {
+              return null;
+            }
+
+            @Override
+            public Map<String, String> getAttributeMap() {
+              return null;
+            }
+
+            @Override
+            public String getAlias() {
+              return null;
+            }
+
+            @Override
+            public boolean isLogicalView() {
+              return schema.isLogicalView();
+            }
+          });
     }
+    return indexOfMissingMeasurements;
   }
 
-  public TimeValuePair getLastCache(PartialPath seriesPath) {
-    return timeSeriesSchemaCache.getLastCache(seriesPath);
+  /**
+   * Store the fetched schema in either the {@link DeviceNormalSchema} or 
{@link
+   * DeviceTemplateSchema}, depending on its associated device.
+   */
+  public void put(final ClusterSchemaTree tree) {
+    tree.getAllDevices()
+        .forEach(
+            deviceSchemaInfo ->
+                deviceSchemaCache.putDeviceSchema(
+                    
tree.getBelongedDatabase(deviceSchemaInfo.getDevicePath()), deviceSchemaInfo));
   }
 
-  public boolean getLastCache(final Map<PartialPath, Map<String, 
TimeValuePair>> inputMap) {
-    return timeSeriesSchemaCache.getLastCache(inputMap);
+  public TimeValuePair getLastCache(final PartialPath seriesPath) {
+    return deviceSchemaCache.getLastEntry(seriesPath.getDevicePath(), 
seriesPath.getMeasurement());
   }
 
-  public void invalidateLastCache(PartialPath path) {
+  public void invalidateLastCache(final PartialPath path) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       return;
     }
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.invalidateLastCache(path);
-    } finally {
-      releaseReadLock();
-    }
+    deviceSchemaCache.invalidateLastCache(path.getDevicePath(), 
path.getMeasurement());
   }
 
-  public void invalidateLastCacheInDataRegion(String database) {
+  public void invalidateDatabaseLastCache(final String database) {
     if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable()) {
       return;
     }
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.invalidateDataRegionLastCache(database);
-    } finally {
-      releaseReadLock();
-    }
-  }
-
-  /** get SchemaCacheEntry and update last cache */
-  @TestOnly
-  public void updateLastCache(
-      PartialPath devicePath,
-      String measurement,
-      TimeValuePair timeValuePair,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    timeSeriesSchemaCache.updateLastCache(
-        devicePath, measurement, timeValuePair, highPriorityUpdate, 
latestFlushedTime);
-  }
-
-  public void updateLastCache(
-      String database,
-      PartialPath devicePath,
-      String[] measurements,
-      MeasurementSchema[] measurementSchemas,
-      boolean isAligned,
-      IntFunction<TimeValuePair> timeValuePairProvider,
-      IntPredicate shouldUpdateProvider,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    takeReadLock();
-    try {
-      timeSeriesSchemaCache.updateLastCache(
-          database,
-          devicePath,
-          measurements,
-          measurementSchemas,
-          isAligned,
-          timeValuePairProvider,
-          shouldUpdateProvider,
-          highPriorityUpdate,
-          latestFlushedTime);
-    } finally {
-      releaseReadLock();
-    }
+    deviceSchemaCache.invalidateLastCache(database);
   }
 
-  public void updateLastCacheWithoutLock(
-      String database,
-      PartialPath devicePath,
-      String[] measurements,
-      MeasurementSchema[] measurementSchemas,
-      boolean isAligned,
-      IntFunction<TimeValuePair> timeValuePairProvider,
-      IntPredicate shouldUpdateProvider,
-      boolean highPriorityUpdate,
-      Long latestFlushedTime) {
-    timeSeriesSchemaCache.updateLastCache(
-        database,
-        devicePath,
-        measurements,
-        measurementSchemas,
-        isAligned,
-        timeValuePairProvider,
-        shouldUpdateProvider,
-        highPriorityUpdate,
-        latestFlushedTime);
+  /**
+   * Update the {@link DeviceLastCache} in writing for tree model. If a 
measurement is with all
+   * {@code null}s or is an id/attribute column, its {@link TimeValuePair}[] 
shall be {@code null}.
+   * For correctness, this will put the {@link DeviceCacheEntry} lazily and 
only update the existing
+   * {@link DeviceLastCache}s of measurements.
+   *
+   * @param database the device's database, WITH "root"
+   * @param deviceID {@link IDeviceID}
+   * @param measurements the fetched measurements
+   * @param timeValuePairs the {@link TimeValuePair}s with indexes 
corresponding to the measurements
+   */
+  public void updateLastCacheIfExists(
+      final String database,
+      final PartialPath deviceID,
+      final String[] measurements,
+      final @Nonnull TimeValuePair[] timeValuePairs,
+      final boolean isAligned,
+      final IMeasurementSchema[] measurementSchemas) {
+    deviceSchemaCache.updateLastCache(
+        database, deviceID, measurements, timeValuePairs, isAligned, 
measurementSchemas, false);
   }
 
   /**
-   * get or create SchemaCacheEntry and update last cache, only support 
non-aligned sensor or
-   * aligned sensor without only one sub sensor
+   * Update the {@link DeviceLastCache} on query in tree model.
+   *
+   * <p>Note: The query shall put the {@link DeviceLastCache} twice:
+   *
+   * <p>- First time set the "isCommit" to {@code false} before the query 
accesses data. It is just
+   * to allow the writing to update the cache, then avoid that the query put a 
stale value to cache
+   * and break the consistency. WARNING: The writing may temporarily put a 
stale value in cache if a
+   * stale value is written, but it won't affect the eventual consistency.
+   *
+   * <p>- Second time put the calculated {@link TimeValuePair}, and use {@link
+   * #updateLastCacheIfExists(String, PartialPath, String[], TimeValuePair[], 
boolean,
+   * IMeasurementSchema[])}. The input {@link TimeValuePair} shall never be or 
contain {@code null},
+   * if the measurement is with all {@code null}s, its {@link TimeValuePair} 
shall be {@link
+   * DeviceLastCache#EMPTY_TIME_VALUE_PAIR}. This method is not supposed to 
update time column.
+   *
+   * <p>If the query has ended abnormally, it shall call this to invalidate 
the entry it has pushed
+   * in the first time, to avoid the stale writing damaging the eventual 
consistency. In this case
+   * and the "isInvalidate" shall be {@code true}.

Review Comment:
   Suggestion: breaking the first paragraph and the third paragraph into two 
methods, like:
   preUpdateLastCacheForQuery
   postUpdateLastCacheForQuery
   
   which shall be much clearer.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DeviceSchemaCache.java:
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze.cache.schema;
+
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.path.PathPatternUtil;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.IDualKeyCache;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl.DualKeyCacheBuilder;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.dualkeycache.impl.DualKeyCachePolicy;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.function.ToIntFunction;
+
+/**
+ * The {@link DeviceSchemaCache} caches some of the devices and their: 
measurement info / template
+ * info. The last value of one device is also cached here.
+ */
+@ThreadSafe
+public class DeviceSchemaCache {
+
+  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final Logger logger = 
LoggerFactory.getLogger(DeviceSchemaCache.class);
+
+  /**
+   * Leading_Segment, {@link IDeviceID}, Map{@literal <}Measurement, 
Schema{@literal
+   * >}/templateInfo{@literal >}
+   *
+   * <p>The segment is used to:
+   *
+   * <p>1. Keep abreast of the newer versions.
+   *
+   * <p>2. Optimize the speed in invalidation by databases for most scenarios.
+   */
+  private final IDualKeyCache<String, PartialPath, DeviceCacheEntry> 
dualKeyCache;
+
+  private final Map<String, String> databasePool = new ConcurrentHashMap<>();
+
+  private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock(false);

Review Comment:
   Anywhere its readLock used?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java:
##########
@@ -171,28 +186,40 @@ public int hashCode() {
   private static class FIFOLinkedList<SK, V> {
 
     // head.next is the newest
-    private final FIFOCacheEntry head;
-    private final FIFOCacheEntry tail;
+    private final FIFOCacheEntry<SK, V> head;
+    private final FIFOCacheEntry<SK, V> tail;
 
     public FIFOLinkedList() {
-      head = new FIFOCacheEntry(null, null, null);
-      tail = new FIFOCacheEntry(null, null, null);
+      head = new FIFOCacheEntry<>(null, null, null);
+      tail = new FIFOCacheEntry<>(null, null, null);
       head.next = tail;
       tail.pre = head;
     }
 
-    synchronized void add(FIFOCacheEntry cacheEntry) {
-      cacheEntry.next = head.next;
+    synchronized void add(final FIFOCacheEntry<SK, V> cacheEntry) {
+      FIFOCacheEntry<SK, V> nextEntry;
+
+      do {
+        nextEntry = head.next;
+      } while (nextEntry.isInvalidated.get());

Review Comment:
   `head` and `nextEntry` do not seem to be changed?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java:
##########
@@ -171,28 +186,40 @@ public int hashCode() {
   private static class FIFOLinkedList<SK, V> {
 
     // head.next is the newest
-    private final FIFOCacheEntry head;
-    private final FIFOCacheEntry tail;
+    private final FIFOCacheEntry<SK, V> head;
+    private final FIFOCacheEntry<SK, V> tail;
 
     public FIFOLinkedList() {
-      head = new FIFOCacheEntry(null, null, null);
-      tail = new FIFOCacheEntry(null, null, null);
+      head = new FIFOCacheEntry<>(null, null, null);
+      tail = new FIFOCacheEntry<>(null, null, null);
       head.next = tail;
       tail.pre = head;
     }
 
-    synchronized void add(FIFOCacheEntry cacheEntry) {
-      cacheEntry.next = head.next;
+    synchronized void add(final FIFOCacheEntry<SK, V> cacheEntry) {
+      FIFOCacheEntry<SK, V> nextEntry;
+
+      do {
+        nextEntry = head.next;
+      } while (nextEntry.isInvalidated.get());
+
+      cacheEntry.next = nextEntry;
       cacheEntry.pre = head;
-      head.next.pre = cacheEntry;
+      nextEntry.pre = cacheEntry;
       head.next = cacheEntry;
     }
 
-    synchronized FIFOCacheEntry evict() {
-      if (tail.pre == head) {
-        return null;
-      }
-      FIFOCacheEntry cacheEntry = tail.pre;
+    synchronized FIFOCacheEntry<SK, V> evict() {
+      FIFOCacheEntry<SK, V> cacheEntry;
+
+      do {
+        cacheEntry = tail.pre;
+        if (cacheEntry == head) {
+          return null;
+        }
+
+      } while (cacheEntry.isInvalidated.compareAndSet(false, true));

Review Comment:
   Also check this



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java:
##########
@@ -169,14 +169,19 @@ public Response executeFastLastQueryStatement(
       List<Object> timeseries = new ArrayList<>();
       List<Object> valueList = new ArrayList<>();
       List<Object> dataTypeList = new ArrayList<>();
-      for (Map.Entry<PartialPath, Map<String, TimeValuePair>> entry : 
resultMap.entrySet()) {
-        final String deviceWithPrefix = entry.getKey() + 
TsFileConstant.PATH_SEPARATOR;
-        for (Map.Entry<String, TimeValuePair> measurementEntry : 
entry.getValue().entrySet()) {
-          final TimeValuePair tvPair = measurementEntry.getValue();
-          valueList.add(tvPair.getValue().getStringValue());
-          dataTypeList.add(tvPair.getValue().getDataType().name());
-          targetDataSet.addTimestampsItem(tvPair.getTimestamp());
-          timeseries.add(deviceWithPrefix + measurementEntry.getKey());
+      for (Map.Entry<String, Map<PartialPath, Map<String, TimeValuePair>>> 
entry :
+          resultMap.entrySet()) {
+        for (final Map.Entry<PartialPath, Map<String, TimeValuePair>> 
device2MeasurementLastEntry :
+            entry.getValue().entrySet()) {
+          final String deviceWithPrefix = entry.getKey() + 
TsFileConstant.PATH_SEPARATOR;

Review Comment:
   entry or device2MeasurementLastEntry?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java:
##########
@@ -92,317 +90,258 @@ public <R> boolean batchGet(
         if (cacheEntry == null) {
           return false;
         }
-        skrEntry.setValue(mappingFunction.apply(cacheEntry.getValue()));
+        if (!mappingFunction.apply(cacheEntry.getValue(), 
skrEntry.getValue())) {
+          return false;
+        }
       }
     }
     return true;
   }
 
   @Override
-  public void compute(IDualKeyCacheComputation<FK, SK, V> computation) {
-    FK firstKey = computation.getFirstKey();
+  public void update(
+      final FK firstKey,
+      final @Nonnull SK secondKey,
+      final V value,
+      final ToIntFunction<V> updater,
+      final boolean createIfNotExists) {
+
     ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
-    SK[] secondKeyList = computation.getSecondKeyList();
-    if (cacheEntryGroup == null) {
-      for (int i = 0; i < secondKeyList.length; i++) {
-        computation.computeValue(i, null);
-      }
-      cacheStats.recordMiss(secondKeyList.length);
-    } else {
-      T cacheEntry;
-      int hitCount = 0;
-      for (int i = 0; i < secondKeyList.length; i++) {
-        cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
-        if (cacheEntry == null) {
-          computation.computeValue(i, null);
-        } else {
-          computation.computeValue(i, cacheEntry.getValue());
-          cacheEntryManager.access(cacheEntry);
-          hitCount++;
-        }
+    if (Objects.isNull(cacheEntryGroup)) {
+      if (createIfNotExists) {
+        cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey, sizeComputer);
+        firstKeyMap.put(firstKey, cacheEntryGroup);
+      } else {
+        return;
       }
-      cacheStats.recordHit(hitCount);
-      cacheStats.recordMiss(secondKeyList.length - hitCount);
     }
-  }
 
-  @Override
-  public void update(IDualKeyCacheUpdating<FK, SK, V> updating) {
-    FK firstKey = updating.getFirstKey();
-    ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup = firstKeyMap.get(firstKey);
-    SK[] secondKeyList = updating.getSecondKeyList();
-    if (cacheEntryGroup == null) {
-      for (int i = 0; i < secondKeyList.length; i++) {
-        updating.updateValue(i, null);
-      }
-      cacheStats.recordMiss(secondKeyList.length);
-    } else {
-      T cacheEntry;
-      int hitCount = 0;
-      for (int i = 0; i < secondKeyList.length; i++) {
-        cacheEntry = cacheEntryGroup.getCacheEntry(secondKeyList[i]);
-        if (cacheEntry == null) {
-          updating.updateValue(i, null);
-        } else {
-          int changeSize = 0;
-          synchronized (cacheEntry) {
-            if (cacheEntry.getBelongedGroup() != null) {
-              // Only update the value when the cache entry is not evicted.
-              // If the cache entry is evicted, getBelongedGroup is null.
-              // Synchronized is to guarantee the cache entry is not evicted 
during the update.
-              changeSize = updating.updateValue(i, cacheEntry.getValue());
-              cacheEntryManager.access(cacheEntry);
-              if (changeSize != 0) {
-                cacheStats.increaseMemoryUsage(changeSize);
+    final ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup = 
cacheEntryGroup;
+    cacheEntryGroup.computeCacheEntry(
+        secondKey,
+        memory ->
+            (sk, cacheEntry) -> {
+              if (Objects.isNull(cacheEntry)) {
+                if (!createIfNotExists) {
+                  return null;
+                }
+                cacheEntry =
+                    cacheEntryManager.createCacheEntry(secondKey, value, 
finalCacheEntryGroup);
+                cacheEntryManager.put(cacheEntry);
+                memory.getAndAdd(
+                    sizeComputer.computeSecondKeySize(sk)
+                        + sizeComputer.computeValueSize(cacheEntry.getValue())
+                        + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY);
               }
-            }
-          }
-          if (changeSize != 0 && cacheStats.isExceedMemoryCapacity()) {
-            executeCacheEviction(changeSize);
-          }
-          hitCount++;
-        }
-      }
-      cacheStats.recordHit(hitCount);
-      cacheStats.recordMiss(secondKeyList.length - hitCount);
-    }
+              memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+              return cacheEntry;
+            });
+
+    mayEvict();
   }
 
   @Override
-  public void put(FK firstKey, SK secondKey, V value) {
-    int usedMemorySize = putToCache(firstKey, secondKey, value);
-    cacheStats.increaseMemoryUsage(usedMemorySize);
-    if (cacheStats.isExceedMemoryCapacity()) {
-      executeCacheEviction(usedMemorySize);
+  public void update(
+      final FK firstKey, final Predicate<SK> secondKeyChecker, final 
ToIntFunction<V> updater) {
+    final ICacheEntryGroup<FK, SK, V, T> entryGroup = 
firstKeyMap.get(firstKey);
+    if (Objects.nonNull(entryGroup)) {
+      entryGroup
+          .getAllCacheEntries()
+          .forEachRemaining(
+              entry -> {
+                if (!secondKeyChecker.test(entry.getKey())) {
+                  return;
+                }
+                entryGroup.computeCacheEntry(
+                    entry.getKey(),
+                    memory ->
+                        (secondKey, cacheEntry) -> {
+                          if (Objects.nonNull(cacheEntry)) {
+                            
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+                          }
+                          return cacheEntry;
+                        });
+              });
     }
+    mayEvict();
   }
 
-  private int putToCache(FK firstKey, SK secondKey, V value) {
-    AtomicInteger usedMemorySize = new AtomicInteger(0);
-    firstKeyMap.compute(
-        firstKey,
-        (k, cacheEntryGroup) -> {
-          if (cacheEntryGroup == null) {
-            cacheEntryGroup = new CacheEntryGroupImpl<>(firstKey);
-            
usedMemorySize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
-          }
-          ICacheEntryGroup<FK, SK, V, T> finalCacheEntryGroup = 
cacheEntryGroup;
-          cacheEntryGroup.computeCacheEntry(
-              secondKey,
-              (sk, cacheEntry) -> {
-                if (cacheEntry == null) {
-                  cacheEntry =
-                      cacheEntryManager.createCacheEntry(secondKey, value, 
finalCacheEntryGroup);
-                  cacheEntryManager.put(cacheEntry);
-                  
usedMemorySize.getAndAdd(sizeComputer.computeSecondKeySize(sk));
-                } else {
-                  V existingValue = cacheEntry.getValue();
-                  if (existingValue != value && !existingValue.equals(value)) {
-                    cacheEntry.replaceValue(value);
-                    
usedMemorySize.getAndAdd(-sizeComputer.computeValueSize(existingValue));
+  @Override
+  public void update(
+      final Predicate<FK> firstKeyChecker,
+      final Predicate<SK> secondKeyChecker,
+      final ToIntFunction<V> updater) {
+    for (final FK firstKey : firstKeyMap.getAllKeys()) {
+      if (!firstKeyChecker.test(firstKey)) {
+        continue;
+      }
+      final ICacheEntryGroup<FK, SK, V, T> entryGroup = 
firstKeyMap.get(firstKey);
+      if (Objects.nonNull(entryGroup)) {
+        entryGroup
+            .getAllCacheEntries()
+            .forEachRemaining(
+                entry -> {
+                  if (!secondKeyChecker.test(entry.getKey())) {
+                    return;
                   }
-                  // update the cache status
-                  cacheEntryManager.access(cacheEntry);
-                }
-                usedMemorySize.getAndAdd(sizeComputer.computeValueSize(value));
-                return cacheEntry;
-              });
-          return cacheEntryGroup;
-        });
-    return usedMemorySize.get();
+                  entryGroup.computeCacheEntry(
+                      entry.getKey(),
+                      memory ->
+                          (secondKey, cacheEntry) -> {
+                            
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
+                            return cacheEntry;
+                          });
+                });
+      }
+      mayEvict();
+    }
   }
 
-  /**
-   * Each thread putting new cache value only needs to evict cache values, 
total memory equals that
-   * the new cache value occupied.
-   */
-  private void executeCacheEviction(int targetSize) {
-    int evictedSize;
-    while (targetSize > 0 && cacheStats.memoryUsage() > 0) {
-      evictedSize = evictOneCacheEntry();
-      cacheStats.decreaseMemoryUsage(evictedSize);
-      targetSize -= evictedSize;
+  private void mayEvict() {
+    long exceedMemory;
+    while ((exceedMemory = cacheStats.getExceedMemory()) > 0) {
+      // Not compute each time to save time when FK is too many
+      // The hard-coded size is 100
+      do {
+        exceedMemory -= evictOneCacheEntry();
+      } while (exceedMemory > 0 && firstKeyMap.size() > 100);
     }
   }
 
-  private int evictOneCacheEntry() {
-
-    ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
+  // The returned delta may have some error, but it's OK
+  // Because the delta is only for loop round estimation
+  private long evictOneCacheEntry() {
+    final ICacheEntry<SK, V> evictCacheEntry = cacheEntryManager.evict();
     if (evictCacheEntry == null) {
       return 0;
     }
-    synchronized (evictCacheEntry) {
-      AtomicInteger evictedSize = new AtomicInteger(0);
-      
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
-
-      ICacheEntryGroup<FK, SK, V, T> belongedGroup = 
evictCacheEntry.getBelongedGroup();
-      evictCacheEntry.setBelongedGroup(null);
-      belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
-      
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
-
-      if (belongedGroup.isEmpty()) {
-        firstKeyMap.compute(
-            belongedGroup.getFirstKey(),
-            (firstKey, cacheEntryGroup) -> {
-              if (cacheEntryGroup == null) {
-                // has been removed by other threads
-                return null;
-              }
-              if (cacheEntryGroup.isEmpty()) {
-                
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
-                return null;
-              }
 
-              // some other thread has put value to it
-              return cacheEntryGroup;
-            });
+    final ICacheEntryGroup<FK, SK, V, T> belongedGroup = 
evictCacheEntry.getBelongedGroup();
+    evictCacheEntry.setBelongedGroup(null);
+
+    long memory = 
belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
+
+    final ICacheEntryGroup<FK, SK, V, T> cacheEntryGroup =
+        firstKeyMap.get(belongedGroup.getFirstKey());

Review Comment:
   Can belongedGroup and cacheEntryGroup be different?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to