JackieTien97 commented on code in PR #16420:
URL: https://github.com/apache/iotdb/pull/16420#discussion_r2351630163


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -643,16 +636,48 @@ private void updateLastCacheUseLastValuesIfPossible() {
       channel += tableAggregator.getChannelCount();
     }
 
+    checkIfUpdated(updateMeasurementList, updateTimeValuePairList);
+  }
+
+  private void checkIfUpdated(
+      List<String> updateMeasurementList, List<TimeValuePair> 
updateTimeValuePairList) {
     if (!updateMeasurementList.isEmpty()) {
-      String[] updateMeasurementArray = updateMeasurementList.toArray(new 
String[0]);
-      TimeValuePair[] updateTimeValuePairArray =
-          updateTimeValuePairList.toArray(new TimeValuePair[0]);
       currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
-      TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
-          dbName,
-          currentDeviceEntry.getDeviceID(),
-          updateMeasurementArray,
-          updateTimeValuePairArray);
+
+      boolean deviceInMultiRegion =
+          deviceScanSumMap != null && 
deviceScanSumMap.containsKey(currentDeviceEntry);
+      try {
+        dataNodeQueryContext.lock(deviceInMultiRegion);
+
+        if (!deviceInMultiRegion) {
+          TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+              dbName,
+              currentDeviceEntry.getDeviceID(),
+              updateMeasurementList.toArray(new String[0]),
+              updateTimeValuePairList.toArray(new TimeValuePair[0]));
+          return;
+        }
+
+        Pair<Integer, TimeValuePair[]> deviceValues =
+            dataNodeQueryContext.getDeviceValues(tableCompleteName, 
currentDeviceEntry);
+        TimeValuePair[] values = deviceValues.getRight();
+        // update cache in DataNodeQueryContext
+        if (values == null) {
+          values = updateTimeValuePairList.toArray(new TimeValuePair[0]);

Review Comment:
   need update the right of the pair?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -643,16 +636,48 @@ private void updateLastCacheUseLastValuesIfPossible() {
       channel += tableAggregator.getChannelCount();
     }
 
+    checkIfUpdated(updateMeasurementList, updateTimeValuePairList);
+  }
+
+  private void checkIfUpdated(
+      List<String> updateMeasurementList, List<TimeValuePair> 
updateTimeValuePairList) {
     if (!updateMeasurementList.isEmpty()) {
-      String[] updateMeasurementArray = updateMeasurementList.toArray(new 
String[0]);
-      TimeValuePair[] updateTimeValuePairArray =
-          updateTimeValuePairList.toArray(new TimeValuePair[0]);
       currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
-      TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
-          dbName,
-          currentDeviceEntry.getDeviceID(),
-          updateMeasurementArray,
-          updateTimeValuePairArray);
+
+      boolean deviceInMultiRegion =
+          deviceScanSumMap != null && 
deviceScanSumMap.containsKey(currentDeviceEntry);
+      try {
+        dataNodeQueryContext.lock(deviceInMultiRegion);
+
+        if (!deviceInMultiRegion) {
+          TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+              dbName,
+              currentDeviceEntry.getDeviceID(),
+              updateMeasurementList.toArray(new String[0]),
+              updateTimeValuePairList.toArray(new TimeValuePair[0]));
+          return;
+        }
+
+        Pair<Integer, TimeValuePair[]> deviceValues =
+            dataNodeQueryContext.getDeviceValues(tableCompleteName, 
currentDeviceEntry);
+        TimeValuePair[] values = deviceValues.getRight();
+        // update cache in DataNodeQueryContext
+        if (values == null) {
+          values = updateTimeValuePairList.toArray(new TimeValuePair[0]);
+        } else {
+          updateTimeValuePairList.toArray(new TimeValuePair[0]);
+        }
+
+        if (deviceValues.left-- == 0) {

Review Comment:
   do `deviceValues.left--` out of the if condition



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -643,16 +636,48 @@ private void updateLastCacheUseLastValuesIfPossible() {
       channel += tableAggregator.getChannelCount();
     }
 
+    checkIfUpdated(updateMeasurementList, updateTimeValuePairList);
+  }
+
+  private void checkIfUpdated(
+      List<String> updateMeasurementList, List<TimeValuePair> 
updateTimeValuePairList) {
     if (!updateMeasurementList.isEmpty()) {
-      String[] updateMeasurementArray = updateMeasurementList.toArray(new 
String[0]);
-      TimeValuePair[] updateTimeValuePairArray =
-          updateTimeValuePairList.toArray(new TimeValuePair[0]);
       currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
-      TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
-          dbName,
-          currentDeviceEntry.getDeviceID(),
-          updateMeasurementArray,
-          updateTimeValuePairArray);
+
+      boolean deviceInMultiRegion =
+          deviceScanSumMap != null && 
deviceScanSumMap.containsKey(currentDeviceEntry);
+      try {
+        dataNodeQueryContext.lock(deviceInMultiRegion);
+
+        if (!deviceInMultiRegion) {
+          TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+              dbName,
+              currentDeviceEntry.getDeviceID(),
+              updateMeasurementList.toArray(new String[0]),
+              updateTimeValuePairList.toArray(new TimeValuePair[0]));
+          return;
+        }
+
+        Pair<Integer, TimeValuePair[]> deviceValues =
+            dataNodeQueryContext.getDeviceValues(tableCompleteName, 
currentDeviceEntry);
+        TimeValuePair[] values = deviceValues.getRight();
+        // update cache in DataNodeQueryContext
+        if (values == null) {
+          values = updateTimeValuePairList.toArray(new TimeValuePair[0]);
+        } else {
+          updateTimeValuePairList.toArray(new TimeValuePair[0]);

Review Comment:
   we need to update each measurement's latest time value?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java:
##########
@@ -95,6 +98,48 @@ private void prepare() {
 
     fragmentInstanceList.forEach(
         fi -> 
fi.setDataNodeFINum(dataNodeFIMap.get(fi.getHostDataNode()).size()));
+
+    if (queryContext.needUpdateScanNumForLastQuery()) {
+      Map<QualifiedObjectName, Map<DeviceEntry, Integer>> 
deviceScanSumMapOfEachTable =
+          new HashMap<>();

Review Comment:
   the scope should be each DN?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java:
##########
@@ -538,6 +582,17 @@ protected static void deserializeMemberVariables(
     node.groupIdSymbol = groupIdSymbol;
 
     node.outputSymbols = constructOutputSymbols(node.getGroupingSets(), 
node.getAggregations());
+
+    if (ReadWriteIOUtils.readBool(byteBuffer)) {
+      size = ReadWriteIOUtils.readInt(byteBuffer);
+      Map<DeviceEntry, Integer> deviceScanSumMap = new HashMap<>(size);
+      while (size-- > 0) {
+        deviceScanSumMap.put(
+            DeviceEntry.deserialize(byteBuffer), 
ReadWriteIOUtils.readInt(byteBuffer));
+      }
+      node.setDeviceScanSumMap(deviceScanSumMap);
+    }
+    node.projection = projection.build();

Review Comment:
   what's this line used for?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3227,6 +3235,23 @@ private LastQueryAggTableScanOperator 
constructLastQueryAggTableScanOperator(
     return lastQueryOperator;
   }
 
+  private void addUncachedDeviceToContext(
+      AggregationTableScanNode node, LocalExecutionPlanContext context, 
DeviceEntry deviceEntry) {
+    boolean deviceInMultiRegion =
+        node.getDeviceScanSumMap() != null && 
node.getDeviceScanSumMap().containsKey(deviceEntry);
+    try {
+      context.dataNodeQueryContext.lock(deviceInMultiRegion);
+      context.dataNodeQueryContext.addUnCachedDeviceIfAbsent(
+          node.getQualifiedObjectName(),
+          deviceEntry,
+          node.getDeviceScanSumMap() == null
+              ? 1
+              : node.getDeviceScanSumMap().getOrDefault(deviceEntry, 1));
+    } finally {
+      context.dataNodeQueryContext.unLock(deviceInMultiRegion);
+    }

Review Comment:
   only deviceInMultiRegion is true need to do so?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to