Copilot commented on code in PR #16041: URL: https://github.com/apache/iotdb/pull/16041#discussion_r2259582011
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxPercentileAccumulator.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; + +public class GroupedApproxPercentileAccumulator extends AbstractGroupedApproxPercentileAccumulator { + + public GroupedApproxPercentileAccumulator(TSDataType seriesDataType) { + super(seriesDataType); + } + + @Override + public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask mask) { + Column valueColumn = arguments[0]; + + int positionCount = mask.getPositionCount(); + + if (mask.isSelectAll()) { + for (int i = 0; i < positionCount; i++) { + int groupId = groupIds[i]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(i)) { + tDigest.add(valueColumn.getInt(i)); + } + } + } else { + int[] selectedPositions = mask.getSelectedPositions(); + int position; + int groupId; + for (int i = 0; i < positionCount; i++) { + position = selectedPositions[i]; + groupId = groupIds[position]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(position)) { + tDigest.add(valueColumn.getInt(i)); Review Comment: The code is using index 'i' to access valueColumn but should use 'position' to match the pattern used in the else block and other similar methods. ```suggestion tDigest.add(valueColumn.getInt(position)); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxPercentileWithWeightAccumulator.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; + +public class GroupedApproxPercentileWithWeightAccumulator + extends AbstractGroupedApproxPercentileAccumulator { + + public GroupedApproxPercentileWithWeightAccumulator(TSDataType seriesDataType) { + super(seriesDataType); + } + + @Override + public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask mask) { + Column valueColumn = arguments[0]; + Column weightColumn = arguments[1]; + + int positionCount = mask.getPositionCount(); + + if (mask.isSelectAll()) { + for (int i = 0; i < positionCount; i++) { + int groupId = groupIds[i]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(i)) { + tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i)); + } + } + } else { + int[] selectedPositions = mask.getSelectedPositions(); + int position; + int groupId; + for (int i = 0; i < positionCount; i++) { + position = selectedPositions[i]; + groupId = groupIds[position]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(position)) { + tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i)); + } + } + } + } + + @Override + public void addLongInput(int[] groupIds, Column[] arguments, AggregationMask mask) { + Column valueColumn = arguments[0]; + Column weightColumn = arguments[1]; + + int positionCount = mask.getPositionCount(); + + if (mask.isSelectAll()) { + for (int i = 0; i < positionCount; i++) { + int groupId = groupIds[i]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(i)) { + tDigest.add(valueColumn.getLong(i), weightColumn.getInt(i)); + } + } + } else { + int[] selectedPositions = mask.getSelectedPositions(); + int position; + int groupId; + for (int i = 0; i < positionCount; i++) { + position = selectedPositions[i]; + groupId = groupIds[position]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(position)) { + tDigest.add(valueColumn.getLong(i), weightColumn.getInt(i)); + } + } + } + } + + @Override + public void addFloatInput(int[] groupIds, Column[] arguments, AggregationMask mask) { + Column valueColumn = arguments[0]; + Column weightColumn = arguments[1]; + + int positionCount = mask.getPositionCount(); + + if (mask.isSelectAll()) { + for (int i = 0; i < positionCount; i++) { + int groupId = groupIds[i]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(i)) { + tDigest.add(valueColumn.getFloat(i), weightColumn.getInt(i)); + } + } + } else { + int[] selectedPositions = mask.getSelectedPositions(); + int position; + int groupId; + for (int i = 0; i < positionCount; i++) { + position = selectedPositions[i]; + groupId = groupIds[position]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(position)) { + tDigest.add(valueColumn.getFloat(i), weightColumn.getInt(i)); Review Comment: The code is using index 'i' to access valueColumn but should use 'position' to match the pattern used in the else block and other similar methods. ```suggestion tDigest.add(valueColumn.getFloat(position), weightColumn.getInt(position)); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxPercentileWithWeightAccumulator.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; + +public class GroupedApproxPercentileWithWeightAccumulator + extends AbstractGroupedApproxPercentileAccumulator { + + public GroupedApproxPercentileWithWeightAccumulator(TSDataType seriesDataType) { + super(seriesDataType); + } + + @Override + public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask mask) { + Column valueColumn = arguments[0]; + Column weightColumn = arguments[1]; + + int positionCount = mask.getPositionCount(); + + if (mask.isSelectAll()) { + for (int i = 0; i < positionCount; i++) { + int groupId = groupIds[i]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(i)) { + tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i)); + } + } + } else { + int[] selectedPositions = mask.getSelectedPositions(); + int position; + int groupId; + for (int i = 0; i < positionCount; i++) { + position = selectedPositions[i]; + groupId = groupIds[position]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(position)) { + tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i)); + } + } + } + } + + @Override + public void addLongInput(int[] groupIds, Column[] arguments, AggregationMask mask) { + Column valueColumn = arguments[0]; + Column weightColumn = arguments[1]; + + int positionCount = mask.getPositionCount(); + + if (mask.isSelectAll()) { + for (int i = 0; i < positionCount; i++) { + int groupId = groupIds[i]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(i)) { + tDigest.add(valueColumn.getLong(i), weightColumn.getInt(i)); + } + } + } else { + int[] selectedPositions = mask.getSelectedPositions(); + int position; + int groupId; + for (int i = 0; i < positionCount; i++) { + position = selectedPositions[i]; + groupId = groupIds[position]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(position)) { + tDigest.add(valueColumn.getLong(i), weightColumn.getInt(i)); Review Comment: The code is using index 'i' to access valueColumn but should use 'position' to match the pattern used in the else block and other similar methods. ```suggestion tDigest.add(valueColumn.getLong(position), weightColumn.getInt(position)); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedApproxPercentileWithWeightAccumulator.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.enums.TSDataType; + +public class GroupedApproxPercentileWithWeightAccumulator + extends AbstractGroupedApproxPercentileAccumulator { + + public GroupedApproxPercentileWithWeightAccumulator(TSDataType seriesDataType) { + super(seriesDataType); + } + + @Override + public void addIntInput(int[] groupIds, Column[] arguments, AggregationMask mask) { + Column valueColumn = arguments[0]; + Column weightColumn = arguments[1]; + + int positionCount = mask.getPositionCount(); + + if (mask.isSelectAll()) { + for (int i = 0; i < positionCount; i++) { + int groupId = groupIds[i]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(i)) { + tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i)); + } + } + } else { + int[] selectedPositions = mask.getSelectedPositions(); + int position; + int groupId; + for (int i = 0; i < positionCount; i++) { + position = selectedPositions[i]; + groupId = groupIds[position]; + TDigest tDigest = array.get(groupId); + if (!valueColumn.isNull(position)) { + tDigest.add(valueColumn.getInt(i), weightColumn.getInt(i)); Review Comment: The code is using index 'i' to access valueColumn but should use 'position' to match the pattern used in the else block and other similar methods. ```suggestion tDigest.add(valueColumn.getInt(position), weightColumn.getInt(position)); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/AbstractGroupedApproxPercentileAccumulator.java: ########## @@ -0,0 +1,151 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped; + +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationMask; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.TDigest; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.TDigestBigArray; + +import org.apache.tsfile.block.column.Column; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.UnSupportedDataTypeException; + +import java.nio.ByteBuffer; + +public abstract class AbstractGroupedApproxPercentileAccumulator implements GroupedAccumulator { + + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupedApproxPercentileAccumulator.class); + protected final TSDataType seriesDataType; + protected double percentage; + protected final TDigestBigArray array = new TDigestBigArray(); + + public AbstractGroupedApproxPercentileAccumulator(TSDataType seriesDataType) { + this.seriesDataType = seriesDataType; + } + + @Override + public long getEstimatedSize() { + return INSTANCE_SIZE + array.sizeOf(); + } + + @Override + public void setGroupCount(long groupCount) { + array.ensureCapacity(groupCount); + } + + @Override + public void addInput(int[] groupIds, Column[] arguments, AggregationMask mask) { + if (arguments.length == 2) { + percentage = arguments[1].getDouble(0); + } else if (arguments.length == 3) { + percentage = arguments[2].getDouble(0); + } else { + throw new SemanticException( + String.format( + "APPROX_PERCENTILE requires 2 or 3 arguments, but got %d", arguments.length)); + } + + switch (seriesDataType) { + case INT32: + addIntInput(groupIds, arguments, mask); + break; + case INT64: + case TIMESTAMP: + addLongInput(groupIds, arguments, mask); + break; + case FLOAT: + addFloatInput(groupIds, arguments, mask); + break; + case DOUBLE: + addDoubleInput(groupIds, arguments, mask); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Unsupported data type in APPROX_PERCENTILE Aggregation: %s", seriesDataType)); + } + } + + @Override + public void addIntermediate(int[] groupIds, Column argument) { + for (int i = 0; i < groupIds.length; i++) { + int groupId = groupIds[i]; + if (!argument.isNull(i)) { + byte[] data = argument.getBinary(i).getValues(); + ByteBuffer buffer = ByteBuffer.wrap(data); + this.percentage = ReadWriteIOUtils.readDouble(buffer); + byte[] tDigestData = new byte[data.length - 8]; + buffer.get(tDigestData); + TDigest other = TDigest.fromByteArray(tDigestData); + array.get(groupId).add(other); + } + } + } + + @Override + public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) { + byte[] tDigestData = array.get(groupId).toByteArray(); + ByteBuffer buffer = ByteBuffer.allocate(8 + tDigestData.length); + ReadWriteIOUtils.write(percentage, buffer); + buffer.put(tDigestData); + columnBuilder.writeBinary(new Binary(buffer.array())); + } + + @Override + public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) { + TDigest tDigest = array.get(groupId); + switch (seriesDataType) { + case INT32: + columnBuilder.writeInt((int) tDigest.quantile(percentage)); + break; + case INT64: + case TIMESTAMP: + columnBuilder.writeLong((long) tDigest.quantile(percentage)); + break; + case FLOAT: + columnBuilder.writeFloat((float) tDigest.quantile(percentage)); + break; + case DOUBLE: + columnBuilder.writeDouble(tDigest.quantile(percentage)); + break; + default: + throw new UnSupportedDataTypeException( + String.format( + "Unsupported data type in APPROX_COUNT_DISTINCT Aggregation: %s", seriesDataType)); Review Comment: The error message incorrectly refers to 'APPROX_COUNT_DISTINCT' but should refer to 'APPROX_PERCENTILE' since this is for the percentile aggregation function. ```suggestion "Unsupported data type in APPROX_PERCENTILE Aggregation: %s", seriesDataType)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
