liuminghui233 commented on code in PR #7277:
URL: https://github.com/apache/iotdb/pull/7277#discussion_r966718785
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/CountAccumulator.java:
##########
@@ -36,28 +37,31 @@ public CountAccumulator() {}
// Column should be like: | Time | Value |
@Override
- public int addInput(Column[] column, TimeRange timeRange) {
+ public int addInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
+ int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
TimeColumn timeColumn = (TimeColumn) column[0];
Column valueColumn = column[1];
long minTime = Math.min(timeColumn.getStartTime(),
timeColumn.getEndTime());
long maxTime = Math.max(timeColumn.getStartTime(),
timeColumn.getEndTime());
- if (!valueColumn.mayHaveNull() && timeRange.contains(minTime, maxTime)) {
+
+ if (curWindow.isTimeWindow()
+ && !valueColumn.mayHaveNull()
+ && ((TimeWindow) curWindow).getCurTimeRange().contains(minTime,
maxTime)) {
countValue += timeColumn.getPositionCount();
} else {
- int curPositionCount = timeColumn.getPositionCount();
- long curMinTime = timeRange.getMin();
- long curMaxTime = timeRange.getMax();
for (int i = 0; i < curPositionCount; i++) {
- long curTime = timeColumn.getLong(i);
- if (curTime > curMaxTime || curTime < curMinTime) {
+ if (!curWindow.satisfy(column[windowControlColumnIndex], i)) {
return i;
}
- if (!valueColumn.isNull(i)) {
+ curWindow.mergeOnePoint();
+ if (!column[1].isNull(i)) {
Review Comment:
```suggestion
if (!valueColumn.isNull(i)) {
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/Aggregator.java:
##########
@@ -128,20 +126,24 @@ public TSDataType[] getOutputType() {
}
public void reset() {
- curTimeRange = new TimeRange(0, Long.MAX_VALUE);
+ curWindow = null;
accumulator.reset();
}
public boolean hasFinalResult() {
+ if (!curWindow.isTimeWindow()) {
Review Comment:
add a comment to explain why we need this judge
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/LastValueDescAccumulator.java:
##########
@@ -39,87 +39,207 @@ public void reset() {
super.reset();
}
- protected int addIntInput(Column[] column, TimeRange timeRange) {
- for (int i = 0; i < column[0].getPositionCount(); i++) {
- long curTime = column[0].getLong(i);
- if (curTime > timeRange.getMax() || curTime < timeRange.getMin()) {
- return i;
+ protected int addIntInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
+ int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
+ if (curWindow.isTimeWindow()) {
Review Comment:
If branch and else branch contain too much duplicate code, please optimize
it ~
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java:
##########
@@ -247,21 +264,38 @@ protected void updateIntFirstValue(int value, long
curTime) {
}
}
- protected int addLongInput(Column[] column, TimeRange timeRange) {
- int curPositionCount = column[0].getPositionCount();
- long curMinTime = timeRange.getMin();
- long curMaxTime = timeRange.getMax();
- for (int i = 0; i < curPositionCount; i++) {
- long curTime = column[0].getLong(i);
- if (curTime > curMaxTime || curTime < curMinTime) {
- return i;
+ protected int addLongInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
+ int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
+ if (curWindow.isTimeWindow()) {
+ for (int i = 0; i < curPositionCount; i++) {
+ if (!curWindow.satisfy(column[windowControlColumnIndex], i)) {
+ return i;
+ }
+ curWindow.mergeOnePoint();
+ if (!column[1].isNull(i)) {
+ updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
Review Comment:
```suggestion
updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
hasCandidateResult = true;
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public interface IWindow {
+
+ int getControlColumnIndex();
+
+ boolean satisfy(Column column, int index);
+
+ boolean isTimeWindow();
+
+ void mergeOnePoint();
+
+ void update(TimeRange curTimeRange);
Review Comment:
This method seems to be used only for time windows and should not be present
in the interface.
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public interface IWindowManager {
+
+ boolean isCurWindowInit();
+
+ void initCurWindow(TsBlock tsBlock);
+
+ boolean hasNext();
+
+ void genNextWindow();
+
+ long currentOutputTime();
+
+ IWindow getCurWindow();
+
+ TsBlock skipPointsOutOfTimeRange(TsBlock inputTsBlock);
Review Comment:
```suggestion
TsBlock skipPointsOutOfCurWindow(TsBlock inputTsBlock);
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java:
##########
@@ -50,6 +52,10 @@ public abstract class SingleInputAggregationOperator
implements ProcessOperator
// current interval of aggregation window [curStartTime, curEndTime)
protected TimeRange curTimeRange;
+ protected IWindowManager windowManager;
Review Comment:
only used in `RawDataAggregationOperator`, move it into
`RawDataAggregationOperator`
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java:
##########
@@ -247,21 +264,38 @@ protected void updateIntFirstValue(int value, long
curTime) {
}
}
- protected int addLongInput(Column[] column, TimeRange timeRange) {
- int curPositionCount = column[0].getPositionCount();
- long curMinTime = timeRange.getMin();
- long curMaxTime = timeRange.getMax();
- for (int i = 0; i < curPositionCount; i++) {
- long curTime = column[0].getLong(i);
- if (curTime > curMaxTime || curTime < curMinTime) {
- return i;
+ protected int addLongInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
+ int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
+ if (curWindow.isTimeWindow()) {
+ for (int i = 0; i < curPositionCount; i++) {
+ if (!curWindow.satisfy(column[windowControlColumnIndex], i)) {
+ return i;
+ }
+ curWindow.mergeOnePoint();
+ if (!column[1].isNull(i)) {
+ updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
+ return i;
+ }
}
- if (!column[1].isNull(i)) {
- updateLongFirstValue(column[1].getLong(i), curTime);
- return i;
+ } else {
+ for (int i = 0; i < curPositionCount; i++) {
+ if (!curWindow.satisfy(column[windowControlColumnIndex], i)) {
+ return i;
+ }
+ curWindow.mergeOnePoint();
+ if (!column[1].isNull(i)) {
+ // do not assign true to hasCandidateResult
+ if (column[0].getLong(i) < minTime) {
+ minTime = column[0].getLong(i);
+ firstValue.setLong(column[1].getLong(i));
+ }
Review Comment:
```suggestion
if (!column[1].isNull(i)) {
// do not assign true to hasCandidateResult
updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java:
##########
@@ -50,6 +52,10 @@ public abstract class SingleInputAggregationOperator
implements ProcessOperator
// current interval of aggregation window [curStartTime, curEndTime)
protected TimeRange curTimeRange;
Review Comment:
`timeRangeIterator` and `curTimeRange` only used in
`SlidingWindowAggregationOperator`, move them into
`SlidingWindowAggregationOperator`
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public interface IWindowManager {
Review Comment:
You should add a bit of comments of this interface and its methods.
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleInputAggregationOperator.java:
##########
@@ -50,6 +52,10 @@ public abstract class SingleInputAggregationOperator
implements ProcessOperator
// current interval of aggregation window [curStartTime, curEndTime)
protected TimeRange curTimeRange;
+ protected IWindowManager windowManager;
+
+ protected boolean finish;
Review Comment:
seem as above
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class TimeWindow implements IWindow {
+
+ private TimeRange curTimeRange;
+
+ private long curMinTime;
+ private long curMaxTime;
+
+ public TimeWindow() {}
+
+ public TimeWindow(TimeRange curTimeRange) {
+ this.curTimeRange = curTimeRange;
+ this.curMinTime = curTimeRange.getMin();
+ this.curMaxTime = curTimeRange.getMax();
+ }
+
+ public TimeRange getCurTimeRange() {
+ return curTimeRange;
+ }
+
+ public long getCurMinTime() {
+ return curMinTime;
+ }
+
+ public long getCurMaxTime() {
+ return curMaxTime;
+ }
+
+ @Override
+ public int getControlColumnIndex() {
+ return 0;
+ }
Review Comment:
We should reduce the use of magic numbers.
```suggestion
private static int TIME_COLUMN_INDEX = 0;
@Override
public int getControlColumnIndex() {
return TIME_COLUMN_INDEX ;
}
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindow.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public interface IWindow {
Review Comment:
You should add a bit of comments of this interface and its methods.
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/RawDataAggregationOperator.java:
##########
@@ -65,20 +73,80 @@ protected boolean calculateNextAggregationResult() {
// if child still has next but can't be invoked now
return false;
} else {
+ if (!windowManager.isCurWindowInit()) {
+ initWindowManagerAndAggregators();
+ }
+ updateResultTsBlock();
break;
}
}
- // update result using aggregators
- updateResultTsBlock();
-
return true;
}
- private boolean calcFromRawData() {
- Pair<Boolean, TsBlock> calcResult =
- calculateAggregationFromRawData(inputTsBlock, aggregators,
curTimeRange, ascending);
- inputTsBlock = calcResult.getRight();
- return calcResult.getLeft();
+ private boolean calculateAndUpdateFromRawData() {
+ // 待使用的inputTsBlock为空就直接return false
Review Comment:
remove redundant comments ~
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/MaxTimeDescAccumulator.java:
##########
@@ -19,29 +19,42 @@
package org.apache.iotdb.db.mpp.aggregation;
-import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.db.mpp.execution.operator.window.IWindow;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
// Column should be like: | Time | Value |
// Value is used to judge isNull()
@Override
- public int addInput(Column[] column, TimeRange timeRange) {
- int curPositionCount = column[0].getPositionCount();
- long curMinTime = timeRange.getMin();
- long curMaxTime = timeRange.getMax();
- for (int i = 0; i < curPositionCount; i++) {
- long curTime = column[0].getLong(i);
- if (curTime > curMaxTime || curTime < curMinTime) {
- return i;
+ public int addInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
+ int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
+ if (curWindow.isTimeWindow()) {
Review Comment:
seem as `LastValueDescAccumulator`
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/IWindowManager.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public interface IWindowManager {
+
+ boolean isCurWindowInit();
+
+ void initCurWindow(TsBlock tsBlock);
+
+ boolean hasNext();
+
+ void genNextWindow();
+
+ long currentOutputTime();
+
+ IWindow getCurWindow();
+
+ TsBlock skipPointsOutOfTimeRange(TsBlock inputTsBlock);
+
+ boolean satisfiedTimeRange(TsBlock inputTsBlock);
Review Comment:
```suggestion
boolean satisfiedCurWindow(TsBlock inputTsBlock);
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/window/TimeWindow.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.window;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class TimeWindow implements IWindow {
+
+ private TimeRange curTimeRange;
+
+ private long curMinTime;
+ private long curMaxTime;
+
+ public TimeWindow() {}
+
+ public TimeWindow(TimeRange curTimeRange) {
+ this.curTimeRange = curTimeRange;
+ this.curMinTime = curTimeRange.getMin();
+ this.curMaxTime = curTimeRange.getMax();
+ }
+
+ public TimeRange getCurTimeRange() {
+ return curTimeRange;
+ }
+
+ public long getCurMinTime() {
+ return curMinTime;
+ }
+
+ public long getCurMaxTime() {
+ return curMaxTime;
+ }
+
+ @Override
+ public int getControlColumnIndex() {
+ return 0;
+ }
+
+ @Override
+ public boolean satisfy(Column column, int index) {
+ long curTime = column.getLong(index);
+ return curTime <= this.curMaxTime && curTime >= this.curMinTime;
+ }
+
+ @Override
+ public boolean isTimeWindow() {
+ return true;
+ }
+
+ @Override
+ public void mergeOnePoint() {}
Review Comment:
```suggestion
public void mergeOnePoint() {
// do nothing
}
```
##########
server/src/main/java/org/apache/iotdb/db/mpp/aggregation/FirstValueAccumulator.java:
##########
@@ -247,21 +264,38 @@ protected void updateIntFirstValue(int value, long
curTime) {
}
}
- protected int addLongInput(Column[] column, TimeRange timeRange) {
- int curPositionCount = column[0].getPositionCount();
- long curMinTime = timeRange.getMin();
- long curMaxTime = timeRange.getMax();
- for (int i = 0; i < curPositionCount; i++) {
- long curTime = column[0].getLong(i);
- if (curTime > curMaxTime || curTime < curMinTime) {
- return i;
+ protected int addLongInput(Column[] column, IWindow curWindow) {
+ int windowControlColumnIndex = curWindow.getControlColumnIndex();
+ int curPositionCount = column[windowControlColumnIndex].getPositionCount();
+
+ if (curWindow.isTimeWindow()) {
+ for (int i = 0; i < curPositionCount; i++) {
+ if (!curWindow.satisfy(column[windowControlColumnIndex], i)) {
+ return i;
+ }
+ curWindow.mergeOnePoint();
+ if (!column[1].isNull(i)) {
+ updateLongFirstValue(column[1].getLong(i), column[0].getLong(i));
+ return i;
+ }
}
- if (!column[1].isNull(i)) {
- updateLongFirstValue(column[1].getLong(i), curTime);
- return i;
+ } else {
+ for (int i = 0; i < curPositionCount; i++) {
+ if (!curWindow.satisfy(column[windowControlColumnIndex], i)) {
+ return i;
+ }
+ curWindow.mergeOnePoint();
+ if (!column[1].isNull(i)) {
+ // do not assign true to hasCandidateResult
+ if (column[0].getLong(i) < minTime) {
+ minTime = column[0].getLong(i);
+ firstValue.setLong(column[1].getLong(i));
+ }
Review Comment:
I suggest remove `hasCandidateResult = true;` in `updateXXXFirstValue`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]