maropu commented on a change in pull request #29484: URL: https://github.com/apache/spark/pull/29484#discussion_r474694854
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, EmptyHashedRelationWithAllNullKeys, HashedRelation} + +/** + * This optimization rule detects and converts a `Join` to an empty `LocalRelation`: Review comment: nit: `[[LocalRelation]]` for shortcuts. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ########## @@ -559,12 +567,16 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { * Generates the code for left semi join. */ protected def codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val HashedRelationInfo(relationTerm, keyIsUnique, _) = prepareRelation(ctx) + val HashedRelationInfo(relationTerm, keyIsUnique, isEmptyHashedRelation) = prepareRelation(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, checkCondition, _) = getJoinCondition(ctx, input) val numOutput = metricTerm(ctx, "numOutputRows") - if (keyIsUnique) { + if (isEmptyHashedRelation) { + s""" Review comment: ditto: remove `s`. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, EmptyHashedRelationWithAllNullKeys, HashedRelation} + +/** + * This optimization rule detects and converts a `Join` to an empty `LocalRelation`: + * 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation` + * is `EmptyHashedRelationWithAllNullKeys`. + * + * 2. `Join` is inner or left semi join, and broadcasted `HashedRelation` is `EmptyHashedRelation`. + */ +object OptimizeJoinToEmptyRelation extends Rule[LogicalPlan] { Review comment: I think the prefix `Optimize` is a general term, so how about keeping `Eliminate` in the prefix? e.g., `EliminateJoinHavingEmptyRelation` ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, EmptyHashedRelationWithAllNullKeys, HashedRelation} + +/** + * This optimization rule detects and converts a `Join` to an empty `LocalRelation`: + * 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation` + * is `EmptyHashedRelationWithAllNullKeys`. + * + * 2. `Join` is inner or left semi join, and broadcasted `HashedRelation` is `EmptyHashedRelation`. + */ +object OptimizeJoinToEmptyRelation extends Rule[LogicalPlan] { + + private def canEliminate( + plan: LogicalPlan, + expectedRelation: HashedRelation): Boolean = plan match { Review comment: How about reformatting it like this? ``` private def canEliminate(plan: LogicalPlan, emptyRelation: HashedRelation): Boolean = plan match { case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined && stage.broadcast.relationFuture.get().value == emptyRelation => true case _ => false } ``` ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, EmptyHashedRelationWithAllNullKeys, HashedRelation} + +/** + * This optimization rule detects and converts a `Join` to an empty `LocalRelation`: + * 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation` + * is `EmptyHashedRelationWithAllNullKeys`. Review comment: ditto: `[[EmptyHashedRelationWithAllNullKeys]]` ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, EmptyHashedRelationWithAllNullKeys, HashedRelation} + +/** + * This optimization rule detects and converts a `Join` to an empty `LocalRelation`: + * 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation` Review comment: ditto: `[[HashedRelation]]` ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ########## @@ -442,7 +446,11 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { case BuildRight => input ++ buildVars } - if (keyIsUnique) { + if (isEmptyHashedRelation) { + s""" Review comment: nit: remove `s` ########## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala ########## @@ -1168,17 +1170,35 @@ class AdaptiveQueryExecSuite } } - test("SPARK-32573: Eliminate NAAJ when BuildSide is EmptyHashedRelationWithAllNullKeys") { + test("SPARK-32573, SPARK-32649: optimize join to empty relation") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) { + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + // Test NULL-aware anti join val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)") val bhj = findTopLevelBroadcastHashJoin(plan) assert(bhj.size == 1) val join = findTopLevelBaseJoin(adaptivePlan) assert(join.isEmpty) checkNumLocalShuffleReaders(adaptivePlan) + + // Test inner and left semi join + Seq( + // inner join (small table at right side) + "SELECT * FROM testData t1 join testData3 t2 ON t1.key = t2.a WHERE t2.b = 1", + // inner join (small table at left side) + "SELECT * FROM testData3 t1 join testData t2 ON t1.a = t2.key WHERE t1.b = 1", + // left semi join + "SELECT * FROM testData t1 left semi join testData3 t2 ON t1.key = t2.a AND t2.b = 1" + ).foreach(query => { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val join = findTopLevelBaseJoin(adaptivePlan) + assert(join.isEmpty) + checkNumLocalShuffleReaders(adaptivePlan) + }) Review comment: Could you separate these new tests from this test unit? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
