Copilot commented on code in PR #16041:
URL: https://github.com/apache/iotdb/pull/16041#discussion_r2259582011


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxPercentileAccumulator.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+
+public class GroupedApproxPercentileAccumulator extends 
AbstractGroupedApproxPercentileAccumulator {
+
+  public GroupedApproxPercentileAccumulator(TSDataType seriesDataType) {
+    super(seriesDataType);
+  }
+
+  @Override
+  public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    Column valueColumn = arguments[0];
+
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(i)) {
+          tDigest.add(valueColumn.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(position)) {
+          tDigest.add(valueColumn.getInt(i));

Review Comment:
   The code is using index 'i' to access valueColumn but should use 'position' 
to match the pattern used in the else block and other similar methods.
   ```suggestion
             tDigest.add(valueColumn.getInt(position));
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxPercentileWithWeightAccumulator.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+
+public class GroupedApproxPercentileWithWeightAccumulator
+    extends AbstractGroupedApproxPercentileAccumulator {
+
+  public GroupedApproxPercentileWithWeightAccumulator(TSDataType 
seriesDataType) {
+    super(seriesDataType);
+  }
+
+  @Override
+  public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    Column valueColumn = arguments[0];
+    Column weightColumn = arguments[1];
+
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(i)) {
+          tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(position)) {
+          tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void addLongInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    Column valueColumn = arguments[0];
+    Column weightColumn = arguments[1];
+
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(i)) {
+          tDigest.add(valueColumn.getLong(i), weightColumn.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(position)) {
+          tDigest.add(valueColumn.getLong(i), weightColumn.getInt(i));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void addFloatInput(int[] groupIds, Column[] arguments, 
AggregationMask mask) {
+    Column valueColumn = arguments[0];
+    Column weightColumn = arguments[1];
+
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(i)) {
+          tDigest.add(valueColumn.getFloat(i), weightColumn.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(position)) {
+          tDigest.add(valueColumn.getFloat(i), weightColumn.getInt(i));

Review Comment:
   The code is using index 'i' to access valueColumn but should use 'position' 
to match the pattern used in the else block and other similar methods.
   ```suggestion
             tDigest.add(valueColumn.getFloat(position), 
weightColumn.getInt(position));
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxPercentileWithWeightAccumulator.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+
+public class GroupedApproxPercentileWithWeightAccumulator
+    extends AbstractGroupedApproxPercentileAccumulator {
+
+  public GroupedApproxPercentileWithWeightAccumulator(TSDataType 
seriesDataType) {
+    super(seriesDataType);
+  }
+
+  @Override
+  public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    Column valueColumn = arguments[0];
+    Column weightColumn = arguments[1];
+
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(i)) {
+          tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(position)) {
+          tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i));
+        }
+      }
+    }
+  }
+
+  @Override
+  public void addLongInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    Column valueColumn = arguments[0];
+    Column weightColumn = arguments[1];
+
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(i)) {
+          tDigest.add(valueColumn.getLong(i), weightColumn.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(position)) {
+          tDigest.add(valueColumn.getLong(i), weightColumn.getInt(i));

Review Comment:
   The code is using index 'i' to access valueColumn but should use 'position' 
to match the pattern used in the else block and other similar methods.
   ```suggestion
             tDigest.add(valueColumn.getLong(position), 
weightColumn.getInt(position));
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxPercentileWithWeightAccumulator.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+
+public class GroupedApproxPercentileWithWeightAccumulator
+    extends AbstractGroupedApproxPercentileAccumulator {
+
+  public GroupedApproxPercentileWithWeightAccumulator(TSDataType 
seriesDataType) {
+    super(seriesDataType);
+  }
+
+  @Override
+  public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    Column valueColumn = arguments[0];
+    Column weightColumn = arguments[1];
+
+    int positionCount = mask.getPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        int groupId = groupIds[i];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(i)) {
+          tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      int groupId;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        groupId = groupIds[position];
+        TDigest tDigest = array.get(groupId);
+        if (!valueColumn.isNull(position)) {
+          tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i));

Review Comment:
   The code is using index 'i' to access valueColumn but should use 'position' 
to match the pattern used in the else block and other similar methods.
   ```suggestion
             tDigest.add(valueColumn.getInt(position), 
weightColumn.getInt(position));
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/AbstractGroupedApproxPercentileAccumulator.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped;
+
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.TDigestBigArray;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+import java.nio.ByteBuffer;
+
+public abstract class AbstractGroupedApproxPercentileAccumulator implements 
GroupedAccumulator {
+
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(GroupedApproxPercentileAccumulator.class);
+  protected final TSDataType seriesDataType;
+  protected double percentage;
+  protected final TDigestBigArray array = new TDigestBigArray();
+
+  public AbstractGroupedApproxPercentileAccumulator(TSDataType seriesDataType) 
{
+    this.seriesDataType = seriesDataType;
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE + array.sizeOf();
+  }
+
+  @Override
+  public void setGroupCount(long groupCount) {
+    array.ensureCapacity(groupCount);
+  }
+
+  @Override
+  public void addInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+    if (arguments.length == 2) {
+      percentage = arguments[1].getDouble(0);
+    } else if (arguments.length == 3) {
+      percentage = arguments[2].getDouble(0);
+    } else {
+      throw new SemanticException(
+          String.format(
+              "APPROX_PERCENTILE requires 2 or 3 arguments, but got %d", 
arguments.length));
+    }
+
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(groupIds, arguments, mask);
+        break;
+      case INT64:
+      case TIMESTAMP:
+        addLongInput(groupIds, arguments, mask);
+        break;
+      case FLOAT:
+        addFloatInput(groupIds, arguments, mask);
+        break;
+      case DOUBLE:
+        addDoubleInput(groupIds, arguments, mask);
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(
+                "Unsupported data type in APPROX_PERCENTILE Aggregation: %s", 
seriesDataType));
+    }
+  }
+
+  @Override
+  public void addIntermediate(int[] groupIds, Column argument) {
+    for (int i = 0; i < groupIds.length; i++) {
+      int groupId = groupIds[i];
+      if (!argument.isNull(i)) {
+        byte[] data = argument.getBinary(i).getValues();
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+        this.percentage = ReadWriteIOUtils.readDouble(buffer);
+        byte[] tDigestData = new byte[data.length - 8];
+        buffer.get(tDigestData);
+        TDigest other = TDigest.fromByteArray(tDigestData);
+        array.get(groupId).add(other);
+      }
+    }
+  }
+
+  @Override
+  public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) {
+    byte[] tDigestData = array.get(groupId).toByteArray();
+    ByteBuffer buffer = ByteBuffer.allocate(8 + tDigestData.length);
+    ReadWriteIOUtils.write(percentage, buffer);
+    buffer.put(tDigestData);
+    columnBuilder.writeBinary(new Binary(buffer.array()));
+  }
+
+  @Override
+  public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
+    TDigest tDigest = array.get(groupId);
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilder.writeInt((int) tDigest.quantile(percentage));
+        break;
+      case INT64:
+      case TIMESTAMP:
+        columnBuilder.writeLong((long) tDigest.quantile(percentage));
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat((float) tDigest.quantile(percentage));
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(tDigest.quantile(percentage));
+        break;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format(
+                "Unsupported data type in APPROX_COUNT_DISTINCT Aggregation: 
%s", seriesDataType));

Review Comment:
   The error message incorrectly refers to 'APPROX_COUNT_DISTINCT' but should 
refer to 'APPROX_PERCENTILE' since this is for the percentile aggregation 
function.
   ```suggestion
                   "Unsupported data type in APPROX_PERCENTILE Aggregation: 
%s", seriesDataType));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to