JackieTien97 commented on code in PR #17294:
URL: https://github.com/apache/iotdb/pull/17294#discussion_r2940100537
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -2852,6 +2862,70 @@ public Operator visitAggregationTreeDeviceViewScan(
return treeAlignedDeviceViewAggregationScanOperator;
}
+ @Override
+ public Operator visitNonAlignedAggregationTreeDeviceViewScan(
+ NonAlignedAggregationTreeDeviceViewScanNode node,
LocalExecutionPlanContext context) {
+ QualifiedObjectName qualifiedObjectName = node.getQualifiedObjectName();
+ TsTable tsTable =
+ DataNodeTableCache.getInstance()
+ .getTable(qualifiedObjectName.getDatabaseName(),
qualifiedObjectName.getObjectName());
+ IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor =
+
createTreeDeviceIdColumnValueExtractor(DataNodeTreeViewSchemaUtils.getPrefixPath(tsTable));
+
+ AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
parameter =
+ constructAbstractAggTableScanOperatorParameter(
+ node,
+ context,
+ TreeAlignedDeviceViewAggregationScanOperator.class.getSimpleName(),
Review Comment:
```suggestion
TreeNonAlignedDeviceViewAggregationScanOperator.class.getSimpleName(),
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -2852,6 +2862,70 @@ public Operator visitAggregationTreeDeviceViewScan(
return treeAlignedDeviceViewAggregationScanOperator;
}
+ @Override
+ public Operator visitNonAlignedAggregationTreeDeviceViewScan(
+ NonAlignedAggregationTreeDeviceViewScanNode node,
LocalExecutionPlanContext context) {
+ QualifiedObjectName qualifiedObjectName = node.getQualifiedObjectName();
+ TsTable tsTable =
+ DataNodeTableCache.getInstance()
+ .getTable(qualifiedObjectName.getDatabaseName(),
qualifiedObjectName.getObjectName());
+ IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor =
+
createTreeDeviceIdColumnValueExtractor(DataNodeTreeViewSchemaUtils.getPrefixPath(tsTable));
+
+ AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
parameter =
+ constructAbstractAggTableScanOperatorParameter(
+ node,
+ context,
+ TreeAlignedDeviceViewAggregationScanOperator.class.getSimpleName(),
+ node.getMeasurementColumnNameMap(),
+ tsTable.getCachedTableTTL());
+
+ // construct source operator (generator)
+ TreeNonAlignedDeviceViewScanNode scanNode =
+ new TreeNonAlignedDeviceViewScanNode(
+ node.getPlanNodeId(),
+ node.getQualifiedObjectName(),
+ // the outputSymbols of AggTableScanNode is not equals with
TableScanNode
+ parameter.getOutputSymbols(),
+ node.getAssignments(),
+ node.getDeviceEntries(),
+ node.getTagAndAttributeIndexMap(),
+ node.getScanOrder(),
+ node.getTimePredicate().orElse(null),
+ node.getPushDownPredicate(),
+ node.getPushDownLimit(),
+ node.getPushDownOffset(),
+ node.isPushLimitToEachDevice(),
+ true,
+ node.getTreeDBName(),
+ node.getMeasurementColumnNameMap());
+
+ Operator sourceOperator = visitTreeNonAlignedDeviceViewScan(scanNode,
context);
+ if (sourceOperator instanceof DeviceIteratorScanOperator) {
+ // Use deviceChildOperatorTreeGenerator directly, we will control switch
of devices in
+ // TreeNonAlignedDeviceViewAggregationScanOperator
+ TreeNonAlignedDeviceViewAggregationScanOperator aggTableScanOperator =
+ new TreeNonAlignedDeviceViewAggregationScanOperator(
+ parameter,
+ idColumnValueExtractor,
+ ((DeviceIteratorScanOperator)
sourceOperator).getDeviceChildOperatorTreeGenerator());
+
+ addSource(
+ aggTableScanOperator,
+ context,
+ node,
+ parameter.getMeasurementColumnNames(),
+ parameter.getMeasurementSchemas(),
+ parameter.getAllSensors(),
+ AggregationTableScanNode.class.getSimpleName());
Review Comment:
```suggestion
NonAlignedAggregationTreeDeviceViewScanNode.class.getSimpleName());
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java:
##########
@@ -1188,70 +1190,19 @@ private boolean prefixMatched(OrderingScheme
childOrdering, List<Symbol> preGrou
@Override
public List<PlanNode> visitAggregationTableScan(
AggregationTableScanNode node, PlanContext context) {
- String dbName =
- node instanceof AggregationTreeDeviceViewScanNode
- ? ((AggregationTreeDeviceViewScanNode) node).getTreeDBName()
- : node.getQualifiedObjectName().getDatabaseName();
+ String dbName = node.getQualifiedObjectName().getDatabaseName();
DataPartition dataPartition = analysis.getDataPartitionInfo();
if (dbName == null || dataPartition == null) {
node.setRegionReplicaSet(NOT_ASSIGNED);
return Collections.singletonList(node);
}
- boolean needSplit = false;
- List<List<TRegionReplicaSet>> regionReplicaSetsList = new ArrayList<>();
- if (dataPartition != null) {
- Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>> seriesSlotMap =
- dataPartition.getDataPartitionMap().get(dbName);
- if (seriesSlotMap == null) {
- throw new SemanticException(
- String.format("Given queried database: %s is not exist!", dbName));
- }
- Map<Integer, List<TRegionReplicaSet>> cachedSeriesSlotWithRegions = new
HashMap<>();
- for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
- List<TRegionReplicaSet> regionReplicaSets =
- getDeviceReplicaSets(
- dataPartition,
- seriesSlotMap,
- deviceEntry.getDeviceID(),
- node.getTimeFilter(),
- cachedSeriesSlotWithRegions);
- if (regionReplicaSets.size() > 1) {
- needSplit = true;
- context.deviceCrossRegion = true;
-
queryContext.setNeedUpdateScanNumForLastQuery(node.mayUseLastCache());
- }
- regionReplicaSetsList.add(regionReplicaSets);
- }
- }
-
- if (regionReplicaSetsList.isEmpty()) {
- regionReplicaSetsList =
Collections.singletonList(Collections.singletonList(NOT_ASSIGNED));
- }
+ AggregationDistributionInfo distributionInfo =
+ prepareAggregationDistribution(node, dbName, dataPartition, context);
Map<TRegionReplicaSet, AggregationTableScanNode> regionNodeMap = new
HashMap<>();
- // Step is SINGLE and device data in more than one region, we need to
final aggregate the result
- // from different region here, so split
- // this node into two-stage
- needSplit = needSplit && node.getStep() == SINGLE;
- AggregationNode finalAggregation = null;
- if (needSplit) {
- Pair<AggregationNode, AggregationTableScanNode> splitResult =
- split(node, symbolAllocator, queryId);
- finalAggregation = splitResult.left;
- AggregationTableScanNode partialAggregation = splitResult.right;
-
- // cover case: complete push-down + group by + streamable
- if (!context.hasSortProperty && finalAggregation.isStreamable()) {
- OrderingScheme expectedOrderingSchema =
- constructOrderingSchema(node.getPreGroupedSymbols());
- context.setExpectedOrderingScheme(expectedOrderingSchema);
- }
-
- buildRegionNodeMap(node, regionReplicaSetsList, regionNodeMap,
partialAggregation);
Review Comment:
need to update `buildRegionNodeMap` method, there is no need to do
`originalAggTableScanNode instanceof AggregationTreeDeviceViewScanNode`
judgement in that method.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -2852,6 +2862,70 @@ public Operator visitAggregationTreeDeviceViewScan(
return treeAlignedDeviceViewAggregationScanOperator;
}
+ @Override
+ public Operator visitNonAlignedAggregationTreeDeviceViewScan(
+ NonAlignedAggregationTreeDeviceViewScanNode node,
LocalExecutionPlanContext context) {
+ QualifiedObjectName qualifiedObjectName = node.getQualifiedObjectName();
+ TsTable tsTable =
+ DataNodeTableCache.getInstance()
+ .getTable(qualifiedObjectName.getDatabaseName(),
qualifiedObjectName.getObjectName());
+ IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor =
+
createTreeDeviceIdColumnValueExtractor(DataNodeTreeViewSchemaUtils.getPrefixPath(tsTable));
+
+ AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
parameter =
+ constructAbstractAggTableScanOperatorParameter(
+ node,
+ context,
+ TreeAlignedDeviceViewAggregationScanOperator.class.getSimpleName(),
+ node.getMeasurementColumnNameMap(),
+ tsTable.getCachedTableTTL());
+
+ // construct source operator (generator)
+ TreeNonAlignedDeviceViewScanNode scanNode =
+ new TreeNonAlignedDeviceViewScanNode(
+ node.getPlanNodeId(),
+ node.getQualifiedObjectName(),
+ // the outputSymbols of AggTableScanNode is not equals with
TableScanNode
Review Comment:
```suggestion
// the outputSymbols of
NonAlignedAggregationTreeDeviceViewScanNode is not equals with
TreeNonAlignedDeviceViewScanNode
```
better to use specific sub class name instead of super class name
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -2852,6 +2862,70 @@ public Operator visitAggregationTreeDeviceViewScan(
return treeAlignedDeviceViewAggregationScanOperator;
}
+ @Override
+ public Operator visitNonAlignedAggregationTreeDeviceViewScan(
+ NonAlignedAggregationTreeDeviceViewScanNode node,
LocalExecutionPlanContext context) {
+ QualifiedObjectName qualifiedObjectName = node.getQualifiedObjectName();
+ TsTable tsTable =
+ DataNodeTableCache.getInstance()
+ .getTable(qualifiedObjectName.getDatabaseName(),
qualifiedObjectName.getObjectName());
+ IDeviceID.TreeDeviceIdColumnValueExtractor idColumnValueExtractor =
+
createTreeDeviceIdColumnValueExtractor(DataNodeTreeViewSchemaUtils.getPrefixPath(tsTable));
+
+ AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
parameter =
+ constructAbstractAggTableScanOperatorParameter(
+ node,
+ context,
+ TreeAlignedDeviceViewAggregationScanOperator.class.getSimpleName(),
+ node.getMeasurementColumnNameMap(),
+ tsTable.getCachedTableTTL());
+
+ // construct source operator (generator)
+ TreeNonAlignedDeviceViewScanNode scanNode =
+ new TreeNonAlignedDeviceViewScanNode(
+ node.getPlanNodeId(),
+ node.getQualifiedObjectName(),
+ // the outputSymbols of AggTableScanNode is not equals with
TableScanNode
+ parameter.getOutputSymbols(),
Review Comment:
only include field is enough?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]