JackieTien97 commented on code in PR #17294: URL: https://github.com/apache/iotdb/pull/17294#discussion_r2944019300
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING; + +public class TreeNonAlignedDeviceViewAggregationScanOperator + extends AbstractDefaultAggTableScanOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance( + TreeNonAlignedDeviceViewAggregationScanOperator.class); + + private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor; + private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator; + + private Operator child; + private List<Operator> dataSourceOperators; + + public TreeNonAlignedDeviceViewAggregationScanOperator( + AbstractAggTableScanOperatorParameter parameter, + IDeviceID.TreeDeviceIdColumnValueExtractor extractor, + DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator) { + super(parameter); + this.extractor = extractor; + this.childOperatorGenerator = childOperatorGenerator; + constructCurrentDeviceOperatorTree(); + } + + @Override + public ListenableFuture<?> isBlocked() { + return child.isBlocked(); + } + + @Override + String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + return (String) extractor.extract(deviceEntry.getDeviceID(), idColumnIndex); + } + + @Override + protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() { + try { + // Try to calculate from cached data + if (calcFromCachedData()) { + updateResultTsBlock(); + checkIfAllAggregatorHasFinalResult(); + return Optional.of(true); + } + + // Read from child operator + if (readAndCalcFromChild()) { + updateResultTsBlock(); + checkIfAllAggregatorHasFinalResult(); + return Optional.of(true); + } + + // No more data from child, finish the current device + if (!child.hasNext()) { + updateResultTsBlock(); + timeIterator.resetCurTimeRange(); + nextDevice(); + + if (currentDeviceIndex >= deviceCount) { + // All devices consumed + timeIterator.setFinished(); + return Optional.of(true); + } else { + // More devices to process, child should provide next device's data + return Optional.of(false); + } + } + + return Optional.of(false); + } catch (Exception e) { + throw new RuntimeException("Error while processing aggregation from child operator", e); + } + } + + /** Read data from child operator and calculate aggregation. */ + private boolean readAndCalcFromChild() throws Exception { + long start = System.nanoTime(); + + while (System.nanoTime() - start < leftRuntimeOfOneNextCall && child.hasNext()) { + // Get next TsBlock from child + TsBlock tsBlock = child.nextWithTimer(); + if (tsBlock == null || tsBlock.isEmpty()) { + continue; + } + + // Calculate aggregation from raw data + if (calcUsingRawData(tsBlock)) { + return true; + } + + // If not finished, continue reading from child + } + + return false; Review Comment: ```suggestion if (child.hasNext()) { // Get next TsBlock from child TsBlock tsBlock = child.nextWithTimer(); if (tsBlock == null || tsBlock.isEmpty()) { return false; } // Calculate aggregation from raw data return calcUsingRawData(tsBlock); } return false; ``` no need to do flow and time control here, what you need to do is processing one batch and then give the control back to super class where time slice control and result builder full judgement happend. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING; + +public class TreeNonAlignedDeviceViewAggregationScanOperator + extends AbstractDefaultAggTableScanOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance( + TreeNonAlignedDeviceViewAggregationScanOperator.class); + + private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor; + private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator; + + private Operator child; + private List<Operator> dataSourceOperators; + + public TreeNonAlignedDeviceViewAggregationScanOperator( + AbstractAggTableScanOperatorParameter parameter, + IDeviceID.TreeDeviceIdColumnValueExtractor extractor, + DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator) { + super(parameter); + this.extractor = extractor; + this.childOperatorGenerator = childOperatorGenerator; + constructCurrentDeviceOperatorTree(); + } + + @Override + public ListenableFuture<?> isBlocked() { + return child.isBlocked(); + } + + @Override + String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + return (String) extractor.extract(deviceEntry.getDeviceID(), idColumnIndex); + } + + @Override + protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() { Review Comment: ```suggestion protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() throws Exception { ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeNonAlignedDeviceViewAggregationScanOperator.java: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.source.relational; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry; +import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; +import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.Optional; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING; + +public class TreeNonAlignedDeviceViewAggregationScanOperator + extends AbstractDefaultAggTableScanOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance( + TreeNonAlignedDeviceViewAggregationScanOperator.class); + + private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor; + private final DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator; + + private Operator child; + private List<Operator> dataSourceOperators; + + public TreeNonAlignedDeviceViewAggregationScanOperator( + AbstractAggTableScanOperatorParameter parameter, + IDeviceID.TreeDeviceIdColumnValueExtractor extractor, + DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator childOperatorGenerator) { + super(parameter); + this.extractor = extractor; + this.childOperatorGenerator = childOperatorGenerator; + constructCurrentDeviceOperatorTree(); + } + + @Override + public ListenableFuture<?> isBlocked() { + return child.isBlocked(); + } + + @Override + String getNthIdColumnValue(DeviceEntry deviceEntry, int idColumnIndex) { + return (String) extractor.extract(deviceEntry.getDeviceID(), idColumnIndex); + } + + @Override + protected Optional<Boolean> calculateAggregationResultForCurrentTimeRange() { + try { + // Try to calculate from cached data + if (calcFromCachedData()) { + updateResultTsBlock(); + checkIfAllAggregatorHasFinalResult(); + return Optional.of(true); + } + + // Read from child operator + if (readAndCalcFromChild()) { + updateResultTsBlock(); + checkIfAllAggregatorHasFinalResult(); + return Optional.of(true); + } + + // No more data from child, finish the current device + if (!child.hasNext()) { + updateResultTsBlock(); + timeIterator.resetCurTimeRange(); + nextDevice(); + + if (currentDeviceIndex >= deviceCount) { + // All devices consumed + timeIterator.setFinished(); + return Optional.of(true); + } else { + // More devices to process, child should provide next device's data + return Optional.of(false); + } + } + + return Optional.of(false); + } catch (Exception e) { + throw new RuntimeException("Error while processing aggregation from child operator", e); + } Review Comment: ```suggestion ``` Exceptions from child operations are wrapped in RuntimeException. The parent class AbstractDefaultAggTableScanOperator declares this method as throws Exception, so you could propagate the checked exception directly instead of wrapping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
