JackieTien97 commented on code in PR #15151:
URL: https://github.com/apache/iotdb/pull/15151#discussion_r2081162263
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java:
##########
@@ -170,6 +170,7 @@ private TsBlock getFilterTsBlock(TsBlock input) {
if (!hasNonMappableUDF) {
// get result of calculated common sub expressions
for (ColumnTransformer columnTransformer : commonTransformerList) {
+ columnTransformer.tryEvaluate();
Review Comment:
Why add this new line? Are there any bugs before? If so, you may need to add
IT about it.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LagFunction.java:
##########
@@ -63,13 +74,44 @@ public void transform(
} else {
builder.appendNull();
}
- } else if (defaultVal != null) {
- builder.writeObject(defaultVal);
+ } else if (defaultValChannel >= 0) {
+ writeDefaultValue(partition, defaultValChannel, index, builder);
} else {
builder.appendNull();
}
}
+ private void writeDefaultValue(
+ Partition partition, int defaultValChannel, int index, ColumnBuilder
builder) {
+ TSDataType dataType = builder.getDataType();
+ switch (dataType) {
+ case INT32:
+ case DATE:
+ builder.writeInt(partition.getInt(defaultValChannel, index));
+ return;
+ case INT64:
+ case TIMESTAMP:
+ builder.writeLong(partition.getLong(defaultValChannel, index));
+ return;
+ case FLOAT:
+ builder.writeFloat(partition.getFloat(defaultValChannel, index));
+ return;
+ case DOUBLE:
+ builder.writeDouble(partition.getDouble(defaultValChannel, index));
+ return;
+ case BOOLEAN:
+ builder.writeBoolean(partition.getBoolean(defaultValChannel, index));
+ return;
+ case TEXT:
+ case STRING:
Review Comment:
What about `Blob`?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LeadFunction.java:
##########
@@ -65,13 +75,44 @@ public void transform(
} else {
builder.appendNull();
}
- } else if (defaultVal != null) {
- builder.writeObject(defaultVal);
+ } else if (defaultValChannel >= 0) {
+ writeDefaultValue(partition, defaultValChannel, index, builder);
} else {
builder.appendNull();
}
}
+ private void writeDefaultValue(
+ Partition partition, int defaultValChannel, int index, ColumnBuilder
builder) {
+ TSDataType dataType = builder.getDataType();
+ switch (dataType) {
+ case INT32:
+ case DATE:
+ builder.writeInt(partition.getInt(defaultValChannel, index));
+ return;
+ case INT64:
+ case TIMESTAMP:
+ builder.writeLong(partition.getLong(defaultValChannel, index));
+ return;
+ case FLOAT:
+ builder.writeFloat(partition.getFloat(defaultValChannel, index));
+ return;
+ case DOUBLE:
+ builder.writeDouble(partition.getDouble(defaultValChannel, index));
+ return;
+ case BOOLEAN:
+ builder.writeBoolean(partition.getBoolean(defaultValChannel, index));
+ return;
+ case TEXT:
+ case STRING:
Review Comment:
What about `BLOB`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java:
##########
@@ -94,10 +103,19 @@
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.groupingSets;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleAggregation;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.singleGroupingSet;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame.Type.GROUPS;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame.Type.RANGE;
+import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WindowFrame.Type.ROWS;
+import static
org.apache.iotdb.db.queryengine.plan.relational.type.TypeSignatureTranslator.toSqlType;
+import static
org.apache.iotdb.db.queryengine.plan.relational.utils.NodeUtils.getSortItemsFromOrderBy;
+import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
public class QueryPlanner {
private final Analysis analysis;
private final SymbolAllocator symbolAllocator;
+ private final QueryId idAllocator;
Review Comment:
why not using `queryIdAllocator`?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java:
##########
@@ -275,6 +302,393 @@ public RelationPlan plan(QuerySpecification node) {
builder.getRoot(), analysis.getScope(node), computeOutputs(builder,
outputs), outerContext);
}
+ private PlanBuilder planWindowFunctions(
+ Node node, PlanBuilder subPlan, List<FunctionCall> windowFunctions) {
+ if (windowFunctions.isEmpty()) {
+ return subPlan;
+ }
+
+ Map<Analysis.ResolvedWindow, List<FunctionCall>> functions =
+ scopeAwareDistinct(subPlan, windowFunctions).stream()
+ .collect(Collectors.groupingBy(analysis::getWindow));
+
+ for (Map.Entry<Analysis.ResolvedWindow, List<FunctionCall>> entry :
functions.entrySet()) {
+ Analysis.ResolvedWindow window = entry.getKey();
+ List<FunctionCall> functionCalls = entry.getValue();
+
+ // Pre-project inputs.
+ // Predefined window parts (specified in WINDOW clause) can only use
source symbols, and no
+ // output symbols.
+ // It matters in case when this window planning takes place in ORDER BY
clause, where both
+ // source and output
+ // symbols are visible.
+ // This issue is solved by analyzing window definitions in the source
scope. After analysis,
+ // the expressions
+ // are recorded as belonging to the source scope, and consequentially
source symbols will be
+ // used to plan them.
+ ImmutableList.Builder<Expression> inputsBuilder =
+ ImmutableList.<Expression>builder()
+ .addAll(window.getPartitionBy())
+ .addAll(
+ getSortItemsFromOrderBy(window.getOrderBy()).stream()
+ .map(SortItem::getSortKey)
+ .iterator());
+
+ if (window.getFrame().isPresent()) {
+ WindowFrame frame = window.getFrame().get();
+ frame.getStart().getValue().ifPresent(inputsBuilder::add);
+
+ if (frame.getEnd().isPresent()) {
+ frame.getEnd().get().getValue().ifPresent(inputsBuilder::add);
+ }
+ }
+
+ for (FunctionCall windowFunction : functionCalls) {
+ inputsBuilder.addAll(new ArrayList<>(windowFunction.getArguments()));
+ }
+
+ List<Expression> inputs = inputsBuilder.build();
+
+ subPlan = subqueryPlanner.handleSubqueries(subPlan, inputs,
analysis.getSubqueries(node));
+ subPlan = subPlan.appendProjections(inputs, symbolAllocator,
queryContext);
+
+ // Add projection to coerce inputs to their site-specific types.
+ // This is important because the same lexical expression may need to be
coerced
+ // in different ways if it's referenced by multiple arguments to the
window function.
+ // For example, given v::integer,
+ // avg(v) OVER (ORDER BY v)
+ // Needs to be rewritten as
+ // avg(CAST(v AS double)) OVER (ORDER BY v)
+ PlanAndMappings coercions = coerce(subPlan, inputs, analysis,
idAllocator, symbolAllocator);
+ subPlan = coercions.getSubPlan();
+
+ // For frame of type RANGE, append casts and functions necessary for
frame bound calculations
+ Optional<Symbol> frameStart = Optional.empty();
+ Optional<Symbol> frameEnd = Optional.empty();
+ Optional<Symbol> sortKeyCoercedForFrameStartComparison =
Optional.empty();
+ Optional<Symbol> sortKeyCoercedForFrameEndComparison = Optional.empty();
+
+ if (window.getFrame().isPresent() && window.getFrame().get().getType()
== RANGE) {
+ Optional<Expression> startValue =
window.getFrame().get().getStart().getValue();
+ Optional<Expression> endValue =
+ window.getFrame().get().getEnd().flatMap(FrameBound::getValue);
+ // record sortKey coercions for reuse
+ Map<Type, Symbol> sortKeyCoercions = new HashMap<>();
+
+ // process frame start
+ FrameBoundPlanAndSymbols plan =
+ planFrameBound(subPlan, coercions, startValue, window,
sortKeyCoercions);
+ subPlan = plan.getSubPlan();
+ frameStart = plan.getFrameBoundSymbol();
+ sortKeyCoercedForFrameStartComparison =
plan.getSortKeyCoercedForFrameBoundComparison();
+
+ // process frame end
+ plan = planFrameBound(subPlan, coercions, endValue, window,
sortKeyCoercions);
+ subPlan = plan.getSubPlan();
+ frameEnd = plan.getFrameBoundSymbol();
+ sortKeyCoercedForFrameEndComparison =
plan.getSortKeyCoercedForFrameBoundComparison();
+ } else if (window.getFrame().isPresent()
+ && (window.getFrame().get().getType() == ROWS
+ || window.getFrame().get().getType() == GROUPS)) {
+ Optional<Expression> startValue =
window.getFrame().get().getStart().getValue();
+ Optional<Expression> endValue =
+ window.getFrame().get().getEnd().flatMap(FrameBound::getValue);
+
+ // process frame start
+ FrameOffsetPlanAndSymbol plan = planFrameOffset(subPlan,
startValue.map(coercions::get));
+ subPlan = plan.getSubPlan();
+ frameStart = plan.getFrameOffsetSymbol();
+
+ // process frame end
+ plan = planFrameOffset(subPlan, endValue.map(coercions::get));
+ subPlan = plan.getSubPlan();
+ frameEnd = plan.getFrameOffsetSymbol();
+ } else if (window.getFrame().isPresent()) {
+ throw new IllegalArgumentException(
+ "unexpected window frame type: " +
window.getFrame().get().getType());
+ }
+
+ subPlan =
+ planWindow(
+ subPlan,
+ functionCalls,
+ window,
+ coercions,
+ frameStart,
+ sortKeyCoercedForFrameStartComparison,
+ frameEnd,
+ sortKeyCoercedForFrameEndComparison);
+ }
+
+ return subPlan;
+ }
+
+ private PlanBuilder planWindow(
+ PlanBuilder subPlan,
+ List<FunctionCall> windowFunctions,
+ Analysis.ResolvedWindow window,
+ PlanAndMappings coercions,
+ Optional<Symbol> frameStartSymbol,
+ Optional<Symbol> sortKeyCoercedForFrameStartComparison,
+ Optional<Symbol> frameEndSymbol,
+ Optional<Symbol> sortKeyCoercedForFrameEndComparison) {
+ WindowFrame.Type frameType = WindowFrame.Type.RANGE;
+ FrameBound.Type frameStartType = FrameBound.Type.UNBOUNDED_PRECEDING;
+ FrameBound.Type frameEndType = FrameBound.Type.CURRENT_ROW;
+
+ Optional<Expression> frameStartExpression = Optional.empty();
+ Optional<Expression> frameEndExpression = Optional.empty();
+
+ if (window.getFrame().isPresent()) {
+ WindowFrame frame = window.getFrame().get();
+ frameType = frame.getType();
+
+ frameStartType = frame.getStart().getType();
+ frameStartExpression = frame.getStart().getValue();
+
+ if (frame.getEnd().isPresent()) {
+ frameEndType = frame.getEnd().get().getType();
+ frameEndExpression = frame.getEnd().get().getValue();
+ }
+ }
+
+ DataOrganizationSpecification specification =
+ planWindowSpecification(window.getPartitionBy(), window.getOrderBy(),
coercions::get);
+
+ // Rewrite frame bounds in terms of pre-projected inputs
+ WindowNode.Frame frame =
+ new WindowNode.Frame(
+ frameType,
+ frameStartType,
+ frameStartSymbol,
+ sortKeyCoercedForFrameStartComparison,
+ frameEndType,
+ frameEndSymbol,
+ sortKeyCoercedForFrameEndComparison,
+ frameStartExpression,
+ frameEndExpression);
+
+ ImmutableMap.Builder<ScopeAware<Expression>, Symbol> mappings =
ImmutableMap.builder();
+ ImmutableMap.Builder<Symbol, WindowNode.Function> functions =
ImmutableMap.builder();
+
+ for (FunctionCall windowFunction : windowFunctions) {
+ Symbol newSymbol =
+ symbolAllocator.newSymbol(windowFunction,
analysis.getType(windowFunction));
+
+ FunctionCall.NullTreatment nullTreatment =
+
windowFunction.getNullTreatment().orElse(FunctionCall.NullTreatment.RESPECT);
+
+ WindowNode.Function function =
+ new WindowNode.Function(
+ analysis.getResolvedFunction(windowFunction),
+ windowFunction.getArguments().stream()
+ .map(argument -> coercions.get(argument).toSymbolReference())
+ .collect(toImmutableList()),
+ frame,
+ // TODO: remove ignore null
Review Comment:
we should never have todo, fix it
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java:
##########
@@ -798,16 +887,41 @@ protected Type visitFunctionCall(
}
Type type = metadata.getFunctionReturnType(functionName, argumentTypes);
+ FunctionKind functionKind = FunctionKind.SCALAR;
+ if (isAggregation) {
+ functionKind = FunctionKind.AGGREGATE;
+ } else {
+ boolean isWindow = metadata.isWindowFunction(session, functionName,
accessControl);
Review Comment:
what if agg function used in window clause? It's FunctionKind.AGGREGATE or
FunctionKind.WINDOW
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/utils/RowComparator.java:
##########
@@ -133,6 +135,7 @@ private boolean equal(ColumnList column, TSDataType
dataType, int offset1, int o
}
break;
case INT64:
+ case TIMESTAMP:
Review Comment:
What about `DATE` and `BLOB`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java:
##########
@@ -138,6 +150,8 @@ public class ExpressionAnalyzer {
private final Map<NodeRef<Node>, ResolvedFunction> resolvedFunctions = new
LinkedHashMap<>();
private final Set<NodeRef<SubqueryExpression>> subqueries = new
LinkedHashSet<>();
private final Set<NodeRef<ExistsPredicate>> existsSubqueries = new
LinkedHashSet<>();
+ private final Map<NodeRef<Expression>, Type> expressionCoercions = new
LinkedHashMap<>();
+ private final Set<NodeRef<Expression>> typeOnlyCoercions = new
LinkedHashSet<>();
Review Comment:
Why add these two? never used
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/utils/RowComparator.java:
##########
@@ -66,6 +66,7 @@ private boolean equal(Column column, TSDataType dataType, int
offset1, int offse
}
break;
case INT64:
+ case TIMESTAMP:
Review Comment:
What about `DATE` and `BLOB`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/DataOrganizationSpecification.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.utils;
+
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class DataOrganizationSpecification {
Review Comment:
deduplicated with
`org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ExpressionAnalyzer.java:
##########
@@ -798,16 +887,41 @@ protected Type visitFunctionCall(
}
Type type = metadata.getFunctionReturnType(functionName, argumentTypes);
+ FunctionKind functionKind = FunctionKind.SCALAR;
+ if (isAggregation) {
+ functionKind = FunctionKind.AGGREGATE;
+ } else {
+ boolean isWindow = metadata.isWindowFunction(session, functionName,
accessControl);
+ if (isWindow) {
+ functionKind = FunctionKind.WINDOW;
+ }
+ }
+ FunctionNullability functionNullability = null;
+ switch (functionKind) {
+ case AGGREGATE:
+ functionNullability =
+
FunctionNullability.getAggregationFunctionNullability(argumentTypes.size());
+ break;
+ case SCALAR:
+ functionNullability =
+
FunctionNullability.getScalarFunctionNullability(argumentTypes.size());
+ break;
+ case WINDOW:
+ functionNullability =
+
FunctionNullability.getWindowFunctionNullability(argumentTypes.size());
+ break;
+ default:
+ // ignore
+ }
+
// now we only support scalar or agg functions
Review Comment:
delete this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]