alex-plekhanov commented on code in PR #11770: URL: https://github.com/apache/ignite/pull/11770#discussion_r2697247966
########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteHashJoin.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rel; + +import java.util.List; +import java.util.Set; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelInput; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost; +import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory; +import org.apache.ignite.internal.processors.query.calcite.util.Commons; + +/** */ +public class IgniteHashJoin extends AbstractIgniteJoin { + /** */ + public IgniteHashJoin( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode left, + RelNode right, + RexNode condition, + Set<CorrelationId> variablesSet, + JoinRelType joinType + ) { + super(cluster, traitSet, left, right, condition, variablesSet, joinType); + } + + /** Constructor. */ + public IgniteHashJoin(RelInput input) { + this( + input.getCluster(), + input.getTraitSet().replace(IgniteConvention.INSTANCE), + input.getInputs().get(0), + input.getInputs().get(1), + input.getExpression("condition"), + Set.copyOf(Commons.transform(input.getIntegerList("variablesSet"), CorrelationId::new)), + input.getEnum("joinType", JoinRelType.class) + ); + } + + /** {@inheritDoc} */ + @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + IgniteCostFactory costFactory = (IgniteCostFactory)planner.getCostFactory(); + + double leftRowCnt = mq.getRowCount(getLeft()); + + double rightRowCnt = mq.getRowCount(getRight()); + + if (Double.isInfinite(leftRowCnt) || Double.isInfinite(rightRowCnt)) + return planner.getCostFactory().makeInfiniteCost(); + + double rowCnt = leftRowCnt + rightRowCnt; + + int rightKeysSize = joinInfo.rightKeys.size(); + + double rightSize = rightRowCnt * IgniteCost.AVERAGE_FIELD_SIZE * getRight().getRowType().getFieldCount(); + + double distRightRows = Util.first(mq.getDistinctRowCount(right, ImmutableBitSet.of(joinInfo.rightKeys), null), 0.9 * rightRowCnt); Review Comment: Exeeds line length limit Let's introduce some constant for 0.9 ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java: ########## @@ -0,0 +1,724 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.function.BiPredicate; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeHashIndex; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** Hash join implementor. */ +public abstract class HashJoinNode<Row> extends AbstractRightMaterializedJoinNode<Row> { + /** */ + private static final int INITIAL_CAPACITY = 128; + + /** */ + private final int[] leftKeys; + + /** */ + private final int[] rightKeys; + + /** Output row handler. */ + protected final RowHandler<Row> outRowHnd; + + /** Right rows storage. */ + protected final RuntimeHashIndex<Row> rightHashStore; + + /** Uses keys of right hand to find matching left rows. */ + protected final RuntimeHashIndex<Row> remappedLeftSearcher; + + /** */ + protected Iterator<Row> rightIt = Collections.emptyIterator(); + + /** */ + @Nullable protected final BiPredicate<Row, Row> nonEqCond; + + /** + * Creates hash join node. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param keepRowsWithNull {@code True} if we need to store the row from right shoulder even if it contains NULL in + * any of join key position. This is required for joins which emit unmatched part + * of the right shoulder, such as RIGHT JOIN and FULL OUTER JOIN. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + protected HashJoinNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler<Row> outRowHnd, + boolean keepRowsWithNull, + @Nullable BiPredicate<Row, Row> nonEqCond + ) { + super(ctx, rowType); + + // For IS NOT DISTINCT we have to keep rows with null values. + if (info.hasMatchingNulls()) + keepRowsWithNull = true; + + leftKeys = info.leftKeys.toIntArray(); + rightKeys = info.rightKeys.toIntArray(); + + assert leftKeys.length == rightKeys.length; + + this.outRowHnd = outRowHnd; + + this.nonEqCond = nonEqCond; + + rightHashStore = new RuntimeHashIndex<>(ctx, info.rightKeys, keepRowsWithNull, info.hasMatchingNulls(), + INITIAL_CAPACITY, TouchedArrayList::new); + + remappedLeftSearcher = rightHashStore.remappedSearcher(leftKeys); + } + + /** {@inheritDoc} */ + @Override protected void rewindInternal() { + super.rewindInternal(); + + rightIt = Collections.emptyIterator(); + + rightHashStore.close(); + } + + /** Creates certain join node. */ + public static <RowT> HashJoinNode<RowT> create( + ExecutionContext<RowT> ctx, + RelDataType rowType, + RelDataType leftRowType, + RelDataType rightRowType, + JoinRelType type, + IgniteJoinInfo info, + @Nullable BiPredicate<RowT, RowT> nonEqCond + ) { + assert !info.pairs().isEmpty() && (info.isEqui() || type == JoinRelType.INNER || type == JoinRelType.SEMI); + assert !info.hasMatchingNulls() || info.matchingNullsCnt() == info.leftKeys.size(); + assert nonEqCond == null || type == JoinRelType.INNER || type == JoinRelType.SEMI; + + IgniteTypeFactory typeFactory = ctx.getTypeFactory(); + RowHandler<RowT> rowHnd = ctx.rowHandler(); + + switch (type) { + case INNER: + return new InnerHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case LEFT: + return new LeftHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, rightRowType)); + + case RIGHT: + return new RightHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType)); + + case FULL: { + return new FullOuterHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType), + rowHnd.factory(typeFactory, rightRowType), nonEqCond); + } + + case SEMI: + return new SemiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case ANTI: + return new AntiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + default: + throw new IllegalArgumentException("Join of type '" + type + "' isn't supported."); + } + } + + /** */ + protected Collection<Row> lookup(Row row) { + Collection<Row> res = remappedLeftSearcher.scan(() -> row).get(); + + if (res == null) + return Collections.emptyList(); + + assert res instanceof TouchedArrayList; + + ((TouchedArrayList<?>)res).touched = true; + + return res; + } + + /** */ + protected Iterator<Row> untouched() { + return F.flat(F.iterator(rightHashStore.rowSets(), c0 -> c0, true, c1 -> !((TouchedArrayList<Row>)c1).touched)); + } + + /** {@inheritDoc} */ + @Override protected void pushRight(Row row) throws Exception { + assert downstream() != null; + assert waitingRight > 0; + + checkState(); + + nodeMemoryTracker.onRowAdded(row); + + waitingRight--; + + rightHashStore.push(row); + + if (waitingRight == 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } + + /** */ + protected void requestMoreOrEnd() throws Exception { Review Comment: Perhaps reuse checkJoinFinished()/tryToRequestInputs() pattern from other join types? ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java: ########## @@ -0,0 +1,724 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.function.BiPredicate; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeHashIndex; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** Hash join implementor. */ +public abstract class HashJoinNode<Row> extends AbstractRightMaterializedJoinNode<Row> { + /** */ + private static final int INITIAL_CAPACITY = 128; + + /** */ + private final int[] leftKeys; + + /** */ + private final int[] rightKeys; + + /** Output row handler. */ + protected final RowHandler<Row> outRowHnd; + + /** Right rows storage. */ + protected final RuntimeHashIndex<Row> rightHashStore; + + /** Uses keys of right hand to find matching left rows. */ + protected final RuntimeHashIndex<Row> remappedLeftSearcher; + + /** */ + protected Iterator<Row> rightIt = Collections.emptyIterator(); + + /** */ + @Nullable protected final BiPredicate<Row, Row> nonEqCond; + + /** + * Creates hash join node. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param keepRowsWithNull {@code True} if we need to store the row from right shoulder even if it contains NULL in + * any of join key position. This is required for joins which emit unmatched part + * of the right shoulder, such as RIGHT JOIN and FULL OUTER JOIN. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + protected HashJoinNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler<Row> outRowHnd, + boolean keepRowsWithNull, + @Nullable BiPredicate<Row, Row> nonEqCond + ) { + super(ctx, rowType); + + // For IS NOT DISTINCT we have to keep rows with null values. + if (info.hasMatchingNulls()) + keepRowsWithNull = true; + + leftKeys = info.leftKeys.toIntArray(); + rightKeys = info.rightKeys.toIntArray(); + + assert leftKeys.length == rightKeys.length; + + this.outRowHnd = outRowHnd; + + this.nonEqCond = nonEqCond; + + rightHashStore = new RuntimeHashIndex<>(ctx, info.rightKeys, keepRowsWithNull, info.hasMatchingNulls(), + INITIAL_CAPACITY, TouchedArrayList::new); + + remappedLeftSearcher = rightHashStore.remappedSearcher(leftKeys); + } + + /** {@inheritDoc} */ + @Override protected void rewindInternal() { + super.rewindInternal(); + + rightIt = Collections.emptyIterator(); + + rightHashStore.close(); + } + + /** Creates certain join node. */ + public static <RowT> HashJoinNode<RowT> create( + ExecutionContext<RowT> ctx, + RelDataType rowType, + RelDataType leftRowType, + RelDataType rightRowType, + JoinRelType type, + IgniteJoinInfo info, + @Nullable BiPredicate<RowT, RowT> nonEqCond + ) { + assert !info.pairs().isEmpty() && (info.isEqui() || type == JoinRelType.INNER || type == JoinRelType.SEMI); + assert !info.hasMatchingNulls() || info.matchingNullsCnt() == info.leftKeys.size(); + assert nonEqCond == null || type == JoinRelType.INNER || type == JoinRelType.SEMI; + + IgniteTypeFactory typeFactory = ctx.getTypeFactory(); + RowHandler<RowT> rowHnd = ctx.rowHandler(); + + switch (type) { + case INNER: + return new InnerHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case LEFT: + return new LeftHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, rightRowType)); + + case RIGHT: + return new RightHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType)); + + case FULL: { + return new FullOuterHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType), + rowHnd.factory(typeFactory, rightRowType), nonEqCond); + } + + case SEMI: + return new SemiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case ANTI: + return new AntiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + default: + throw new IllegalArgumentException("Join of type '" + type + "' isn't supported."); + } + } + + /** */ + protected Collection<Row> lookup(Row row) { + Collection<Row> res = remappedLeftSearcher.scan(() -> row).get(); + + if (res == null) + return Collections.emptyList(); + + assert res instanceof TouchedArrayList; + + ((TouchedArrayList<?>)res).touched = true; + + return res; + } + + /** */ + protected Iterator<Row> untouched() { + return F.flat(F.iterator(rightHashStore.rowSets(), c0 -> c0, true, c1 -> !((TouchedArrayList<Row>)c1).touched)); + } + + /** {@inheritDoc} */ + @Override protected void pushRight(Row row) throws Exception { + assert downstream() != null; + assert waitingRight > 0; + + checkState(); + + nodeMemoryTracker.onRowAdded(row); + + waitingRight--; + + rightHashStore.push(row); + + if (waitingRight == 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } + + /** */ + protected void requestMoreOrEnd() throws Exception { + if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && leftInBuf.isEmpty() && left == null + && !rightIt.hasNext()) { + requested = 0; + + rightHashStore.close(); + + downstream().end(); + + return; + } + + if (waitingRight == 0 && requested > 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + + if (waitingLeft == 0 && leftInBuf.size() <= HALF_BUF_SIZE) + leftSource().request(waitingLeft = IN_BUFFER_SIZE - leftInBuf.size()); + } + + /** */ + private static final class InnerHashJoin<RowT> extends HashJoinNode<RowT> { + /** + * Creates node for INNER JOIN. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + private InnerHashJoin(ExecutionContext<RowT> ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler<RowT> outRowHnd, + @Nullable BiPredicate<RowT, RowT> nonEqCond + ) { + super(ctx, rowType, info, outRowHnd, false, nonEqCond); + } + + /** {@inheritDoc} */ + @Override protected void join() throws Exception { + if (waitingRight == NOT_WAITING) { + inLoop = true; + + try { + while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + // Proceed with next left row, if previous was fully processed. + if (!rightIt.hasNext()) { + left = leftInBuf.remove(); + + rightIt = lookup(left).iterator(); + } + + if (rightIt.hasNext()) { + // Emits matched rows. + while (requested > 0 && rightIt.hasNext()) { + checkState(); + + RowT right = rightIt.next(); + + if (nonEqCond != null && !nonEqCond.test(left, right)) + continue; Review Comment: We can block the thread for a long time here, not allowing other queries to proceed. We should leave after some count of rows and schedule the next task (the same for other join types). ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java: ########## @@ -0,0 +1,724 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.function.BiPredicate; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeHashIndex; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** Hash join implementor. */ +public abstract class HashJoinNode<Row> extends AbstractRightMaterializedJoinNode<Row> { + /** */ + private static final int INITIAL_CAPACITY = 128; + + /** */ + private final int[] leftKeys; + + /** */ + private final int[] rightKeys; + + /** Output row handler. */ + protected final RowHandler<Row> outRowHnd; + + /** Right rows storage. */ + protected final RuntimeHashIndex<Row> rightHashStore; + + /** Uses keys of right hand to find matching left rows. */ + protected final RuntimeHashIndex<Row> remappedLeftSearcher; + + /** */ + protected Iterator<Row> rightIt = Collections.emptyIterator(); + + /** */ + @Nullable protected final BiPredicate<Row, Row> nonEqCond; + + /** + * Creates hash join node. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param keepRowsWithNull {@code True} if we need to store the row from right shoulder even if it contains NULL in + * any of join key position. This is required for joins which emit unmatched part + * of the right shoulder, such as RIGHT JOIN and FULL OUTER JOIN. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + protected HashJoinNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler<Row> outRowHnd, + boolean keepRowsWithNull, + @Nullable BiPredicate<Row, Row> nonEqCond + ) { + super(ctx, rowType); + + // For IS NOT DISTINCT we have to keep rows with null values. + if (info.hasMatchingNulls()) + keepRowsWithNull = true; + + leftKeys = info.leftKeys.toIntArray(); + rightKeys = info.rightKeys.toIntArray(); + + assert leftKeys.length == rightKeys.length; + + this.outRowHnd = outRowHnd; + + this.nonEqCond = nonEqCond; + + rightHashStore = new RuntimeHashIndex<>(ctx, info.rightKeys, keepRowsWithNull, info.hasMatchingNulls(), + INITIAL_CAPACITY, TouchedArrayList::new); + + remappedLeftSearcher = rightHashStore.remappedSearcher(leftKeys); + } + + /** {@inheritDoc} */ + @Override protected void rewindInternal() { + super.rewindInternal(); + + rightIt = Collections.emptyIterator(); + + rightHashStore.close(); + } + + /** Creates certain join node. */ + public static <RowT> HashJoinNode<RowT> create( + ExecutionContext<RowT> ctx, + RelDataType rowType, + RelDataType leftRowType, + RelDataType rightRowType, + JoinRelType type, + IgniteJoinInfo info, + @Nullable BiPredicate<RowT, RowT> nonEqCond + ) { + assert !info.pairs().isEmpty() && (info.isEqui() || type == JoinRelType.INNER || type == JoinRelType.SEMI); + assert !info.hasMatchingNulls() || info.matchingNullsCnt() == info.leftKeys.size(); + assert nonEqCond == null || type == JoinRelType.INNER || type == JoinRelType.SEMI; + + IgniteTypeFactory typeFactory = ctx.getTypeFactory(); + RowHandler<RowT> rowHnd = ctx.rowHandler(); + + switch (type) { + case INNER: + return new InnerHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case LEFT: + return new LeftHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, rightRowType)); + + case RIGHT: + return new RightHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType)); + + case FULL: { + return new FullOuterHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType), + rowHnd.factory(typeFactory, rightRowType), nonEqCond); + } + + case SEMI: + return new SemiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case ANTI: + return new AntiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + default: + throw new IllegalArgumentException("Join of type '" + type + "' isn't supported."); + } + } + + /** */ + protected Collection<Row> lookup(Row row) { + Collection<Row> res = remappedLeftSearcher.scan(() -> row).get(); + + if (res == null) + return Collections.emptyList(); + + assert res instanceof TouchedArrayList; + + ((TouchedArrayList<?>)res).touched = true; + + return res; + } + + /** */ + protected Iterator<Row> untouched() { + return F.flat(F.iterator(rightHashStore.rowSets(), c0 -> c0, true, c1 -> !((TouchedArrayList<Row>)c1).touched)); + } + + /** {@inheritDoc} */ + @Override protected void pushRight(Row row) throws Exception { + assert downstream() != null; + assert waitingRight > 0; + + checkState(); + + nodeMemoryTracker.onRowAdded(row); + + waitingRight--; + + rightHashStore.push(row); + + if (waitingRight == 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } + + /** */ + protected void requestMoreOrEnd() throws Exception { + if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && leftInBuf.isEmpty() && left == null + && !rightIt.hasNext()) { + requested = 0; + + rightHashStore.close(); + + downstream().end(); + + return; + } + + if (waitingRight == 0 && requested > 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + + if (waitingLeft == 0 && leftInBuf.size() <= HALF_BUF_SIZE) + leftSource().request(waitingLeft = IN_BUFFER_SIZE - leftInBuf.size()); Review Comment: Let's move NestedLoopJoinNode#tryToRequestInputs to abstract class and reuse it ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/HashJoinConverterRule.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.rule; + +import java.util.EnumSet; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.PhysicalNode; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.ignite.internal.processors.query.calcite.hint.HintDefinition; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; + +/** Hash join converter rule. */ +public class HashJoinConverterRule extends AbstractIgniteJoinConverterRule { + /** */ + public static final RelOptRule INSTANCE = new HashJoinConverterRule(); + + /** */ + private static final EnumSet<JoinRelType> NON_EQ_CONDITIONS_SUPPORT = EnumSet.of(JoinRelType.INNER, JoinRelType.SEMI); + + /** Ctor. */ + private HashJoinConverterRule() { + super("HashJoinConverter", HintDefinition.HASH_JOIN); + } + + /** {@inheritDoc} */ + @Override public boolean matchesJoin(RelOptRuleCall call) { + LogicalJoin join = call.rel(0); + + IgniteJoinInfo joinInfo = IgniteJoinInfo.of(join); + + if (joinInfo.pairs().isEmpty()) + return false; + + // IS NOT DISTINCT is currently not supported simultaneously with equi conditions. + if (joinInfo.hasMatchingNulls() && joinInfo.matchingNullsCnt() != joinInfo.pairs().size()) Review Comment: Can't understand how it worked for example for `JoinIntegrationTest#testIsNotDistinctWithEquiConditionFrom` (and other) it's two key pairs in the test (IS NOT DISTINCT FROM and EQUALS) and other join converter rules are disabled. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java: ########## @@ -28,22 +29,28 @@ public class MappingRowHandler<Row> implements RowHandler<Row> { private final RowHandler<Row> delegate; /** */ - private final int[] mapping; + private final ImmutableIntList mapping; Review Comment: `ImmutableIntList` - it's an redundant wrapper here. We can leave int array and use `toIntArray()` in second constructor or use int[] as second constructor argument ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java: ########## @@ -28,22 +29,28 @@ public class MappingRowHandler<Row> implements RowHandler<Row> { private final RowHandler<Row> delegate; /** */ - private final int[] mapping; + private final ImmutableIntList mapping; - /** */ + /** Creates a mapping handler based on a bit set mapping. Keeps columns unique, but may change required colums order. */ public MappingRowHandler(RowHandler<Row> delegate, ImmutableBitSet requiredColumns) { this.delegate = delegate; - mapping = requiredColumns.toArray(); + mapping = ImmutableIntList.of(requiredColumns.toArray()); + } + + /** Creates a mapping handler based on a array mapping as is. */ Review Comment: `on a array` -> `on an array` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java: ########## @@ -0,0 +1,724 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.function.BiPredicate; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeHashIndex; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** Hash join implementor. */ +public abstract class HashJoinNode<Row> extends AbstractRightMaterializedJoinNode<Row> { + /** */ + private static final int INITIAL_CAPACITY = 128; + + /** */ + private final int[] leftKeys; + + /** */ + private final int[] rightKeys; + + /** Output row handler. */ + protected final RowHandler<Row> outRowHnd; + + /** Right rows storage. */ + protected final RuntimeHashIndex<Row> rightHashStore; + + /** Uses keys of right hand to find matching left rows. */ + protected final RuntimeHashIndex<Row> remappedLeftSearcher; + + /** */ + protected Iterator<Row> rightIt = Collections.emptyIterator(); + + /** */ + @Nullable protected final BiPredicate<Row, Row> nonEqCond; + + /** + * Creates hash join node. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param keepRowsWithNull {@code True} if we need to store the row from right shoulder even if it contains NULL in + * any of join key position. This is required for joins which emit unmatched part + * of the right shoulder, such as RIGHT JOIN and FULL OUTER JOIN. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + protected HashJoinNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler<Row> outRowHnd, + boolean keepRowsWithNull, + @Nullable BiPredicate<Row, Row> nonEqCond + ) { + super(ctx, rowType); + + // For IS NOT DISTINCT we have to keep rows with null values. + if (info.hasMatchingNulls()) + keepRowsWithNull = true; + + leftKeys = info.leftKeys.toIntArray(); + rightKeys = info.rightKeys.toIntArray(); + + assert leftKeys.length == rightKeys.length; + + this.outRowHnd = outRowHnd; + + this.nonEqCond = nonEqCond; + + rightHashStore = new RuntimeHashIndex<>(ctx, info.rightKeys, keepRowsWithNull, info.hasMatchingNulls(), + INITIAL_CAPACITY, TouchedArrayList::new); + + remappedLeftSearcher = rightHashStore.remappedSearcher(leftKeys); + } + + /** {@inheritDoc} */ + @Override protected void rewindInternal() { + super.rewindInternal(); + + rightIt = Collections.emptyIterator(); + + rightHashStore.close(); + } + + /** Creates certain join node. */ + public static <RowT> HashJoinNode<RowT> create( + ExecutionContext<RowT> ctx, + RelDataType rowType, + RelDataType leftRowType, + RelDataType rightRowType, + JoinRelType type, + IgniteJoinInfo info, + @Nullable BiPredicate<RowT, RowT> nonEqCond + ) { + assert !info.pairs().isEmpty() && (info.isEqui() || type == JoinRelType.INNER || type == JoinRelType.SEMI); + assert !info.hasMatchingNulls() || info.matchingNullsCnt() == info.leftKeys.size(); + assert nonEqCond == null || type == JoinRelType.INNER || type == JoinRelType.SEMI; + + IgniteTypeFactory typeFactory = ctx.getTypeFactory(); + RowHandler<RowT> rowHnd = ctx.rowHandler(); + + switch (type) { + case INNER: + return new InnerHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case LEFT: + return new LeftHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, rightRowType)); + + case RIGHT: + return new RightHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType)); + + case FULL: { + return new FullOuterHashJoin<>(ctx, rowType, info, rowHnd, rowHnd.factory(typeFactory, leftRowType), + rowHnd.factory(typeFactory, rightRowType), nonEqCond); + } + + case SEMI: + return new SemiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + case ANTI: + return new AntiHashJoin<>(ctx, rowType, info, rowHnd, nonEqCond); + + default: + throw new IllegalArgumentException("Join of type '" + type + "' isn't supported."); + } + } + + /** */ + protected Collection<Row> lookup(Row row) { + Collection<Row> res = remappedLeftSearcher.scan(() -> row).get(); + + if (res == null) + return Collections.emptyList(); + + assert res instanceof TouchedArrayList; + + ((TouchedArrayList<?>)res).touched = true; + + return res; + } + + /** */ + protected Iterator<Row> untouched() { + return F.flat(F.iterator(rightHashStore.rowSets(), c0 -> c0, true, c1 -> !((TouchedArrayList<Row>)c1).touched)); + } + + /** {@inheritDoc} */ + @Override protected void pushRight(Row row) throws Exception { + assert downstream() != null; + assert waitingRight > 0; + + checkState(); + + nodeMemoryTracker.onRowAdded(row); + + waitingRight--; + + rightHashStore.push(row); + + if (waitingRight == 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + } + + /** */ + protected void requestMoreOrEnd() throws Exception { + if (requested > 0 && waitingLeft == NOT_WAITING && waitingRight == NOT_WAITING && leftInBuf.isEmpty() && left == null + && !rightIt.hasNext()) { + requested = 0; + + rightHashStore.close(); + + downstream().end(); + + return; + } + + if (waitingRight == 0 && requested > 0) + rightSource().request(waitingRight = IN_BUFFER_SIZE); + + if (waitingLeft == 0 && leftInBuf.size() <= HALF_BUF_SIZE) + leftSource().request(waitingLeft = IN_BUFFER_SIZE - leftInBuf.size()); + } + + /** */ + private static final class InnerHashJoin<RowT> extends HashJoinNode<RowT> { + /** + * Creates node for INNER JOIN. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + private InnerHashJoin(ExecutionContext<RowT> ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler<RowT> outRowHnd, + @Nullable BiPredicate<RowT, RowT> nonEqCond + ) { + super(ctx, rowType, info, outRowHnd, false, nonEqCond); + } + + /** {@inheritDoc} */ + @Override protected void join() throws Exception { + if (waitingRight == NOT_WAITING) { + inLoop = true; + + try { + while (requested > 0 && (left != null || !leftInBuf.isEmpty())) { + // Proceed with next left row, if previous was fully processed. + if (!rightIt.hasNext()) { Review Comment: Looks like `left == null` is more correct condition here ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java: ########## @@ -17,48 +17,91 @@ package org.apache.ignite.internal.processors.query.calcite.exec; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; -import java.util.List; +import java.util.Map; import java.util.function.Supplier; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.ImmutableIntList; import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey; -import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** * Runtime hash index based on on-heap hash map. */ public class RuntimeHashIndex<Row> implements RuntimeIndex<Row> { + /** Allowed key for null values but matching no any other key. */ + private static final GroupKey NON_MATCHING_NULLS_KEY = new GroupKey<>(null, null) { + @Override public boolean equals(Object o) { + throw new UnsupportedOperationException("Rows with null values must not be compared at all."); Review Comment: It still used as key and can be compared, see reproducer: ``` sql("create table tbl1 (id int primary key, val int)"); sql("create table tbl2 (id int primary key, val int)"); sql("insert into tbl1 values (0, 0), (1, null)"); sql("insert into tbl2 values (0, 0), (1, null)"); sql("SELECT /*+ HASH_JOIN */ * FROM tbl1 RIGHT JOIN tbl2 ON tbl1.val = tbl2.val"); ``` ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java: ########## @@ -28,22 +29,28 @@ public class MappingRowHandler<Row> implements RowHandler<Row> { private final RowHandler<Row> delegate; /** */ - private final int[] mapping; + private final ImmutableIntList mapping; - /** */ + /** Creates a mapping handler based on a bit set mapping. Keeps columns unique, but may change required colums order. */ Review Comment: Columns order can't be changed. Perhaps this part of comment rely to second constructor. ########## modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashJoinNode.java: ########## @@ -0,0 +1,724 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.rel; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.function.BiPredicate; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeHashIndex; +import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoinInfo; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.internal.util.typedef.F; +import org.jetbrains.annotations.Nullable; + +/** Hash join implementor. */ +public abstract class HashJoinNode<Row> extends AbstractRightMaterializedJoinNode<Row> { + /** */ + private static final int INITIAL_CAPACITY = 128; + + /** */ + private final int[] leftKeys; + + /** */ + private final int[] rightKeys; + + /** Output row handler. */ + protected final RowHandler<Row> outRowHnd; + + /** Right rows storage. */ + protected final RuntimeHashIndex<Row> rightHashStore; + + /** Uses keys of right hand to find matching left rows. */ + protected final RuntimeHashIndex<Row> remappedLeftSearcher; + + /** */ + protected Iterator<Row> rightIt = Collections.emptyIterator(); + + /** */ + @Nullable protected final BiPredicate<Row, Row> nonEqCond; + + /** + * Creates hash join node. + * + * @param ctx Execution context. + * @param rowType Out row type. + * @param info Join info. + * @param outRowHnd Output row handler. + * @param keepRowsWithNull {@code True} if we need to store the row from right shoulder even if it contains NULL in + * any of join key position. This is required for joins which emit unmatched part + * of the right shoulder, such as RIGHT JOIN and FULL OUTER JOIN. + * @param nonEqCond If provided, only rows matching the predicate will be emitted as matched rows. + */ + protected HashJoinNode( + ExecutionContext<Row> ctx, + RelDataType rowType, + IgniteJoinInfo info, + RowHandler<Row> outRowHnd, + boolean keepRowsWithNull, + @Nullable BiPredicate<Row, Row> nonEqCond + ) { + super(ctx, rowType); + + // For IS NOT DISTINCT we have to keep rows with null values. + if (info.hasMatchingNulls()) + keepRowsWithNull = true; + + leftKeys = info.leftKeys.toIntArray(); + rightKeys = info.rightKeys.toIntArray(); + + assert leftKeys.length == rightKeys.length; + + this.outRowHnd = outRowHnd; + + this.nonEqCond = nonEqCond; + + rightHashStore = new RuntimeHashIndex<>(ctx, info.rightKeys, keepRowsWithNull, info.hasMatchingNulls(), Review Comment: Since we use filter only for non-eq columns, we should correctly process each "IS NOT DISTINCT FROM" condition, it's not enough to just set `allowNulls = true`, each column should be checked separately for `null` value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
