JackieTien97 commented on code in PR #7277:
URL: https://github.com/apache/iotdb/pull/7277#discussion_r969130950
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java:
##########
@@ -65,20 +81,78 @@ protected boolean calculateNextAggregationResult() {
// if child still has next but can't be invoked now
return false;
} else {
+ if (!windowManager.isCurWindowInit()) {
+ initWindowManagerAndAggregators();
+ }
+ updateResultTsBlock();
break;
}
}
- // update result using aggregators
- updateResultTsBlock();
-
return true;
}
- private boolean calcFromRawData() {
- Pair<Boolean, TsBlock> calcResult =
- calculateAggregationFromRawData(inputTsBlock, aggregators,
curTimeRange, ascending);
- inputTsBlock = calcResult.getRight();
- return calcResult.getLeft();
+ private boolean calculateAndUpdateFromRawData() {
+ if (inputTsBlock == null || inputTsBlock.isEmpty()) {
+ return false;
+ }
+
+ // if window is not initialized, we should init window status and reset
aggregators
+ if (!windowManager.isCurWindowInit()) {
+ initWindowManagerAndAggregators();
+ }
+
+ if (windowManager.satisfiedCurWindow(inputTsBlock)) {
+ inputTsBlock = windowManager.skipPointsOutOfCurWindow(inputTsBlock);
+
+ int lastReadRowIndex = 0;
+ for (Aggregator aggregator : aggregators) {
+ // current agg method has been calculated
+ if (aggregator.hasFinalResult()) {
+ continue;
+ }
+
+ lastReadRowIndex = Math.max(lastReadRowIndex,
aggregator.processTsBlock(inputTsBlock));
+ }
+ if (lastReadRowIndex >= inputTsBlock.getPositionCount()) {
+ inputTsBlock = null;
+ // for the last index of TsBlock, if we can know the aggregation
calculation is over
+ // we can directly updateResultTsBlock and return true
+ if (isAllAggregatorsHasFinalResult(aggregators)) {
+ updateResultTsBlock();
+ return true;
+ }
+ return false;
+ } else {
+ inputTsBlock = inputTsBlock.subTsBlock(lastReadRowIndex);
+ updateResultTsBlock();
+ return true;
+ }
+ }
+ boolean isTsBlockOutOfBound =
windowManager.isTsBlockOutOfBound(inputTsBlock);
+ boolean canMakeWindow = isAllAggregatorsHasFinalResult(aggregators) ||
isTsBlockOutOfBound;
+ if (canMakeWindow) {
+ updateResultTsBlock();
+ }
+ return canMakeWindow;
+ }
+
+ @Override
+ protected void updateResultTsBlock() {
+ appendAggregationResult(resultTsBlockBuilder, aggregators,
windowManager.currentOutputTime());
+ if (windowManager.hasNext()) {
+ // reset window init status to false
+ windowManager.genNextWindow();
+ } else {
+ finish = true;
+ }
+ }
+
+ private void initWindowManagerAndAggregators() {
+ windowManager.initCurWindow(inputTsBlock);
+ IWindow curWindow = windowManager.getCurWindow();
+ for (Aggregator aggregator : aggregators) {
+ aggregator.updateWindow(curWindow);
+ }
Review Comment:
I think we only need to init IWindow once and then call some method like
`moveToNextWindow` to update its window.
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java:
##########
@@ -128,20 +126,27 @@ public TSDataType[] getOutputType() {
}
public void reset() {
- curTimeRange = new TimeRange(0, Long.MAX_VALUE);
+ curWindow = null;
accumulator.reset();
}
public boolean hasFinalResult() {
+ // For other window (SessionWindow, CountWindow and StateWindow), we cannot
+ // precompute where the window ends, so we cannot judge whether the
aggregation method
+ // has been calculated.
+ if (!curWindow.isTimeWindow()) {
+ return false;
+ }
return accumulator.hasFinalResult();
Review Comment:
delete `isTimeWindow()` in IWindow interface, it's something like `instance
of` which we should avoid.
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java:
##########
@@ -46,10 +43,6 @@ public abstract class SingleInputAggregationOperator
implements ProcessOperator
protected TsBlock inputTsBlock;
protected boolean canCallNext;
- protected final ITimeRangeIterator timeRangeIterator;
Review Comment:
also delete the corresponding parameter in constructor.
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java:
##########
@@ -42,6 +44,10 @@
*/
public class RawDataAggregationOperator extends SingleInputAggregationOperator
{
+ private final IWindowManager windowManager;
Review Comment:
I think IWindowManager should be replaced with IWindow, we can talk about it
later in this afternoon.
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java:
##########
@@ -36,28 +37,31 @@ public CountAccumulator() {}
// Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, TimeRange timeRange) {
+ public int addInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
+ int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
TimeColumn timeColumn = (TimeColumn) column[0];
Column valueColumn = column[1];
long minTime = Math.min(timeColumn.getStartTime(),
timeColumn.getEndTime());
long maxTime = Math.max(timeColumn.getStartTime(),
timeColumn.getEndTime());
- if (!valueColumn.mayHaveNull() && timeRange.contains(minTime, maxTime)) {
+
+ if (curWindow.isTimeWindow()
+ && !valueColumn.mayHaveNull()
+ && ((TimeWindow) curWindow).getCurTimeRange().contains(minTime,
maxTime)) {
Review Comment:
```suggestion
&& curWindow.contains(column[0])) {
```
add a new method in IWindow.
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java:
##########
@@ -128,20 +126,27 @@ public TSDataType[] getOutputType() {
}
public void reset() {
- curTimeRange = new TimeRange(0, Long.MAX_VALUE);
+ curWindow = null;
accumulator.reset();
}
public boolean hasFinalResult() {
+ // For other window (SessionWindow, CountWindow and StateWindow), we cannot
+ // precompute where the window ends, so we cannot judge whether the
aggregation method
+ // has been calculated.
+ if (!curWindow.isTimeWindow()) {
+ return false;
+ }
return accumulator.hasFinalResult();
Review Comment:
```suggestion
return curWindow.hasFinalResult(accumulator);
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java:
##########
@@ -36,28 +37,31 @@ public CountAccumulator() {}
// Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, TimeRange timeRange) {
+ public int addInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
+ int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
TimeColumn timeColumn = (TimeColumn) column[0];
Column valueColumn = column[1];
long minTime = Math.min(timeColumn.getStartTime(),
timeColumn.getEndTime());
long maxTime = Math.max(timeColumn.getStartTime(),
timeColumn.getEndTime());
- if (!valueColumn.mayHaveNull() && timeRange.contains(minTime, maxTime)) {
+
+ if (curWindow.isTimeWindow()
+ && !valueColumn.mayHaveNull()
+ && ((TimeWindow) curWindow).getCurTimeRange().contains(minTime,
maxTime)) {
Review Comment:
same as all other Accumulator
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java:
##########
@@ -36,28 +37,31 @@ public CountAccumulator() {}
// Column should be like: | Time | Value |
Review Comment:
```suggestion
// Column should be like: | ControlCoumn | Value |
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java:
##########
@@ -65,20 +81,78 @@ protected boolean calculateNextAggregationResult() {
// if child still has next but can't be invoked now
return false;
} else {
+ if (!windowManager.isCurWindowInit()) {
+ initWindowManagerAndAggregators();
+ }
+ updateResultTsBlock();
break;
}
}
- // update result using aggregators
- updateResultTsBlock();
-
return true;
}
- private boolean calcFromRawData() {
- Pair<Boolean, TsBlock> calcResult =
- calculateAggregationFromRawData(inputTsBlock, aggregators,
curTimeRange, ascending);
- inputTsBlock = calcResult.getRight();
- return calcResult.getLeft();
+ private boolean calculateAndUpdateFromRawData() {
Review Comment:
this method look similar to `calculateAggregationFromRawData` in
`AggregateUtil`, you can refactor that method instead of adding a new one.
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java:
##########
@@ -50,11 +56,21 @@ public RawDataAggregationOperator(
boolean ascending,
long maxReturnSize) {
super(operatorContext, aggregators, child, ascending, timeRangeIterator,
maxReturnSize);
+ this.windowManager = new TimeWindowManager(timeRangeIterator);
+ this.finish = false;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (finish) {
+ return false;
+ }
+ return inputTsBlock != null || child.hasNext() || windowManager.hasNext();
Review Comment:
```suggestion
return IWindow.hasNext(boolean hasMoreData);
```
You should add a new method in `IWindow` and put these different judgement
logics into each different IWindow implementation.
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java:
##########
@@ -36,28 +37,31 @@ public CountAccumulator() {}
// Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, TimeRange timeRange) {
+ public int addInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
Review Comment:
No need to get controlColumnIndex again here, you can assume that
ControlColumn has already been put into index 0 by Aggregator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]