JackieTien97 commented on code in PR #17292:
URL: https://github.com/apache/iotdb/pull/17292#discussion_r3339912911


##########
iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/aggregation/CentralMomentAccumulator.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.calc.execution.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CentralMomentAccumulator implements Accumulator {
+
+  private static final int INTERMEDIATE_SIZE = Long.BYTES + 4 * Double.BYTES;
+
+  public enum MomentType {
+    SKEWNESS,
+    KURTOSIS
+  }
+
+  private final TSDataType seriesDataType;
+  private final MomentType momentType;
+
+  private long count;
+  private double mean;
+  private double m2;
+  private double m3;
+  private double m4;
+
+  public CentralMomentAccumulator(TSDataType seriesDataType, MomentType 
momentType) {
+    this.seriesDataType = seriesDataType;
+    this.momentType = momentType;
+  }
+
+  @Override
+  public void addInput(Column[] columns, BitMap bitMap) {
+
+    int size = columns[1].getPositionCount();
+    for (int i = 0; i < size; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
+        continue;
+      }
+      if (columns[1].isNull(i)) {
+        continue;
+      }
+      update(getDoubleValue(columns[1], i));
+    }
+  }
+
+  private double getDoubleValue(Column column, int position) {
+    switch (seriesDataType) {
+      case INT32:
+      case DATE:
+        return column.getInt(position);
+      case INT64:
+      case TIMESTAMP:
+        return column.getLong(position);
+      case FLOAT:
+        return column.getFloat(position);
+      case DOUBLE:
+        return column.getDouble(position);
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported data type in CentralMoment Aggregation: " + 
seriesDataType);
+    }
+  }
+
+  private void update(double value) {
+    long n1 = count;
+    long n = n1 + 1;
+    double m1 = mean;
+    double m2 = this.m2;
+    double m3 = this.m3;
+    double delta = value - m1;
+    double deltaN = delta / n;
+    double deltaN2 = deltaN * deltaN;
+    double dm2 = delta * deltaN * n1;
+
+    count = n;
+    mean = m1 + deltaN;
+    this.m2 = m2 + dm2;
+    this.m3 = m3 + dm2 * deltaN * (n - 2) - 3 * deltaN * m2;
+    m4 += dm2 * deltaN2 * (n * (double) n - 3 * n + 3) + 6 * deltaN2 * m2 - 4 
* deltaN * m3;
+  }
+
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of CentralMoment 
should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
+    byte[] bytes = partialResult[0].getBinary(0).getValues();
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+    long otherCount = buffer.getLong();
+    double otherMean = buffer.getDouble();
+    double otherM2 = buffer.getDouble();
+    double otherM3 = buffer.getDouble();
+    double otherM4 = buffer.getDouble();
+
+    merge(otherCount, otherMean, otherM2, otherM3, otherM4);
+  }
+
+  private void merge(long nB, double meanB, double m2B, double m3B, double 
m4B) {
+    if (nB == 0) return;
+    if (count == 0) {
+      count = nB;
+      mean = meanB;
+      m2 = m2B;
+      m3 = m3B;
+      m4 = m4B;
+    } else {
+      long nA = count;
+      double m1A = mean;
+      double m2A = m2;
+      double m3A = m3;
+      long n = nA + nB;
+      double nDouble = n;
+      double delta = meanB - m1A;
+      double delta2 = delta * delta;
+      double delta3 = delta * delta2;
+      double delta4 = delta2 * delta2;
+
+      count = n;
+      mean = (nA * m1A + nB * meanB) / nDouble;
+      m2 = m2A + m2B + delta2 * nA * nB / nDouble;
+      m3 =
+          m3A
+              + m3B
+              + delta3 * nA * nB * (nA - nB) / (nDouble * nDouble)
+              + 3 * delta * (nA * m2B - nB * m2A) / nDouble;
+      m4 +=
+          m4B
+              + delta4 * nA * nB * (nA * nA - nA * nB + nB * nB) / (nDouble * 
nDouble * nDouble)
+              + 6 * delta2 * (nA * nA * m2B + nB * nB * m2A) / (nDouble * 
nDouble)
+              + 4 * delta * (nA * m3B - nB * m3A) / nDouble;
+    }
+  }
+
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult should be 1");
+    if (count == 0) {
+      columnBuilders[0].appendNull();
+    } else {
+
+      byte[] bytes = new byte[INTERMEDIATE_SIZE];
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      buffer.putLong(count);
+      buffer.putDouble(mean);
+      buffer.putDouble(m2);
+      buffer.putDouble(m3);
+      buffer.putDouble(m4);
+      columnBuilders[0].writeBinary(new Binary(bytes));
+    }
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    if (count == 0 || m2 == 0) {
+      columnBuilder.appendNull();
+      return;
+    }
+
+    if (momentType == MomentType.SKEWNESS) {
+      if (count < 3) {
+        columnBuilder.appendNull();
+        return;
+      }
+      double result = Math.sqrt((double) count) * m3 / Math.pow(m2, 1.5);
+      columnBuilder.writeDouble(result);
+    } else {
+      if (count < 4) {
+        columnBuilder.appendNull();
+      } else {
+
+        double variance = m2 / (count - 1);
+        double term1 =
+            (count * (count + 1) * m4)
+                / ((count - 1) * (count - 2) * (count - 3) * variance * 
variance);

Review Comment:
   **Likely `long` overflow in the kurtosis denominator.**
   
   `(count - 1) * (count - 2) * (count - 3)` is evaluated as `long` (all three 
operands are `long`) *before* it is promoted to `double`. For `count` greater 
than ~2.1M this product exceeds `Long.MAX_VALUE`, and around `count` ~2.5M it 
even **flips sign**, so kurtosis comes out wrong (often negative-signed) 
instead of correct. Million-row groups are routine in a time-series DB, so this 
is reachable in practice.
   
   The numerator `count * (count + 1)` and `term2`'s `(count - 2) * (count - 
3)` have the same problem at larger thresholds.
   
   Suggest forcing `double` early:
   ```java
   double n = count;
   double term1 = (n * (n + 1) * m4) / ((n - 1) * (n - 2) * (n - 3) * variance 
* variance);
   double term2 = (3 * (n - 1) * (n - 1)) / ((n - 2) * (n - 3));
   ```
   The same fix is needed in `TableCentralMomentAccumulator` and 
`GroupedCentralMomentAccumulator`. A test over ~3M non-constant rows would lock 
in the regression.



##########
iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/TableCentralMomentAccumulator.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.calc.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.calc.execution.aggregation.CentralMomentAccumulator;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class TableCentralMomentAccumulator implements TableAccumulator {
+
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(TableCentralMomentAccumulator.class);
+  private static final int INTERMEDIATE_SIZE = Long.BYTES + 4 * Double.BYTES;
+  private static final double EPSILON = 1e-12;
+
+  private final TSDataType seriesDataType;
+  private final CentralMomentAccumulator.MomentType momentType;
+
+  private long count;
+  private double mean;
+  private double m2;
+  private double m3;
+  private double m4;
+
+  public TableCentralMomentAccumulator(
+      TSDataType seriesDataType, CentralMomentAccumulator.MomentType 
momentType) {
+    this.seriesDataType = seriesDataType;
+    this.momentType = momentType;
+  }
+
+  @Override
+  public void addInput(Column[] arguments, AggregationMask mask) {
+    int positionCount = mask.getSelectedPositionCount();
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        if (!arguments[0].isNull(i)) {
+          update(getDoubleValue(arguments[0], i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      for (int i = 0; i < positionCount; i++) {
+        int position = selectedPositions[i];
+        if (!arguments[0].isNull(position)) {
+          update(getDoubleValue(arguments[0], position));
+        }
+      }
+    }
+  }
+
+  private double getDoubleValue(Column column, int position) {
+    switch (seriesDataType) {
+      case INT32:
+      case DATE:
+        return column.getInt(position);
+      case INT64:
+      case TIMESTAMP:
+        return column.getLong(position);
+      case FLOAT:
+        return column.getFloat(position);
+      case DOUBLE:
+        return column.getDouble(position);
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(
+                "Unsupported data type in CentralMoment Aggregation: %s", 
seriesDataType));
+    }
+  }
+
+  private void update(double value) {
+    long n1 = count;
+    long n = n1 + 1;
+    double m1 = mean;
+    double m2 = this.m2;
+    double m3 = this.m3;
+    double delta = value - m1;
+    double deltaN = delta / n;
+    double deltaN2 = deltaN * deltaN;
+    double dm2 = delta * deltaN * n1;
+
+    count = n;
+    mean = m1 + deltaN;
+    this.m2 = m2 + dm2;
+    this.m3 = m3 + dm2 * deltaN * (n - 2) - 3 * deltaN * m2;
+    m4 += dm2 * deltaN2 * (n * (double) n - 3 * n + 3) + 6 * deltaN2 * m2 - 4 
* deltaN * m3;
+  }
+
+  @Override
+  public void addIntermediate(Column argument) {
+    checkArgument(
+        argument instanceof BinaryColumn
+            || (argument instanceof RunLengthEncodedColumn
+                && ((RunLengthEncodedColumn) argument).getValue() instanceof 
BinaryColumn),
+        "intermediate input and output should be BinaryColumn");
+
+    for (int i = 0; i < argument.getPositionCount(); i++) {
+      if (argument.isNull(i)) {
+        continue;
+      }
+      byte[] bytes = argument.getBinary(i).getValues();
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+      long otherCount = buffer.getLong();
+      double otherMean = buffer.getDouble();
+      double otherM2 = buffer.getDouble();
+      double otherM3 = buffer.getDouble();
+      double otherM4 = buffer.getDouble();
+
+      merge(otherCount, otherMean, otherM2, otherM3, otherM4);
+    }
+  }
+
+  private void merge(long nB, double meanB, double m2B, double m3B, double 
m4B) {
+    if (nB == 0) return;
+    if (count == 0) {
+      count = nB;
+      mean = meanB;
+      m2 = m2B;
+      m3 = m3B;
+      m4 = m4B;
+    } else {
+      long nA = count;
+      double m1A = mean;
+      double m2A = m2;
+      double m3A = m3;
+      long n = nA + nB;
+      double nDouble = n;
+      double delta = meanB - m1A;
+      double delta2 = delta * delta;
+      double delta3 = delta * delta2;
+      double delta4 = delta2 * delta2;
+
+      count = n;
+      mean = (nA * m1A + nB * meanB) / nDouble;
+      m2 = m2A + m2B + delta2 * nA * nB / nDouble;
+      m3 =
+          m3A
+              + m3B
+              + delta3 * nA * nB * (nA - nB) / (nDouble * nDouble)
+              + 3 * delta * (nA * m2B - nB * m2A) / nDouble;
+      m4 +=
+          m4B
+              + delta4 * nA * nB * (nA * nA - nA * nB + nB * nB) / (nDouble * 
nDouble * nDouble)
+              + 6 * delta2 * (nA * nA * m2B + nB * nB * m2A) / (nDouble * 
nDouble)
+              + 4 * delta * (nA * m3B - nB * m3A) / nDouble;
+    }
+  }
+
+  @Override
+  public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+    checkArgument(
+        columnBuilder instanceof BinaryColumnBuilder,
+        "intermediate input and output should be BinaryColumn");
+
+    if (count == 0) {
+      columnBuilder.appendNull();
+    } else {
+
+      ByteBuffer buffer = ByteBuffer.allocate(INTERMEDIATE_SIZE);
+      buffer.putLong(count);
+      buffer.putDouble(mean);
+      buffer.putDouble(m2);
+      buffer.putDouble(m3);
+      buffer.putDouble(m4);
+      columnBuilder.writeBinary(new Binary(buffer.array()));
+    }
+  }
+
+  @Override
+  public void evaluateFinal(ColumnBuilder columnBuilder) {
+    if (count == 0 || m2 == 0) {
+      columnBuilder.appendNull();
+      return;
+    }
+
+    if (momentType == CentralMomentAccumulator.MomentType.SKEWNESS) {
+      if (count < 3) {
+        columnBuilder.appendNull();
+        return;
+      }
+      double result = Math.sqrt((double) count) * m3 / Math.pow(m2, 1.5);
+      columnBuilder.writeDouble(result);
+    } else {
+      if (count < 4) {
+        columnBuilder.appendNull();
+      } else {
+        double variance = m2 / (count - 1);
+        double term1 =
+            (count * (count + 1) * m4)
+                / ((count - 1) * (count - 2) * (count - 3) * variance * 
variance);

Review Comment:
   Same `long`-overflow in the kurtosis denominator as in 
`CentralMomentAccumulator` (see that comment): `(count - 1) * (count - 2) * 
(count - 3)` overflows `long` for `count` greater than ~2.1M. Please compute 
the factorial-style terms in `double`.



##########
iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/aggregation/CentralMomentAccumulator.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.calc.execution.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CentralMomentAccumulator implements Accumulator {
+
+  private static final int INTERMEDIATE_SIZE = Long.BYTES + 4 * Double.BYTES;
+
+  public enum MomentType {
+    SKEWNESS,
+    KURTOSIS
+  }
+
+  private final TSDataType seriesDataType;
+  private final MomentType momentType;
+
+  private long count;
+  private double mean;
+  private double m2;
+  private double m3;
+  private double m4;
+
+  public CentralMomentAccumulator(TSDataType seriesDataType, MomentType 
momentType) {
+    this.seriesDataType = seriesDataType;
+    this.momentType = momentType;
+  }
+
+  @Override
+  public void addInput(Column[] columns, BitMap bitMap) {
+
+    int size = columns[1].getPositionCount();
+    for (int i = 0; i < size; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
+        continue;
+      }
+      if (columns[1].isNull(i)) {
+        continue;
+      }
+      update(getDoubleValue(columns[1], i));
+    }
+  }
+
+  private double getDoubleValue(Column column, int position) {
+    switch (seriesDataType) {
+      case INT32:
+      case DATE:
+        return column.getInt(position);
+      case INT64:
+      case TIMESTAMP:
+        return column.getLong(position);
+      case FLOAT:
+        return column.getFloat(position);
+      case DOUBLE:
+        return column.getDouble(position);
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported data type in CentralMoment Aggregation: " + 
seriesDataType);
+    }
+  }
+
+  private void update(double value) {
+    long n1 = count;
+    long n = n1 + 1;
+    double m1 = mean;
+    double m2 = this.m2;
+    double m3 = this.m3;
+    double delta = value - m1;
+    double deltaN = delta / n;
+    double deltaN2 = deltaN * deltaN;
+    double dm2 = delta * deltaN * n1;
+
+    count = n;
+    mean = m1 + deltaN;
+    this.m2 = m2 + dm2;
+    this.m3 = m3 + dm2 * deltaN * (n - 2) - 3 * deltaN * m2;
+    m4 += dm2 * deltaN2 * (n * (double) n - 3 * n + 3) + 6 * deltaN2 * m2 - 4 
* deltaN * m3;
+  }
+
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of CentralMoment 
should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
+    byte[] bytes = partialResult[0].getBinary(0).getValues();
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+    long otherCount = buffer.getLong();
+    double otherMean = buffer.getDouble();
+    double otherM2 = buffer.getDouble();
+    double otherM3 = buffer.getDouble();
+    double otherM4 = buffer.getDouble();
+
+    merge(otherCount, otherMean, otherM2, otherM3, otherM4);
+  }
+
+  private void merge(long nB, double meanB, double m2B, double m3B, double 
m4B) {
+    if (nB == 0) return;
+    if (count == 0) {
+      count = nB;
+      mean = meanB;
+      m2 = m2B;
+      m3 = m3B;
+      m4 = m4B;
+    } else {
+      long nA = count;
+      double m1A = mean;
+      double m2A = m2;
+      double m3A = m3;
+      long n = nA + nB;
+      double nDouble = n;
+      double delta = meanB - m1A;
+      double delta2 = delta * delta;
+      double delta3 = delta * delta2;
+      double delta4 = delta2 * delta2;
+
+      count = n;
+      mean = (nA * m1A + nB * meanB) / nDouble;
+      m2 = m2A + m2B + delta2 * nA * nB / nDouble;
+      m3 =
+          m3A
+              + m3B
+              + delta3 * nA * nB * (nA - nB) / (nDouble * nDouble)
+              + 3 * delta * (nA * m2B - nB * m2A) / nDouble;
+      m4 +=
+          m4B
+              + delta4 * nA * nB * (nA * nA - nA * nB + nB * nB) / (nDouble * 
nDouble * nDouble)
+              + 6 * delta2 * (nA * nA * m2B + nB * nB * m2A) / (nDouble * 
nDouble)
+              + 4 * delta * (nA * m3B - nB * m3A) / nDouble;
+    }
+  }
+
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult should be 1");
+    if (count == 0) {
+      columnBuilders[0].appendNull();
+    } else {
+
+      byte[] bytes = new byte[INTERMEDIATE_SIZE];
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      buffer.putLong(count);
+      buffer.putDouble(mean);
+      buffer.putDouble(m2);
+      buffer.putDouble(m3);
+      buffer.putDouble(m4);
+      columnBuilders[0].writeBinary(new Binary(bytes));
+    }
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    if (count == 0 || m2 == 0) {

Review Comment:
   The exact `m2 == 0` test is fragile here. Unlike the table variants 
(`TableCentralMomentAccumulator` / `TableCorrelationAccumulator`), which apply 
`normalizeZero` with `EPSILON = 1e-12` after `removeInput`, this tree-model 
accumulator subtracts partial state in `removeIntermediate` without clamping. 
After removal in a sliding window over constant data, `m2` can land at a tiny 
residual (e.g. `1e-13`) rather than exactly `0`, so this guard is skipped and 
`skewness = sqrt(n) * m3 / m2^1.5` blows up to a spurious huge value instead of 
returning null. Consider the same epsilon clamp for consistency.



##########
iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/source/relational/aggregation/grouped/GroupedCentralMomentAccumulator.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped;
+
+import org.apache.iotdb.calc.execution.aggregation.CentralMomentAccumulator;
+import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.AggregationMask;
+import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.array.DoubleBigArray;
+import 
org.apache.iotdb.calc.execution.operator.source.relational.aggregation.grouped.array.LongBigArray;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class GroupedCentralMomentAccumulator implements GroupedAccumulator {
+
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(GroupedCentralMomentAccumulator.class);
+  private static final int INTERMEDIATE_SIZE = Long.BYTES + 4 * Double.BYTES;
+
+  private final TSDataType seriesDataType;
+  private final CentralMomentAccumulator.MomentType momentType;
+
+  private final LongBigArray counts = new LongBigArray();
+  private final DoubleBigArray means = new DoubleBigArray();
+  private final DoubleBigArray m2s = new DoubleBigArray();
+  private final DoubleBigArray m3s = new DoubleBigArray();
+  private final DoubleBigArray m4s = new DoubleBigArray();
+
+  public GroupedCentralMomentAccumulator(
+      TSDataType seriesDataType, CentralMomentAccumulator.MomentType 
momentType) {
+    this.seriesDataType = seriesDataType;
+    this.momentType = momentType;
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE
+        + counts.sizeOf()
+        + means.sizeOf()
+        + m2s.sizeOf()
+        + m3s.sizeOf()
+        + m4s.sizeOf();
+  }
+
+  @Override
+  public void setGroupCount(long groupCount) {
+    counts.ensureCapacity(groupCount);
+    means.ensureCapacity(groupCount);
+    m2s.ensureCapacity(groupCount);
+    m3s.ensureCapacity(groupCount);
+    m4s.ensureCapacity(groupCount);
+  }
+
+  @Override
+  public void addInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    int positionCount = mask.getSelectedPositionCount();
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        if (!arguments[0].isNull(i)) {
+          update(groupIds[i], getDoubleValue(arguments[0], i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      for (int i = 0; i < positionCount; i++) {
+        int position = selectedPositions[i];
+        if (!arguments[0].isNull(position)) {
+          update(groupIds[position], getDoubleValue(arguments[0], position));
+        }
+      }
+    }
+  }
+
+  private double getDoubleValue(Column column, int position) {
+    switch (seriesDataType) {
+      case INT32:
+      case DATE:
+        return column.getInt(position);
+      case INT64:
+      case TIMESTAMP:
+        return column.getLong(position);
+      case FLOAT:
+        return column.getFloat(position);
+      case DOUBLE:
+        return column.getDouble(position);
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(
+                "Unsupported data type in CentralMoment Aggregation: %s", 
seriesDataType));
+    }
+  }
+
+  private void update(int groupId, double value) {
+    long n1 = counts.get(groupId);
+    long n = n1 + 1;
+    double m1 = means.get(groupId);
+    double m2 = m2s.get(groupId);
+    double m3 = m3s.get(groupId);
+    double m4 = m4s.get(groupId);
+
+    double delta = value - m1;
+    double deltaN = delta / n;
+    double deltaN2 = deltaN * deltaN;
+    double dm2 = delta * deltaN * n1;
+
+    counts.set(groupId, n);
+    means.set(groupId, m1 + deltaN);
+    m2s.set(groupId, m2 + dm2);
+    m3s.set(groupId, m3 + dm2 * deltaN * (n - 2) - 3 * deltaN * m2);
+    m4s.set(
+        groupId,
+        m4 + dm2 * deltaN2 * (n * (double) n - 3 * n + 3) + 6 * deltaN2 * m2 - 
4 * deltaN * m3);
+  }
+
+  @Override
+  public void addIntermediate(int[] groupIds, Column argument) {
+    checkArgument(
+        argument instanceof BinaryColumn
+            || (argument instanceof RunLengthEncodedColumn
+                && ((RunLengthEncodedColumn) argument).getValue() instanceof 
BinaryColumn),
+        "intermediate input and output should be BinaryColumn");
+
+    for (int i = 0; i < argument.getPositionCount(); i++) {
+      if (argument.isNull(i)) {
+        continue;
+      }
+      byte[] bytes = argument.getBinary(i).getValues();
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+      long otherCount = buffer.getLong();
+      double otherMean = buffer.getDouble();
+      double otherM2 = buffer.getDouble();
+      double otherM3 = buffer.getDouble();
+      double otherM4 = buffer.getDouble();
+
+      merge(groupIds[i], otherCount, otherMean, otherM2, otherM3, otherM4);
+    }
+  }
+
+  private void merge(int groupId, long nB, double meanB, double m2B, double 
m3B, double m4B) {
+    if (nB == 0) return;
+    long nA = counts.get(groupId);
+    if (nA == 0) {
+      counts.set(groupId, nB);
+      means.set(groupId, meanB);
+      m2s.set(groupId, m2B);
+      m3s.set(groupId, m3B);
+      m4s.set(groupId, m4B);
+    } else {
+      double m1A = means.get(groupId);
+      double m2A = m2s.get(groupId);
+      double m3A = m3s.get(groupId);
+      long n = nA + nB;
+      double nDouble = n;
+      double delta = meanB - m1A;
+      double delta2 = delta * delta;
+      double delta3 = delta * delta2;
+      double delta4 = delta2 * delta2;
+
+      counts.set(groupId, n);
+      means.set(groupId, (nA * m1A + nB * meanB) / nDouble);
+      m2s.set(groupId, m2A + m2B + delta2 * nA * nB / nDouble);
+      m3s.set(
+          groupId,
+          m3A
+              + m3B
+              + delta3 * nA * nB * (nA - nB) / (nDouble * nDouble)
+              + 3 * delta * (nA * m2B - nB * m2A) / nDouble);
+      m4s.set(
+          groupId,
+          m4s.get(groupId)
+              + m4B
+              + delta4 * nA * nB * (nA * nA - nA * nB + nB * nB) / (nDouble * 
nDouble * nDouble)
+              + 6 * delta2 * (nA * nA * m2B + nB * nB * m2A) / (nDouble * 
nDouble)
+              + 4 * delta * (nA * m3B - nB * m3A) / nDouble);
+    }
+  }
+
+  @Override
+  public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) {
+    checkArgument(
+        columnBuilder instanceof BinaryColumnBuilder,
+        "intermediate input and output should be BinaryColumn");
+
+    if (counts.get(groupId) == 0) {
+      columnBuilder.appendNull();
+    } else {
+      ByteBuffer buffer = ByteBuffer.allocate(INTERMEDIATE_SIZE);
+      buffer.putLong(counts.get(groupId));
+      buffer.putDouble(means.get(groupId));
+      buffer.putDouble(m2s.get(groupId));
+      buffer.putDouble(m3s.get(groupId));
+      buffer.putDouble(m4s.get(groupId));
+      columnBuilder.writeBinary(new Binary(buffer.array()));
+    }
+  }
+
+  @Override
+  public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
+    long count = counts.get(groupId);
+    double m2 = m2s.get(groupId);
+
+    if (count == 0 || m2 == 0) {
+      columnBuilder.appendNull();
+      return;
+    }
+
+    if (momentType == CentralMomentAccumulator.MomentType.SKEWNESS) {
+      if (count < 3) {
+        columnBuilder.appendNull();
+        return;
+      }
+      double m3 = m3s.get(groupId);
+      double result = Math.sqrt((double) count) * m3 / Math.pow(m2, 1.5);
+      columnBuilder.writeDouble(result);
+    } else {
+      if (count < 4) {
+        columnBuilder.appendNull();
+      } else {
+        double m4 = m4s.get(groupId);
+        double variance = m2 / (count - 1);
+        double term1 =
+            (count * (count + 1) * m4)
+                / ((count - 1) * (count - 2) * (count - 3) * variance * 
variance);

Review Comment:
   Same `long`-overflow in the kurtosis denominator here too — `(count - 1) * 
(count - 2) * (count - 3)` is a `long` product that overflows for `count` 
greater than ~2.1M. Compute in `double`.



##########
iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/aggregation/CentralMomentAccumulator.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.calc.execution.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CentralMomentAccumulator implements Accumulator {
+
+  private static final int INTERMEDIATE_SIZE = Long.BYTES + 4 * Double.BYTES;
+
+  public enum MomentType {
+    SKEWNESS,
+    KURTOSIS
+  }
+
+  private final TSDataType seriesDataType;
+  private final MomentType momentType;
+
+  private long count;
+  private double mean;
+  private double m2;
+  private double m3;
+  private double m4;
+
+  public CentralMomentAccumulator(TSDataType seriesDataType, MomentType 
momentType) {
+    this.seriesDataType = seriesDataType;
+    this.momentType = momentType;
+  }
+
+  @Override
+  public void addInput(Column[] columns, BitMap bitMap) {
+
+    int size = columns[1].getPositionCount();
+    for (int i = 0; i < size; i++) {
+      if (bitMap != null && !bitMap.isMarked(i)) {
+        continue;
+      }
+      if (columns[1].isNull(i)) {
+        continue;
+      }
+      update(getDoubleValue(columns[1], i));
+    }
+  }
+
+  private double getDoubleValue(Column column, int position) {
+    switch (seriesDataType) {
+      case INT32:
+      case DATE:
+        return column.getInt(position);
+      case INT64:
+      case TIMESTAMP:
+        return column.getLong(position);
+      case FLOAT:
+        return column.getFloat(position);
+      case DOUBLE:
+        return column.getDouble(position);
+      default:
+        throw new UnsupportedOperationException(
+            "Unsupported data type in CentralMoment Aggregation: " + 
seriesDataType);
+    }
+  }
+
+  private void update(double value) {
+    long n1 = count;
+    long n = n1 + 1;
+    double m1 = mean;
+    double m2 = this.m2;
+    double m3 = this.m3;
+    double delta = value - m1;
+    double deltaN = delta / n;
+    double deltaN2 = deltaN * deltaN;
+    double dm2 = delta * deltaN * n1;
+
+    count = n;
+    mean = m1 + deltaN;
+    this.m2 = m2 + dm2;
+    this.m3 = m3 + dm2 * deltaN * (n - 2) - 3 * deltaN * m2;
+    m4 += dm2 * deltaN2 * (n * (double) n - 3 * n + 3) + 6 * deltaN2 * m2 - 4 
* deltaN * m3;
+  }
+
+  @Override
+  public void addIntermediate(Column[] partialResult) {
+    checkArgument(partialResult.length == 1, "partialResult of CentralMoment 
should be 1");
+    if (partialResult[0].isNull(0)) {
+      return;
+    }
+    byte[] bytes = partialResult[0].getBinary(0).getValues();
+    ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+    long otherCount = buffer.getLong();
+    double otherMean = buffer.getDouble();
+    double otherM2 = buffer.getDouble();
+    double otherM3 = buffer.getDouble();
+    double otherM4 = buffer.getDouble();
+
+    merge(otherCount, otherMean, otherM2, otherM3, otherM4);
+  }
+
+  private void merge(long nB, double meanB, double m2B, double m3B, double 
m4B) {
+    if (nB == 0) return;
+    if (count == 0) {
+      count = nB;
+      mean = meanB;
+      m2 = m2B;
+      m3 = m3B;
+      m4 = m4B;
+    } else {
+      long nA = count;
+      double m1A = mean;
+      double m2A = m2;
+      double m3A = m3;
+      long n = nA + nB;
+      double nDouble = n;
+      double delta = meanB - m1A;
+      double delta2 = delta * delta;
+      double delta3 = delta * delta2;
+      double delta4 = delta2 * delta2;
+
+      count = n;
+      mean = (nA * m1A + nB * meanB) / nDouble;
+      m2 = m2A + m2B + delta2 * nA * nB / nDouble;
+      m3 =
+          m3A
+              + m3B
+              + delta3 * nA * nB * (nA - nB) / (nDouble * nDouble)
+              + 3 * delta * (nA * m2B - nB * m2A) / nDouble;
+      m4 +=
+          m4B
+              + delta4 * nA * nB * (nA * nA - nA * nB + nB * nB) / (nDouble * 
nDouble * nDouble)
+              + 6 * delta2 * (nA * nA * m2B + nB * nB * m2A) / (nDouble * 
nDouble)
+              + 4 * delta * (nA * m3B - nB * m3A) / nDouble;
+    }
+  }
+
+  @Override
+  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
+    checkArgument(columnBuilders.length == 1, "partialResult should be 1");
+    if (count == 0) {
+      columnBuilders[0].appendNull();
+    } else {
+
+      byte[] bytes = new byte[INTERMEDIATE_SIZE];
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      buffer.putLong(count);
+      buffer.putDouble(mean);
+      buffer.putDouble(m2);
+      buffer.putDouble(m3);
+      buffer.putDouble(m4);
+      columnBuilders[0].writeBinary(new Binary(bytes));
+    }
+  }
+
+  @Override
+  public void outputFinal(ColumnBuilder columnBuilder) {
+    if (count == 0 || m2 == 0) {
+      columnBuilder.appendNull();
+      return;
+    }
+
+    if (momentType == MomentType.SKEWNESS) {
+      if (count < 3) {
+        columnBuilder.appendNull();
+        return;
+      }
+      double result = Math.sqrt((double) count) * m3 / Math.pow(m2, 1.5);

Review Comment:
   **Skewness and kurtosis use inconsistent estimators.**
   
   This is the *population / biased* skewness g1 (`sqrt(n) * m3 / m2^1.5`), 
while `KURTOSIS` below uses the *sample / bias-corrected* excess kurtosis (the 
Excel `KURT` formula with the `n(n+1)/((n-1)(n-2)(n-3))` factor). Mixing biased 
skewness with unbiased kurtosis is unusual — most engines are internally 
consistent (Spark: both biased; Excel/SAS: both sample-corrected).
   
   Could you confirm which semantics are intended and make the two consistent? 
Either way it would help to document the exact definition (and how it compares 
to other DBs) so users aren't surprised.



##########
iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/aggregation/CorrelationAccumulator.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.calc.execution.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class CorrelationAccumulator implements Accumulator {
+
+  private static final int INTERMEDIATE_SIZE = Long.BYTES + 5 * Double.BYTES;
+
+  public enum CorrelationType {

Review Comment:
   `CorrelationType` has a single value `CORR`, and `outputFinal` then does `if 
(correlationType != CorrelationType.CORR) throw ...`, a branch that can never 
fire. Since covariance lives in its own `CovarianceAccumulator`, this enum + 
the constructor parameter + the dead check are unnecessary indirection and can 
be removed. (The same single-value-enum pattern is repeated in 
`TableCorrelationAccumulator` and `GroupedCorrelationAccumulator`.)



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java:
##########
@@ -361,6 +361,7 @@ public TSDataType visitFunctionExpression(
       }
 
       if (functionExpression.isBuiltInAggregationFunctionExpression()) {
+

Review Comment:
   Stray blank line — this is the only change in this file and appears 
accidental. Please remove it to keep the diff clean.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java:
##########
@@ -190,7 +197,27 @@ private static void 
verifyIsAggregationDataTypeMatched(String aggrFuncName, TSDa
           return;
         }
         throw new SemanticException(
-            "Aggregate functions [AVG, SUM, EXTREME, STDDEV, STDDEV_POP, 
STDDEV_SAMP, VARIANCE, VAR_POP, VAR_SAMP] only support numeric data types 
[INT32, INT64, FLOAT, DOUBLE]");
+            "Aggregate functions [AVG, SUM, EXTREME, STDDEV, STDDEV_POP, 
STDDEV_SAMP, "
+                + "VARIANCE, VAR_POP, VAR_SAMP] only support "
+                + "numeric data types [INT32, INT64, FLOAT, DOUBLE]");
+      case SqlConstant.SKEWNESS:
+      case SqlConstant.KURTOSIS:
+        if (dataType.isNumeric() || TSDataType.TIMESTAMP.equals(dataType)) {
+          return;
+        }
+        throw new SemanticException(
+            "Aggregate functions [SKEWNESS, KURTOSIS] only support "
+                + "numeric data types [INT32, INT64, FLOAT, DOUBLE, 
TIMESTAMP]");
+      /**

Review Comment:
   Use a normal comment (`//` or `/* ... */`) instead of a Javadoc `/** ... */` 
block here. Javadoc attached to a `case` label inside a method body is 
misplaced — it documents nothing and some linters / the `javadoc` tool will 
warn about it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to