JackieTien97 commented on code in PR #17656:
URL: https://github.com/apache/iotdb/pull/17656#discussion_r3379213492
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java:
##########
@@ -5022,9 +5023,133 @@ private ArgumentsAnalysis analyzeArguments(
analyzeDefault(parameterSpecification, errorLocation));
}
}
+ tryAppendM4ModeArgument(functionName, arguments,
parameterSpecifications, passedArguments);
return new ArgumentsAnalysis(passedArguments.buildOrThrow(),
tableArgumentAnalyses.build());
}
+ private void tryAppendM4ModeArgument(
+ String functionName,
+ List<TableFunctionArgument> arguments,
+ List<ParameterSpecification> parameterSpecifications,
+ ImmutableMap.Builder<String, Argument> passedArguments) {
+ if
(!TableBuiltinTableFunction.M4.getFunctionName().equalsIgnoreCase(functionName))
{
+ return;
+ }
+
+ TableFunctionArgument sizeArgument =
+ findTableFunctionArgument(
+ arguments, parameterSpecifications,
M4TableFunction.SIZE_PARAMETER_NAME);
+ if (!(sizeArgument.getValue() instanceof Expression)) {
+ throw new SemanticException(
+ String.format(
+ "Invalid argument %s. Expected scalar argument, got table",
+ M4TableFunction.SIZE_PARAMETER_NAME));
+ }
+
+ boolean isTimeWindow = sizeArgument.getValue() instanceof
TimeDurationLiteral;
Review Comment:
这里只根据 SIZE 是否为 `TimeDurationLiteral` 推断窗口模式,但没有校验 SLIDE 的原始参数类型。按需求,时间窗口模式下
SLIDE 应为时间长度,点数窗口模式下 SLIDE 应为整数;当前 `SIZE => 10ms, SLIDE => 5` 或 `SIZE => 10,
SLIDE => 5ms` 都会被统一转成 long 后通过分析,语义会混用。建议在这里同时检查 SLIDE 的 AST 类型并拒绝和 SIZE
模式不一致的调用。
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java:
##########
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.commons.exception.SemanticException;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+
+import java.time.LocalDate;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+import static
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER;
+
+public class M4TableFunction implements TableFunction {
+
+ public static final String DATA_PARAMETER_NAME = "DATA";
+ public static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+ public static final String SIZE_PARAMETER_NAME = "SIZE";
+ public static final String SLIDE_PARAMETER_NAME = "SLIDE";
+ public static final String ORIGIN_PARAMETER_NAME = "ORIGIN";
+ public static final String WINDOW_MODE_PARAMETER_NAME = "__M4_WINDOW_MODE";
+
+ private static final String OUTPUT_WINDOW_START_COLUMN = "window_start";
+ private static final String OUTPUT_WINDOW_END_COLUMN = "window_end";
+ private static final String PARTICIPANT_TYPES_PROPERTY =
"__M4_PARTICIPANT_TYPES";
+ private static final long UNSPECIFIED_SLIDE = Long.MIN_VALUE;
+ private static final long INVALID_INDEX = -1;
+
+ @Override
+ public List<ParameterSpecification> getArgumentsSpecifications() {
+ return Arrays.asList(
+
TableParameterSpecification.builder().name(DATA_PARAMETER_NAME).setSemantics().build(),
+ ScalarParameterSpecification.builder()
+ .name(TIMECOL_PARAMETER_NAME)
+ .type(Type.STRING)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(SIZE_PARAMETER_NAME)
+ .type(Type.INT64)
+ .addChecker(POSITIVE_LONG_CHECKER)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(SLIDE_PARAMETER_NAME)
+ .type(Type.INT64)
+ .defaultValue(UNSPECIFIED_SLIDE)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(ORIGIN_PARAMETER_NAME)
+ .type(Type.TIMESTAMP)
+ .defaultValue(0L)
+ .build());
+ }
+
+ @Override
+ public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws
UDFException {
+ TableArgument tableArgument = (TableArgument)
arguments.get(DATA_PARAMETER_NAME);
+ if (tableArgument.getOrderBy().isEmpty()) {
+ throw new SemanticException("Table argument with set semantics requires
an ORDER BY clause.");
+ }
+
+ String timeColumn =
+ (String) ((ScalarArgument)
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+ int timeColumnIndex =
+ findColumnIndex(tableArgument, timeColumn,
Collections.singleton(Type.TIMESTAMP));
+ validateOrderBy(tableArgument, timeColumn);
+
+ List<Integer> partitionIndexes = getPartitionIndexes(tableArgument);
+ Set<Integer> excludedIndexes = new HashSet<>(partitionIndexes);
+ excludedIndexes.add(timeColumnIndex);
+
+ List<Integer> participantIndexes = new ArrayList<>();
+ List<Type> participantTypes = new ArrayList<>();
+ DescribedSchema.Builder schemaBuilder =
Review Comment:
这里把所有动态 M4 输出列都放进 properColumnSchema,但 PTF 框架会在 proper columns 之后再追加 DATA 的
PARTITION BY pass-through 列。因此 `SELECT * FROM M4(...)` 的实际列顺序会变成 `window_start,
window_end, <dynamic columns>, <partition columns>`,和需求文档要求的 `window_start,
window_end, <partition columns>, <dynamic columns>` 不一致。现有 IT 都显式选择列名,建议补一个
`SELECT *` 的列顺序测试,并调整输出 schema/实现方式。
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java:
##########
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.commons.exception.SemanticException;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+
+import java.time.LocalDate;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+import static
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER;
+
+public class M4TableFunction implements TableFunction {
+
+ public static final String DATA_PARAMETER_NAME = "DATA";
+ public static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+ public static final String SIZE_PARAMETER_NAME = "SIZE";
+ public static final String SLIDE_PARAMETER_NAME = "SLIDE";
+ public static final String ORIGIN_PARAMETER_NAME = "ORIGIN";
+ public static final String WINDOW_MODE_PARAMETER_NAME = "__M4_WINDOW_MODE";
+
+ private static final String OUTPUT_WINDOW_START_COLUMN = "window_start";
+ private static final String OUTPUT_WINDOW_END_COLUMN = "window_end";
+ private static final String PARTICIPANT_TYPES_PROPERTY =
"__M4_PARTICIPANT_TYPES";
+ private static final long UNSPECIFIED_SLIDE = Long.MIN_VALUE;
+ private static final long INVALID_INDEX = -1;
+
+ @Override
+ public List<ParameterSpecification> getArgumentsSpecifications() {
+ return Arrays.asList(
+
TableParameterSpecification.builder().name(DATA_PARAMETER_NAME).setSemantics().build(),
+ ScalarParameterSpecification.builder()
+ .name(TIMECOL_PARAMETER_NAME)
+ .type(Type.STRING)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(SIZE_PARAMETER_NAME)
+ .type(Type.INT64)
+ .addChecker(POSITIVE_LONG_CHECKER)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(SLIDE_PARAMETER_NAME)
+ .type(Type.INT64)
+ .defaultValue(UNSPECIFIED_SLIDE)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(ORIGIN_PARAMETER_NAME)
+ .type(Type.TIMESTAMP)
+ .defaultValue(0L)
+ .build());
+ }
+
+ @Override
+ public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws
UDFException {
+ TableArgument tableArgument = (TableArgument)
arguments.get(DATA_PARAMETER_NAME);
+ if (tableArgument.getOrderBy().isEmpty()) {
+ throw new SemanticException("Table argument with set semantics requires
an ORDER BY clause.");
+ }
+
+ String timeColumn =
+ (String) ((ScalarArgument)
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+ int timeColumnIndex =
+ findColumnIndex(tableArgument, timeColumn,
Collections.singleton(Type.TIMESTAMP));
+ validateOrderBy(tableArgument, timeColumn);
+
+ List<Integer> partitionIndexes = getPartitionIndexes(tableArgument);
+ Set<Integer> excludedIndexes = new HashSet<>(partitionIndexes);
+ excludedIndexes.add(timeColumnIndex);
+
+ List<Integer> participantIndexes = new ArrayList<>();
+ List<Type> participantTypes = new ArrayList<>();
+ DescribedSchema.Builder schemaBuilder =
+ new DescribedSchema.Builder()
+ .addField(OUTPUT_WINDOW_START_COLUMN, Type.TIMESTAMP)
+ .addField(OUTPUT_WINDOW_END_COLUMN, Type.TIMESTAMP);
+
+ for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
+ if (excludedIndexes.contains(i)) {
+ continue;
+ }
+ Type type = tableArgument.getFieldTypes().get(i);
+ String columnName = tableArgument.getFieldNames().get(i).get();
+ if (!isComparableType(type)) {
+ throw new SemanticException(
+ String.format("The type of the column [%s] is not comparable.",
columnName));
+ }
+ participantIndexes.add(i);
+ participantTypes.add(type);
+ schemaBuilder.addField(columnName + "_time", Type.TIMESTAMP);
+ schemaBuilder.addField(columnName, type);
+ }
+
+ if (participantIndexes.isEmpty()) {
+ throw new SemanticException("No comparable columns found for M4
calculation.");
+ }
+
+ long size = (long) ((ScalarArgument)
arguments.get(SIZE_PARAMETER_NAME)).getValue();
+ long slide = (long) ((ScalarArgument)
arguments.get(SLIDE_PARAMETER_NAME)).getValue();
+ if (slide == UNSPECIFIED_SLIDE) {
+ slide = size;
+ } else if (slide <= 0) {
+ throw new UDFException("Invalid scalar argument SLIDE, should be a
positive value");
+ }
+
+ boolean isTimeWindow =
+ arguments.containsKey(WINDOW_MODE_PARAMETER_NAME)
+ && (boolean) ((ScalarArgument)
arguments.get(WINDOW_MODE_PARAMETER_NAME)).getValue();
+
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(WINDOW_MODE_PARAMETER_NAME, isTimeWindow)
+ .addProperty(SIZE_PARAMETER_NAME, size)
+ .addProperty(SLIDE_PARAMETER_NAME, slide)
+ .addProperty(
+ ORIGIN_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(ORIGIN_PARAMETER_NAME)).getValue())
+ .addProperty(PARTICIPANT_TYPES_PROPERTY,
joinTypes(participantTypes))
+ .build();
+
+ List<Integer> requiredColumns = new ArrayList<>();
+ requiredColumns.add(timeColumnIndex);
+ requiredColumns.addAll(participantIndexes);
+
+ return TableFunctionAnalysis.builder()
+ .properColumnSchema(schemaBuilder.build())
+ .requireRecordSnapshot(false)
+ .requiredColumns(DATA_PARAMETER_NAME, requiredColumns)
+ .handle(handle)
+ .build();
+ }
+
+ @Override
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
+ MapTableFunctionHandle handle = (MapTableFunctionHandle)
tableFunctionHandle;
+ boolean isTimeWindow = (boolean)
handle.getProperty(WINDOW_MODE_PARAMETER_NAME);
+ long size = (long) handle.getProperty(SIZE_PARAMETER_NAME);
+ long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME);
+ long origin = (long) handle.getProperty(ORIGIN_PARAMETER_NAME);
+ Type[] participantTypes = parseTypes((String)
handle.getProperty(PARTICIPANT_TYPES_PROPERTY));
+
+ return new TableFunctionProcessorProvider() {
+ @Override
+ public TableFunctionDataProcessor getDataProcessor() {
+ M4Column[] participantColumns = createColumns(participantTypes);
+ return isTimeWindow
+ ? new TimeWindowM4DataProcessor(size, slide, origin,
participantColumns)
+ : new CountWindowM4DataProcessor(size, slide, participantColumns);
+ }
+ };
+ }
+
+ private static void validateOrderBy(TableArgument tableArgument, String
timeColumn) {
+ if (tableArgument.getOrderBy().size() != 1
+ || !tableArgument.getOrderBy().get(0).equalsIgnoreCase(timeColumn)) {
+ throw new SemanticException(
+ "The ORDER BY clause of the DATA argument must contain exactly the
time column specified by the TIMECOL argument.");
+ }
+ }
+
+ private static List<Integer> getPartitionIndexes(TableArgument
tableArgument) {
+ List<Integer> indexes = new ArrayList<>();
+ for (String partitionColumn : tableArgument.getPartitionBy()) {
+ indexes.add(
+ findColumnIndex(tableArgument, partitionColumn, new
LinkedHashSet<>(Type.allTypes())));
+ }
+ return indexes;
+ }
+
+ private static boolean isComparableType(Type type) {
+ return type != Type.BLOB && type != Type.OBJECT;
+ }
+
+ private static String joinTypes(List<Type> types) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < types.size(); i++) {
+ if (i > 0) {
+ builder.append(',');
+ }
+ builder.append(types.get(i).name());
+ }
+ return builder.toString();
+ }
+
+ private static Type[] parseTypes(String value) {
+ if (value.isEmpty()) {
+ return new Type[0];
+ }
+ String[] values = value.split(",");
+ Type[] types = new Type[values.length];
+ for (int i = 0; i < values.length; i++) {
+ types[i] = Type.valueOf(values[i]);
+ }
+ return types;
+ }
+
+ private static M4Column[] createColumns(Type[] types) {
+ M4Column[] columns = new M4Column[types.length];
+ for (int i = 0; i < types.length; i++) {
+ columns[i] = new M4Column(i + 1, ValueOperator.fromType(types[i]));
+ }
+ return columns;
+ }
+
+ private enum ValueOperator {
+ BOOLEAN(Type.BOOLEAN) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getBoolean(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Boolean.compare((Boolean) left, (Boolean) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeBoolean((Boolean) value);
+ }
+ },
+ INT32(Type.INT32) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getInt(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Integer.compare((Integer) left, (Integer) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeInt((Integer) value);
+ }
+ },
+ INT64(Type.INT64) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getLong(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Long.compare((Long) left, (Long) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeLong((Long) value);
+ }
+ },
+ FLOAT(Type.FLOAT) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getFloat(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Float.compare((Float) left, (Float) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeFloat((Float) value);
+ }
+ },
+ DOUBLE(Type.DOUBLE) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getDouble(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Double.compare((Double) left, (Double) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeDouble((Double) value);
+ }
+ },
+ TEXT(Type.TEXT) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getBinary(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return ((Binary) left).compareTo((Binary) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeBinary((Binary) value);
+ }
+ },
+ TIMESTAMP(Type.TIMESTAMP) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getLong(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Long.compare((Long) left, (Long) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeLong((Long) value);
+ }
+ },
+ DATE(Type.DATE) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getLocalDate(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return ((LocalDate) left).compareTo((LocalDate) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeObject(value);
+ }
+ },
+ STRING(Type.STRING) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getBinary(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return ((Binary) left).compareTo((Binary) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeBinary((Binary) value);
+ }
+ };
+
+ private final Type type;
+
+ ValueOperator(Type type) {
+ this.type = type;
+ }
+
+ abstract Object read(Record record, int index);
+
+ abstract int compare(Object left, Object right);
+
+ abstract void write(ColumnBuilder builder, Object value);
+
+ static ValueOperator fromType(Type type) {
+ for (ValueOperator valueOperator : values()) {
+ if (valueOperator.type == type) {
+ return valueOperator;
+ }
+ }
+ throw new IllegalArgumentException("Unsupported M4 value type: " + type);
+ }
+ }
+
+ private static class M4Column {
+ private final int inputIndex;
+ private final ValueOperator valueOperator;
+
+ private M4Column(int inputIndex, ValueOperator valueOperator) {
+ this.inputIndex = inputIndex;
+ this.valueOperator = valueOperator;
+ }
+
+ private boolean isNull(Record record) {
+ return record.isNull(inputIndex);
+ }
+
+ private Object read(Record record) {
+ return valueOperator.read(record, inputIndex);
+ }
+
+ private void write(ColumnBuilder builder, Object value) {
+ valueOperator.write(builder, value);
+ }
+
+ private int compare(Object left, Object right) {
+ return valueOperator.compare(left, right);
+ }
+ }
+
+ private static class Candidate {
+ private long index = INVALID_INDEX;
+ private long time;
+ private Object value;
+
+ private void set(long index, long time, Object value) {
+ this.index = index;
+ this.time = time;
+ this.value = value;
+ }
+ }
+
+ private static class ColumnWindowState {
+ private final Candidate first = new Candidate();
+ private final Candidate last = new Candidate();
+ private final Candidate bottom = new Candidate();
+ private final Candidate top = new Candidate();
+
+ private void update(long rowIndex, long time, Object value, M4Column
column) {
+ if (first.index == INVALID_INDEX) {
+ first.set(rowIndex, time, value);
+ last.set(rowIndex, time, value);
+ bottom.set(rowIndex, time, value);
+ top.set(rowIndex, time, value);
+ return;
+ }
+
+ last.set(rowIndex, time, value);
+ if (column.compare(value, bottom.value) < 0) {
+ bottom.set(rowIndex, time, value);
+ }
+ if (column.compare(value, top.value) > 0) {
+ top.set(rowIndex, time, value);
+ }
+ }
+
+ private boolean hasOutput() {
+ return first.index != INVALID_INDEX;
+ }
+
+ private List<Candidate> getSortedCandidates() {
+ if (!hasOutput()) {
+ return Collections.emptyList();
+ }
+ List<Candidate> candidates = new ArrayList<>(Arrays.asList(first, last,
bottom, top));
+ Set<Long> emittedTimestamps = new HashSet<>();
+ candidates.removeIf(candidate -> !emittedTimestamps.add(candidate.time));
+ candidates.sort(Comparator.comparingLong(candidate -> candidate.time));
+ return candidates;
+ }
+ }
+
+ private abstract static class WindowState {
+ protected final ColumnWindowState[] columnStates;
+
+ private WindowState(int columnCount) {
+ columnStates = new ColumnWindowState[columnCount];
+ for (int i = 0; i < columnCount; i++) {
+ columnStates[i] = new ColumnWindowState();
+ }
+ }
+
+ protected abstract long getOutputWindowStart();
+
+ protected abstract long getOutputWindowEnd();
+ }
+
+ private static class TimeWindowState extends WindowState {
+ private final long windowStart;
+ private final long endExclusive;
+
+ private TimeWindowState(long windowStart, long endExclusive, int
columnCount) {
+ super(columnCount);
+ this.windowStart = windowStart;
+ this.endExclusive = endExclusive;
+ }
+
+ @Override
+ protected long getOutputWindowStart() {
+ return windowStart;
+ }
+
+ @Override
+ protected long getOutputWindowEnd() {
+ return endExclusive;
+ }
+ }
+
+ private static class CountWindowState extends WindowState {
+ private final long endExclusive;
+ private long windowStart = Long.MIN_VALUE;
+ private long windowEnd = Long.MIN_VALUE;
+
+ private CountWindowState(long endExclusive, int columnCount) {
+ super(columnCount);
+ this.endExclusive = endExclusive;
+ }
+
+ @Override
+ protected long getOutputWindowStart() {
+ return windowStart;
+ }
+
+ @Override
+ protected long getOutputWindowEnd() {
+ return windowEnd;
+ }
+ }
+
+ private abstract static class AbstractM4DataProcessor implements
TableFunctionDataProcessor {
+ protected final long size;
+ protected final long slide;
+ protected final M4Column[] participantColumns;
+ protected long curIndex = 0;
+
+ protected AbstractM4DataProcessor(long size, long slide, M4Column[]
participantColumns) {
+ this.size = size;
+ this.slide = slide;
+ this.participantColumns = participantColumns;
+ }
+
+ @Override
+ public final void process(
+ Record input,
+ List<ColumnBuilder> properColumnBuilders,
+ ColumnBuilder passThroughIndexBuilder) {
+ processRecord(input, input.getLong(0), properColumnBuilders);
+ curIndex++;
+ }
+
+ protected abstract void processRecord(
+ Record input, long time, List<ColumnBuilder> properColumnBuilders);
+
+ protected final void updateWindow(WindowState windowState, Record input,
long time) {
+ for (int i = 0; i < participantColumns.length; i++) {
+ if (!participantColumns[i].isNull(input)) {
+ windowState.columnStates[i].update(
+ curIndex, time, participantColumns[i].read(input),
participantColumns[i]);
+ }
+ }
+ }
+
+ protected final void outputWindow(
+ WindowState windowState, List<ColumnBuilder> properColumnBuilders) {
+ List<List<Candidate>> candidatesByColumn = new ArrayList<>();
+ int rowCount = 0;
+ for (ColumnWindowState columnState : windowState.columnStates) {
+ List<Candidate> candidates = columnState.getSortedCandidates();
+ candidatesByColumn.add(candidates);
+ rowCount = Math.max(rowCount, candidates.size());
+ }
+ if (rowCount == 0) {
+ return;
+ }
+
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+
properColumnBuilders.get(0).writeLong(windowState.getOutputWindowStart());
+
properColumnBuilders.get(1).writeLong(windowState.getOutputWindowEnd());
+ int outputColumnIndex = 2;
+ for (int columnIndex = 0; columnIndex < participantColumns.length;
columnIndex++) {
+ List<Candidate> candidates = candidatesByColumn.get(columnIndex);
+ if (rowIndex < candidates.size()) {
+ Candidate candidate = candidates.get(rowIndex);
+
properColumnBuilders.get(outputColumnIndex++).writeLong(candidate.time);
+ participantColumns[columnIndex].write(
+ properColumnBuilders.get(outputColumnIndex++),
candidate.value);
+ } else {
+ properColumnBuilders.get(outputColumnIndex++).appendNull();
+ properColumnBuilders.get(outputColumnIndex++).appendNull();
+ }
+ }
+ }
+ }
+
+ protected final long getWindowEnd(long windowStart) {
+ return windowStart + size;
+ }
+ }
+
+ private static class TimeWindowM4DataProcessor extends
AbstractM4DataProcessor {
+ private final Deque<TimeWindowState> activeWindows = new ArrayDeque<>();
+ private final long origin;
+ private boolean nextWindowStartInitialized = false;
+ private long nextWindowStart;
+
+ private TimeWindowM4DataProcessor(
+ long size, long slide, long origin, M4Column[] participantColumns) {
+ super(size, slide, participantColumns);
+ this.origin = origin;
+ }
+
+ @Override
+ protected void processRecord(
+ Record input, long time, List<ColumnBuilder> properColumnBuilders) {
+ if (time < origin) {
+ return;
+ }
+
+ while (!activeWindows.isEmpty() &&
activeWindows.peekFirst().endExclusive <= time) {
+ outputWindow(activeWindows.removeFirst(), properColumnBuilders);
+ }
+
+ long firstCandidateStart =
+ origin + Math.floorDiv(time - origin - size, slide) * slide + slide;
+ while (getWindowEnd(firstCandidateStart) <= time) {
+ firstCandidateStart += slide;
+ }
+ if (!nextWindowStartInitialized) {
+ nextWindowStart = firstCandidateStart;
+ nextWindowStartInitialized = true;
+ } else if (nextWindowStart < firstCandidateStart) {
+ nextWindowStart = firstCandidateStart;
+ }
+
+ while (nextWindowStart <= time && getWindowEnd(nextWindowStart) > time) {
+ activeWindows.addLast(
+ new TimeWindowState(
+ nextWindowStart, getWindowEnd(nextWindowStart),
participantColumns.length));
+ nextWindowStart += slide;
+ }
+
+ for (TimeWindowState activeWindow : activeWindows) {
Review Comment:
需求文档里 M4TableFunction 的目标空间复杂度是 O(1),并且按讨论语义,每来一行数据应能直接根据 SIZE、SLIDE、ORIGIN
算出它所属分组。这里维护并遍历 `activeWindows`,在 `SLIDE < SIZE`
的重叠窗口场景下一行会更新多个窗口,状态量和单行处理成本都会随 `SIZE / SLIDE`
增长。建议改成每行只计算并更新一个窗口;如果要支持重叠窗口,则需要相应调整需求里的 O(1) 复杂度约束。
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java:
##########
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.commons.exception.SemanticException;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+
+import java.time.LocalDate;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+import static
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER;
+
+public class M4TableFunction implements TableFunction {
+
+ public static final String DATA_PARAMETER_NAME = "DATA";
+ public static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+ public static final String SIZE_PARAMETER_NAME = "SIZE";
+ public static final String SLIDE_PARAMETER_NAME = "SLIDE";
+ public static final String ORIGIN_PARAMETER_NAME = "ORIGIN";
+ public static final String WINDOW_MODE_PARAMETER_NAME = "__M4_WINDOW_MODE";
+
+ private static final String OUTPUT_WINDOW_START_COLUMN = "window_start";
+ private static final String OUTPUT_WINDOW_END_COLUMN = "window_end";
+ private static final String PARTICIPANT_TYPES_PROPERTY =
"__M4_PARTICIPANT_TYPES";
+ private static final long UNSPECIFIED_SLIDE = Long.MIN_VALUE;
+ private static final long INVALID_INDEX = -1;
+
+ @Override
+ public List<ParameterSpecification> getArgumentsSpecifications() {
+ return Arrays.asList(
+
TableParameterSpecification.builder().name(DATA_PARAMETER_NAME).setSemantics().build(),
+ ScalarParameterSpecification.builder()
+ .name(TIMECOL_PARAMETER_NAME)
+ .type(Type.STRING)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(SIZE_PARAMETER_NAME)
+ .type(Type.INT64)
+ .addChecker(POSITIVE_LONG_CHECKER)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(SLIDE_PARAMETER_NAME)
+ .type(Type.INT64)
+ .defaultValue(UNSPECIFIED_SLIDE)
+ .build(),
+ ScalarParameterSpecification.builder()
+ .name(ORIGIN_PARAMETER_NAME)
+ .type(Type.TIMESTAMP)
+ .defaultValue(0L)
+ .build());
+ }
+
+ @Override
+ public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws
UDFException {
+ TableArgument tableArgument = (TableArgument)
arguments.get(DATA_PARAMETER_NAME);
+ if (tableArgument.getOrderBy().isEmpty()) {
+ throw new SemanticException("Table argument with set semantics requires
an ORDER BY clause.");
+ }
+
+ String timeColumn =
+ (String) ((ScalarArgument)
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+ int timeColumnIndex =
+ findColumnIndex(tableArgument, timeColumn,
Collections.singleton(Type.TIMESTAMP));
+ validateOrderBy(tableArgument, timeColumn);
+
+ List<Integer> partitionIndexes = getPartitionIndexes(tableArgument);
+ Set<Integer> excludedIndexes = new HashSet<>(partitionIndexes);
+ excludedIndexes.add(timeColumnIndex);
+
+ List<Integer> participantIndexes = new ArrayList<>();
+ List<Type> participantTypes = new ArrayList<>();
+ DescribedSchema.Builder schemaBuilder =
+ new DescribedSchema.Builder()
+ .addField(OUTPUT_WINDOW_START_COLUMN, Type.TIMESTAMP)
+ .addField(OUTPUT_WINDOW_END_COLUMN, Type.TIMESTAMP);
+
+ for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
+ if (excludedIndexes.contains(i)) {
+ continue;
+ }
+ Type type = tableArgument.getFieldTypes().get(i);
+ String columnName = tableArgument.getFieldNames().get(i).get();
+ if (!isComparableType(type)) {
+ throw new SemanticException(
+ String.format("The type of the column [%s] is not comparable.",
columnName));
+ }
+ participantIndexes.add(i);
+ participantTypes.add(type);
+ schemaBuilder.addField(columnName + "_time", Type.TIMESTAMP);
+ schemaBuilder.addField(columnName, type);
+ }
+
+ if (participantIndexes.isEmpty()) {
+ throw new SemanticException("No comparable columns found for M4
calculation.");
+ }
+
+ long size = (long) ((ScalarArgument)
arguments.get(SIZE_PARAMETER_NAME)).getValue();
+ long slide = (long) ((ScalarArgument)
arguments.get(SLIDE_PARAMETER_NAME)).getValue();
+ if (slide == UNSPECIFIED_SLIDE) {
+ slide = size;
+ } else if (slide <= 0) {
+ throw new UDFException("Invalid scalar argument SLIDE, should be a
positive value");
+ }
+
+ boolean isTimeWindow =
+ arguments.containsKey(WINDOW_MODE_PARAMETER_NAME)
+ && (boolean) ((ScalarArgument)
arguments.get(WINDOW_MODE_PARAMETER_NAME)).getValue();
+
+ MapTableFunctionHandle handle =
+ new MapTableFunctionHandle.Builder()
+ .addProperty(WINDOW_MODE_PARAMETER_NAME, isTimeWindow)
+ .addProperty(SIZE_PARAMETER_NAME, size)
+ .addProperty(SLIDE_PARAMETER_NAME, slide)
+ .addProperty(
+ ORIGIN_PARAMETER_NAME,
+ ((ScalarArgument)
arguments.get(ORIGIN_PARAMETER_NAME)).getValue())
+ .addProperty(PARTICIPANT_TYPES_PROPERTY,
joinTypes(participantTypes))
+ .build();
+
+ List<Integer> requiredColumns = new ArrayList<>();
+ requiredColumns.add(timeColumnIndex);
+ requiredColumns.addAll(participantIndexes);
+
+ return TableFunctionAnalysis.builder()
+ .properColumnSchema(schemaBuilder.build())
+ .requireRecordSnapshot(false)
+ .requiredColumns(DATA_PARAMETER_NAME, requiredColumns)
+ .handle(handle)
+ .build();
+ }
+
+ @Override
+ public TableFunctionHandle createTableFunctionHandle() {
+ return new MapTableFunctionHandle();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(
+ TableFunctionHandle tableFunctionHandle) {
+ MapTableFunctionHandle handle = (MapTableFunctionHandle)
tableFunctionHandle;
+ boolean isTimeWindow = (boolean)
handle.getProperty(WINDOW_MODE_PARAMETER_NAME);
+ long size = (long) handle.getProperty(SIZE_PARAMETER_NAME);
+ long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME);
+ long origin = (long) handle.getProperty(ORIGIN_PARAMETER_NAME);
+ Type[] participantTypes = parseTypes((String)
handle.getProperty(PARTICIPANT_TYPES_PROPERTY));
+
+ return new TableFunctionProcessorProvider() {
+ @Override
+ public TableFunctionDataProcessor getDataProcessor() {
+ M4Column[] participantColumns = createColumns(participantTypes);
+ return isTimeWindow
+ ? new TimeWindowM4DataProcessor(size, slide, origin,
participantColumns)
+ : new CountWindowM4DataProcessor(size, slide, participantColumns);
+ }
+ };
+ }
+
+ private static void validateOrderBy(TableArgument tableArgument, String
timeColumn) {
+ if (tableArgument.getOrderBy().size() != 1
+ || !tableArgument.getOrderBy().get(0).equalsIgnoreCase(timeColumn)) {
+ throw new SemanticException(
+ "The ORDER BY clause of the DATA argument must contain exactly the
time column specified by the TIMECOL argument.");
+ }
+ }
+
+ private static List<Integer> getPartitionIndexes(TableArgument
tableArgument) {
+ List<Integer> indexes = new ArrayList<>();
+ for (String partitionColumn : tableArgument.getPartitionBy()) {
+ indexes.add(
+ findColumnIndex(tableArgument, partitionColumn, new
LinkedHashSet<>(Type.allTypes())));
+ }
+ return indexes;
+ }
+
+ private static boolean isComparableType(Type type) {
+ return type != Type.BLOB && type != Type.OBJECT;
+ }
+
+ private static String joinTypes(List<Type> types) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < types.size(); i++) {
+ if (i > 0) {
+ builder.append(',');
+ }
+ builder.append(types.get(i).name());
+ }
+ return builder.toString();
+ }
+
+ private static Type[] parseTypes(String value) {
+ if (value.isEmpty()) {
+ return new Type[0];
+ }
+ String[] values = value.split(",");
+ Type[] types = new Type[values.length];
+ for (int i = 0; i < values.length; i++) {
+ types[i] = Type.valueOf(values[i]);
+ }
+ return types;
+ }
+
+ private static M4Column[] createColumns(Type[] types) {
+ M4Column[] columns = new M4Column[types.length];
+ for (int i = 0; i < types.length; i++) {
+ columns[i] = new M4Column(i + 1, ValueOperator.fromType(types[i]));
+ }
+ return columns;
+ }
+
+ private enum ValueOperator {
+ BOOLEAN(Type.BOOLEAN) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getBoolean(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Boolean.compare((Boolean) left, (Boolean) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeBoolean((Boolean) value);
+ }
+ },
+ INT32(Type.INT32) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getInt(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Integer.compare((Integer) left, (Integer) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeInt((Integer) value);
+ }
+ },
+ INT64(Type.INT64) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getLong(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Long.compare((Long) left, (Long) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeLong((Long) value);
+ }
+ },
+ FLOAT(Type.FLOAT) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getFloat(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Float.compare((Float) left, (Float) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeFloat((Float) value);
+ }
+ },
+ DOUBLE(Type.DOUBLE) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getDouble(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Double.compare((Double) left, (Double) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeDouble((Double) value);
+ }
+ },
+ TEXT(Type.TEXT) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getBinary(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return ((Binary) left).compareTo((Binary) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeBinary((Binary) value);
+ }
+ },
+ TIMESTAMP(Type.TIMESTAMP) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getLong(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return Long.compare((Long) left, (Long) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeLong((Long) value);
+ }
+ },
+ DATE(Type.DATE) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getLocalDate(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return ((LocalDate) left).compareTo((LocalDate) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeObject(value);
+ }
+ },
+ STRING(Type.STRING) {
+ @Override
+ Object read(Record record, int index) {
+ return record.getBinary(index);
+ }
+
+ @Override
+ int compare(Object left, Object right) {
+ return ((Binary) left).compareTo((Binary) right);
+ }
+
+ @Override
+ void write(ColumnBuilder builder, Object value) {
+ builder.writeBinary((Binary) value);
+ }
+ };
+
+ private final Type type;
+
+ ValueOperator(Type type) {
+ this.type = type;
+ }
+
+ abstract Object read(Record record, int index);
+
+ abstract int compare(Object left, Object right);
+
+ abstract void write(ColumnBuilder builder, Object value);
+
+ static ValueOperator fromType(Type type) {
+ for (ValueOperator valueOperator : values()) {
+ if (valueOperator.type == type) {
+ return valueOperator;
+ }
+ }
+ throw new IllegalArgumentException("Unsupported M4 value type: " + type);
+ }
+ }
+
+ private static class M4Column {
+ private final int inputIndex;
+ private final ValueOperator valueOperator;
+
+ private M4Column(int inputIndex, ValueOperator valueOperator) {
+ this.inputIndex = inputIndex;
+ this.valueOperator = valueOperator;
+ }
+
+ private boolean isNull(Record record) {
+ return record.isNull(inputIndex);
+ }
+
+ private Object read(Record record) {
+ return valueOperator.read(record, inputIndex);
+ }
+
+ private void write(ColumnBuilder builder, Object value) {
+ valueOperator.write(builder, value);
+ }
+
+ private int compare(Object left, Object right) {
+ return valueOperator.compare(left, right);
+ }
+ }
+
+ private static class Candidate {
+ private long index = INVALID_INDEX;
+ private long time;
+ private Object value;
+
+ private void set(long index, long time, Object value) {
+ this.index = index;
+ this.time = time;
+ this.value = value;
+ }
+ }
+
+ private static class ColumnWindowState {
+ private final Candidate first = new Candidate();
+ private final Candidate last = new Candidate();
+ private final Candidate bottom = new Candidate();
+ private final Candidate top = new Candidate();
+
+ private void update(long rowIndex, long time, Object value, M4Column
column) {
+ if (first.index == INVALID_INDEX) {
+ first.set(rowIndex, time, value);
+ last.set(rowIndex, time, value);
+ bottom.set(rowIndex, time, value);
+ top.set(rowIndex, time, value);
+ return;
+ }
+
+ last.set(rowIndex, time, value);
+ if (column.compare(value, bottom.value) < 0) {
+ bottom.set(rowIndex, time, value);
+ }
+ if (column.compare(value, top.value) > 0) {
+ top.set(rowIndex, time, value);
+ }
+ }
+
+ private boolean hasOutput() {
+ return first.index != INVALID_INDEX;
+ }
+
+ private List<Candidate> getSortedCandidates() {
+ if (!hasOutput()) {
+ return Collections.emptyList();
+ }
+ List<Candidate> candidates = new ArrayList<>(Arrays.asList(first, last,
bottom, top));
+ Set<Long> emittedTimestamps = new HashSet<>();
+ candidates.removeIf(candidate -> !emittedTimestamps.add(candidate.time));
+ candidates.sort(Comparator.comparingLong(candidate -> candidate.time));
+ return candidates;
+ }
+ }
+
+ private abstract static class WindowState {
+ protected final ColumnWindowState[] columnStates;
+
+ private WindowState(int columnCount) {
+ columnStates = new ColumnWindowState[columnCount];
+ for (int i = 0; i < columnCount; i++) {
+ columnStates[i] = new ColumnWindowState();
+ }
+ }
+
+ protected abstract long getOutputWindowStart();
+
+ protected abstract long getOutputWindowEnd();
+ }
+
+ private static class TimeWindowState extends WindowState {
+ private final long windowStart;
+ private final long endExclusive;
+
+ private TimeWindowState(long windowStart, long endExclusive, int
columnCount) {
+ super(columnCount);
+ this.windowStart = windowStart;
+ this.endExclusive = endExclusive;
+ }
+
+ @Override
+ protected long getOutputWindowStart() {
+ return windowStart;
+ }
+
+ @Override
+ protected long getOutputWindowEnd() {
+ return endExclusive;
+ }
+ }
+
+ private static class CountWindowState extends WindowState {
+ private final long endExclusive;
+ private long windowStart = Long.MIN_VALUE;
+ private long windowEnd = Long.MIN_VALUE;
+
+ private CountWindowState(long endExclusive, int columnCount) {
+ super(columnCount);
+ this.endExclusive = endExclusive;
+ }
+
+ @Override
+ protected long getOutputWindowStart() {
+ return windowStart;
+ }
+
+ @Override
+ protected long getOutputWindowEnd() {
+ return windowEnd;
+ }
+ }
+
+ private abstract static class AbstractM4DataProcessor implements
TableFunctionDataProcessor {
+ protected final long size;
+ protected final long slide;
+ protected final M4Column[] participantColumns;
+ protected long curIndex = 0;
+
+ protected AbstractM4DataProcessor(long size, long slide, M4Column[]
participantColumns) {
+ this.size = size;
+ this.slide = slide;
+ this.participantColumns = participantColumns;
+ }
+
+ @Override
+ public final void process(
+ Record input,
+ List<ColumnBuilder> properColumnBuilders,
+ ColumnBuilder passThroughIndexBuilder) {
+ processRecord(input, input.getLong(0), properColumnBuilders);
+ curIndex++;
+ }
+
+ protected abstract void processRecord(
+ Record input, long time, List<ColumnBuilder> properColumnBuilders);
+
+ protected final void updateWindow(WindowState windowState, Record input,
long time) {
+ for (int i = 0; i < participantColumns.length; i++) {
+ if (!participantColumns[i].isNull(input)) {
+ windowState.columnStates[i].update(
+ curIndex, time, participantColumns[i].read(input),
participantColumns[i]);
+ }
+ }
+ }
+
+ protected final void outputWindow(
+ WindowState windowState, List<ColumnBuilder> properColumnBuilders) {
+ List<List<Candidate>> candidatesByColumn = new ArrayList<>();
+ int rowCount = 0;
+ for (ColumnWindowState columnState : windowState.columnStates) {
+ List<Candidate> candidates = columnState.getSortedCandidates();
+ candidatesByColumn.add(candidates);
+ rowCount = Math.max(rowCount, candidates.size());
+ }
+ if (rowCount == 0) {
+ return;
+ }
+
+ for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+
properColumnBuilders.get(0).writeLong(windowState.getOutputWindowStart());
+
properColumnBuilders.get(1).writeLong(windowState.getOutputWindowEnd());
+ int outputColumnIndex = 2;
+ for (int columnIndex = 0; columnIndex < participantColumns.length;
columnIndex++) {
+ List<Candidate> candidates = candidatesByColumn.get(columnIndex);
+ if (rowIndex < candidates.size()) {
+ Candidate candidate = candidates.get(rowIndex);
+
properColumnBuilders.get(outputColumnIndex++).writeLong(candidate.time);
+ participantColumns[columnIndex].write(
+ properColumnBuilders.get(outputColumnIndex++),
candidate.value);
+ } else {
+ properColumnBuilders.get(outputColumnIndex++).appendNull();
+ properColumnBuilders.get(outputColumnIndex++).appendNull();
+ }
+ }
+ }
+ }
+
+ protected final long getWindowEnd(long windowStart) {
+ return windowStart + size;
+ }
+ }
+
+ private static class TimeWindowM4DataProcessor extends
AbstractM4DataProcessor {
+ private final Deque<TimeWindowState> activeWindows = new ArrayDeque<>();
+ private final long origin;
+ private boolean nextWindowStartInitialized = false;
+ private long nextWindowStart;
+
+ private TimeWindowM4DataProcessor(
+ long size, long slide, long origin, M4Column[] participantColumns) {
+ super(size, slide, participantColumns);
+ this.origin = origin;
+ }
+
+ @Override
+ protected void processRecord(
+ Record input, long time, List<ColumnBuilder> properColumnBuilders) {
+ if (time < origin) {
Review Comment:
ORIGIN 应该只作为窗口对齐原点,不应隐式过滤输入数据。这里直接跳过 `time < origin` 的记录,会导致 ORIGIN
之前的数据无法落入按 origin 往前推导出的窗口;如果用户需要限定输入范围,应由外层 WHERE 明确过滤。建议删除这个 return,并补充
`ORIGIN` 大于部分输入时间时仍能输出前向窗口的测试。
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]