Beyyes commented on code in PR #14213: URL: https://github.com/apache/iotdb/pull/14213#discussion_r1909143706
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/exception/FrameTypeException.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.exception; + +public class FrameTypeException extends WindowFunctionException { Review Comment: Put exception in package `org.apache.iotdb.db.exception`, you can reference the `SemanticException`  ########## iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LeadFunctionTest.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.TableWindowOperatorTestUtils; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.FunctionTestUtils; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.PartitionExecutor; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + +public class LeadFunctionTest { + private final List<TSDataType> inputDataTypes = Collections.singletonList(TSDataType.INT32); + // Inputs element less than 0 means this pos is null + private final int[] inputs = {0, -1, -1, 1, 2, -1, 3, 4, -1, 5, 6, -1, -1, -1, -1, -1}; + + private final List<TSDataType> outputDataTypes = + Arrays.asList(TSDataType.INT32, TSDataType.INT32); + + @Test + public void testLeadFunctionIgnoreNullWithoutDefault() { + int[] expected = {2, 2, 2, 3, 4, 4, 5, 6, 6, -1, -1, -1, -1, -1, -1, -1}; + + TsBlock tsBlock = TableWindowOperatorTestUtils.createIntsTsBlockWithoutNulls(inputs); + LeadFunction function = new LeadFunction(0, 2, null, true); + PartitionExecutor partitionExecutor = + FunctionTestUtils.createPartitionExecutor(tsBlock, inputDataTypes, function); + + TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(expected.length, outputDataTypes); + while (partitionExecutor.hasNext()) { + partitionExecutor.processNextRow(tsBlockBuilder); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + Column column = result.getColumn(1); + + Assert.assertEquals(column.getPositionCount(), expected.length); + for (int i = 0; i < expected.length; i++) { + if (expected[i] < 0) { + Assert.assertTrue(column.isNull(i)); + } else { + Assert.assertEquals(expected[i], column.getInt(i)); + } + } + } + + @Test + public void testLagFunctionIgnoreNullWithDefault() { + int[] expected = {2, 2, 2, 3, 4, 4, 5, 6, 6, 10, 10, 10, 10, 10, 10, 10}; + + TsBlock tsBlock = TableWindowOperatorTestUtils.createIntsTsBlockWithoutNulls(inputs); + LeadFunction function = new LeadFunction(0, 2, 10, true); + PartitionExecutor partitionExecutor = + FunctionTestUtils.createPartitionExecutor(tsBlock, inputDataTypes, function); + + TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(expected.length, outputDataTypes); + while (partitionExecutor.hasNext()) { + partitionExecutor.processNextRow(tsBlockBuilder); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + Column column = result.getColumn(1); + + Assert.assertEquals(column.getPositionCount(), expected.length); + for (int i = 0; i < expected.length; i++) { + Assert.assertEquals(expected[i], column.getInt(i)); + } + } + + @Test + public void testLagFunctionNotIgnoreNullWithoutDefault() { Review Comment: testLeadFunctionNotIgnoreNullWithoutDefault ########## iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LeadFunctionTest.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.TableWindowOperatorTestUtils; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.FunctionTestUtils; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.PartitionExecutor; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + +public class LeadFunctionTest { + private final List<TSDataType> inputDataTypes = Collections.singletonList(TSDataType.INT32); + // Inputs element less than 0 means this pos is null + private final int[] inputs = {0, -1, -1, 1, 2, -1, 3, 4, -1, 5, 6, -1, -1, -1, -1, -1}; + + private final List<TSDataType> outputDataTypes = + Arrays.asList(TSDataType.INT32, TSDataType.INT32); + + @Test + public void testLeadFunctionIgnoreNullWithoutDefault() { + int[] expected = {2, 2, 2, 3, 4, 4, 5, 6, 6, -1, -1, -1, -1, -1, -1, -1}; + + TsBlock tsBlock = TableWindowOperatorTestUtils.createIntsTsBlockWithoutNulls(inputs); + LeadFunction function = new LeadFunction(0, 2, null, true); + PartitionExecutor partitionExecutor = + FunctionTestUtils.createPartitionExecutor(tsBlock, inputDataTypes, function); + + TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(expected.length, outputDataTypes); + while (partitionExecutor.hasNext()) { + partitionExecutor.processNextRow(tsBlockBuilder); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + Column column = result.getColumn(1); + + Assert.assertEquals(column.getPositionCount(), expected.length); + for (int i = 0; i < expected.length; i++) { + if (expected[i] < 0) { + Assert.assertTrue(column.isNull(i)); + } else { + Assert.assertEquals(expected[i], column.getInt(i)); + } + } + } + + @Test + public void testLagFunctionIgnoreNullWithDefault() { Review Comment: testLeadFunctionIgnoreNullWithDefault ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableAccumulator.java: ########## @@ -30,6 +30,10 @@ public interface TableAccumulator { void addInput(Column[] arguments); + default void removeInput(Column[] arguments) { Review Comment: If `removeIntermediate` exist? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/SumAccumulator.java: ########## @@ -77,6 +77,34 @@ public void addInput(Column[] arguments) { } } + @Override + public void removeInput(Column[] arguments) { + checkArgument(arguments.length == 1, "argument of Sum should be one column"); + switch (argumentDataType) { + case INT32: + removeIntInput(arguments[0]); + return; + case INT64: + removeLongInput(arguments[0]); + return; + case FLOAT: + removeFloatInput(arguments[0]); + return; + case DOUBLE: + removeDoubleInput(arguments[0]); + return; + case TEXT: + case BLOB: + case STRING: + case BOOLEAN: + case DATE: Review Comment: DATE (which is same INT32), TIMESTAMP (which is same as INT64) also can be removed? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AvgAccumulator.java: ########## @@ -204,4 +237,44 @@ private void addDoubleInput(Column column) { } } } + + private void removeIntInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + countValue--; + sumValue -= column.getInt(i); + } + } + } + + private void removeLongInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + countValue--; + sumValue -= column.getLong(i); + } + } + } + + private void removeFloatInput(Column column) { + int count = column.getPositionCount(); + for (int i = 0; i < count; i++) { + if (!column.isNull(i)) { + countValue--; + sumValue += column.getFloat(i); Review Comment: ```suggestion sumValue -= column.getFloat(i); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/FirstValueFunction.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition; + +import org.apache.tsfile.block.column.ColumnBuilder; + +public class FirstValueFunction implements WindowFunction { + private final int channel; + private final boolean ignoreNull; + + public FirstValueFunction(int channel, boolean ignoreNull) { + this.channel = channel; + this.ignoreNull = ignoreNull; + } + + @Override + public void reset() {} Review Comment: Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation. Same of other function. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LagFunction.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition; + +import org.apache.tsfile.block.column.ColumnBuilder; + +public class LagFunction implements WindowFunction { + private final int channel; + private final Integer offset; + private final Object defaultVal; + private final boolean ignoreNull; + + public LagFunction(int channel, Integer offset, Object defaultVal, boolean ignoreNull) { + this.channel = channel; + this.offset = offset == null ? 1 : offset; + this.defaultVal = defaultVal; + this.ignoreNull = ignoreNull; + } + + @Override + public void reset() {} + + @Override + public void transform( + Partition partition, + ColumnBuilder builder, + int index, + int frameStart, + int frameEnd, + int peerGroupStart, + int peerGroupEnd) { + int pos; + if (ignoreNull) { + int nonNullCount = 0; + pos = index - 1; + while (pos >= 0) { + if (!partition.isNull(channel, pos)) { + nonNullCount++; + if (nonNullCount == offset) { + break; + } + } + + pos--; + } + } else { + pos = index - offset; + } + + if (pos >= 0) { + if (!partition.isNull(channel, pos)) { + partition.writeTo(builder, channel, pos); + } else { + builder.appendNull(); + } + } else if (defaultVal != null) { + // TODO: Replace write object Review Comment: `TODO` comment is not allowed ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/rank/RowNumberFunction.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition; + +import org.apache.tsfile.block.column.ColumnBuilder; + +public class RowNumberFunction implements WindowFunction { + public RowNumberFunction() {} Review Comment: Remove this empty constructor ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/utils/ColumnList.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.utils; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnEncoding; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; + +import java.util.ArrayList; +import java.util.List; + +public class ColumnList { + private final List<Column> columns; + private final List<Integer> positionCounts; + + public ColumnList(List<Column> columns) { + this.columns = columns; + + positionCounts = new ArrayList<>(); + for (Column column : columns) { + positionCounts.add(column.getPositionCount()); + } + } + + public TSDataType getDataType() { + return columns.get(0).getDataType(); + } + + public ColumnEncoding getEncoding() { + return columns.get(0).getEncoding(); + } + + public static class ColumnListIndex { + private final int columnIndex; + private final int offsetInColumn; + + ColumnListIndex(int columnIndex, int offsetInColumn) { + this.columnIndex = columnIndex; + this.offsetInColumn = offsetInColumn; + } + + public int getColumnIndex() { + return columnIndex; + } + + public int getOffsetInColumn() { + return offsetInColumn; + } + } + + public ColumnListIndex getColumnIndex(int rowIndex) { + int columnIndex = 0; + while (columnIndex < columns.size() && rowIndex >= positionCounts.get(columnIndex)) { + rowIndex -= positionCounts.get(columnIndex); + // Enter next Column + columnIndex++; + } + + if (columnIndex != columns.size()) { + return new ColumnListIndex(columnIndex, rowIndex); + } else { + // Unlikely + throw new IndexOutOfBoundsException("Index out of Partition's bounds!"); + } + } + + public boolean getBoolean(int position) { + ColumnListIndex columnListIndex = getColumnIndex(position); Review Comment: ```suggestion ColumnListIndex columnListIndex = getColumnIndex(position); int columnIndex = columnListIndex.getColumnIndex(); ``` These two lines are duplicated in getXXX method ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LagFunction.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition; + +import org.apache.tsfile.block.column.ColumnBuilder; + +public class LagFunction implements WindowFunction { + private final int channel; + private final Integer offset; + private final Object defaultVal; + private final boolean ignoreNull; + + public LagFunction(int channel, Integer offset, Object defaultVal, boolean ignoreNull) { + this.channel = channel; + this.offset = offset == null ? 1 : offset; + this.defaultVal = defaultVal; + this.ignoreNull = ignoreNull; + } + + @Override + public void reset() {} + + @Override + public void transform( + Partition partition, + ColumnBuilder builder, + int index, + int frameStart, + int frameEnd, + int peerGroupStart, + int peerGroupEnd) { + int pos; + if (ignoreNull) { + int nonNullCount = 0; + pos = index - 1; + while (pos >= 0) { + if (!partition.isNull(channel, pos)) { + nonNullCount++; + if (nonNullCount == offset) { + break; + } + } + + pos--; + } + } else { + pos = index - offset; + } + + if (pos >= 0) { + if (!partition.isNull(channel, pos)) { + partition.writeTo(builder, channel, pos); + } else { + builder.appendNull(); + } + } else if (defaultVal != null) { + // TODO: Replace write object Review Comment: Impl this TODO, default value can be any data types? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/utils/ColumnList.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.utils; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnEncoding; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; + +import java.util.ArrayList; +import java.util.List; + +public class ColumnList { + private final List<Column> columns; + private final List<Integer> positionCounts; + + public ColumnList(List<Column> columns) { + this.columns = columns; + + positionCounts = new ArrayList<>(); Review Comment: ```suggestion positionCounts = new ArrayList<>(columns.size()); ``` ########## iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/value/LeadFunctionTest.java: ########## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.value; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.TableWindowOperatorTestUtils; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.FunctionTestUtils; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.PartitionExecutor; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + +public class LeadFunctionTest { + private final List<TSDataType> inputDataTypes = Collections.singletonList(TSDataType.INT32); + // Inputs element less than 0 means this pos is null + private final int[] inputs = {0, -1, -1, 1, 2, -1, 3, 4, -1, 5, 6, -1, -1, -1, -1, -1}; + + private final List<TSDataType> outputDataTypes = + Arrays.asList(TSDataType.INT32, TSDataType.INT32); + + @Test + public void testLeadFunctionIgnoreNullWithoutDefault() { + int[] expected = {2, 2, 2, 3, 4, 4, 5, 6, 6, -1, -1, -1, -1, -1, -1, -1}; + + TsBlock tsBlock = TableWindowOperatorTestUtils.createIntsTsBlockWithoutNulls(inputs); + LeadFunction function = new LeadFunction(0, 2, null, true); + PartitionExecutor partitionExecutor = + FunctionTestUtils.createPartitionExecutor(tsBlock, inputDataTypes, function); + + TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(expected.length, outputDataTypes); + while (partitionExecutor.hasNext()) { + partitionExecutor.processNextRow(tsBlockBuilder); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + Column column = result.getColumn(1); + + Assert.assertEquals(column.getPositionCount(), expected.length); + for (int i = 0; i < expected.length; i++) { + if (expected[i] < 0) { + Assert.assertTrue(column.isNull(i)); + } else { + Assert.assertEquals(expected[i], column.getInt(i)); + } + } + } + + @Test + public void testLagFunctionIgnoreNullWithDefault() { + int[] expected = {2, 2, 2, 3, 4, 4, 5, 6, 6, 10, 10, 10, 10, 10, 10, 10}; + + TsBlock tsBlock = TableWindowOperatorTestUtils.createIntsTsBlockWithoutNulls(inputs); + LeadFunction function = new LeadFunction(0, 2, 10, true); + PartitionExecutor partitionExecutor = + FunctionTestUtils.createPartitionExecutor(tsBlock, inputDataTypes, function); + + TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(expected.length, outputDataTypes); + while (partitionExecutor.hasNext()) { + partitionExecutor.processNextRow(tsBlockBuilder); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + Column column = result.getColumn(1); + + Assert.assertEquals(column.getPositionCount(), expected.length); + for (int i = 0; i < expected.length; i++) { + Assert.assertEquals(expected[i], column.getInt(i)); + } + } + + @Test + public void testLagFunctionNotIgnoreNullWithoutDefault() { + int[] expected = {-1, 1, 2, -1, 3, 4, -1, 5, 6, -1, -1, -1, -1, -1, -1, -1}; + + TsBlock tsBlock = TableWindowOperatorTestUtils.createIntsTsBlockWithoutNulls(inputs); + LeadFunction function = new LeadFunction(0, 2, null, false); + PartitionExecutor partitionExecutor = + FunctionTestUtils.createPartitionExecutor(tsBlock, inputDataTypes, function); + + TsBlockBuilder tsBlockBuilder = new TsBlockBuilder(expected.length, outputDataTypes); + while (partitionExecutor.hasNext()) { + partitionExecutor.processNextRow(tsBlockBuilder); + } + + TsBlock result = + tsBlockBuilder.build( + new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount())); + Column column = result.getColumn(1); + + Assert.assertEquals(column.getPositionCount(), expected.length); + for (int i = 0; i < expected.length; i++) { + if (expected[i] < 0) { + Assert.assertTrue(column.isNull(i)); + } else { + Assert.assertEquals(expected[i], column.getInt(i)); + } + } + } + + @Test + public void testLagFunctionNotIgnoreNullWithDefault() { Review Comment: testLeadFunctionNotIgnoreNullWithDefault ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/WindowFunction.java: ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition; + +import org.apache.tsfile.block.column.ColumnBuilder; + +public interface WindowFunction { Review Comment: I think add abstract subclass `RankingWindowFunction` and `ValueWindowFunction` like trino is better. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/aggregate/WindowAggregator.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.aggregate; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAccumulator; + +import com.google.common.primitives.Ints; +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; + +import java.util.List; + +import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + +public class WindowAggregator { + private final TableAccumulator accumulator; + private final TSDataType outputType; + private final int[] inputChannels; + + public WindowAggregator( + TableAccumulator accumulator, TSDataType outputType, List<Integer> inputChannels) { + this.accumulator = requireNonNull(accumulator, "accumulator is null"); + this.outputType = requireNonNull(outputType, "intermediateType is null"); + this.inputChannels = Ints.toArray(requireNonNull(inputChannels, "inputChannels is null")); + } + + public TSDataType getType() { + return outputType; + } + + public void addInput(Partition partition) { + List<Column[]> allColumns = partition.getAllColumns(); + for (Column[] columns : allColumns) { + addInput(columns); + } + } + + public void addInput(Column[] columns) { + Column[] arguments = new Column[inputChannels.length]; + for (int i = 0; i < inputChannels.length; i++) { + arguments[i] = columns[inputChannels[i]]; + } + + // Process count(*) + int count = columns[0].getPositionCount(); + if (arguments.length == 0) { + arguments = new Column[] {new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, count)}; + } + + accumulator.addInput(arguments); + } + + public void removeInput(Partition partition) { + List<Column[]> allColumns = partition.getAllColumns(); + for (Column[] columns : allColumns) { + removeInput(columns); + } + } + + private void removeInput(Column[] columns) { + Column[] arguments = new Column[inputChannels.length]; + for (int i = 0; i < inputChannels.length; i++) { + arguments[i] = columns[inputChannels[i]]; + } + + // Process count(*) + int count = columns[0].getPositionCount(); + if (arguments.length == 0) { + arguments = new Column[] {new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, count)}; + } + + accumulator.removeInput(arguments); + } + + public void evaluate(ColumnBuilder columnBuilder) { + accumulator.evaluateFinal(columnBuilder); + } + + public void processStatistics(Statistics[] statistics) { Review Comment: Can WindowAggregator use statistics? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/exception/WindowFunctionException.java: ########## @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.exception; + +public class WindowFunctionException extends RuntimeException { Review Comment: Put into exception package ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/frame/RowsFrame.java: ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.frame; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.exception.FrameTypeException; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.Range; + +public class RowsFrame implements Frame { + private final FrameInfo frameInfo; + private final int partitionStart; + private final int partitionSize; + + public RowsFrame(FrameInfo frameInfo, int partitionStart, int partitionEnd) { + assert frameInfo.getFrameType() == FrameInfo.FrameType.ROWS; Review Comment: Use checkArgument ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/function/rank/RowNumberFunction.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.function.rank; + +import org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction; +import org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition; + +import org.apache.tsfile.block.column.ColumnBuilder; + +public class RowNumberFunction implements WindowFunction { + public RowNumberFunction() {} + + @Override + public void reset() {} + + @Override + public void transform( + Partition partition, + ColumnBuilder builder, + int index, + int frameStart, + int frameEnd, + int peerGroupStart, + int peerGroupEnd) { + builder.writeLong(index + 1); Review Comment: ```suggestion builder.writeLong((long)index + 1); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/utils/RowComparator.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window.utils; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.write.UnSupportedDataTypeException; + +import java.util.List; + +public class RowComparator { + private final List<TSDataType> dataTypes; + + public RowComparator(List<TSDataType> dataTypes) { + this.dataTypes = dataTypes; + } + + public boolean equalColumns(List<Column> columns, int offset1, int offset2) { + for (int i = 0; i < dataTypes.size(); i++) { + Column column = columns.get(i); + TSDataType dataType = dataTypes.get(i); + if (!equal(column, dataType, offset1, offset2)) { + return false; + } + } + return true; + } + + public boolean equal(Column column, int offset1, int offset2) { + assert dataTypes.size() == 1; + return equal(column, dataTypes.get(0), offset1, offset2); + } + + private boolean equal(Column column, TSDataType dataType, int offset1, int offset2) { + switch (dataType) { + case BOOLEAN: + boolean bool1 = column.getBoolean(offset1); + boolean bool2 = column.getBoolean(offset2); + if (bool1 != bool2) { + return false; + } + break; + case INT32: + int int1 = column.getInt(offset1); + int int2 = column.getInt(offset2); + if (int1 != int2) { + return false; + } + break; + case INT64: + long long1 = column.getLong(offset1); + long long2 = column.getLong(offset2); + if (long1 != long2) { + return false; + } + break; + case FLOAT: + float float1 = column.getFloat(offset1); + float float2 = column.getFloat(offset2); + if (float1 != float2) { + return false; + } + break; + case DOUBLE: + double double1 = column.getDouble(offset1); + double double2 = column.getDouble(offset2); + if (double1 != double2) { + return false; + } + break; + case TEXT: + Binary bin1 = column.getBinary(offset1); + Binary bin2 = column.getBinary(offset2); + if (!bin1.equals(bin2)) { + return false; + } + break; + default: + // Would throw at the first run + throw new UnSupportedDataTypeException(dataType.toString()); + } + return true; + } + + public boolean equalColumnLists(List<ColumnList> columns, int offset1, int offset2) { + for (int i = 0; i < dataTypes.size(); i++) { + ColumnList column = columns.get(i); + TSDataType dataType = dataTypes.get(i); + if (!equal(column, dataType, offset1, offset2)) { + return false; + } + } + return true; + } + + public boolean equal(ColumnList column, int offset1, int offset2) { + assert dataTypes.size() == 1; + return equal(column, dataTypes.get(0), offset1, offset2); + } + + private boolean equal(ColumnList column, TSDataType dataType, int offset1, int offset2) { + switch (dataType) { Review Comment:  These methods are a bit duplicated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
