Copilot commented on code in PR #16700:
URL: https://github.com/apache/iotdb/pull/16700#discussion_r2501523778


##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/IntersectTest.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
+
+import org.junit.Test;
+
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.aggregation;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.filter;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.union;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.window;
+
+/** test to intersect (distinct) and intersect all */

Review Comment:
   Grammatical error in comment. "test to intersect" should be "test for 
intersect" or "tests for intersect".
   
   Corrected: `/** tests for intersect (distinct) and intersect all */`
   ```suggestion
   /** tests for intersect (distinct) and intersect all */
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FrameBound;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.concat;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class SetOperationNodeTranslator {
+
+  private static final String MARKER = "marker";
+  private static final String COUNT_MARKER = "count";
+  private static final String ROW_NUMBER_SYMBOL = "row_number";
+  private final SymbolAllocator symbolAllocator;
+  private final QueryId idAllocator;
+  private final Metadata metadata;
+
+  public SetOperationNodeTranslator(
+      Metadata metadata, SymbolAllocator symbolAllocator, QueryId idAllocator) 
{
+
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is 
null");
+    this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+  }
+
+  /** for intersect distinct and except distinct , use true and false for 
markers */
+  public TranslationResult makeSetContainmentPlanForDistinct(SetOperationNode 
node) {
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // 1. add the marker column to the origin planNode
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    // 2. add the union node over all new projection nodes.
+    // The outputs of the union must have the same name as the original 
intersect node
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+
+    // 3. add the aggregation node above the union node
+    List<Symbol> aggregationOutputs = allocateSymbols(markers.size(), COUNT, 
INT64);
+    AggregationNode aggregation =
+        computeCounts(union, node.getOutputSymbols(), markers, 
aggregationOutputs);
+
+    return new TranslationResult(aggregation, aggregationOutputs);
+  }
+
+  /** for intersect all and except all, use true and false for markers */
+  public TranslationResult makeSetContainmentPlanForAll(SetOperationNode node) 
{
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // for every child of SetOperation node, add the marker column for the 
child
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+    List<Symbol> countOutputs = allocateSymbols(markers.size(), COUNT_MARKER, 
INT64);
+    Symbol rowNumberSymbol = symbolAllocator.newSymbol(ROW_NUMBER_SYMBOL, 
INT64);
+    WindowNode windowNode =
+        appendCounts(union, node.getOutputSymbols(), markers, countOutputs, 
rowNumberSymbol);
+
+    ProjectNode projectNode =
+        new ProjectNode(
+            idAllocator.genPlanNodeId(),
+            windowNode,
+            Assignments.identity(
+                ImmutableList.copyOf(
+                    concat(
+                        node.getOutputSymbols(),
+                        countOutputs,
+                        ImmutableList.of(rowNumberSymbol)))));
+
+    return new TranslationResult(projectNode, countOutputs, 
Optional.of(rowNumberSymbol));
+  }
+
+  /***
+   * only for transforming the intersection (all) node, add the window node 
and group node above the union node
+   */
+  private WindowNode appendCounts(
+      UnionNode union,
+      List<Symbol> originOutputSymbols,
+      List<Symbol> markers,
+      List<Symbol> countOutputs,
+      Symbol rowNumberSymbol) {
+
+    checkArgument(
+        markers.size() == countOutputs.size(),
+        "the size of markers should be same as the size of count output 
symbols");
+
+    // Add group node above the union node to assist partitioning, preparing 
for the window node
+    ImmutableMap.Builder<Symbol, SortOrder> sortOrderings = 
ImmutableMap.builder();
+    ImmutableList.Builder<Symbol> sortSymbolBuilder = ImmutableList.builder();
+    for (Symbol originalColumn : originOutputSymbols) {
+      sortSymbolBuilder.add(originalColumn);
+      sortOrderings.put(originalColumn, SortOrder.ASC_NULLS_LAST);
+    }
+    ImmutableList<Symbol> sortSymbols = sortSymbolBuilder.build();
+    GroupNode groupNode =
+        new GroupNode(
+            idAllocator.genPlanNodeId(),
+            union,
+            new OrderingScheme(sortSymbols, sortOrderings.build()),
+            sortSymbols.size());
+
+    // build the windowFunctions for count(marker) and row_number
+    ImmutableMap.Builder<Symbol, WindowNode.Function> windowFunctions = 
ImmutableMap.builder();
+    WindowNode.Frame windowFunctionFrame =
+        new WindowNode.Frame(
+            WindowFrame.Type.ROWS,
+            FrameBound.Type.UNBOUNDED_PRECEDING,
+            Optional.empty(),
+            Optional.empty(),
+            FrameBound.Type.UNBOUNDED_FOLLOWING,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+    ResolvedFunction countFunction =
+        getResolvedBuiltInAggregateFunction(
+            metadata, SqlConstant.COUNT, Collections.singletonList(BOOLEAN));
+    for (int i = 0; i < markers.size(); i++) {
+      windowFunctions.put(
+          countOutputs.get(i),
+          new WindowNode.Function(
+              countFunction,
+              ImmutableList.of(markers.get(i).toSymbolReference()),
+              windowFunctionFrame,
+              false));
+    }
+
+    List<Type> argumentTypes = ImmutableList.of();
+    ResolvedFunction rowNumberFunction =
+        new ResolvedFunction(
+            new BoundSignature(
+                SqlConstant.ROW_NUMBER,
+                metadata.getFunctionReturnType(SqlConstant.ROW_NUMBER, 
argumentTypes),
+                argumentTypes),
+            FunctionId.NOOP_FUNCTION_ID,
+            FunctionKind.WINDOW,
+            true,
+            
FunctionNullability.getAggregationFunctionNullability(argumentTypes.size()));
+
+    windowFunctions.put(
+        rowNumberSymbol,
+        new WindowNode.Function(rowNumberFunction, ImmutableList.of(), 
windowFunctionFrame, false));
+
+    // add the windowNode above the group node
+    return new WindowNode(
+        idAllocator.genPlanNodeId(),
+        groupNode,
+        new DataOrganizationSpecification(originOutputSymbols, 
Optional.empty()),
+        windowFunctions.buildOrThrow(),
+        Optional.empty(),
+        ImmutableSet.of(),
+        0);
+  }
+
+  /** get an array of markers, used for the new columns */
+  private List<Symbol> allocateSymbols(int count, String nameHint, Type type) {
+    ImmutableList.Builder<Symbol> symbolsBuilder = ImmutableList.builder();
+    for (int i = 0; i < count; i++) {
+      symbolsBuilder.add(symbolAllocator.newSymbol(nameHint, type));
+    }
+    return symbolsBuilder.build();
+  }
+
+  /** build new projection node with markers */
+  private List<PlanNode> appendMarkers(
+      List<Symbol> markers, List<PlanNode> children, SetOperationNode node) {
+    ImmutableList.Builder<PlanNode> projectionsWithMarker = 
ImmutableList.builder();
+
+    Map<Symbol, Collection<Symbol>> symbolMapping = 
node.getSymbolMapping().asMap();
+    for (int i = 0; i < children.size(); i++) {
+
+      // add the original symbols to projection node
+      Assignments.Builder assignments = Assignments.builder();
+      for (Symbol outputSymbol : node.getOutputSymbols()) {
+        Collection<Symbol> inputSymbols = symbolMapping.get(outputSymbol);
+        Symbol sourceSymbol = ((ImmutableList<Symbol>) inputSymbols).get(i);

Review Comment:
   Unsafe cast without explanation or validation. The code casts 
`Collection<Symbol>` to `ImmutableList<Symbol>` without checking if the 
collection is actually an ImmutableList. This could throw a 
`ClassCastException` at runtime if the collection is of a different type.
   
   Consider either:
   1. Using `ImmutableList.copyOf(inputSymbols).get(i)` to ensure type safety
   2. Converting to a List first: `new ArrayList<>(inputSymbols).get(i)`
   3. Using an iterator approach: `Iterables.get(inputSymbols, i)`
   4. Adding a comment explaining why this cast is safe in this context
   ```suggestion
           Symbol sourceSymbol = ImmutableList.copyOf(inputSymbols).get(i);
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FrameBound;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.concat;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class SetOperationNodeTranslator {
+
+  private static final String MARKER = "marker";
+  private static final String COUNT_MARKER = "count";
+  private static final String ROW_NUMBER_SYMBOL = "row_number";
+  private final SymbolAllocator symbolAllocator;
+  private final QueryId idAllocator;
+  private final Metadata metadata;
+
+  public SetOperationNodeTranslator(
+      Metadata metadata, SymbolAllocator symbolAllocator, QueryId idAllocator) 
{
+
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is 
null");
+    this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+  }
+
+  /** for intersect distinct and except distinct , use true and false for 
markers */
+  public TranslationResult makeSetContainmentPlanForDistinct(SetOperationNode 
node) {
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // 1. add the marker column to the origin planNode
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    // 2. add the union node over all new projection nodes.
+    // The outputs of the union must have the same name as the original 
intersect node
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+
+    // 3. add the aggregation node above the union node
+    List<Symbol> aggregationOutputs = allocateSymbols(markers.size(), COUNT, 
INT64);
+    AggregationNode aggregation =
+        computeCounts(union, node.getOutputSymbols(), markers, 
aggregationOutputs);
+
+    return new TranslationResult(aggregation, aggregationOutputs);
+  }
+
+  /** for intersect all and except all, use true and false for markers */
+  public TranslationResult makeSetContainmentPlanForAll(SetOperationNode node) 
{
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // for every child of SetOperation node, add the marker column for the 
child
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+    List<Symbol> countOutputs = allocateSymbols(markers.size(), COUNT_MARKER, 
INT64);
+    Symbol rowNumberSymbol = symbolAllocator.newSymbol(ROW_NUMBER_SYMBOL, 
INT64);
+    WindowNode windowNode =
+        appendCounts(union, node.getOutputSymbols(), markers, countOutputs, 
rowNumberSymbol);
+
+    ProjectNode projectNode =
+        new ProjectNode(
+            idAllocator.genPlanNodeId(),
+            windowNode,
+            Assignments.identity(
+                ImmutableList.copyOf(
+                    concat(
+                        node.getOutputSymbols(),
+                        countOutputs,
+                        ImmutableList.of(rowNumberSymbol)))));
+
+    return new TranslationResult(projectNode, countOutputs, 
Optional.of(rowNumberSymbol));
+  }
+
+  /***
+   * only for transforming the intersection (all) node, add the window node 
and group node above the union node
+   */
+  private WindowNode appendCounts(
+      UnionNode union,
+      List<Symbol> originOutputSymbols,
+      List<Symbol> markers,
+      List<Symbol> countOutputs,
+      Symbol rowNumberSymbol) {
+
+    checkArgument(
+        markers.size() == countOutputs.size(),
+        "the size of markers should be same as the size of count output 
symbols");
+
+    // Add group node above the union node to assist partitioning, preparing 
for the window node
+    ImmutableMap.Builder<Symbol, SortOrder> sortOrderings = 
ImmutableMap.builder();
+    ImmutableList.Builder<Symbol> sortSymbolBuilder = ImmutableList.builder();
+    for (Symbol originalColumn : originOutputSymbols) {
+      sortSymbolBuilder.add(originalColumn);
+      sortOrderings.put(originalColumn, SortOrder.ASC_NULLS_LAST);
+    }
+    ImmutableList<Symbol> sortSymbols = sortSymbolBuilder.build();
+    GroupNode groupNode =
+        new GroupNode(
+            idAllocator.genPlanNodeId(),
+            union,
+            new OrderingScheme(sortSymbols, sortOrderings.build()),
+            sortSymbols.size());
+
+    // build the windowFunctions for count(marker) and row_number
+    ImmutableMap.Builder<Symbol, WindowNode.Function> windowFunctions = 
ImmutableMap.builder();
+    WindowNode.Frame windowFunctionFrame =
+        new WindowNode.Frame(
+            WindowFrame.Type.ROWS,
+            FrameBound.Type.UNBOUNDED_PRECEDING,
+            Optional.empty(),
+            Optional.empty(),
+            FrameBound.Type.UNBOUNDED_FOLLOWING,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+    ResolvedFunction countFunction =
+        getResolvedBuiltInAggregateFunction(
+            metadata, SqlConstant.COUNT, Collections.singletonList(BOOLEAN));
+    for (int i = 0; i < markers.size(); i++) {
+      windowFunctions.put(
+          countOutputs.get(i),
+          new WindowNode.Function(
+              countFunction,
+              ImmutableList.of(markers.get(i).toSymbolReference()),
+              windowFunctionFrame,
+              false));
+    }
+
+    List<Type> argumentTypes = ImmutableList.of();
+    ResolvedFunction rowNumberFunction =
+        new ResolvedFunction(
+            new BoundSignature(
+                SqlConstant.ROW_NUMBER,
+                metadata.getFunctionReturnType(SqlConstant.ROW_NUMBER, 
argumentTypes),
+                argumentTypes),
+            FunctionId.NOOP_FUNCTION_ID,
+            FunctionKind.WINDOW,
+            true,
+            
FunctionNullability.getAggregationFunctionNullability(argumentTypes.size()));
+
+    windowFunctions.put(
+        rowNumberSymbol,
+        new WindowNode.Function(rowNumberFunction, ImmutableList.of(), 
windowFunctionFrame, false));
+
+    // add the windowNode above the group node
+    return new WindowNode(
+        idAllocator.genPlanNodeId(),
+        groupNode,
+        new DataOrganizationSpecification(originOutputSymbols, 
Optional.empty()),
+        windowFunctions.buildOrThrow(),
+        Optional.empty(),
+        ImmutableSet.of(),
+        0);
+  }
+
+  /** get an array of markers, used for the new columns */

Review Comment:
   Documentation inaccuracy. The comment says "get an array of markers" but the 
method returns a `List<Symbol>`, not an array. 
   
   Suggested correction: `/** Allocates a list of marker symbols for new 
columns */`
   ```suggestion
     /** Allocates a list of marker symbols for new columns */
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FrameBound;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.concat;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class SetOperationNodeTranslator {
+
+  private static final String MARKER = "marker";
+  private static final String COUNT_MARKER = "count";
+  private static final String ROW_NUMBER_SYMBOL = "row_number";
+  private final SymbolAllocator symbolAllocator;
+  private final QueryId idAllocator;
+  private final Metadata metadata;
+
+  public SetOperationNodeTranslator(
+      Metadata metadata, SymbolAllocator symbolAllocator, QueryId idAllocator) 
{
+
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is 
null");
+    this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+  }
+
+  /** for intersect distinct and except distinct , use true and false for 
markers */
+  public TranslationResult makeSetContainmentPlanForDistinct(SetOperationNode 
node) {
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // 1. add the marker column to the origin planNode
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    // 2. add the union node over all new projection nodes.
+    // The outputs of the union must have the same name as the original 
intersect node
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+
+    // 3. add the aggregation node above the union node
+    List<Symbol> aggregationOutputs = allocateSymbols(markers.size(), COUNT, 
INT64);
+    AggregationNode aggregation =
+        computeCounts(union, node.getOutputSymbols(), markers, 
aggregationOutputs);
+
+    return new TranslationResult(aggregation, aggregationOutputs);
+  }
+
+  /** for intersect all and except all, use true and false for markers */
+  public TranslationResult makeSetContainmentPlanForAll(SetOperationNode node) 
{
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // for every child of SetOperation node, add the marker column for the 
child
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+    List<Symbol> countOutputs = allocateSymbols(markers.size(), COUNT_MARKER, 
INT64);
+    Symbol rowNumberSymbol = symbolAllocator.newSymbol(ROW_NUMBER_SYMBOL, 
INT64);
+    WindowNode windowNode =
+        appendCounts(union, node.getOutputSymbols(), markers, countOutputs, 
rowNumberSymbol);
+
+    ProjectNode projectNode =
+        new ProjectNode(
+            idAllocator.genPlanNodeId(),
+            windowNode,
+            Assignments.identity(
+                ImmutableList.copyOf(
+                    concat(
+                        node.getOutputSymbols(),
+                        countOutputs,
+                        ImmutableList.of(rowNumberSymbol)))));
+
+    return new TranslationResult(projectNode, countOutputs, 
Optional.of(rowNumberSymbol));
+  }
+
+  /***

Review Comment:
   Non-standard Javadoc comment. Use standard Javadoc `/**` instead of `/***` 
(three asterisks). The extra asterisk is not standard and may cause 
documentation generation tools to misinterpret the comment.
   ```suggestion
     /**
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/IntersectNode.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.collect.ListMultimap;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class IntersectNode extends SetOperationNode {
+
+  private final boolean distinct;
+
+  public IntersectNode(
+      PlanNodeId id,
+      List<PlanNode> children,
+      ListMultimap<Symbol, Symbol> outputToInputs,
+      List<Symbol> outputs,
+      boolean distinct) {
+    super(id, children, outputToInputs, outputs);
+    this.distinct = distinct;
+  }
+
+  private IntersectNode(
+      PlanNodeId id,
+      ListMultimap<Symbol, Symbol> outputToInputs,
+      List<Symbol> outputs,
+      boolean distinct) {
+    super(id, outputToInputs, outputs);
+    this.distinct = distinct;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitIntersect(this, context);
+  }
+
+  public boolean isDistinct() {
+    return distinct;
+  }
+
+  @Override
+  public PlanNode clone() {
+    return new IntersectNode(getPlanNodeId(), getSymbolMapping(), 
getOutputSymbols(), distinct);
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    throw new UnsupportedOperationException(
+        "IntersectNode should never be serialized in current version");
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    throw new UnsupportedOperationException(
+        "IntersectNode should never be serialized in current version");
+  }
+
+  public static IntersectNode deserialize(ByteBuffer byteBuffer) {
+    throw new UnsupportedOperationException(
+        "IntersectNode should never be deserialized in current version");
+  }
+

Review Comment:
   This method overrides [PlanNode.replaceChildren](1); it is advisable to add 
an Override annotation.
   ```suggestion
   
     @Override
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FrameBound;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.concat;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class SetOperationNodeTranslator {
+
+  private static final String MARKER = "marker";
+  private static final String COUNT_MARKER = "count";
+  private static final String ROW_NUMBER_SYMBOL = "row_number";
+  private final SymbolAllocator symbolAllocator;
+  private final QueryId idAllocator;
+  private final Metadata metadata;
+
+  public SetOperationNodeTranslator(
+      Metadata metadata, SymbolAllocator symbolAllocator, QueryId idAllocator) 
{
+
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is 
null");
+    this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+  }
+
+  /** for intersect distinct and except distinct , use true and false for 
markers */
+  public TranslationResult makeSetContainmentPlanForDistinct(SetOperationNode 
node) {
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // 1. add the marker column to the origin planNode
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    // 2. add the union node over all new projection nodes.
+    // The outputs of the union must have the same name as the original 
intersect node
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+
+    // 3. add the aggregation node above the union node
+    List<Symbol> aggregationOutputs = allocateSymbols(markers.size(), COUNT, 
INT64);
+    AggregationNode aggregation =
+        computeCounts(union, node.getOutputSymbols(), markers, 
aggregationOutputs);
+
+    return new TranslationResult(aggregation, aggregationOutputs);
+  }
+
+  /** for intersect all and except all, use true and false for markers */
+  public TranslationResult makeSetContainmentPlanForAll(SetOperationNode node) 
{
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // for every child of SetOperation node, add the marker column for the 
child
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+    List<Symbol> countOutputs = allocateSymbols(markers.size(), COUNT_MARKER, 
INT64);
+    Symbol rowNumberSymbol = symbolAllocator.newSymbol(ROW_NUMBER_SYMBOL, 
INT64);
+    WindowNode windowNode =
+        appendCounts(union, node.getOutputSymbols(), markers, countOutputs, 
rowNumberSymbol);
+
+    ProjectNode projectNode =
+        new ProjectNode(
+            idAllocator.genPlanNodeId(),
+            windowNode,
+            Assignments.identity(
+                ImmutableList.copyOf(
+                    concat(
+                        node.getOutputSymbols(),
+                        countOutputs,
+                        ImmutableList.of(rowNumberSymbol)))));
+
+    return new TranslationResult(projectNode, countOutputs, 
Optional.of(rowNumberSymbol));
+  }
+
+  /***
+   * only for transforming the intersection (all) node, add the window node 
and group node above the union node
+   */
+  private WindowNode appendCounts(
+      UnionNode union,
+      List<Symbol> originOutputSymbols,
+      List<Symbol> markers,
+      List<Symbol> countOutputs,
+      Symbol rowNumberSymbol) {
+
+    checkArgument(
+        markers.size() == countOutputs.size(),
+        "the size of markers should be same as the size of count output 
symbols");
+
+    // Add group node above the union node to assist partitioning, preparing 
for the window node
+    ImmutableMap.Builder<Symbol, SortOrder> sortOrderings = 
ImmutableMap.builder();
+    ImmutableList.Builder<Symbol> sortSymbolBuilder = ImmutableList.builder();
+    for (Symbol originalColumn : originOutputSymbols) {
+      sortSymbolBuilder.add(originalColumn);
+      sortOrderings.put(originalColumn, SortOrder.ASC_NULLS_LAST);
+    }
+    ImmutableList<Symbol> sortSymbols = sortSymbolBuilder.build();
+    GroupNode groupNode =
+        new GroupNode(
+            idAllocator.genPlanNodeId(),
+            union,
+            new OrderingScheme(sortSymbols, sortOrderings.build()),
+            sortSymbols.size());
+
+    // build the windowFunctions for count(marker) and row_number
+    ImmutableMap.Builder<Symbol, WindowNode.Function> windowFunctions = 
ImmutableMap.builder();
+    WindowNode.Frame windowFunctionFrame =
+        new WindowNode.Frame(
+            WindowFrame.Type.ROWS,
+            FrameBound.Type.UNBOUNDED_PRECEDING,
+            Optional.empty(),
+            Optional.empty(),
+            FrameBound.Type.UNBOUNDED_FOLLOWING,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+    ResolvedFunction countFunction =
+        getResolvedBuiltInAggregateFunction(
+            metadata, SqlConstant.COUNT, Collections.singletonList(BOOLEAN));
+    for (int i = 0; i < markers.size(); i++) {
+      windowFunctions.put(
+          countOutputs.get(i),
+          new WindowNode.Function(
+              countFunction,
+              ImmutableList.of(markers.get(i).toSymbolReference()),
+              windowFunctionFrame,
+              false));
+    }
+
+    List<Type> argumentTypes = ImmutableList.of();
+    ResolvedFunction rowNumberFunction =
+        new ResolvedFunction(
+            new BoundSignature(
+                SqlConstant.ROW_NUMBER,
+                metadata.getFunctionReturnType(SqlConstant.ROW_NUMBER, 
argumentTypes),
+                argumentTypes),
+            FunctionId.NOOP_FUNCTION_ID,
+            FunctionKind.WINDOW,
+            true,
+            
FunctionNullability.getAggregationFunctionNullability(argumentTypes.size()));
+
+    windowFunctions.put(
+        rowNumberSymbol,
+        new WindowNode.Function(rowNumberFunction, ImmutableList.of(), 
windowFunctionFrame, false));
+
+    // add the windowNode above the group node
+    return new WindowNode(
+        idAllocator.genPlanNodeId(),
+        groupNode,
+        new DataOrganizationSpecification(originOutputSymbols, 
Optional.empty()),
+        windowFunctions.buildOrThrow(),
+        Optional.empty(),
+        ImmutableSet.of(),
+        0);
+  }
+
+  /** get an array of markers, used for the new columns */
+  private List<Symbol> allocateSymbols(int count, String nameHint, Type type) {
+    ImmutableList.Builder<Symbol> symbolsBuilder = ImmutableList.builder();
+    for (int i = 0; i < count; i++) {
+      symbolsBuilder.add(symbolAllocator.newSymbol(nameHint, type));
+    }
+    return symbolsBuilder.build();
+  }
+
+  /** build new projection node with markers */

Review Comment:
   Documentation could be more descriptive. The comment "build new projection 
node with markers" should be more specific about what this method does and its 
purpose in the INTERSECT/EXCEPT implementation.
   
   Suggested: `/** Builds projection nodes with marker columns for each child 
of the set operation. Each child gets TRUE for its own marker and NULL (cast to 
BOOLEAN) for others. */`
   ```suggestion
     /**
      * Builds projection nodes with marker columns for each child of the set 
operation.
      * Each child gets TRUE for its own marker and NULL (cast to BOOLEAN) for 
others.
      * This is used in the implementation of INTERSECT and EXCEPT set 
operations.
      */
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java:
##########
@@ -275,6 +277,17 @@ public LogicalOptimizeFactory(PlannerContext 
plannerContext) {
                         // new MergeExcept
                         new PruneDistinctAggregation()))
                 .build()),
+        new IterativeOptimizer(
+            plannerContext,
+            ruleStats,
+            ImmutableSet.<Rule<?>>builder()
+                .add(
+                    new ImplementIntersectDistinctAsUnion(metadata),
+                    // new ImplementExceptDistinctAsUnion(metadata)
+                    new ImplementIntersectAll(metadata)
+                    // new ImplementExceptAll(metadata))),
+                    )

Review Comment:
   Missing closing parenthesis in the `.add()` call. The method call starts at 
line 284 with `.add(` but the closing parenthesis on line 289 is for the 
`.build()` method, not for `.add()`. This will cause a compilation error.
   
   Should be:
   ```java
   .add(
       new ImplementIntersectDistinctAsUnion(metadata),
       // new ImplementExceptDistinctAsUnion(metadata)
       new ImplementIntersectAll(metadata)
       // new ImplementExceptAll(metadata)
   )
   ```
   ```suggestion
                       // new ImplementExceptAll(metadata)
                   )
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/SetOperationNodeTranslator.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.function.BoundSignature;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
+import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.FunctionNullability;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.ResolvedFunction;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Assignments;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SetOperationNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.UnionNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.FrameBound;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.read.common.type.Type;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.concat;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Util.getResolvedBuiltInAggregateFunction;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static org.apache.iotdb.db.utils.constant.SqlConstant.COUNT;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class SetOperationNodeTranslator {
+
+  private static final String MARKER = "marker";
+  private static final String COUNT_MARKER = "count";
+  private static final String ROW_NUMBER_SYMBOL = "row_number";
+  private final SymbolAllocator symbolAllocator;
+  private final QueryId idAllocator;
+  private final Metadata metadata;
+
+  public SetOperationNodeTranslator(
+      Metadata metadata, SymbolAllocator symbolAllocator, QueryId idAllocator) 
{
+
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.symbolAllocator = requireNonNull(symbolAllocator, "symbolAllocator is 
null");
+    this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
+  }
+
+  /** for intersect distinct and except distinct , use true and false for 
markers */
+  public TranslationResult makeSetContainmentPlanForDistinct(SetOperationNode 
node) {
+
+    checkArgument(!(node instanceof UnionNode), "Cannot simplify a UnionNode");
+    List<Symbol> markers = allocateSymbols(node.getChildren().size(), MARKER, 
BOOLEAN);
+
+    // 1. add the marker column to the origin planNode
+    List<PlanNode> projectNodesWithMarkers = appendMarkers(markers, 
node.getChildren(), node);
+
+    // 2. add the union node over all new projection nodes.
+    // The outputs of the union must have the same name as the original 
intersect node
+    UnionNode union =
+        union(
+            projectNodesWithMarkers,
+            ImmutableList.copyOf(concat(node.getOutputSymbols(), markers)));
+
+    // 3. add the aggregation node above the union node
+    List<Symbol> aggregationOutputs = allocateSymbols(markers.size(), COUNT, 
INT64);
+    AggregationNode aggregation =
+        computeCounts(union, node.getOutputSymbols(), markers, 
aggregationOutputs);
+
+    return new TranslationResult(aggregation, aggregationOutputs);
+  }
+
+  /** for intersect all and except all, use true and false for markers */

Review Comment:
   Inconsistent comment formatting. Comments on lines 90 and 114 use different 
phrasing but describe similar functionality. For consistency and clarity, use 
parallel structure.
   
   Consider:
   - Line 90: `/** For intersect distinct and except distinct, use true and 
false for markers */`
   - Line 114: `/** For intersect all and except all, use true and false for 
markers */`
   
   Or be more specific about what "use true and false for markers" means in 
each context.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java:
##########
@@ -275,6 +277,17 @@ public LogicalOptimizeFactory(PlannerContext 
plannerContext) {
                         // new MergeExcept
                         new PruneDistinctAggregation()))
                 .build()),
+        new IterativeOptimizer(
+            plannerContext,
+            ruleStats,
+            ImmutableSet.<Rule<?>>builder()
+                .add(
+                    new ImplementIntersectDistinctAsUnion(metadata),
+                    // new ImplementExceptDistinctAsUnion(metadata)
+                    new ImplementIntersectAll(metadata)
+                    // new ImplementExceptAll(metadata))),
+                    )
+                .build()),

Review Comment:
   Inconsistent commenting style. Lines 286 and 288 have commented-out code 
with single-line comments, but they're part of an active `.add()` call. This 
creates confusion about whether the comments are placeholders for future work 
or if the closing structure is intentional.
   
   Consider:
   1. Moving the commented code outside the `.add()` call for clarity
   2. Adding a TODO comment explaining these are planned features
   3. Ensuring the structure is clear after fixing the missing closing 
parenthesis issue
   ```suggestion
                       new ImplementIntersectAll(metadata)
                   )
                   .build()),
           // TODO: Add support for ImplementExceptDistinctAsUnion(metadata) 
and ImplementExceptAll(metadata) in the future.
   ```



##########
integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBIntersectTableIT.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableAssertTestFail;
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBIntersectTableIT {
+  protected static final String DATABASE_NAME = "test";
+  protected static final String[] createSqls =
+      new String[] {
+        "CREATE DATABASE " + DATABASE_NAME,
+        "USE " + DATABASE_NAME,
+        // table1:  ('d1', 1, 1) * 2, ('d1', 2, 2) *1
+        "create table table1(device STRING TAG, s1 INT32 FIELD, s2 INT32 
FIELD)",
+        "insert into table1 values (1, 'd1', 1, 1)",
+        "insert into table1 values (2, 'd1', 1, 1)",
+        "insert into table1 values (3, 'd1', 2, 2)",
+        // table2: ('d1', 1, 1.0) * 3, ('d1', 3, 3.0) *1
+        "create table table2(device STRING TAG, s1 INT64 FIELD, s2 DOUBLE 
FIELD)",
+        "insert into table2 values (1, 'd1', 1, 1.0)",
+        "insert into table2 values (2, 'd1', 1, 1.0)",
+        "insert into table2 values (3, 'd1', 1, 1.0)",
+        "insert into table2 values (4, 'd1', 3, 3.0)",
+        // table3: use for testing alias
+        "create table table3(device STRING TAG, s1_testName INT64 FIELD, 
s2_testName DOUBLE FIELD)",
+        "insert into table3 values (1, 'd1', 1, 1.0)",
+        "insert into table3 values (2, 'd1', 1, 1.0)",
+        "insert into table3 values (3, 'd1', 1, 1.0)",
+        "insert into table3 values (4, 'd1', 3, 3.0)",
+        // table4: test type compatible
+        "create table table4(device STRING TAG, s1 TEXT FIELD, s2 DOUBLE 
FIELD)"
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareTableData(createSqls);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void normalTest() {
+    String[] expectedHeader = new String[] {"device", "s1", "s2"};
+
+    // --- INTERSECT (DISTINCT) ---
+    // table1 and table2, expected one tuple : ('d1', 1, 1.0)
+    String[] retArray =
+        new String[] {
+          "d1,1,1.0,",
+        };
+    tableResultSetEqualTest(
+        "select device, s1, s2 from table1 intersect select device, s1, s2 
from table2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+    tableResultSetEqualTest(
+        "select device, s1, s2 from table1 intersect distinct select device, 
s1, s2 from table2",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    // --- INTERSECT ALL ---
+    // (1, 1.0) show twice in table1, show three times in table2

Review Comment:
   Grammatical error in comment. "show twice" and "show three times" should use 
"shows" (third person singular) or "appears" for better grammar.
   
   Corrected: `// (1, 1.0) shows twice in table1, shows three times in table2`
   or: `// (1, 1.0) appears twice in table1, appears three times in table2`
   ```suggestion
       // (1, 1.0) shows twice in table1, shows three times in table2
   ```



##########
iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4:
##########
@@ -930,8 +930,9 @@ rowCount
     ;
 
 queryTerm
-    : queryPrimary                                                             
                   #queryTermDefault
-    | left=queryTerm operator=(INTERSECT | UNION | EXCEPT) setQuantifier? 
right=queryTerm         #setOperation
+    : queryPrimary                                                             
    #queryTermDefault
+    | left=queryTerm operator=INTERSECT setQuantifier? right=queryTerm         
    #setOperation
+    | left=queryTerm operator=(UNION | EXCEPT) setQuantifier? right=queryTerm  
    #setOperation

Review Comment:
   The grammar may not correctly implement INTERSECT precedence. In standard 
SQL, INTERSECT has higher precedence than UNION and EXCEPT. The current grammar 
structure with both alternatives at the same level and left-associativity means 
that `A UNION B INTERSECT C` will parse as `(A UNION B) INTERSECT C`, not as `A 
UNION (B INTERSECT C)` which is the standard SQL behavior.
   
   To correctly implement precedence, consider restructuring the grammar with a 
separate rule level:
   ```
   queryTerm
       : intersectTerm ((UNION | EXCEPT) setQuantifier? intersectTerm)*
       ;
   
   intersectTerm
       : queryPrimary (INTERSECT setQuantifier? queryPrimary)*
       ;
   ```
   
   Or ensure the test expectations in `setOperationPriority()` match the actual 
behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to