JackieTien97 commented on code in PR #17294:
URL: https://github.com/apache/iotdb/pull/17294#discussion_r2944019300


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+
+public class TreeNonAlignedDeviceViewAggregationScanOperator
+    extends AbstractDefaultAggTableScanOperator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(
+          TreeNonAlignedDeviceViewAggregationScanOperator.class);
+
+  private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
+  private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
childOperatorGenerator;
+
+  private Operator child;
+  private List<Operator> dataSourceOperators;
+
+  public TreeNonAlignedDeviceViewAggregationScanOperator(
+      AbstractAggTableScanOperatorParameter parameter,
+      IDeviceID.TreeDeviceIdColumnValueExtractor extractor,
+      DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
childOperatorGenerator) {
+    super(parameter);
+    this.extractor = extractor;
+    this.childOperatorGenerator = childOperatorGenerator;
+    constructCurrentDeviceOperatorTree();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
+    return (String) extractor.extract(deviceEntry.getDeviceID(), 
idColumnIndex);
+  }
+
+  @Override
+  protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {
+    try {
+      // Try to calculate from cached data
+      if (calcFromCachedData()) {
+        updateResultTsBlock();
+        checkIfAllAggregatorHasFinalResult();
+        return Optional.of(true);
+      }
+
+      // Read from child operator
+      if (readAndCalcFromChild()) {
+        updateResultTsBlock();
+        checkIfAllAggregatorHasFinalResult();
+        return Optional.of(true);
+      }
+
+      // No more data from child, finish the current device
+      if (!child.hasNext()) {
+        updateResultTsBlock();
+        timeIterator.resetCurTimeRange();
+        nextDevice();
+
+        if (currentDeviceIndex >= deviceCount) {
+          // All devices consumed
+          timeIterator.setFinished();
+          return Optional.of(true);
+        } else {
+          // More devices to process, child should provide next device's data
+          return Optional.of(false);
+        }
+      }
+
+      return Optional.of(false);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while processing aggregation from 
child operator", e);
+    }
+  }
+
+  /** Read data from child operator and calculate aggregation. */
+  private boolean readAndCalcFromChild() throws Exception {
+    long start = System.nanoTime();
+
+    while (System.nanoTime() - start < leftRuntimeOfOneNextCall && 
child.hasNext()) {
+      // Get next TsBlock from child
+      TsBlock tsBlock = child.nextWithTimer();
+      if (tsBlock == null || tsBlock.isEmpty()) {
+        continue;
+      }
+
+      // Calculate aggregation from raw data
+      if (calcUsingRawData(tsBlock)) {
+        return true;
+      }
+
+      // If not finished, continue reading from child
+    }
+
+    return false;

Review Comment:
   ```suggestion
       if (child.hasNext()) {
         // Get next TsBlock from child
         TsBlock tsBlock = child.nextWithTimer();
         if (tsBlock == null || tsBlock.isEmpty()) {
           return false;
         }
   
         // Calculate aggregation from raw data
         return calcUsingRawData(tsBlock);
       }
       return false;
   ```
   
   no need to do flow and time control here, what you need to do is processing 
one batch and then give the control back to super class where time slice 
control and result builder full judgement happend.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+
+public class TreeNonAlignedDeviceViewAggregationScanOperator
+    extends AbstractDefaultAggTableScanOperator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(
+          TreeNonAlignedDeviceViewAggregationScanOperator.class);
+
+  private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
+  private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
childOperatorGenerator;
+
+  private Operator child;
+  private List<Operator> dataSourceOperators;
+
+  public TreeNonAlignedDeviceViewAggregationScanOperator(
+      AbstractAggTableScanOperatorParameter parameter,
+      IDeviceID.TreeDeviceIdColumnValueExtractor extractor,
+      DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
childOperatorGenerator) {
+    super(parameter);
+    this.extractor = extractor;
+    this.childOperatorGenerator = childOperatorGenerator;
+    constructCurrentDeviceOperatorTree();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
+    return (String) extractor.extract(deviceEntry.getDeviceID(), 
idColumnIndex);
+  }
+
+  @Override
+  protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {

Review Comment:
   ```suggestion
     protected Optional<Boolean> 
calculateAggregationResultForCurrentTimeRange() throws Exception  {
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import 
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING;
+
+public class TreeNonAlignedDeviceViewAggregationScanOperator
+    extends AbstractDefaultAggTableScanOperator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(
+          TreeNonAlignedDeviceViewAggregationScanOperator.class);
+
+  private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
+  private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
childOperatorGenerator;
+
+  private Operator child;
+  private List<Operator> dataSourceOperators;
+
+  public TreeNonAlignedDeviceViewAggregationScanOperator(
+      AbstractAggTableScanOperatorParameter parameter,
+      IDeviceID.TreeDeviceIdColumnValueExtractor extractor,
+      DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
childOperatorGenerator) {
+    super(parameter);
+    this.extractor = extractor;
+    this.childOperatorGenerator = childOperatorGenerator;
+    constructCurrentDeviceOperatorTree();
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
+  }
+
+  @Override
+  String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) {
+    return (String) extractor.extract(deviceEntry.getDeviceID(), 
idColumnIndex);
+  }
+
+  @Override
+  protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() {
+    try {
+      // Try to calculate from cached data
+      if (calcFromCachedData()) {
+        updateResultTsBlock();
+        checkIfAllAggregatorHasFinalResult();
+        return Optional.of(true);
+      }
+
+      // Read from child operator
+      if (readAndCalcFromChild()) {
+        updateResultTsBlock();
+        checkIfAllAggregatorHasFinalResult();
+        return Optional.of(true);
+      }
+
+      // No more data from child, finish the current device
+      if (!child.hasNext()) {
+        updateResultTsBlock();
+        timeIterator.resetCurTimeRange();
+        nextDevice();
+
+        if (currentDeviceIndex >= deviceCount) {
+          // All devices consumed
+          timeIterator.setFinished();
+          return Optional.of(true);
+        } else {
+          // More devices to process, child should provide next device's data
+          return Optional.of(false);
+        }
+      }
+
+      return Optional.of(false);
+    } catch (Exception e) {
+      throw new RuntimeException("Error while processing aggregation from 
child operator", e);
+    }

Review Comment:
   ```suggestion
   ```
   Exceptions from child operations are wrapped in RuntimeException. The parent 
class AbstractDefaultAggTableScanOperator declares this method as throws 
Exception, so you could propagate the checked exception directly instead of 
wrapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to