JackieTien97 commented on code in PR #16353:
URL: https://github.com/apache/iotdb/pull/16353#discussion_r2642961795


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java:
##########
@@ -141,17 +140,37 @@ public FragmentInstanceStateMachine getStateMachine() {
     return stateMachine;
   }
 
+  // Check if this fragment instance should be ignored for statistics
+  // (i.e., it contains ExplainAnalyzeOperator only)
+  private boolean shouldIgnoreForStatistics() {
+    if (drivers == null || drivers.isEmpty()) {
+      return false;
+    }
+    // Check if any driver contains ExplainAnalyzeOperator
+    return drivers.stream()
+        .anyMatch(
+            driver ->
+                driver.getDriverContext().getOperatorContexts().stream()
+                    .anyMatch(
+                        operatorContext ->
+                            ExplainAnalyzeOperator.class
+                                .getSimpleName()
+                                .equals(operatorContext.getOperatorType())));

Review Comment:
   it should be inited as a field while generating this 
FragmentInstanceExecution in LocalExecutionPlanner instead of doing this 
judgement each time.



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -1123,6 +1123,18 @@ batch_size=100000
 # Datatype: long
 sort_buffer_size_in_bytes=0
 
+# The buffer size for CTE materialization. If cte_buffer_size_in_bytes <= 0, a 
default value of 128 KB is used; otherwise the specified value
+# will be used.
+# effectiveMode: hot_reload
+# Datatype: long
+cte_buffer_size_in_bytes=0

Review Comment:
   ```suggestion
   # Privilege: SYSTEM
   # unit: byte
   cte_buffer_size_in_bytes=131072
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java:
##########
@@ -1057,6 +1057,24 @@ public void loadProperties(TrimProperties properties) 
throws BadNodeUrlException
     // The buffer for sort operator to calculate
     loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", 
conf::setSortBufferSize);
 
+    // The buffer for cte materialization.
+    long cteBufferSizeInBytes =
+        Long.parseLong(
+            properties.getProperty(
+                "cte_buffer_size_in_bytes", 
Long.toString(conf.getCteBufferSize())));
+    if (cteBufferSizeInBytes > 0) {
+      conf.setCteBufferSize(cteBufferSizeInBytes);
+    }
+
+    // max number of rows for cte materialization
+    int maxRowsInCteBuffer =
+        Integer.parseInt(
+            properties.getProperty(
+                "max_rows_in_cte_buffer", 
Integer.toString(conf.getMaxRowsInCteBuffer())));
+    if (maxRowsInCteBuffer > 0) {
+      conf.setMaxRowsInCteBuffer(maxRowsInCteBuffer);
+    }
+

Review Comment:
   also add these in `loadHotModifiedProps` function, these two configurations 
should be hot loaded.



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -1123,6 +1123,18 @@ batch_size=100000
 # Datatype: long
 sort_buffer_size_in_bytes=0
 
+# The buffer size for CTE materialization. If cte_buffer_size_in_bytes <= 0, a 
default value of 128 KB is used; otherwise the specified value
+# will be used.
+# effectiveMode: hot_reload
+# Datatype: long
+cte_buffer_size_in_bytes=0
+
+
+# Max rows for CTE materialization
+# effectiveMode: hot_reload
+# Datatype: int

Review Comment:
   ```suggestion
   # Datatype: int
   # Privilege: SYSTEM
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java:
##########
@@ -0,0 +1,134 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.utils.cte.CteDataReader;
+import org.apache.iotdb.db.utils.cte.CteDataStore;
+import org.apache.iotdb.db.utils.cte.MemoryReader;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CteScanOperator implements SourceOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CteScanOperator.class);
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(CteScanOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+
+  private final CteDataStore dataStore;
+  private final CteDataReader dataReader;
+  private final int dataStoreRefCount;
+
+  public CteScanOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, CteDataStore 
dataStore) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.dataStore = dataStore;
+    this.dataReader = new MemoryReader(dataStore.getCachedData());
+    this.dataStoreRefCount = dataStore.increaseRefCount();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (dataReader == null) {
+      return null;
+    }
+    return dataReader.next();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (dataReader == null) {
+      return false;
+    }
+    return dataReader.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      if (dataReader != null) {
+        dataReader.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to close fileChannel", e);

Review Comment:
   ```suggestion
         LOGGER.error("Fail to close CteDataReader", e);
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java:
##########
@@ -0,0 +1,134 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.utils.cte.CteDataReader;
+import org.apache.iotdb.db.utils.cte.CteDataStore;
+import org.apache.iotdb.db.utils.cte.MemoryReader;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CteScanOperator implements SourceOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CteScanOperator.class);
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(CteScanOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+
+  private final CteDataStore dataStore;
+  private final CteDataReader dataReader;
+  private final int dataStoreRefCount;
+
+  public CteScanOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, CteDataStore 
dataStore) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.dataStore = dataStore;
+    this.dataReader = new MemoryReader(dataStore.getCachedData());
+    this.dataStoreRefCount = dataStore.increaseRefCount();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (dataReader == null) {
+      return null;
+    }
+    return dataReader.next();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (dataReader == null) {
+      return false;
+    }
+    return dataReader.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      if (dataReader != null) {
+        dataReader.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to close fileChannel", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return calculateRetainedSizeAfterCallingNext() + calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // The returned object is a reference to TsBlock in CteDataReader
+    return RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    long bytes =
+        INSTANCE_SIZE
+            + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+            + dataReader.bytesUsed();
+    if (dataStoreRefCount == 1) {
+      bytes += dataStore.getCachedBytes();
+    }

Review Comment:
   what if the FI which this CteScanOperator belongs to has finished, but other 
CteScanOperator with the same dataStore hasn't finished?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Scope.java:
##########
@@ -50,6 +54,7 @@ public class Scope {
   private final RelationId relationId;
   private final RelationType relation;
   private final Map<String, WithQuery> namedQueries;
+  private final List<Identifier> tables;

Review Comment:
   what's this field used for? we need some comments to explain.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java:
##########
@@ -123,6 +124,8 @@ public class Analysis implements IAnalysis {
 
   private final Map<NodeRef<Table>, Query> namedQueries = new 
LinkedHashMap<>();
 
+  private With with;

Review Comment:
   add some comments about this important field, like what's this field used 
for? in which case



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java:
##########
@@ -0,0 +1,134 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.utils.cte.CteDataReader;
+import org.apache.iotdb.db.utils.cte.CteDataStore;
+import org.apache.iotdb.db.utils.cte.MemoryReader;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CteScanOperator implements SourceOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CteScanOperator.class);
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(CteScanOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+
+  private final CteDataStore dataStore;
+  private final CteDataReader dataReader;
+  private final int dataStoreRefCount;
+
+  public CteScanOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, CteDataStore 
dataStore) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.dataStore = dataStore;
+    this.dataReader = new MemoryReader(dataStore.getCachedData());
+    this.dataStoreRefCount = dataStore.increaseRefCount();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (dataReader == null) {
+      return null;
+    }
+    return dataReader.next();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (dataReader == null) {
+      return false;
+    }
+    return dataReader.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      if (dataReader != null) {
+        dataReader.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to close fileChannel", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return calculateRetainedSizeAfterCallingNext() + calculateMaxReturnSize();

Review Comment:
   same as `calculateMaxReturnSize`



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java:
##########
@@ -763,9 +764,13 @@ private PlanBuilder filter(PlanBuilder subPlan, Expression 
predicate, Node node)
     }
 
     subPlan = subqueryPlanner.handleSubqueries(subPlan, predicate, 
analysis.getSubqueries(node));
-    return subPlan.withNewRoot(
-        new FilterNode(
-            queryIdAllocator.genPlanNodeId(), subPlan.getRoot(), 
subPlan.rewrite(predicate)));
+    PlanBuilder planBuilder =
+        subPlan.withNewRoot(
+            new FilterNode(
+                queryIdAllocator.genPlanNodeId(), subPlan.getRoot(), 
subPlan.rewrite(predicate)));
+    PredicateWithUncorrelatedScalarSubqueryReconstructor.getInstance()
+        .clearShadowExpression(predicate);

Review Comment:
   We shouldn't clear shadow expression here, the predicate expression is still 
being used afterwards. We can only clear them while front end finished.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java:
##########
@@ -3581,8 +3630,15 @@ protected Scope visitJoin(Join node, Optional<Scope> 
scope) {
 
       joinConditionCheck(criteria);
 
-      Scope left = process(node.getLeft(), scope);
-      Scope right = process(node.getRight(), scope);
+      Optional<Scope> leftScope = scope.map(Scope::copy);
+      Scope left = process(node.getLeft(), leftScope);
+      Optional<Scope> rightScope = scope.map(Scope::copy);
+      Scope right = process(node.getRight(), rightScope);
+
+      if (scope.isPresent()) {
+        leftScope.ifPresent(l -> scope.get().addTables(l.getTables()));
+        rightScope.ifPresent(l -> scope.get().addTables(l.getTables()));
+      }

Review Comment:
   why we need to do these changes? It seems that trino just directly use the 
scope.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/MemoryReader.java:
##########
@@ -0,0 +1,64 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.utils.cte;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+
+public class MemoryReader implements CteDataReader {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(MemoryReader.class);
+
+  // all the data in MemoryReader lies in memory
+  private final List<TsBlock> cachedData;
+  private int tsBlockIndex;
+
+  public MemoryReader(List<TsBlock> cachedTsBlock) {
+    this.cachedData = cachedTsBlock;
+    this.tsBlockIndex = 0;
+  }
+
+  @Override
+  public boolean hasNext() throws IoTDBException {
+    return cachedData != null && tsBlockIndex < cachedData.size();
+  }
+
+  @Override
+  public TsBlock next() throws IoTDBException {
+    if (cachedData == null || tsBlockIndex >= cachedData.size()) {
+      return null;
+    }
+    return cachedData.get(tsBlockIndex++);
+  }
+
+  @Override
+  public void close() throws IoTDBException {}
+
+  @Override
+  public long bytesUsed() {
+    return INSTANCE_SIZE;

Review Comment:
   handle the memory footprint right.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/PredicateWithUncorrelatedScalarSubqueryReconstructor.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.ir;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext.ExplainType;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BinaryLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DoubleLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubqueryExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WithQuery;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PredicateWithUncorrelatedScalarSubqueryReconstructor {

Review Comment:
   no need to be a singleton? if you just want to add UT, you should pass that 
mock one from the top level



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java:
##########
@@ -103,6 +125,24 @@ public class MPPQueryContext implements IAuditEntity {
 
   private boolean userQuery = false;
 
+  private Map<NodeRef<Table>, Query> cteQueries = new HashMap<>();
+
+  // Stores the EXPLAIN/EXPLAIN ANALYZE results for Common Table Expressions 
(CTEs)
+  // Key: CTE table reference
+  // Value: Pair containing (max line length of the explain output, list of 
formatted explain lines)
+  // This ensures consistent formatting between the main query and its CTE 
sub-queries
+  private final Map<NodeRef<Table>, Pair<Integer, List<String>>> 
cteExplainResults =
+      new LinkedHashMap<>();
+  // Tracks the materialization time cost (in nanoseconds) for each CTE to 
help optimize query
+  // planning
+  private final Map<NodeRef<Table>, Long> cteMaterializationCosts = new 
HashMap<>();
+
+  // Never materialize CTE in a subquery.
+  private boolean subquery = false;

Review Comment:
   not just a subQuery? I think the field only means materialized CTE query or 
folded uncorrelated scalar subquery?
   
   if there is no better names, I think you should write the meaning more clear 
in the java docs.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java:
##########
@@ -0,0 +1,134 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.utils.cte.CteDataReader;
+import org.apache.iotdb.db.utils.cte.CteDataStore;
+import org.apache.iotdb.db.utils.cte.MemoryReader;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CteScanOperator implements SourceOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CteScanOperator.class);
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(CteScanOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+
+  private final CteDataStore dataStore;
+  private final CteDataReader dataReader;
+  private final int dataStoreRefCount;
+
+  public CteScanOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, CteDataStore 
dataStore) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.dataStore = dataStore;
+    this.dataReader = new MemoryReader(dataStore.getCachedData());
+    this.dataStoreRefCount = dataStore.increaseRefCount();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (dataReader == null) {
+      return null;
+    }
+    return dataReader.next();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (dataReader == null) {
+      return false;
+    }
+    return dataReader.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      if (dataReader != null) {
+        dataReader.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to close fileChannel", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return calculateRetainedSizeAfterCallingNext() + calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // The returned object is a reference to TsBlock in CteDataReader
+    return RamUsageEstimator.NUM_BYTES_OBJECT_REF;

Review Comment:
   it should return the actual max return size among all the tsblocks in 
dataReader



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java:
##########
@@ -299,6 +300,13 @@ protected Filter visitBetweenPredicate(BetweenPredicate 
node, Void context) {
   }
 
   public static long getLongValue(Expression expression) {
-    return ((LongLiteral) expression).getParsedValue();
+    if (expression instanceof LongLiteral) {
+      return ((LongLiteral) expression).getParsedValue();
+    } else if (expression instanceof DoubleLiteral) {
+      return (long) ((DoubleLiteral) expression).getValue();
+    } else {
+      throw new IllegalArgumentException(
+          "Expression should be LongLiteral or DoubleLiteral, but got: " + 
expression.getClass());
+    }

Review Comment:
   You don't need to fix that, @alpass163 has already fixed that in 
https://github.com/apache/iotdb/pull/16917



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/PredicateWithUncorrelatedScalarSubqueryReconstructor.java:
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.ir;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext.ExplainType;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BinaryLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DoubleLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FunctionCall;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Literal;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NotExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.StringLiteral;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SubqueryExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WithQuery;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PredicateWithUncorrelatedScalarSubqueryReconstructor {
+
+  private static final SqlParser relationSqlParser = new SqlParser();

Review Comment:
   no need to new one, you can get it from `TableModelPlanner`



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/CteMaterializer.java:
##########
@@ -0,0 +1,354 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.exception.mpp.FragmentInstanceFetchException;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
+import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
+import org.apache.iotdb.db.queryengine.plan.execution.QueryExecution;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.With;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WithQuery;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
+import 
org.apache.iotdb.db.queryengine.statistics.FragmentInstanceStatisticsDrawer;
+import org.apache.iotdb.db.queryengine.statistics.QueryStatisticsFetcher;
+import org.apache.iotdb.db.queryengine.statistics.StatisticLine;
+import org.apache.iotdb.db.utils.cte.CteDataStore;
+import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.TypeFactory;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class CteMaterializer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CteMaterializer.class);
+
+  private static final Coordinator coordinator = Coordinator.getInstance();
+  private static final SessionManager sessionManager = 
SessionManager.getInstance();
+
+  public void materializeCTE(Analysis analysis, MPPQueryContext context) {
+    analysis
+        .getNamedQueries()
+        .forEach(
+            (tableRef, query) -> {
+              Table table = tableRef.getNode();
+              if (query.isMaterialized()) {
+                if (!query.isDone()) {

Review Comment:
   we may need another if to check whether this cte has already failed last 
time? if so we can just return even if this cte is not done



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/CteDataReader.java:
##########
@@ -0,0 +1,58 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.utils.cte;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+
+public interface CteDataReader {

Review Comment:
   ```suggestion
   public interface CteDataReader extends Accountable {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java:
##########
@@ -0,0 +1,134 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.utils.cte.CteDataReader;
+import org.apache.iotdb.db.utils.cte.CteDataStore;
+import org.apache.iotdb.db.utils.cte.MemoryReader;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CteScanOperator implements SourceOperator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CteScanOperator.class);
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(CteScanOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final PlanNodeId sourceId;
+
+  private final CteDataStore dataStore;
+  private final CteDataReader dataReader;
+  private final int dataStoreRefCount;
+
+  public CteScanOperator(
+      OperatorContext operatorContext, PlanNodeId sourceId, CteDataStore 
dataStore) {
+    this.operatorContext = operatorContext;
+    this.sourceId = sourceId;
+    this.dataStore = dataStore;
+    this.dataReader = new MemoryReader(dataStore.getCachedData());
+    this.dataStoreRefCount = dataStore.increaseRefCount();
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (dataReader == null) {
+      return null;
+    }
+    return dataReader.next();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (dataReader == null) {
+      return false;
+    }
+    return dataReader.hasNext();
+  }
+
+  @Override
+  public void close() throws Exception {
+    try {
+      if (dataReader != null) {
+        dataReader.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to close fileChannel", e);
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !hasNextWithTimer();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return calculateRetainedSizeAfterCallingNext() + calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    // The returned object is a reference to TsBlock in CteDataReader
+    return RamUsageEstimator.NUM_BYTES_OBJECT_REF;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0L;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    long bytes =
+        INSTANCE_SIZE
+            + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+            + dataReader.bytesUsed();
+    if (dataStoreRefCount == 1) {
+      bytes += dataStore.getCachedBytes();
+    }

Review Comment:
   I think it's better to do memory alloc and free in CteDataReader.close() and 
init(). first init() call to alloc from 
LocalExecutionPlanner.reserveFromFreeMemoryForOperators and last free from 
LocalExecutionPlanner.releaseToFreeMemoryForOperators



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/cte/CteDataReader.java:
##########
@@ -0,0 +1,58 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.utils.cte;
+
+import org.apache.iotdb.commons.exception.IoTDBException;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+
+public interface CteDataReader {
+  /**
+   * Check if there is more data in CteDataReader. DiskSpillerReader may run 
out of current TsBlocks
+   * , then it needs to read from file and cache more data. This method should 
be called before
+   * next() to ensure that there is data to read.
+   *
+   * @throws IoTDBException the error occurs when reading data from fileChannel
+   */
+  boolean hasNext() throws IoTDBException;
+
+  /**
+   * output the cached data in CteDataReader, it needs to be called after 
hasNext() returns true.
+   *
+   * @return next TsBlock
+   */
+  TsBlock next() throws IoTDBException;
+
+  /**
+   * Close the CteDataReader and release resources.
+   *
+   * @throws IoTDBException the error occurs when closing fileChannel
+   */
+  void close() throws IoTDBException;
+
+  /**
+   * Get the bytes used by this CteDataReader.
+   *
+   * @return the bytes used by this CteDataReader
+   */
+  long bytesUsed();

Review Comment:
   ```suggestion
     long ramBytesUsed();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to