JackieTien97 commented on code in PR #17656:
URL: https://github.com/apache/iotdb/pull/17656#discussion_r3379213505


##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/M4TableFunction.java:
##########
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.commons.exception.SemanticException;
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.utils.Binary;
+
+import java.time.LocalDate;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
+import static 
org.apache.iotdb.udf.api.relational.table.argument.ScalarArgumentChecker.POSITIVE_LONG_CHECKER;
+
+public class M4TableFunction implements TableFunction {
+
+  public static final String DATA_PARAMETER_NAME = "DATA";
+  public static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
+  public static final String SIZE_PARAMETER_NAME = "SIZE";
+  public static final String SLIDE_PARAMETER_NAME = "SLIDE";
+  public static final String ORIGIN_PARAMETER_NAME = "ORIGIN";
+  public static final String WINDOW_MODE_PARAMETER_NAME = "__M4_WINDOW_MODE";
+
+  private static final String OUTPUT_WINDOW_START_COLUMN = "window_start";
+  private static final String OUTPUT_WINDOW_END_COLUMN = "window_end";
+  private static final String PARTICIPANT_TYPES_PROPERTY = 
"__M4_PARTICIPANT_TYPES";
+  private static final long UNSPECIFIED_SLIDE = Long.MIN_VALUE;
+  private static final long INVALID_INDEX = -1;
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecifications() {
+    return Arrays.asList(
+        
TableParameterSpecification.builder().name(DATA_PARAMETER_NAME).setSemantics().build(),
+        ScalarParameterSpecification.builder()
+            .name(TIMECOL_PARAMETER_NAME)
+            .type(Type.STRING)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(SIZE_PARAMETER_NAME)
+            .type(Type.INT64)
+            .addChecker(POSITIVE_LONG_CHECKER)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(SLIDE_PARAMETER_NAME)
+            .type(Type.INT64)
+            .defaultValue(UNSPECIFIED_SLIDE)
+            .build(),
+        ScalarParameterSpecification.builder()
+            .name(ORIGIN_PARAMETER_NAME)
+            .type(Type.TIMESTAMP)
+            .defaultValue(0L)
+            .build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
+    if (tableArgument.getOrderBy().isEmpty()) {
+      throw new SemanticException("Table argument with set semantics requires 
an ORDER BY clause.");
+    }
+
+    String timeColumn =
+        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
+    int timeColumnIndex =
+        findColumnIndex(tableArgument, timeColumn, 
Collections.singleton(Type.TIMESTAMP));
+    validateOrderBy(tableArgument, timeColumn);
+
+    List<Integer> partitionIndexes = getPartitionIndexes(tableArgument);
+    Set<Integer> excludedIndexes = new HashSet<>(partitionIndexes);
+    excludedIndexes.add(timeColumnIndex);
+
+    List<Integer> participantIndexes = new ArrayList<>();
+    List<Type> participantTypes = new ArrayList<>();
+    DescribedSchema.Builder schemaBuilder =
+        new DescribedSchema.Builder()
+            .addField(OUTPUT_WINDOW_START_COLUMN, Type.TIMESTAMP)
+            .addField(OUTPUT_WINDOW_END_COLUMN, Type.TIMESTAMP);
+
+    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
+      if (excludedIndexes.contains(i)) {
+        continue;
+      }
+      Type type = tableArgument.getFieldTypes().get(i);
+      String columnName = tableArgument.getFieldNames().get(i).get();
+      if (!isComparableType(type)) {
+        throw new SemanticException(
+            String.format("The type of the column [%s] is not comparable.", 
columnName));
+      }
+      participantIndexes.add(i);
+      participantTypes.add(type);
+      schemaBuilder.addField(columnName + "_time", Type.TIMESTAMP);
+      schemaBuilder.addField(columnName, type);
+    }
+
+    if (participantIndexes.isEmpty()) {
+      throw new SemanticException("No comparable columns found for M4 
calculation.");
+    }
+
+    long size = (long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue();
+    long slide = (long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue();
+    if (slide == UNSPECIFIED_SLIDE) {
+      slide = size;
+    } else if (slide <= 0) {
+      throw new UDFException("Invalid scalar argument SLIDE, should be a 
positive value");
+    }
+
+    boolean isTimeWindow =
+        arguments.containsKey(WINDOW_MODE_PARAMETER_NAME)
+            && (boolean) ((ScalarArgument) 
arguments.get(WINDOW_MODE_PARAMETER_NAME)).getValue();
+
+    MapTableFunctionHandle handle =
+        new MapTableFunctionHandle.Builder()
+            .addProperty(WINDOW_MODE_PARAMETER_NAME, isTimeWindow)
+            .addProperty(SIZE_PARAMETER_NAME, size)
+            .addProperty(SLIDE_PARAMETER_NAME, slide)
+            .addProperty(
+                ORIGIN_PARAMETER_NAME,
+                ((ScalarArgument) 
arguments.get(ORIGIN_PARAMETER_NAME)).getValue())
+            .addProperty(PARTICIPANT_TYPES_PROPERTY, 
joinTypes(participantTypes))
+            .build();
+
+    List<Integer> requiredColumns = new ArrayList<>();
+    requiredColumns.add(timeColumnIndex);
+    requiredColumns.addAll(participantIndexes);
+
+    return TableFunctionAnalysis.builder()
+        .properColumnSchema(schemaBuilder.build())
+        .requireRecordSnapshot(false)
+        .requiredColumns(DATA_PARAMETER_NAME, requiredColumns)
+        .handle(handle)
+        .build();
+  }
+
+  @Override
+  public TableFunctionHandle createTableFunctionHandle() {
+    return new MapTableFunctionHandle();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(
+      TableFunctionHandle tableFunctionHandle) {
+    MapTableFunctionHandle handle = (MapTableFunctionHandle) 
tableFunctionHandle;
+    boolean isTimeWindow = (boolean) 
handle.getProperty(WINDOW_MODE_PARAMETER_NAME);
+    long size = (long) handle.getProperty(SIZE_PARAMETER_NAME);
+    long slide = (long) handle.getProperty(SLIDE_PARAMETER_NAME);
+    long origin = (long) handle.getProperty(ORIGIN_PARAMETER_NAME);
+    Type[] participantTypes = parseTypes((String) 
handle.getProperty(PARTICIPANT_TYPES_PROPERTY));
+
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionDataProcessor getDataProcessor() {
+        M4Column[] participantColumns = createColumns(participantTypes);
+        return isTimeWindow
+            ? new TimeWindowM4DataProcessor(size, slide, origin, 
participantColumns)
+            : new CountWindowM4DataProcessor(size, slide, participantColumns);
+      }
+    };
+  }
+
+  private static void validateOrderBy(TableArgument tableArgument, String 
timeColumn) {
+    if (tableArgument.getOrderBy().size() != 1
+        || !tableArgument.getOrderBy().get(0).equalsIgnoreCase(timeColumn)) {
+      throw new SemanticException(
+          "The ORDER BY clause of the DATA argument must contain exactly the 
time column specified by the TIMECOL argument.");
+    }
+  }
+
+  private static List<Integer> getPartitionIndexes(TableArgument 
tableArgument) {
+    List<Integer> indexes = new ArrayList<>();
+    for (String partitionColumn : tableArgument.getPartitionBy()) {
+      indexes.add(
+          findColumnIndex(tableArgument, partitionColumn, new 
LinkedHashSet<>(Type.allTypes())));
+    }
+    return indexes;
+  }
+
+  private static boolean isComparableType(Type type) {
+    return type != Type.BLOB && type != Type.OBJECT;
+  }
+
+  private static String joinTypes(List<Type> types) {
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < types.size(); i++) {
+      if (i > 0) {
+        builder.append(',');
+      }
+      builder.append(types.get(i).name());
+    }
+    return builder.toString();
+  }
+
+  private static Type[] parseTypes(String value) {
+    if (value.isEmpty()) {
+      return new Type[0];
+    }
+    String[] values = value.split(",");
+    Type[] types = new Type[values.length];
+    for (int i = 0; i < values.length; i++) {
+      types[i] = Type.valueOf(values[i]);
+    }
+    return types;
+  }
+
+  private static M4Column[] createColumns(Type[] types) {
+    M4Column[] columns = new M4Column[types.length];
+    for (int i = 0; i < types.length; i++) {
+      columns[i] = new M4Column(i + 1, ValueOperator.fromType(types[i]));
+    }
+    return columns;
+  }
+
+  private enum ValueOperator {
+    BOOLEAN(Type.BOOLEAN) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getBoolean(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Boolean.compare((Boolean) left, (Boolean) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeBoolean((Boolean) value);
+      }
+    },
+    INT32(Type.INT32) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getInt(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Integer.compare((Integer) left, (Integer) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeInt((Integer) value);
+      }
+    },
+    INT64(Type.INT64) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getLong(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Long.compare((Long) left, (Long) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeLong((Long) value);
+      }
+    },
+    FLOAT(Type.FLOAT) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getFloat(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Float.compare((Float) left, (Float) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeFloat((Float) value);
+      }
+    },
+    DOUBLE(Type.DOUBLE) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getDouble(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Double.compare((Double) left, (Double) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeDouble((Double) value);
+      }
+    },
+    TEXT(Type.TEXT) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getBinary(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return ((Binary) left).compareTo((Binary) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeBinary((Binary) value);
+      }
+    },
+    TIMESTAMP(Type.TIMESTAMP) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getLong(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return Long.compare((Long) left, (Long) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeLong((Long) value);
+      }
+    },
+    DATE(Type.DATE) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getLocalDate(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return ((LocalDate) left).compareTo((LocalDate) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeObject(value);
+      }
+    },
+    STRING(Type.STRING) {
+      @Override
+      Object read(Record record, int index) {
+        return record.getBinary(index);
+      }
+
+      @Override
+      int compare(Object left, Object right) {
+        return ((Binary) left).compareTo((Binary) right);
+      }
+
+      @Override
+      void write(ColumnBuilder builder, Object value) {
+        builder.writeBinary((Binary) value);
+      }
+    };
+
+    private final Type type;
+
+    ValueOperator(Type type) {
+      this.type = type;
+    }
+
+    abstract Object read(Record record, int index);
+
+    abstract int compare(Object left, Object right);
+
+    abstract void write(ColumnBuilder builder, Object value);
+
+    static ValueOperator fromType(Type type) {
+      for (ValueOperator valueOperator : values()) {
+        if (valueOperator.type == type) {
+          return valueOperator;
+        }
+      }
+      throw new IllegalArgumentException("Unsupported M4 value type: " + type);
+    }
+  }
+
+  private static class M4Column {
+    private final int inputIndex;
+    private final ValueOperator valueOperator;
+
+    private M4Column(int inputIndex, ValueOperator valueOperator) {
+      this.inputIndex = inputIndex;
+      this.valueOperator = valueOperator;
+    }
+
+    private boolean isNull(Record record) {
+      return record.isNull(inputIndex);
+    }
+
+    private Object read(Record record) {
+      return valueOperator.read(record, inputIndex);
+    }
+
+    private void write(ColumnBuilder builder, Object value) {
+      valueOperator.write(builder, value);
+    }
+
+    private int compare(Object left, Object right) {
+      return valueOperator.compare(left, right);
+    }
+  }
+
+  private static class Candidate {
+    private long index = INVALID_INDEX;
+    private long time;
+    private Object value;
+
+    private void set(long index, long time, Object value) {
+      this.index = index;
+      this.time = time;
+      this.value = value;
+    }
+  }
+
+  private static class ColumnWindowState {
+    private final Candidate first = new Candidate();
+    private final Candidate last = new Candidate();
+    private final Candidate bottom = new Candidate();
+    private final Candidate top = new Candidate();
+
+    private void update(long rowIndex, long time, Object value, M4Column 
column) {
+      if (first.index == INVALID_INDEX) {
+        first.set(rowIndex, time, value);
+        last.set(rowIndex, time, value);
+        bottom.set(rowIndex, time, value);
+        top.set(rowIndex, time, value);
+        return;
+      }
+
+      last.set(rowIndex, time, value);
+      if (column.compare(value, bottom.value) < 0) {
+        bottom.set(rowIndex, time, value);
+      }
+      if (column.compare(value, top.value) > 0) {
+        top.set(rowIndex, time, value);
+      }
+    }
+
+    private boolean hasOutput() {
+      return first.index != INVALID_INDEX;
+    }
+
+    private List<Candidate> getSortedCandidates() {
+      if (!hasOutput()) {
+        return Collections.emptyList();
+      }
+      List<Candidate> candidates = new ArrayList<>(Arrays.asList(first, last, 
bottom, top));
+      Set<Long> emittedTimestamps = new HashSet<>();
+      candidates.removeIf(candidate -> !emittedTimestamps.add(candidate.time));
+      candidates.sort(Comparator.comparingLong(candidate -> candidate.time));
+      return candidates;
+    }
+  }
+
+  private abstract static class WindowState {
+    protected final ColumnWindowState[] columnStates;
+
+    private WindowState(int columnCount) {
+      columnStates = new ColumnWindowState[columnCount];
+      for (int i = 0; i < columnCount; i++) {
+        columnStates[i] = new ColumnWindowState();
+      }
+    }
+
+    protected abstract long getOutputWindowStart();
+
+    protected abstract long getOutputWindowEnd();
+  }
+
+  private static class TimeWindowState extends WindowState {
+    private final long windowStart;
+    private final long endExclusive;
+
+    private TimeWindowState(long windowStart, long endExclusive, int 
columnCount) {
+      super(columnCount);
+      this.windowStart = windowStart;
+      this.endExclusive = endExclusive;
+    }
+
+    @Override
+    protected long getOutputWindowStart() {
+      return windowStart;
+    }
+
+    @Override
+    protected long getOutputWindowEnd() {
+      return endExclusive;
+    }
+  }
+
+  private static class CountWindowState extends WindowState {
+    private final long endExclusive;
+    private long windowStart = Long.MIN_VALUE;
+    private long windowEnd = Long.MIN_VALUE;
+
+    private CountWindowState(long endExclusive, int columnCount) {
+      super(columnCount);
+      this.endExclusive = endExclusive;
+    }
+
+    @Override
+    protected long getOutputWindowStart() {
+      return windowStart;
+    }
+
+    @Override
+    protected long getOutputWindowEnd() {
+      return windowEnd;
+    }
+  }
+
+  private abstract static class AbstractM4DataProcessor implements 
TableFunctionDataProcessor {
+    protected final long size;
+    protected final long slide;
+    protected final M4Column[] participantColumns;
+    protected long curIndex = 0;
+
+    protected AbstractM4DataProcessor(long size, long slide, M4Column[] 
participantColumns) {
+      this.size = size;
+      this.slide = slide;
+      this.participantColumns = participantColumns;
+    }
+
+    @Override
+    public final void process(
+        Record input,
+        List<ColumnBuilder> properColumnBuilders,
+        ColumnBuilder passThroughIndexBuilder) {
+      processRecord(input, input.getLong(0), properColumnBuilders);
+      curIndex++;
+    }
+
+    protected abstract void processRecord(
+        Record input, long time, List<ColumnBuilder> properColumnBuilders);
+
+    protected final void updateWindow(WindowState windowState, Record input, 
long time) {
+      for (int i = 0; i < participantColumns.length; i++) {
+        if (!participantColumns[i].isNull(input)) {
+          windowState.columnStates[i].update(
+              curIndex, time, participantColumns[i].read(input), 
participantColumns[i]);
+        }
+      }
+    }
+
+    protected final void outputWindow(
+        WindowState windowState, List<ColumnBuilder> properColumnBuilders) {
+      List<List<Candidate>> candidatesByColumn = new ArrayList<>();
+      int rowCount = 0;
+      for (ColumnWindowState columnState : windowState.columnStates) {
+        List<Candidate> candidates = columnState.getSortedCandidates();
+        candidatesByColumn.add(candidates);
+        rowCount = Math.max(rowCount, candidates.size());
+      }
+      if (rowCount == 0) {
+        return;
+      }
+
+      for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
+        
properColumnBuilders.get(0).writeLong(windowState.getOutputWindowStart());
+        
properColumnBuilders.get(1).writeLong(windowState.getOutputWindowEnd());
+        int outputColumnIndex = 2;
+        for (int columnIndex = 0; columnIndex < participantColumns.length; 
columnIndex++) {
+          List<Candidate> candidates = candidatesByColumn.get(columnIndex);
+          if (rowIndex < candidates.size()) {
+            Candidate candidate = candidates.get(rowIndex);
+            
properColumnBuilders.get(outputColumnIndex++).writeLong(candidate.time);
+            participantColumns[columnIndex].write(
+                properColumnBuilders.get(outputColumnIndex++), 
candidate.value);
+          } else {
+            properColumnBuilders.get(outputColumnIndex++).appendNull();
+            properColumnBuilders.get(outputColumnIndex++).appendNull();
+          }
+        }
+      }
+    }
+
+    protected final long getWindowEnd(long windowStart) {
+      return windowStart + size;
+    }
+  }
+
+  private static class TimeWindowM4DataProcessor extends 
AbstractM4DataProcessor {
+    private final Deque<TimeWindowState> activeWindows = new ArrayDeque<>();
+    private final long origin;
+    private boolean nextWindowStartInitialized = false;
+    private long nextWindowStart;
+
+    private TimeWindowM4DataProcessor(
+        long size, long slide, long origin, M4Column[] participantColumns) {
+      super(size, slide, participantColumns);
+      this.origin = origin;
+    }
+
+    @Override
+    protected void processRecord(
+        Record input, long time, List<ColumnBuilder> properColumnBuilders) {
+      if (time < origin) {
+        return;
+      }
+
+      while (!activeWindows.isEmpty() && 
activeWindows.peekFirst().endExclusive <= time) {
+        outputWindow(activeWindows.removeFirst(), properColumnBuilders);
+      }
+
+      long firstCandidateStart =
+          origin + Math.floorDiv(time - origin - size, slide) * slide + slide;
+      while (getWindowEnd(firstCandidateStart) <= time) {
+        firstCandidateStart += slide;
+      }
+      if (!nextWindowStartInitialized) {
+        nextWindowStart = firstCandidateStart;
+        nextWindowStartInitialized = true;
+      } else if (nextWindowStart < firstCandidateStart) {
+        nextWindowStart = firstCandidateStart;
+      }
+
+      while (nextWindowStart <= time && getWindowEnd(nextWindowStart) > time) {
+        activeWindows.addLast(
+            new TimeWindowState(
+                nextWindowStart, getWindowEnd(nextWindowStart), 
participantColumns.length));
+        nextWindowStart += slide;
+      }
+
+      for (TimeWindowState activeWindow : activeWindows) {

Review Comment:
   需求文档里 M4TableFunction 的目标空间复杂度是 O(1),并且按讨论语义,每来一行数据应能直接根据 SIZE、SLIDE、ORIGIN 
算出它所属分组。这里维护并遍历 `activeWindows`,在 `SLIDE < SIZE` 
的重叠窗口场景下一行会更新多个窗口,状态量和单行处理成本都会随 `SIZE / SLIDE` 
增长。建议改成每行只计算并更新一个窗口;如果要支持重叠窗口,则需要相应调整需求里的 O(1) 复杂度约束。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to