JackieTien97 commented on code in PR #14148: URL: https://github.com/apache/iotdb/pull/14148#discussion_r1866939696
########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/JoinKeyComparator.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator; + +import org.apache.tsfile.read.common.block.TsBlock; + +public interface JoinKeyComparator { + + boolean lessThan( + TsBlock left, + int leftColumnIndex, + int leftRowIndex, + TsBlock right, + int rightColumnIndex, + int rightRowIndex); + + boolean equalsTo( + TsBlock left, + int leftColumnIndex, + int leftRowIndex, + TsBlock right, + int rightColumnIndex, + int rightRowIndex); + + boolean lessThanOrEqual( Review Comment: add some java doc for these methods, for example, is lessThan `left < right`or `right < left`? That's very important, intuitively we will take the `left < right` one, bu your implementation is on the contrary. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/SimpleNestedLoopCrossJoinOperator.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator.buildResultTsBlock; + +/** + * This Operator is used to implement the simple nested loop join algorithm for Cartesian product. + * It is used to join two tables, one is the probe table and the other is the build table. For now, + * the build table is assumed to be small enough to be cached in memory.(Produced by a scalar + * subquery.) Scalar subquery is always the right child of PlanNode, so we can use right child of + * JoinNode as the build table. + */ +public class SimpleNestedLoopCrossJoinOperator extends AbstractOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SimpleNestedLoopCrossJoinOperator.class); + + private final Operator probeSource; + + private final Operator buildSource; + + // cache the result of buildSource, for now, we assume that the buildChild produces a small number + // of TsBlocks + private final List<TsBlock> buildBlocks; + + private final TsBlockBuilder resultBuilder; + + private final MemoryReservationManager memoryReservationManager; + + private final int[] leftOutputSymbolIdx; + + private final int[] rightOutputSymbolIdx; + + private TsBlock cachedProbeBlock; + + private int probeIndex; + + private boolean buildFinished = false; + + public SimpleNestedLoopCrossJoinOperator( + OperatorContext operatorContext, + Operator probeSource, + Operator buildSource, + int[] leftOutputSymbolIdx, + int[] rightOutputSymbolIdx, + List<TSDataType> dataTypes) { + this.operatorContext = operatorContext; + this.probeSource = probeSource; + this.buildSource = buildSource; + this.leftOutputSymbolIdx = leftOutputSymbolIdx; + this.rightOutputSymbolIdx = rightOutputSymbolIdx; + this.buildBlocks = new ArrayList<>(); + this.resultBuilder = new TsBlockBuilder(dataTypes); + this.memoryReservationManager = + operatorContext + .getDriverContext() + .getFragmentInstanceContext() + .getMemoryReservationContext(); + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + getResultFromRetainedTsBlock(); + } + resultBuilder.reset(); + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + if (!buildFinished) { + TsBlock block = buildSource.next(); + if (block != null && !block.isEmpty()) { + buildBlocks.add(block); + memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); + } + if (!buildSource.hasNext()) { + buildFinished = true; + } + // probeSource could still be blocked by now, so we need to check it again + return null; + } + cachedProbeBlock = cachedProbeBlock == null ? probeSource.next() : cachedProbeBlock; + if (cachedProbeBlock == null || cachedProbeBlock.isEmpty()) { + // TsBlock returned by probeSource is null or empty, we need to wait for another round + cachedProbeBlock = null; + return null; + } + while (probeIndex < cachedProbeBlock.getPositionCount() + && System.nanoTime() - start < maxRuntime) { + for (TsBlock buildBlock : buildBlocks) { + appendValueToResult(probeIndex, buildBlock); + } + probeIndex++; + } + if (probeIndex == cachedProbeBlock.getPositionCount()) { + probeIndex = 0; + cachedProbeBlock = null; + } + if (resultBuilder.isEmpty()) { + return null; + } + + resultTsBlock = buildResultTsBlock(resultBuilder); + return checkTsBlockSizeAndGetResult(); + } + + private void appendValueToResult(int probeIndex, TsBlock buildBlock) { + for (int i = 0; i < leftOutputSymbolIdx.length; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); + for (int j = 0; j < buildBlock.getPositionCount(); j++) { + if (cachedProbeBlock.getColumn(leftOutputSymbolIdx[i]).isNull(probeIndex)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(cachedProbeBlock.getColumn(leftOutputSymbolIdx[i]), probeIndex); + } + } + } + for (int i = 0; i < rightOutputSymbolIdx.length; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i + leftOutputSymbolIdx.length); + for (int j = 0; j < buildBlock.getPositionCount(); j++) { + if (buildBlock.getColumn(rightOutputSymbolIdx[i]).isNull(j)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(buildBlock.getColumn(rightOutputSymbolIdx[i]), j); + } + } + } + resultBuilder.declarePositions(buildBlock.getPositionCount()); + } + + @Override + public boolean hasNext() throws Exception { + if (retainedTsBlock != null) { + return true; + } + if (!buildFinished) { + return buildSource.hasNext(); Review Comment: ```suggestion return true; ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/AscBinaryTypeJoinKeyComparator.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator; + +import org.apache.tsfile.read.common.block.TsBlock; + +public class AscBinaryTypeJoinKeyComparator implements JoinKeyComparator { + + private static final AscBinaryTypeJoinKeyComparator INSTANCE = + new AscBinaryTypeJoinKeyComparator(); + + private AscBinaryTypeJoinKeyComparator() { + // hide constructor + } + + public static AscBinaryTypeJoinKeyComparator getInstance() { + return INSTANCE; + } + + @Override + public boolean lessThan( + TsBlock left, + int leftColumnIndex, + int leftRowIndex, + TsBlock right, + int rightColumnIndex, + int rightRowIndex) { + return left.getColumn(leftColumnIndex) + .getBinary(leftRowIndex) + .compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex)) + < 0; + } + + @Override + public boolean equalsTo( + TsBlock left, + int leftColumnIndex, + int leftRowIndex, + TsBlock right, + int rightColumnIndex, + int rightRowIndex) { + return left.getColumn(leftColumnIndex) + .getBinary(leftRowIndex) + .compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex)) + == 0; Review Comment: same as below, using Binary.equals instead ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/SimpleNestedLoopCrossJoinOperator.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator.buildResultTsBlock; + +/** + * This Operator is used to implement the simple nested loop join algorithm for Cartesian product. + * It is used to join two tables, one is the probe table and the other is the build table. For now, + * the build table is assumed to be small enough to be cached in memory.(Produced by a scalar + * subquery.) Scalar subquery is always the right child of PlanNode, so we can use right child of + * JoinNode as the build table. + */ +public class SimpleNestedLoopCrossJoinOperator extends AbstractOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SimpleNestedLoopCrossJoinOperator.class); + + private final Operator probeSource; + + private final Operator buildSource; + + // cache the result of buildSource, for now, we assume that the buildChild produces a small number + // of TsBlocks + private final List<TsBlock> buildBlocks; + + private final TsBlockBuilder resultBuilder; + + private final MemoryReservationManager memoryReservationManager; + + private final int[] leftOutputSymbolIdx; + + private final int[] rightOutputSymbolIdx; + + private TsBlock cachedProbeBlock; + + private int probeIndex; + + private boolean buildFinished = false; + + public SimpleNestedLoopCrossJoinOperator( + OperatorContext operatorContext, + Operator probeSource, + Operator buildSource, + int[] leftOutputSymbolIdx, + int[] rightOutputSymbolIdx, + List<TSDataType> dataTypes) { + this.operatorContext = operatorContext; + this.probeSource = probeSource; + this.buildSource = buildSource; + this.leftOutputSymbolIdx = leftOutputSymbolIdx; + this.rightOutputSymbolIdx = rightOutputSymbolIdx; + this.buildBlocks = new ArrayList<>(); + this.resultBuilder = new TsBlockBuilder(dataTypes); + this.memoryReservationManager = + operatorContext + .getDriverContext() + .getFragmentInstanceContext() + .getMemoryReservationContext(); + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + getResultFromRetainedTsBlock(); + } + resultBuilder.reset(); + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + if (!buildFinished) { + TsBlock block = buildSource.next(); + if (block != null && !block.isEmpty()) { + buildBlocks.add(block); + memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); + } + if (!buildSource.hasNext()) { + buildFinished = true; + } + // probeSource could still be blocked by now, so we need to check it again + return null; + } + cachedProbeBlock = cachedProbeBlock == null ? probeSource.next() : cachedProbeBlock; + if (cachedProbeBlock == null || cachedProbeBlock.isEmpty()) { + // TsBlock returned by probeSource is null or empty, we need to wait for another round + cachedProbeBlock = null; + return null; + } + while (probeIndex < cachedProbeBlock.getPositionCount() + && System.nanoTime() - start < maxRuntime) { + for (TsBlock buildBlock : buildBlocks) { + appendValueToResult(probeIndex, buildBlock); + } + probeIndex++; + } + if (probeIndex == cachedProbeBlock.getPositionCount()) { + probeIndex = 0; + cachedProbeBlock = null; + } + if (resultBuilder.isEmpty()) { + return null; + } + + resultTsBlock = buildResultTsBlock(resultBuilder); + return checkTsBlockSizeAndGetResult(); + } + + private void appendValueToResult(int probeIndex, TsBlock buildBlock) { + for (int i = 0; i < leftOutputSymbolIdx.length; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); + for (int j = 0; j < buildBlock.getPositionCount(); j++) { + if (cachedProbeBlock.getColumn(leftOutputSymbolIdx[i]).isNull(probeIndex)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(cachedProbeBlock.getColumn(leftOutputSymbolIdx[i]), probeIndex); + } + } + } + for (int i = 0; i < rightOutputSymbolIdx.length; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i + leftOutputSymbolIdx.length); + for (int j = 0; j < buildBlock.getPositionCount(); j++) { + if (buildBlock.getColumn(rightOutputSymbolIdx[i]).isNull(j)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(buildBlock.getColumn(rightOutputSymbolIdx[i]), j); + } + } + } + resultBuilder.declarePositions(buildBlock.getPositionCount()); + } + + @Override + public boolean hasNext() throws Exception { + if (retainedTsBlock != null) { + return true; + } + if (!buildFinished) { + return buildSource.hasNext(); + } + return probeSource.hasNext(); Review Comment: ```suggestion return !buildBlocks.isEmpty() && probeSource.hasNextWithTimer(); ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -148,38 +153,40 @@ public TsBlock next() throws Exception { } // all the rightTsBlock is less than leftTsBlock, just skip right - if (comparator.lessThan(getRightEndTime(), getCurrentLeftTime())) { - // releaseMemory(); + if (allRightLessThanLeft()) { + // release memory; resetRightBlockList(); return null; } // all the leftTsBlock is less than rightTsBlock, just skip left - else if (comparator.lessThan(getLeftEndTime(), getRightTime(rightBlockListIdx, rightIndex))) { + else if (allLeftLessThanRight()) { leftBlock = null; leftIndex = 0; return null; } - long leftProbeTime = getCurrentLeftTime(); while (!resultBuilder.isFull()) { - - // all right block time is not matched - if (!comparator.canContinueInclusive(leftProbeTime, getRightEndTime())) { - // releaseMemory(); + TsBlock lastRightTsBlock = rightBlockList.get(rightBlockList.size() - 1); + // all right block value is not matched + if (!comparator.lessThanOrEqual( + leftBlock, + leftJoinKeyPosition, + leftIndex, + lastRightTsBlock, + rightJoinKeyPosition, + lastRightTsBlock.getPositionCount() - 1)) { Review Comment: allRightLessThanLeft? ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -73,22 +76,24 @@ public class TableInnerJoinOperator extends AbstractOperator { public TableInnerJoinOperator( Review Comment: why `resetRightBlockList` just release from `1`, close release right from `0` @Beyyes ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -226,7 +293,13 @@ protected void tryCachedNextRightTsBlock() throws Exception { if (rightChild.hasNextWithTimer()) { TsBlock block = rightChild.nextWithTimer(); if (block != null) { Review Comment: what if block is empty, check all the places in this class. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/comparator/DescBinaryTypeJoinKeyComparator.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator; + +import org.apache.tsfile.read.common.block.TsBlock; + +public class DescBinaryTypeJoinKeyComparator implements JoinKeyComparator { + + private static final DescBinaryTypeJoinKeyComparator INSTANCE = + new DescBinaryTypeJoinKeyComparator(); + + private DescBinaryTypeJoinKeyComparator() { + // hide constructor + } + + public static DescBinaryTypeJoinKeyComparator getInstance() { + return INSTANCE; + } + + @Override + public boolean lessThan( + TsBlock left, + int leftColumnIndex, + int leftRowIndex, + TsBlock right, + int rightColumnIndex, + int rightRowIndex) { + return left.getColumn(leftColumnIndex) + .getBinary(leftRowIndex) + .compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex)) + > 0; + } + + @Override + public boolean equalsTo( + TsBlock left, + int leftColumnIndex, + int leftRowIndex, + TsBlock right, + int rightColumnIndex, + int rightRowIndex) { + return left.getColumn(leftColumnIndex) + .getBinary(leftRowIndex) + .compareTo(right.getColumn(rightColumnIndex).getBinary(rightRowIndex)) + == 0; Review Comment: directly using Binary.equals may be more efficient ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/SimpleNestedLoopCrossJoinOperator.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator.buildResultTsBlock; + +/** + * This Operator is used to implement the simple nested loop join algorithm for Cartesian product. + * It is used to join two tables, one is the probe table and the other is the build table. For now, + * the build table is assumed to be small enough to be cached in memory.(Produced by a scalar + * subquery.) Scalar subquery is always the right child of PlanNode, so we can use right child of + * JoinNode as the build table. + */ +public class SimpleNestedLoopCrossJoinOperator extends AbstractOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SimpleNestedLoopCrossJoinOperator.class); + + private final Operator probeSource; + + private final Operator buildSource; + + // cache the result of buildSource, for now, we assume that the buildChild produces a small number + // of TsBlocks + private final List<TsBlock> buildBlocks; + + private final TsBlockBuilder resultBuilder; + + private final MemoryReservationManager memoryReservationManager; + + private final int[] leftOutputSymbolIdx; + + private final int[] rightOutputSymbolIdx; + + private TsBlock cachedProbeBlock; + + private int probeIndex; + + private boolean buildFinished = false; + + public SimpleNestedLoopCrossJoinOperator( + OperatorContext operatorContext, + Operator probeSource, + Operator buildSource, + int[] leftOutputSymbolIdx, + int[] rightOutputSymbolIdx, + List<TSDataType> dataTypes) { + this.operatorContext = operatorContext; + this.probeSource = probeSource; + this.buildSource = buildSource; + this.leftOutputSymbolIdx = leftOutputSymbolIdx; + this.rightOutputSymbolIdx = rightOutputSymbolIdx; + this.buildBlocks = new ArrayList<>(); + this.resultBuilder = new TsBlockBuilder(dataTypes); + this.memoryReservationManager = + operatorContext + .getDriverContext() + .getFragmentInstanceContext() + .getMemoryReservationContext(); + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + getResultFromRetainedTsBlock(); + } + resultBuilder.reset(); + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + if (!buildFinished) { + TsBlock block = buildSource.next(); + if (block != null && !block.isEmpty()) { + buildBlocks.add(block); + memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); + } + if (!buildSource.hasNext()) { + buildFinished = true; + } + // probeSource could still be blocked by now, so we need to check it again + return null; + } + cachedProbeBlock = cachedProbeBlock == null ? probeSource.next() : cachedProbeBlock; + if (cachedProbeBlock == null || cachedProbeBlock.isEmpty()) { + // TsBlock returned by probeSource is null or empty, we need to wait for another round + cachedProbeBlock = null; + return null; + } + while (probeIndex < cachedProbeBlock.getPositionCount() + && System.nanoTime() - start < maxRuntime) { + for (TsBlock buildBlock : buildBlocks) { + appendValueToResult(probeIndex, buildBlock); + } + probeIndex++; + } + if (probeIndex == cachedProbeBlock.getPositionCount()) { + probeIndex = 0; + cachedProbeBlock = null; + } + if (resultBuilder.isEmpty()) { + return null; + } + + resultTsBlock = buildResultTsBlock(resultBuilder); Review Comment: resultBuilder.build(new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE,resultBuilder.getPositionCount())) ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -190,6 +197,66 @@ else if (comparator.lessThan(getLeftEndTime(), getRightTime(rightBlockListIdx, r return checkTsBlockSizeAndGetResult(); } + protected boolean allRightLessThanLeft() { + // check if the last value of the right is less than left + return comparator.lessThan( + rightBlockList.get(rightBlockList.size() - 1), + rightJoinKeyPosition, + rightBlockList.get(rightBlockList.size() - 1).getPositionCount() - 1, + leftBlock, + leftJoinKeyPosition, + leftIndex); + } + + protected boolean allLeftLessThanRight() { + return comparator.lessThan( + leftBlock, + leftJoinKeyPosition, + leftBlock.getPositionCount() - 1, + rightBlockList.get(rightBlockListIdx), + rightJoinKeyPosition, + rightIndex); + } + + private void appendResult() { + while (comparator.lessThan( + rightBlockList.get(rightBlockListIdx), + rightJoinKeyPosition, + rightIndex, + leftBlock, + leftJoinKeyPosition, + leftIndex)) { + if (rightBlockFinish()) { + return; + } + } + + int tmpBlockIdx = rightBlockListIdx, tmpIdx = rightIndex; + while (comparator.equalsTo( Review Comment: should think about time slice ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableFullOuterJoinOperator.java: ########## @@ -22,44 +22,60 @@ import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparator; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.column.BinaryColumn; +import org.apache.tsfile.read.common.block.column.BooleanColumn; +import org.apache.tsfile.read.common.block.column.DoubleColumn; +import org.apache.tsfile.read.common.block.column.FloatColumn; +import org.apache.tsfile.read.common.block.column.IntColumn; +import org.apache.tsfile.read.common.block.column.LongColumn; +import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.RamUsageEstimator; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; + public class TableFullOuterJoinOperator extends TableInnerJoinOperator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableFullOuterJoinOperator.class); private boolean leftFinished; private boolean rightFinished; - private long lastMatchedRightTime = Long.MIN_VALUE; + + private TsBlock lastMatchedRightBlock; + + private boolean lastMatchedRightBlockIsNull = true; public TableFullOuterJoinOperator( Review Comment: change it name to MergeSortFullOuterJoinOperator ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/SimpleNestedLoopCrossJoinOperator.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator.buildResultTsBlock; + +/** + * This Operator is used to implement the simple nested loop join algorithm for Cartesian product. + * It is used to join two tables, one is the probe table and the other is the build table. For now, + * the build table is assumed to be small enough to be cached in memory.(Produced by a scalar + * subquery.) Scalar subquery is always the right child of PlanNode, so we can use right child of + * JoinNode as the build table. + */ +public class SimpleNestedLoopCrossJoinOperator extends AbstractOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SimpleNestedLoopCrossJoinOperator.class); + + private final Operator probeSource; + + private final Operator buildSource; + + // cache the result of buildSource, for now, we assume that the buildChild produces a small number + // of TsBlocks + private final List<TsBlock> buildBlocks; + + private final TsBlockBuilder resultBuilder; + + private final MemoryReservationManager memoryReservationManager; + + private final int[] leftOutputSymbolIdx; + + private final int[] rightOutputSymbolIdx; + + private TsBlock cachedProbeBlock; + + private int probeIndex; + + private boolean buildFinished = false; + + public SimpleNestedLoopCrossJoinOperator( + OperatorContext operatorContext, + Operator probeSource, + Operator buildSource, + int[] leftOutputSymbolIdx, + int[] rightOutputSymbolIdx, + List<TSDataType> dataTypes) { + this.operatorContext = operatorContext; + this.probeSource = probeSource; + this.buildSource = buildSource; + this.leftOutputSymbolIdx = leftOutputSymbolIdx; + this.rightOutputSymbolIdx = rightOutputSymbolIdx; + this.buildBlocks = new ArrayList<>(); + this.resultBuilder = new TsBlockBuilder(dataTypes); + this.memoryReservationManager = + operatorContext + .getDriverContext() + .getFragmentInstanceContext() + .getMemoryReservationContext(); + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + getResultFromRetainedTsBlock(); + } + resultBuilder.reset(); + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + if (!buildFinished) { + TsBlock block = buildSource.next(); + if (block != null && !block.isEmpty()) { + buildBlocks.add(block); + memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); + } + if (!buildSource.hasNext()) { + buildFinished = true; + } + // probeSource could still be blocked by now, so we need to check it again + return null; + } + cachedProbeBlock = cachedProbeBlock == null ? probeSource.next() : cachedProbeBlock; Review Comment: ```suggestion cachedProbeBlock = cachedProbeBlock == null ? probeSource.nextWithTimer() : cachedProbeBlock; ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/SimpleNestedLoopCrossJoinOperator.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator.buildResultTsBlock; + +/** + * This Operator is used to implement the simple nested loop join algorithm for Cartesian product. + * It is used to join two tables, one is the probe table and the other is the build table. For now, + * the build table is assumed to be small enough to be cached in memory.(Produced by a scalar + * subquery.) Scalar subquery is always the right child of PlanNode, so we can use right child of + * JoinNode as the build table. + */ +public class SimpleNestedLoopCrossJoinOperator extends AbstractOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SimpleNestedLoopCrossJoinOperator.class); + + private final Operator probeSource; + + private final Operator buildSource; + + // cache the result of buildSource, for now, we assume that the buildChild produces a small number + // of TsBlocks + private final List<TsBlock> buildBlocks; + + private final TsBlockBuilder resultBuilder; + + private final MemoryReservationManager memoryReservationManager; + + private final int[] leftOutputSymbolIdx; + + private final int[] rightOutputSymbolIdx; + + private TsBlock cachedProbeBlock; + + private int probeIndex; + + private boolean buildFinished = false; + + public SimpleNestedLoopCrossJoinOperator( + OperatorContext operatorContext, + Operator probeSource, + Operator buildSource, + int[] leftOutputSymbolIdx, + int[] rightOutputSymbolIdx, + List<TSDataType> dataTypes) { + this.operatorContext = operatorContext; + this.probeSource = probeSource; + this.buildSource = buildSource; + this.leftOutputSymbolIdx = leftOutputSymbolIdx; + this.rightOutputSymbolIdx = rightOutputSymbolIdx; + this.buildBlocks = new ArrayList<>(); + this.resultBuilder = new TsBlockBuilder(dataTypes); + this.memoryReservationManager = + operatorContext + .getDriverContext() + .getFragmentInstanceContext() + .getMemoryReservationContext(); + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + getResultFromRetainedTsBlock(); + } + resultBuilder.reset(); + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + if (!buildFinished) { + TsBlock block = buildSource.next(); + if (block != null && !block.isEmpty()) { + buildBlocks.add(block); + memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); + } + if (!buildSource.hasNext()) { + buildFinished = true; + } Review Comment: ```suggestion if (!buildSource.hasNextWithTimer()) { buildFinished = true; } else { TsBlock block = buildSource.nextWithTimer(); if (block != null && !block.isEmpty()) { buildBlocks.add(block); memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); } } ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -73,22 +76,24 @@ public class TableInnerJoinOperator extends AbstractOperator { public TableInnerJoinOperator( Review Comment: Change the class name to MergeSortInnerJoinOperator ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -73,22 +76,24 @@ public class TableInnerJoinOperator extends AbstractOperator { public TableInnerJoinOperator( Review Comment: don't need to pass time slice related paramater to `prepareInput` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/SimpleNestedLoopCrossJoinOperator.java: ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.join; + +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.iotdb.db.queryengine.execution.operator.AbstractOperator; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; + +import com.google.common.util.concurrent.ListenableFuture; +import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableInnerJoinOperator.buildResultTsBlock; + +/** + * This Operator is used to implement the simple nested loop join algorithm for Cartesian product. + * It is used to join two tables, one is the probe table and the other is the build table. For now, + * the build table is assumed to be small enough to be cached in memory.(Produced by a scalar + * subquery.) Scalar subquery is always the right child of PlanNode, so we can use right child of + * JoinNode as the build table. + */ +public class SimpleNestedLoopCrossJoinOperator extends AbstractOperator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(SimpleNestedLoopCrossJoinOperator.class); + + private final Operator probeSource; + + private final Operator buildSource; + + // cache the result of buildSource, for now, we assume that the buildChild produces a small number + // of TsBlocks + private final List<TsBlock> buildBlocks; + + private final TsBlockBuilder resultBuilder; + + private final MemoryReservationManager memoryReservationManager; + + private final int[] leftOutputSymbolIdx; + + private final int[] rightOutputSymbolIdx; + + private TsBlock cachedProbeBlock; + + private int probeIndex; + + private boolean buildFinished = false; + + public SimpleNestedLoopCrossJoinOperator( + OperatorContext operatorContext, + Operator probeSource, + Operator buildSource, + int[] leftOutputSymbolIdx, + int[] rightOutputSymbolIdx, + List<TSDataType> dataTypes) { + this.operatorContext = operatorContext; + this.probeSource = probeSource; + this.buildSource = buildSource; + this.leftOutputSymbolIdx = leftOutputSymbolIdx; + this.rightOutputSymbolIdx = rightOutputSymbolIdx; + this.buildBlocks = new ArrayList<>(); + this.resultBuilder = new TsBlockBuilder(dataTypes); + this.memoryReservationManager = + operatorContext + .getDriverContext() + .getFragmentInstanceContext() + .getMemoryReservationContext(); + } + + @Override + public TsBlock next() throws Exception { + if (retainedTsBlock != null) { + getResultFromRetainedTsBlock(); + } + resultBuilder.reset(); + // start stopwatch + long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS); + long start = System.nanoTime(); + if (!buildFinished) { + TsBlock block = buildSource.next(); + if (block != null && !block.isEmpty()) { + buildBlocks.add(block); + memoryReservationManager.reserveMemoryCumulatively(block.getRetainedSizeInBytes()); + } + if (!buildSource.hasNext()) { + buildFinished = true; + } + // probeSource could still be blocked by now, so we need to check it again + return null; + } + cachedProbeBlock = cachedProbeBlock == null ? probeSource.next() : cachedProbeBlock; + if (cachedProbeBlock == null || cachedProbeBlock.isEmpty()) { + // TsBlock returned by probeSource is null or empty, we need to wait for another round + cachedProbeBlock = null; + return null; + } + while (probeIndex < cachedProbeBlock.getPositionCount() + && System.nanoTime() - start < maxRuntime) { + for (TsBlock buildBlock : buildBlocks) { + appendValueToResult(probeIndex, buildBlock); + } + probeIndex++; + } + if (probeIndex == cachedProbeBlock.getPositionCount()) { + probeIndex = 0; + cachedProbeBlock = null; + } + if (resultBuilder.isEmpty()) { + return null; + } + + resultTsBlock = buildResultTsBlock(resultBuilder); + return checkTsBlockSizeAndGetResult(); + } + + private void appendValueToResult(int probeIndex, TsBlock buildBlock) { + for (int i = 0; i < leftOutputSymbolIdx.length; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i); + for (int j = 0; j < buildBlock.getPositionCount(); j++) { + if (cachedProbeBlock.getColumn(leftOutputSymbolIdx[i]).isNull(probeIndex)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(cachedProbeBlock.getColumn(leftOutputSymbolIdx[i]), probeIndex); + } + } + } + for (int i = 0; i < rightOutputSymbolIdx.length; i++) { + ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i + leftOutputSymbolIdx.length); + for (int j = 0; j < buildBlock.getPositionCount(); j++) { + if (buildBlock.getColumn(rightOutputSymbolIdx[i]).isNull(j)) { + columnBuilder.appendNull(); + } else { + columnBuilder.write(buildBlock.getColumn(rightOutputSymbolIdx[i]), j); + } + } + } + resultBuilder.declarePositions(buildBlock.getPositionCount()); + } + + @Override + public boolean hasNext() throws Exception { + if (retainedTsBlock != null) { + return true; + } + if (!buildFinished) { + return buildSource.hasNext(); + } + return probeSource.hasNext(); + } + + @Override + public ListenableFuture<?> isBlocked() { + if (buildFinished) { + return probeSource.isBlocked(); + } + return buildSource.isBlocked(); + } + + @Override + public void close() throws Exception { + if (probeSource != null) { + probeSource.close(); + } + if (buildSource != null) { + buildSource.close(); + } + for (TsBlock block : buildBlocks) { + memoryReservationManager.releaseMemoryCumulatively(block.getRetainedSizeInBytes()); + } + buildBlocks.clear(); + cachedProbeBlock = null; + resultTsBlock = null; + retainedTsBlock = null; + } + + @Override + public boolean isFinished() throws Exception { + if (retainedTsBlock != null) { + return false; + } + + return (cachedProbeBlock == null || cachedProbeBlock.isEmpty()) + && probeSource.isFinished() + && buildFinished; Review Comment: ```suggestion if (buildFinished) { // build side is finished if (buildBlocks.isEmpty()) { // no rows in build side return true; } else if ((cachedProbeBlock == null || cachedProbeBlock.isEmpty()) && probeSource.isFinished()) { // no remaining rows in probe side is finished return true; } else { return false; } } else { // build side is not finished return false; } ``` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -73,22 +76,24 @@ public class TableInnerJoinOperator extends AbstractOperator { public TableInnerJoinOperator( Review Comment: `prepareInput` is a little bit confusing. `rightBlockList`, `cachedNextRightBlock` and `hasCachedNextRightBlock` need to be added comments, why we need an extra `cachedNextRightBlock` ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -73,22 +76,24 @@ public class TableInnerJoinOperator extends AbstractOperator { public TableInnerJoinOperator( Review Comment: if we already cached one leftTsBlock, we won't need to judge left child is blocked. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableInnerJoinOperator.java: ########## @@ -73,22 +76,24 @@ public class TableInnerJoinOperator extends AbstractOperator { public TableInnerJoinOperator( Review Comment: delete `buildResultTsBlock` method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
