Sh-Zh-7 commented on code in PR #15151:
URL: https://github.com/apache/iotdb/pull/15151#discussion_r2086473231


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java:
##########
@@ -275,6 +302,393 @@ public RelationPlan plan(QuerySpecification node) {
         builder.getRoot(), analysis.getScope(node), computeOutputs(builder, 
outputs), outerContext);
   }
 
+  private PlanBuilder planWindowFunctions(
+      Node node, PlanBuilder subPlan, List<FunctionCall> windowFunctions) {
+    if (windowFunctions.isEmpty()) {
+      return subPlan;
+    }
+
+    Map<Analysis.ResolvedWindow, List<FunctionCall>> functions =
+        scopeAwareDistinct(subPlan, windowFunctions).stream()
+            .collect(Collectors.groupingBy(analysis::getWindow));
+
+    for (Map.Entry<Analysis.ResolvedWindow, List<FunctionCall>> entry : 
functions.entrySet()) {
+      Analysis.ResolvedWindow window = entry.getKey();
+      List<FunctionCall> functionCalls = entry.getValue();
+
+      // Pre-project inputs.
+      // Predefined window parts (specified in WINDOW clause) can only use 
source symbols, and no
+      // output symbols.
+      // It matters in case when this window planning takes place in ORDER BY 
clause, where both
+      // source and output
+      // symbols are visible.
+      // This issue is solved by analyzing window definitions in the source 
scope. After analysis,
+      // the expressions
+      // are recorded as belonging to the source scope, and consequentially 
source symbols will be
+      // used to plan them.
+      ImmutableList.Builder<Expression> inputsBuilder =
+          ImmutableList.<Expression>builder()
+              .addAll(window.getPartitionBy())
+              .addAll(
+                  getSortItemsFromOrderBy(window.getOrderBy()).stream()
+                      .map(SortItem::getSortKey)
+                      .iterator());
+
+      if (window.getFrame().isPresent()) {
+        WindowFrame frame = window.getFrame().get();
+        frame.getStart().getValue().ifPresent(inputsBuilder::add);
+
+        if (frame.getEnd().isPresent()) {
+          frame.getEnd().get().getValue().ifPresent(inputsBuilder::add);
+        }
+      }
+
+      for (FunctionCall windowFunction : functionCalls) {
+        inputsBuilder.addAll(new ArrayList<>(windowFunction.getArguments()));
+      }
+
+      List<Expression> inputs = inputsBuilder.build();
+
+      subPlan = subqueryPlanner.handleSubqueries(subPlan, inputs, 
analysis.getSubqueries(node));
+      subPlan = subPlan.appendProjections(inputs, symbolAllocator, 
queryContext);
+
+      // Add projection to coerce inputs to their site-specific types.
+      // This is important because the same lexical expression may need to be 
coerced
+      // in different ways if it's referenced by multiple arguments to the 
window function.
+      // For example, given v::integer,
+      //    avg(v) OVER (ORDER BY v)
+      // Needs to be rewritten as
+      //    avg(CAST(v AS double)) OVER (ORDER BY v)
+      PlanAndMappings coercions = coerce(subPlan, inputs, analysis, 
idAllocator, symbolAllocator);
+      subPlan = coercions.getSubPlan();
+
+      // For frame of type RANGE, append casts and functions necessary for 
frame bound calculations
+      Optional<Symbol> frameStart = Optional.empty();
+      Optional<Symbol> frameEnd = Optional.empty();
+      Optional<Symbol> sortKeyCoercedForFrameStartComparison = 
Optional.empty();
+      Optional<Symbol> sortKeyCoercedForFrameEndComparison = Optional.empty();
+
+      if (window.getFrame().isPresent() && window.getFrame().get().getType() 
== RANGE) {
+        Optional<Expression> startValue = 
window.getFrame().get().getStart().getValue();
+        Optional<Expression> endValue =
+            window.getFrame().get().getEnd().flatMap(FrameBound::getValue);
+        // record sortKey coercions for reuse
+        Map<Type, Symbol> sortKeyCoercions = new HashMap<>();
+
+        // process frame start
+        FrameBoundPlanAndSymbols plan =
+            planFrameBound(subPlan, coercions, startValue, window, 
sortKeyCoercions);
+        subPlan = plan.getSubPlan();
+        frameStart = plan.getFrameBoundSymbol();
+        sortKeyCoercedForFrameStartComparison = 
plan.getSortKeyCoercedForFrameBoundComparison();
+
+        // process frame end
+        plan = planFrameBound(subPlan, coercions, endValue, window, 
sortKeyCoercions);
+        subPlan = plan.getSubPlan();
+        frameEnd = plan.getFrameBoundSymbol();
+        sortKeyCoercedForFrameEndComparison = 
plan.getSortKeyCoercedForFrameBoundComparison();
+      } else if (window.getFrame().isPresent()
+          && (window.getFrame().get().getType() == ROWS
+              || window.getFrame().get().getType() == GROUPS)) {
+        Optional<Expression> startValue = 
window.getFrame().get().getStart().getValue();
+        Optional<Expression> endValue =
+            window.getFrame().get().getEnd().flatMap(FrameBound::getValue);
+
+        // process frame start
+        FrameOffsetPlanAndSymbol plan = planFrameOffset(subPlan, 
startValue.map(coercions::get));
+        subPlan = plan.getSubPlan();
+        frameStart = plan.getFrameOffsetSymbol();
+
+        // process frame end
+        plan = planFrameOffset(subPlan, endValue.map(coercions::get));
+        subPlan = plan.getSubPlan();
+        frameEnd = plan.getFrameOffsetSymbol();
+      } else if (window.getFrame().isPresent()) {
+        throw new IllegalArgumentException(
+            "unexpected window frame type: " + 
window.getFrame().get().getType());
+      }
+
+      subPlan =
+          planWindow(
+              subPlan,
+              functionCalls,
+              window,
+              coercions,
+              frameStart,
+              sortKeyCoercedForFrameStartComparison,
+              frameEnd,
+              sortKeyCoercedForFrameEndComparison);
+    }
+
+    return subPlan;
+  }
+
+  private PlanBuilder planWindow(
+      PlanBuilder subPlan,
+      List<FunctionCall> windowFunctions,
+      Analysis.ResolvedWindow window,
+      PlanAndMappings coercions,
+      Optional<Symbol> frameStartSymbol,
+      Optional<Symbol> sortKeyCoercedForFrameStartComparison,
+      Optional<Symbol> frameEndSymbol,
+      Optional<Symbol> sortKeyCoercedForFrameEndComparison) {
+    WindowFrame.Type frameType = WindowFrame.Type.RANGE;
+    FrameBound.Type frameStartType = FrameBound.Type.UNBOUNDED_PRECEDING;
+    FrameBound.Type frameEndType = FrameBound.Type.CURRENT_ROW;
+
+    Optional<Expression> frameStartExpression = Optional.empty();
+    Optional<Expression> frameEndExpression = Optional.empty();
+
+    if (window.getFrame().isPresent()) {
+      WindowFrame frame = window.getFrame().get();
+      frameType = frame.getType();
+
+      frameStartType = frame.getStart().getType();
+      frameStartExpression = frame.getStart().getValue();
+
+      if (frame.getEnd().isPresent()) {
+        frameEndType = frame.getEnd().get().getType();
+        frameEndExpression = frame.getEnd().get().getValue();
+      }
+    }
+
+    DataOrganizationSpecification specification =
+        planWindowSpecification(window.getPartitionBy(), window.getOrderBy(), 
coercions::get);
+
+    // Rewrite frame bounds in terms of pre-projected inputs
+    WindowNode.Frame frame =
+        new WindowNode.Frame(
+            frameType,
+            frameStartType,
+            frameStartSymbol,
+            sortKeyCoercedForFrameStartComparison,
+            frameEndType,
+            frameEndSymbol,
+            sortKeyCoercedForFrameEndComparison,
+            frameStartExpression,
+            frameEndExpression);
+
+    ImmutableMap.Builder<ScopeAware<Expression>, Symbol> mappings = 
ImmutableMap.builder();
+    ImmutableMap.Builder<Symbol, WindowNode.Function> functions = 
ImmutableMap.builder();
+
+    for (FunctionCall windowFunction : windowFunctions) {
+      Symbol newSymbol =
+          symbolAllocator.newSymbol(windowFunction, 
analysis.getType(windowFunction));
+
+      FunctionCall.NullTreatment nullTreatment =
+          
windowFunction.getNullTreatment().orElse(FunctionCall.NullTreatment.RESPECT);
+
+      WindowNode.Function function =
+          new WindowNode.Function(
+              analysis.getResolvedFunction(windowFunction),
+              windowFunction.getArguments().stream()
+                  .map(argument -> coercions.get(argument).toSymbolReference())
+                  .collect(toImmutableList()),
+              frame,
+              // TODO: remove ignore null

Review Comment:
   Looks like we already support `IGNORE NULLS` keyword. So this TODO is 
meanningless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to