JackieTien97 commented on code in PR #17294:
URL: https://github.com/apache/iotdb/pull/17294#discussion_r2944024785


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+
+public class TreeNonAlignedDeviceViewAggregationScanOperator

Review Comment:
   you need to override close method to close `currentDeviceRootOperator` just 
like `DeviceIteratorScanOperator`.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -2821,8 +2831,8 @@ private GroupedAggregator buildGroupByAggregator(
   }
 
   @Override
-  public Operator visitAggregationTreeDeviceViewScan(
-      AggregationTreeDeviceViewScanNode node, LocalExecutionPlanContext 
context) {
+  public Operator visitAlignedAggregationTreeDeviceViewScan(

Review Comment:
   add `visitAggregationTreeDeviceViewScanNode` method and directly throw 
UnsupportedException for that, avoiding `AggregationTreeDeviceViewScanNode` 
visit its super class `AggregationTableScanNode`



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+
+public class TreeNonAlignedDeviceViewAggregationScanOperator
+    extends AbstractDefaultAggTableScanOperator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(
+          TreeNonAlignedDeviceViewAggregationScanOperator.class);
+
+  private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
+  private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
childOperatorGenerator;
+
+  private Operator child;
+  private List<Operator> dataSourceOperators;
+
+  public TreeNonAlignedDeviceViewAggregationScanOperator(
+      AbstractAggTableScanOperatorParameter parameter,
+      IDeviceID.TreeDeviceIdColumnValueExtractor extractor,
+      DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
childOperatorGenerator) {
+    super(parameter);
+    this.extractor = extractor;
+    this.childOperatorGenerator = childOperatorGenerator;
+    constructCurrentDeviceOperatorTree();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
+    return (String) extractor.extract(deviceEntry.getDeviceID(), 
idColumnIndex);
+  }
+
+  @Override
+  protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {
+    try {
+      // Try to calculate from cached data
+      if (calcFromCachedData()) {
+        updateResultTsBlock();
+        checkIfAllAggregatorHasFinalResult();
+        return Optional.of(true);
+      }
+
+      // Read from child operator
+      if (readAndCalcFromChild()) {
+        updateResultTsBlock();
+        checkIfAllAggregatorHasFinalResult();
+        return Optional.of(true);
+      }
+
+      // No more data from child, finish the current device
+      if (!child.hasNext()) {
+        updateResultTsBlock();
+        timeIterator.resetCurTimeRange();
+        nextDevice();
+
+        if (currentDeviceIndex >= deviceCount) {
+          // All devices consumed
+          timeIterator.setFinished();
+          return Optional.of(true);
+        } else {
+          // More devices to process, child should provide next device's data
+          return Optional.of(false);
+        }
+      }
+
+      return Optional.of(false);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while processing aggregation from 
child operator", e);
+    }
+  }
+
+  /** Read data from child operator and calculate aggregation. */
+  private boolean readAndCalcFromChild() throws Exception {
+    long start = System.nanoTime();
+
+    while (System.nanoTime() - start < leftRuntimeOfOneNextCall && 
child.hasNext()) {
+      // Get next TsBlock from child
+      TsBlock tsBlock = child.nextWithTimer();
+      if (tsBlock == null || tsBlock.isEmpty()) {
+        continue;
+      }
+
+      // Calculate aggregation from raw data
+      if (calcUsingRawData(tsBlock)) {
+        return true;
+      }
+
+      // If not finished, continue reading from child
+    }
+
+    return false;
+  }
+
+  @Override
+  protected void nextDevice() throws Exception {
+    currentDeviceIndex++;
+    childOperatorGenerator.getCurrentDeviceStartCloseOperator().close();
+    if (currentDeviceIndex >= deviceEntries.size()) {
+      return;
+    }
+    constructCurrentDeviceOperatorTree();
+    queryDataSource.reset();
+    initQueryDataSource(queryDataSource);
+    this.operatorContext.recordSpecifiedInfo(
+        CURRENT_DEVICE_INDEX_STRING, Integer.toString(currentDeviceIndex));
+  }
+
+  private void constructCurrentDeviceOperatorTree() {
+    if (this.deviceEntries.isEmpty()) {
+      return;
+    }
+    if (this.deviceEntries.get(this.currentDeviceIndex) == null) {
+      throw new IllegalStateException(
+          "Device entries of index " + this.currentDeviceIndex + " is empty");
+    }
+    DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
+
+    childOperatorGenerator.generateCurrentDeviceOperatorTree(deviceEntry, 
false);
+    child = childOperatorGenerator.getCurrentDeviceRootOperator();
+    dataSourceOperators = 
childOperatorGenerator.getCurrentDeviceDataSourceOperators();
+  }
+
+  /** same with {@link 
DeviceIteratorScanOperator#initQueryDataSource(IQueryDataSource)} */
+  @Override
+  public void initQueryDataSource(IQueryDataSource dataSource) {
+    if (resultTsBlockBuilder == null) {
+      // only need to do this when init firstly
+      this.queryDataSource = (QueryDataSource) dataSource;
+      this.resultTsBlockBuilder = new TsBlockBuilder(getResultDataTypes());
+    }
+
+    if (dataSourceOperators == null || dataSourceOperators.isEmpty()) {
+      return;
+    }
+
+    for (Operator operator : dataSourceOperators) {
+      ((AbstractDataSourceOperator) operator).initQueryDataSource(dataSource);
+    }
+  }
+
+  @Override
+  protected void checkIfAllAggregatorHasFinalResult() throws Exception {
+    if (allAggregatorsHasFinalResult
+        && (timeIterator.getType() == 
ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR
+            || tableAggregators.isEmpty())) {
+      nextDevice();
+      inputTsBlock = null;
+
+      if (currentDeviceIndex >= deviceCount) {
+        // all devices have been consumed
+        timeIterator.setFinished();
+      }
+
+      allAggregatorsHasFinalResult = false;
+    }
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return child.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return child.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return child.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long ramBytesUsed() {

Review Comment:
   The estimation includes child and dataSourceOperators but doesn't include 
the childOperatorGenerator, which may hold references to the operator tree 
templates. Minor memory accounting concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to