JackieTien97 commented on code in PR #10263:
URL: https://github.com/apache/iotdb/pull/10263#discussion_r1263427535


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastSeriesSourceNode.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class LastSeriesSourceNode extends SeriesSourceNode {
+  // The num of same series LastQueryScanNode on dispatched DataNode,
+  // use reference value here to avoid iterate twice.
+  private final AtomicInteger dataNodeSeriesScanNum;
+
+  protected LastSeriesSourceNode(PlanNodeId id, AtomicInteger 
dataNodeSeriesScanNum) {
+    super(id);
+    this.dataNodeSeriesScanNum = dataNodeSeriesScanNum;
+  }
+
+  public AtomicInteger getDataNodeSeriesScanNum() {
+    return dataNodeSeriesScanNum;
+  }
+
+  public PartialPath getSeriesPath() {
+    throw new UnsupportedOperationException();
+  }

Review Comment:
   ```suggestion
     public abstract PartialPath getSeriesPath();
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java:
##########
@@ -77,6 +79,17 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
+  private final Map<QueryId, DataNodeQueryContext> dataNodeQueryContextMap;
+  private DataNodeQueryContext dataNodeQueryContext;
+
+  //    private final GcMonitor gcMonitor;
+  //    private final AtomicLong startNanos = new AtomicLong();
+  //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
+  //    private final AtomicLong startFullGcTimeNanos = new AtomicLong(-1);
+  //    private final AtomicLong endNanos = new AtomicLong();
+  //    private final AtomicLong endFullGcCount = new AtomicLong(-1);
+  //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
+

Review Comment:
   ```suggestion
   ```



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/distribution/Util.java:
##########
@@ -224,6 +227,9 @@ public static Analysis constructAnalysis() {
       analysis.setDataPartitionInfo(dataPartition);
       analysis.setSchemaPartitionInfo(schemaPartition);
       analysis.setSchemaTree(genSchemaTree());
+      // to avoid some special case which is not the point of test
+      analysis.setStatement(Mockito.mock(QueryStatement.class));
+      Mockito.when(analysis.getStatement().isQuery()).thenReturn(false);

Review Comment:
   last query  and aggregation query use this methods, they all belong to 
query, we should never simply return `false` here.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java:
##########
@@ -90,6 +97,40 @@ private void prepare() {
       recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
       produceFragmentInstance(fragment);
     }
+    fragmentInstanceList.forEach(
+        fragmentInstance ->
+            fragmentInstance.setDataNodeFINum(
+                dataNodeFIMap.get(fragmentInstance.getHostDataNode()).size()));
+
+    // compute dataNodeSeriesScanNum in LastQueryScanNode
+    if (analysis.getStatement().isQuery()
+        && ((QueryStatement) analysis.getStatement()).isLastQuery()) {
+      final Map<Path, AtomicInteger> pathSumMap = new HashMap<>();
+      dataNodeFIMap
+          .values()
+          .forEach(
+              fragmentInstances -> {
+                fragmentInstances.forEach(
+                    fragmentInstance ->
+                        updateScanNum(
+                            fragmentInstance.getFragment().getPlanNodeTree(), 
pathSumMap));
+                pathSumMap.clear();
+              });
+    }
+  }
+
+  private void updateScanNum(PlanNode planNode, Map<Path, AtomicInteger> 
pathSumMap) {
+    if (planNode instanceof LastSeriesSourceNode) {
+      LastSeriesSourceNode lastSeriesSourceNode = (LastSeriesSourceNode) 
planNode;
+      pathSumMap.merge(
+          lastSeriesSourceNode.getSeriesPath(),
+          lastSeriesSourceNode.getDataNodeSeriesScanNum(),
+          (k, v) -> {
+            v.incrementAndGet();
+            return v;
+          });

Review Comment:
   here you won't update the number in previous LastSeriesSourceNode



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java:
##########
@@ -90,6 +97,40 @@ private void prepare() {
       recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
       produceFragmentInstance(fragment);
     }
+    fragmentInstanceList.forEach(
+        fragmentInstance ->
+            fragmentInstance.setDataNodeFINum(
+                dataNodeFIMap.get(fragmentInstance.getHostDataNode()).size()));
+
+    // compute dataNodeSeriesScanNum in LastQueryScanNode
+    if (analysis.getStatement().isQuery()
+        && ((QueryStatement) analysis.getStatement()).isLastQuery()) {
+      final Map<Path, AtomicInteger> pathSumMap = new HashMap<>();
+      dataNodeFIMap
+          .values()
+          .forEach(
+              fragmentInstances -> {
+                fragmentInstances.forEach(
+                    fragmentInstance ->
+                        updateScanNum(
+                            fragmentInstance.getFragment().getPlanNodeTree(), 
pathSumMap));
+                pathSumMap.clear();
+              });
+    }

Review Comment:
   do this in distrution planner processing.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/LastSeriesSourceNode.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.source;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class LastSeriesSourceNode extends SeriesSourceNode {
+  // The num of same series LastQueryScanNode on dispatched DataNode,
+  // use reference value here to avoid iterate twice.
+  private final AtomicInteger dataNodeSeriesScanNum;

Review Comment:
   No need to be `AtomicInteger`.



##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/plan/PipelineBuilderTest.java:
##########
@@ -811,7 +812,8 @@ private LocalExecutionPlanContext 
createLocalExecutionPlanContext(TypeProvider t
         createFragmentInstanceContext(instanceId, stateMachine);
     fragmentInstanceContext.setDataRegion(dataRegion);
 
-    return new LocalExecutionPlanContext(typeProvider, 
fragmentInstanceContext);
+    return new LocalExecutionPlanContext(
+        typeProvider, fragmentInstanceContext, new DataNodeQueryContext(0));

Review Comment:
   ```suggestion
           typeProvider, fragmentInstanceContext, new DataNodeQueryContext(1));
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/last/AbstractUpdateLastCacheOperator.java:
##########
@@ -80,6 +97,35 @@ protected String getDatabaseName() {
     return databaseName;
   }
 
+  protected void mayUpdateLastCache(
+      long time, @Nullable TsPrimitiveType value, MeasurementPath fullPath) {
+    if (!needUpdateCache) {
+      return;
+    }
+    try {
+      dataNodeQueryContext.lock();
+      Pair<AtomicInteger, TimeValuePair> seriesScanInfo =
+          dataNodeQueryContext.getSeriesScanInfo(fullPath);
+
+      // may enter this case when use TTL
+      if (seriesScanInfo == null) {
+        return;
+      }
+
+      // update cache in DataNodeQueryContext
+      if (seriesScanInfo.right == null || time > 
seriesScanInfo.right.getTimestamp()) {
+        seriesScanInfo.right = new TimeValuePair(time, value);
+      }
+
+      if 
(dataNodeQueryContext.getDataNodeSeriesScanNum(fullPath).decrementAndGet() == 
0) {

Review Comment:
   ```suggestion
         if (seriesScanInfo.left.decrementAndGet() == 0) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to