JackieTien97 commented on code in PR #14623:
URL: https://github.com/apache/iotdb/pull/14623#discussion_r1903771542


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java:
##########
@@ -366,4 +412,169 @@ private static TableAccumulator 
createBuiltinSingleInputAccumulator(
   public interface KeepEvaluator {
     boolean apply(long keep);
   }
+
+  private static class DistinctAccumulator implements TableAccumulator {
+    private final TableAccumulator accumulator;
+    private MarkDistinctHash hash;
+
+    private final List<Type> inputTypes;
+
+    private DistinctAccumulator(TableAccumulator accumulator, List<Type> 
inputTypes) {
+      this.accumulator = requireNonNull(accumulator, "accumulator is null");
+      this.hash = new MarkDistinctHash(inputTypes, false, UpdateMemory.NOOP);
+      this.inputTypes = inputTypes;
+    }
+
+    @Override
+    public long getEstimatedSize() {
+      return hash.getEstimatedSize() + accumulator.getEstimatedSize();
+    }
+
+    @Override
+    public TableAccumulator copy() {
+      throw new UnsupportedOperationException(
+          "Distinct aggregation function state can not be copied");
+    }
+
+    @Override
+    public void addInput(Column[] arguments, AggregationMask mask) {
+      // 1. filter out positions based on mask, if present
+      Column[] filtered = mask.filterBlock(arguments);
+
+      // 2. compute a distinct mask column
+      Column distinctMask = hash.markDistinctRows(filtered);
+
+      // 3. update original mask to the new distinct mask
+      mask.reset(filtered[0].getPositionCount());
+      mask.applyMaskBlock(distinctMask);
+      if (mask.isSelectNone()) {
+        return;
+      }
+
+      // 4. feed a TsBlock with a new mask to the underlying aggregation
+      accumulator.addInput(filtered, mask);
+    }
+
+    @Override
+    public void addIntermediate(Column argument) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void evaluateFinal(ColumnBuilder columnBuilder) {
+      accumulator.evaluateFinal(columnBuilder);
+    }
+
+    @Override
+    public boolean hasFinalResult() {
+      return accumulator.hasFinalResult();
+    }
+
+    @Override
+    public void addStatistics(Statistics[] statistics) {
+      throw new UnsupportedOperationException("Distinct aggregation function 
can not be push down");
+    }
+
+    @Override
+    public void reset() {
+      accumulator.reset();
+      hash = new MarkDistinctHash(inputTypes, false, UpdateMemory.NOOP);
+    }
+  }
+
+  private static class DistinctGroupedAccumulator implements 
GroupedAccumulator {
+    private final GroupedAccumulator accumulator;
+    private MarkDistinctHash hash;
+
+    private final List<Type> inputTypes;
+
+    private DistinctGroupedAccumulator(GroupedAccumulator accumulator, 
List<Type> inputTypes) {
+      this.accumulator = requireNonNull(accumulator, "accumulator is null");
+      this.inputTypes =
+          ImmutableList.<Type>builder()
+              .add(INT32) // group id column
+              .addAll(inputTypes)
+              .build();
+      this.hash = new MarkDistinctHash(this.inputTypes, false, 
UpdateMemory.NOOP);
+    }
+
+    @Override
+    public long getEstimatedSize() {
+      return hash.getEstimatedSize() + accumulator.getEstimatedSize();
+    }
+
+    @Override
+    public void setGroupCount(long groupCount) {
+      accumulator.setGroupCount(groupCount);
+    }
+
+    @Override
+    public void addInput(int[] groupIds, Column[] arguments, AggregationMask 
mask) {
+      // 1. filter out positions based on mask
+      groupIds = maskGroupIds(groupIds, mask);
+      Column[] filtered = mask.filterBlock(arguments);
+
+      // 2. compute a distinct mask column (including the group id)
+      Column distinctMask =
+          hash.markDistinctRows(
+              Stream.concat(
+                      Stream.of(new IntColumn(groupIds.length, 
Optional.empty(), groupIds)),
+                      Arrays.stream(filtered))
+                  .toArray(Column[]::new));
+
+      // 3. update original mask to the new distinct mask
+      mask.reset(filtered[0].getPositionCount());
+      mask.applyMaskBlock(distinctMask);
+      if (mask.isSelectNone()) {
+        return;
+      }
+
+      // 4. feed a TsBlock with a new mask to the underlying aggregation
+      accumulator.addInput(groupIds, filtered, mask);
+    }
+
+    private static int[] maskGroupIds(int[] groupIds, AggregationMask mask) {
+      if (mask.isSelectAll() || mask.isSelectNone()) {
+        return groupIds;
+      }
+
+      int[] newGroupIds = new int[mask.getSelectedPositionCount()];
+      int[] selectedPositions = mask.getSelectedPositions();
+      for (int i = 0; i < newGroupIds.length; i++) {
+        newGroupIds[i] = groupIds[selectedPositions[i]];
+      }
+      return newGroupIds;
+    }
+
+    @Override
+    public void addIntermediate(int[] groupIds, Column argument) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) 
{
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
+      accumulator.evaluateFinal(groupId, columnBuilder);
+    }
+
+    @Override
+    public void prepareFinal() {
+      accumulator.prepareFinal();
+    }

Review Comment:
   It seems that `Trino` is an empty method implementation?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/MaskedRecordIterator.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.iotdb.commons.udf.access.RecordIterator;
+import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
+import org.apache.iotdb.udf.api.relational.access.Record;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.DateUtils;
+
+import java.time.LocalDate;
+import java.util.List;
+
+public class MaskedRecordIterator extends RecordIterator {
+  private final int[] selectedPositions;
+
+  public MaskedRecordIterator(
+      List<Column> childrenColumns, List<Type> dataTypes, AggregationMask 
mask) {
+    super(childrenColumns, dataTypes, mask.getSelectedPositionCount());
+    this.selectedPositions = mask.getSelectedPositions();
+  }
+
+  @Override
+  public Record next() {
+    final int index = selectedPositions[currentIndex++];

Review Comment:
   extract a protect method getCurrentIndex in RecordIterator, and then only 
override that method in subclass instead of duplicate so much code in subclass.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AggregationMask.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public final class AggregationMask {
+  private static final int[] NO_SELECTED_POSITIONS = new int[0];
+
+  private int positionCount;
+  private int[] selectedPositions = NO_SELECTED_POSITIONS;
+  private int selectedPositionCount;
+
+  public static AggregationMask createSelectNone(int positionCount) {
+    return createSelectedPositions(positionCount, NO_SELECTED_POSITIONS, 0);
+  }
+
+  public static AggregationMask createSelectAll(int positionCount) {
+    return new AggregationMask(positionCount);
+  }
+
+  /**
+   * Creates a mask with the given selected positions. Selected positions must 
be sorted in
+   * ascending order.
+   */
+  public static AggregationMask createSelectedPositions(
+      int positionCount, int[] selectedPositions, int selectedPositionCount) {
+    return new AggregationMask(positionCount, selectedPositions, 
selectedPositionCount);
+  }
+
+  private AggregationMask(int positionCount) {
+    reset(positionCount);
+  }
+
+  private AggregationMask(int positionCount, int[] selectedPositions, int 
selectedPositionCount) {
+    checkArgument(positionCount >= 0, "positionCount is negative");
+    checkArgument(selectedPositionCount >= 0, "selectedPositionCount is 
negative");
+    checkArgument(
+        selectedPositionCount <= positionCount,
+        "selectedPositionCount cannot be greater than positionCount");
+    requireNonNull(selectedPositions, "selectedPositions is null");
+    checkArgument(
+        selectedPositions.length >= selectedPositionCount,
+        "selectedPosition is smaller than selectedPositionCount");
+
+    reset(positionCount);
+    this.selectedPositions = selectedPositions;
+    this.selectedPositionCount = selectedPositionCount;
+  }
+
+  public void reset(int positionCount) {
+    checkArgument(positionCount >= 0, "positionCount is negative");
+    this.positionCount = positionCount;
+    this.selectedPositionCount = positionCount;
+  }
+
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  public boolean isSelectAll() {
+    return positionCount == selectedPositionCount;
+  }
+
+  public boolean isSelectNone() {
+    return selectedPositionCount == 0;
+  }
+
+  public Column[] filterBlock(Column[] columns) {
+    if (isSelectAll()) {
+      return columns;
+    }
+    if (isSelectNone()) {
+      return Arrays.stream(columns).map(column -> column.getRegion(0, 
0)).toArray(Column[]::new);
+    }
+    return getPositions(
+        columns, Arrays.copyOf(selectedPositions, selectedPositionCount), 0, 
selectedPositionCount);
+  }
+
+  private Column[] getPositions(
+      Column[] originalColumns, int[] retainedPositions, int offset, int 
length) {
+    requireNonNull(retainedPositions, "retainedPositions is null");
+
+    Column[] columns = new Column[originalColumns.length];
+    for (int i = 0; i < originalColumns.length; i++) {
+      columns[i] = getPositions(originalColumns[i], retainedPositions, offset, 
length);
+    }
+    return columns;
+  }
+
+  private Column getPositions(
+      Column originalColumn, int[] retainedPositions, int offset, int length) {
+    requireNonNull(retainedPositions, "retainedPositions is null");
+    if (offset < 0 || length < 0 || offset + length > 
retainedPositions.length) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid offset %s and length %s in array with %s elements",
+              offset, length, retainedPositions.length));
+    }
+
+    if (length == 0) {
+      return originalColumn.getRegionCopy(0, 0);
+    }
+    if (length == 1) {
+      return originalColumn.getRegion(retainedPositions[offset], 1);
+    }
+
+    if (originalColumn instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(
+          ((RunLengthEncodedColumn) originalColumn).getValue(), positionCount);

Review Comment:
   ```suggestion
             ((RunLengthEncodedColumn) originalColumn).getValue(), length);
   ```



##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/access/RecordIterator.java:
##########
@@ -34,10 +34,10 @@
 
 public class RecordIterator implements Iterator<Record> {
 
-  private final List<Column> childrenColumns;
-  private final List<org.apache.tsfile.read.common.type.Type> dataTypes;
-  private final int positionCount;
-  private int currentIndex;
+  protected final List<Column> childrenColumns;
+  protected final List<org.apache.tsfile.read.common.type.Type> dataTypes;
+  protected final int positionCount;
+  protected int currentIndex;

Review Comment:
   ```suggestion
     private final List<Column> childrenColumns;
     private final List<org.apache.tsfile.read.common.type.Type> dataTypes;
     private final int positionCount;
     private int currentIndex;
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AggregationMask.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
+import org.apache.tsfile.read.common.block.column.BooleanColumnBuilder;
+import org.apache.tsfile.read.common.block.column.DoubleColumnBuilder;
+import org.apache.tsfile.read.common.block.column.FloatColumnBuilder;
+import org.apache.tsfile.read.common.block.column.IntColumnBuilder;
+import org.apache.tsfile.read.common.block.column.LongColumnBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public final class AggregationMask {
+  private static final int[] NO_SELECTED_POSITIONS = new int[0];
+
+  private int positionCount;
+  private int[] selectedPositions = NO_SELECTED_POSITIONS;
+  private int selectedPositionCount;
+
+  public static AggregationMask createSelectNone(int positionCount) {
+    return createSelectedPositions(positionCount, NO_SELECTED_POSITIONS, 0);
+  }
+
+  public static AggregationMask createSelectAll(int positionCount) {
+    return new AggregationMask(positionCount);
+  }
+
+  /**
+   * Creates a mask with the given selected positions. Selected positions must 
be sorted in
+   * ascending order.
+   */
+  public static AggregationMask createSelectedPositions(
+      int positionCount, int[] selectedPositions, int selectedPositionCount) {
+    return new AggregationMask(positionCount, selectedPositions, 
selectedPositionCount);
+  }
+
+  private AggregationMask(int positionCount) {
+    reset(positionCount);
+  }
+
+  private AggregationMask(int positionCount, int[] selectedPositions, int 
selectedPositionCount) {
+    checkArgument(positionCount >= 0, "positionCount is negative");
+    checkArgument(selectedPositionCount >= 0, "selectedPositionCount is 
negative");
+    checkArgument(
+        selectedPositionCount <= positionCount,
+        "selectedPositionCount cannot be greater than positionCount");
+    requireNonNull(selectedPositions, "selectedPositions is null");
+    checkArgument(
+        selectedPositions.length >= selectedPositionCount,
+        "selectedPosition is smaller than selectedPositionCount");
+
+    reset(positionCount);
+    this.selectedPositions = selectedPositions;
+    this.selectedPositionCount = selectedPositionCount;
+  }
+
+  public void reset(int positionCount) {
+    checkArgument(positionCount >= 0, "positionCount is negative");
+    this.positionCount = positionCount;
+    this.selectedPositionCount = positionCount;
+  }
+
+  public int getPositionCount() {
+    return positionCount;
+  }
+
+  public boolean isSelectAll() {
+    return positionCount == selectedPositionCount;
+  }
+
+  public boolean isSelectNone() {
+    return selectedPositionCount == 0;
+  }
+
+  public Column[] filterBlock(Column[] columns) {
+    if (isSelectAll()) {
+      return columns;
+    }
+    if (isSelectNone()) {
+      return Arrays.stream(columns).map(column -> column.getRegion(0, 
0)).toArray(Column[]::new);
+    }
+    return getPositions(
+        columns, Arrays.copyOf(selectedPositions, selectedPositionCount), 0, 
selectedPositionCount);
+  }
+
+  private Column[] getPositions(
+      Column[] originalColumns, int[] retainedPositions, int offset, int 
length) {
+    requireNonNull(retainedPositions, "retainedPositions is null");
+
+    Column[] columns = new Column[originalColumns.length];
+    for (int i = 0; i < originalColumns.length; i++) {
+      columns[i] = getPositions(originalColumns[i], retainedPositions, offset, 
length);
+    }
+    return columns;
+  }
+
+  private Column getPositions(
+      Column originalColumn, int[] retainedPositions, int offset, int length) {
+    requireNonNull(retainedPositions, "retainedPositions is null");
+    if (offset < 0 || length < 0 || offset + length > 
retainedPositions.length) {
+      throw new IndexOutOfBoundsException(
+          format(
+              "Invalid offset %s and length %s in array with %s elements",
+              offset, length, retainedPositions.length));
+    }
+
+    if (length == 0) {
+      return originalColumn.getRegionCopy(0, 0);
+    }
+    if (length == 1) {
+      return originalColumn.getRegion(retainedPositions[offset], 1);
+    }
+
+    if (originalColumn instanceof RunLengthEncodedColumn) {
+      return new RunLengthEncodedColumn(
+          ((RunLengthEncodedColumn) originalColumn).getValue(), positionCount);
+    }
+
+    ColumnBuilder builder;
+    switch (originalColumn.getDataType()) {

Review Comment:
   ```
       ColumnBuilder builder = originalColumn.constructNewColumnBuilder(length);
       for (int i = 0; i < length; i++) {
         int index = retainedPositions[offset + i];
         if (originalColumn.isNull(index)) {
           builder.appendNull();
         } else {
           builder.write(originalColumn, index);
         }
       }
   ```
   add a new method `constructNewColumnBuilder` in Column interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to