JackieTien97 commented on code in PR #15136:
URL: https://github.com/apache/iotdb/pull/15136#discussion_r2020469456


##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class CapacityTableFunction implements TableFunction {
+  private static final String DATA_PARAMETER_NAME = "DATA";
+  private static final String SIZE_PARAMETER_NAME = "SIZE";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        TableParameterSpecification.builder()
+            .name(DATA_PARAMETER_NAME)
+            .passThroughColumns()
+            .build(),
+        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    long size = (long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue();
+    if (size <= 0) {
+      throw new UDFException("Size must be greater than 0");
+    }
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(
+            new DescribedSchema.Builder().addField("window_index", 
Type.INT64).build())
+        .requireRecordSnapshot(false)
+        .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(0))
+        .build();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    long sz = (long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue();
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionDataProcessor getDataProcessor() {
+        return new CountDataProcessor(sz);
+      }
+    };
+  }
+
+  private static class CountDataProcessor implements 
TableFunctionDataProcessor {
+
+    private final long size;
+    private final List<Long> currentRowIndexes = new ArrayList<>();

Review Comment:
   there is no need to save this list? we can simplely record a curStartIndex 
and then [startStartIndex, curIndex] is for passThroughIndexBuilder



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/WindowTVFUtils.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.Optional;
+import java.util.Set;
+
+public class WindowTVFUtils {
+  /**
+   * Find the index of the column in the table argument.
+   *
+   * @param tableArgument the table argument
+   * @param expectedFieldName the expected field name
+   * @param expectedTypes the expected types
+   * @return the index of the time column, -1 if not found
+   */
+  public static int findColumnIndex(
+      TableArgument tableArgument, String expectedFieldName, Set<Type> 
expectedTypes)
+      throws UDFException {
+    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
+      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
+      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
+        if (!expectedTypes.contains(tableArgument.getFieldTypes().get(i))) {
+          throw new UDFException(
+              String.format("The type of the column [%s] is not as expected.", 
expectedFieldName));
+        }
+        return i;
+      }
+    }
+    throw new UDFException(
+        String.format(
+            "Required column [%s] not found in the source table argument.", 
expectedFieldName));

Review Comment:
   ```suggestion
       throw new IoTDBRuntimeException(
           String.format(
               "Required column [%s] not found in the source table argument.", 
expectedFieldName), TSStatusCode.SEMANTIC_ERROR.getStatusCode());
   ```



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+
+public class SessionTableFunction implements TableFunction {
+  private static final String DATA_PARAMETER_NAME = "DATA";
+  private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+  private static final String GAP_PARAMETER_NAME = "GAP";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        TableParameterSpecification.builder()
+            .name(DATA_PARAMETER_NAME)
+            .passThroughColumns()
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(TIMECOL_PARAMETER_NAME)
+            .type(Type.STRING)
+            .build(),
+        
ScalarParameterSpecification.builder().name(GAP_PARAMETER_NAME).type(Type.INT64).build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
+    String expectedFieldName =
+        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+    int requiredIndex =
+        findColumnIndex(tableArgument, expectedFieldName, 
Collections.singleton(Type.TIMESTAMP));
+    DescribedSchema properColumnSchema =
+        new DescribedSchema.Builder()
+            .addField("window_start", Type.TIMESTAMP)
+            .addField("window_end", Type.TIMESTAMP)
+            .build();
+
+    // outputColumnSchema
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(properColumnSchema)
+        .requireRecordSnapshot(false)
+        .requiredColumns(DATA_PARAMETER_NAME, 
Collections.singletonList(requiredIndex))
+        .build();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    long gap = (long) ((ScalarArgument) 
arguments.get(GAP_PARAMETER_NAME)).getValue();
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionDataProcessor getDataProcessor() {
+        return new SessionDataProcessor(gap);
+      }
+    };
+  }
+
+  private static class SessionDataProcessor implements 
TableFunctionDataProcessor {
+
+    private final long gap;
+    private final List<Long> currentRowIndexes = new ArrayList<>();

Review Comment:
   no need to use list to save all



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/WindowTVFUtils.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.Optional;
+import java.util.Set;
+
+public class WindowTVFUtils {
+  /**
+   * Find the index of the column in the table argument.
+   *
+   * @param tableArgument the table argument
+   * @param expectedFieldName the expected field name
+   * @param expectedTypes the expected types
+   * @return the index of the time column, -1 if not found
+   */
+  public static int findColumnIndex(
+      TableArgument tableArgument, String expectedFieldName, Set<Type> 
expectedTypes)
+      throws UDFException {
+    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
+      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
+      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
+        if (!expectedTypes.contains(tableArgument.getFieldTypes().get(i))) {
+          throw new UDFException(
+              String.format("The type of the column [%s] is not as expected.", 
expectedFieldName));

Review Comment:
   ```suggestion
   throw new IoTDBRuntimeException(String.format("The type of the column [%s] 
is not as expected.", expectedFieldName), 
TSStatusCode.SEMANTIC_ERROR.getStatusCode());
   ```



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+
+public class SessionTableFunction implements TableFunction {
+  private static final String DATA_PARAMETER_NAME = "DATA";
+  private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+  private static final String GAP_PARAMETER_NAME = "GAP";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        TableParameterSpecification.builder()
+            .name(DATA_PARAMETER_NAME)
+            .passThroughColumns()
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(TIMECOL_PARAMETER_NAME)
+            .type(Type.STRING)
+            .build(),
+        
ScalarParameterSpecification.builder().name(GAP_PARAMETER_NAME).type(Type.INT64).build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
+    String expectedFieldName =
+        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+    int requiredIndex =
+        findColumnIndex(tableArgument, expectedFieldName, 
Collections.singleton(Type.TIMESTAMP));
+    DescribedSchema properColumnSchema =
+        new DescribedSchema.Builder()
+            .addField("window_start", Type.TIMESTAMP)
+            .addField("window_end", Type.TIMESTAMP)
+            .build();
+
+    // outputColumnSchema
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(properColumnSchema)
+        .requireRecordSnapshot(false)
+        .requiredColumns(DATA_PARAMETER_NAME, 
Collections.singletonList(requiredIndex))
+        .build();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    long gap = (long) ((ScalarArgument) 
arguments.get(GAP_PARAMETER_NAME)).getValue();
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionDataProcessor getDataProcessor() {
+        return new SessionDataProcessor(gap);
+      }
+    };
+  }
+
+  private static class SessionDataProcessor implements 
TableFunctionDataProcessor {
+
+    private final long gap;
+    private final List<Long> currentRowIndexes = new ArrayList<>();
+    private long curIndex = 0;
+    private long windowStart = -1;
+    private long windowEnd = -1;

Review Comment:
   what if time is negative? better using Long.MIN_VALUE



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+
+public class VariationTableFunction implements TableFunction {
+  private static final String DATA_PARAMETER_NAME = "DATA";
+  private static final String COL_PARAMETER_NAME = "COL";
+  private static final String DELTA_PARAMETER_NAME = "DELTA";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        TableParameterSpecification.builder()
+            .name(DATA_PARAMETER_NAME)
+            .passThroughColumns()
+            .build(),
+        
ScalarParameterSpecification.builder().name(COL_PARAMETER_NAME).type(Type.STRING).build(),
+        ScalarParameterSpecification.builder()
+            .name(DELTA_PARAMETER_NAME)
+            .type(Type.DOUBLE)
+            .build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
+    String expectedFieldName =
+        (String) ((ScalarArgument) 
arguments.get(COL_PARAMETER_NAME)).getValue();
+    int requiredIndex =
+        findColumnIndex(
+            tableArgument,
+            expectedFieldName,
+            ImmutableSet.of(Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE));
+    DescribedSchema properColumnSchema =
+        new DescribedSchema.Builder().addField("window_index", 
Type.INT64).build();
+    // outputColumnSchema
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(properColumnSchema)
+        .requireRecordSnapshot(false)
+        .requiredColumns(DATA_PARAMETER_NAME, 
Collections.singletonList(requiredIndex))
+        .build();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    double delta = (double) ((ScalarArgument) 
arguments.get(DELTA_PARAMETER_NAME)).getValue();
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionDataProcessor getDataProcessor() {
+        return new VariationDataProcessor(delta);
+      }
+    };
+  }
+
+  private static class VariationDataProcessor implements 
TableFunctionDataProcessor {
+
+    private final double gap;
+    private final List<Long> currentRowIndexes = new ArrayList<>();

Review Comment:
   no need to be a list



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+
+public class SessionTableFunction implements TableFunction {
+  private static final String DATA_PARAMETER_NAME = "DATA";
+  private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+  private static final String GAP_PARAMETER_NAME = "GAP";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        TableParameterSpecification.builder()
+            .name(DATA_PARAMETER_NAME)
+            .passThroughColumns()
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(TIMECOL_PARAMETER_NAME)
+            .type(Type.STRING)
+            .build(),
+        
ScalarParameterSpecification.builder().name(GAP_PARAMETER_NAME).type(Type.INT64).build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
+    String expectedFieldName =
+        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+    int requiredIndex =
+        findColumnIndex(tableArgument, expectedFieldName, 
Collections.singleton(Type.TIMESTAMP));
+    DescribedSchema properColumnSchema =
+        new DescribedSchema.Builder()
+            .addField("window_start", Type.TIMESTAMP)
+            .addField("window_end", Type.TIMESTAMP)
+            .build();
+
+    // outputColumnSchema
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(properColumnSchema)
+        .requireRecordSnapshot(false)
+        .requiredColumns(DATA_PARAMETER_NAME, 
Collections.singletonList(requiredIndex))
+        .build();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    long gap = (long) ((ScalarArgument) 
arguments.get(GAP_PARAMETER_NAME)).getValue();
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionDataProcessor getDataProcessor() {
+        return new SessionDataProcessor(gap);
+      }
+    };
+  }
+
+  private static class SessionDataProcessor implements 
TableFunctionDataProcessor {
+
+    private final long gap;
+    private final List<Long> currentRowIndexes = new ArrayList<>();
+    private long curIndex = 0;
+    private long windowStart = -1;
+    private long windowEnd = -1;
+
+    public SessionDataProcessor(long gap) {
+      this.gap = gap;
+    }
+
+    @Override
+    public void process(
+        Record input,
+        List<ColumnBuilder> properColumnBuilders,
+        ColumnBuilder passThroughIndexBuilder) {
+      long timeValue = input.getLong(0);
+      if (timeValue > windowEnd) {
+        outputWindow(properColumnBuilders, passThroughIndexBuilder);
+      }
+      if (currentRowIndexes.isEmpty()) {
+        windowStart = timeValue;
+      }
+      currentRowIndexes.add(curIndex);
+      windowEnd = timeValue + gap;
+      curIndex++;
+    }
+
+    @Override
+    public void finish(List<ColumnBuilder> columnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      if (!currentRowIndexes.isEmpty()) {
+        outputWindow(columnBuilders, passThroughIndexBuilder);
+      }
+    }
+
+    private void outputWindow(
+        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      for (Long currentRowIndex : currentRowIndexes) {
+        properColumnBuilders.get(0).writeLong(windowStart);
+        properColumnBuilders.get(1).writeLong(windowEnd - gap);
+        passThroughIndexBuilder.writeLong(currentRowIndex);
+      }

Review Comment:
   ```suggestion
         long currentWindowEnd = windowEnd - gap;
         for (Long currentRowIndex : currentRowIndexes) {
           properColumnBuilders.get(0).writeLong(windowStart);
           properColumnBuilders.get(1).writeLong(currentWindowEnd);
           passThroughIndexBuilder.writeLong(currentRowIndex);
         }
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java:
##########
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanOptimized.ENABLE_PARALLEL;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanOptimized.KEEP_GROUP;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanOptimized.PENDING;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanOptimized.TO_SORT;
+
+/**
+ * This rule is used to determine whether the GroupNode can be executed 
parallel.
+ *
+ * <p>Optimization phase: Logical plan planning.
+ *
+ * <p>
+ *
+ * <ul>
+ *   The GroupNode can be paralleled if the lasted offspring that guarantees 
the data is grouped by
+ *   PartitionKey or is sorted by PartitionKey. For example:
+ *   <ul>
+ *     <li>GroupNode[PK={device_id}, OK={time}] -> ... -> TableDeviceScanNode
+ *     <li>GroupNode[PK={device_id}, OK={s1}] -> ... -> TableDeviceScanNode
+ *     <li>GroupNode[PK={device_id,attr}, OK={time}] -> ... -> 
TableDeviceScanNode
+ *     <li>GroupNode[PK={tag1,tag2}, OK={tag3}] -> 
SortNode[sort={tag1,tag2,tag3}]
+ *     <li>GroupNode[PK={tag1,tag2}, OK={s1}] -> 
TopKNode[sort={tag1,tag2,tag3}]
+ *     <li>GroupNode[PK={tag1,tag2}, OK={}] -> 
AggregationNode[group={tag1,tag2}]
+ *     <li>GroupNode[PK={tag1,tag2}, OK={s1}] -> 
AggregationNode[group={tag1,tag2,s1}]
+ *     <li>GroupNode[PK={tag1}, OK={}] -> 
TableFunctionNode[partition={tag1,tag2}]
+ *   </ul>
+ * </ul>
+ *
+ * <p>Otherwise, the GroupNode should be transformed into a SortNode.
+ */
+public class ParallelizeGrouping implements PlanOptimizer {
+  @Override
+  public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
+    if (!(context.getAnalysis().isQuery())) {

Review Comment:
   only query plan with `GroupNode` should continue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to