Copilot commented on code in PR #54135: URL: https://github.com/apache/spark/pull/54135#discussion_r2762946181
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CrossJoinArrayContainsToInnerJoin.scala: ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.trees.TreePattern.{FILTER, JOIN} +import org.apache.spark.sql.types._ + +/** + * Converts cross joins with array_contains filter into inner joins using explode. + * + * This optimization transforms queries of the form: + * {{{ + * SELECT * FROM left, right WHERE array_contains(left.arr, right.elem) + * }}} + * + * Into a more efficient form: + * {{{ + * SELECT * FROM ( + * SELECT *, explode(array_distinct(arr)) AS unnested FROM left + * ) l + * INNER JOIN right ON l.unnested = right.elem + * }}} + * + * This avoids the O(N*M) cross join by using unnesting and equi-join. + * + * Ported from Presto's CrossJoinWithArrayContainsToInnerJoin optimizer rule. + */ +object CrossJoinArrayContainsToInnerJoin extends Rule[LogicalPlan] with PredicateHelper { + + // Supported element types for the optimization (matching Presto's supported types) + private val supportedTypes: Set[DataType] = Set( + IntegerType, LongType, StringType, DateType + ) Review Comment: The PR description states that this optimization is added as an optimizer rule that converts applicable queries automatically, but as far as I can tell this rule is not registered in `Optimizer.scala` (no references to `CrossJoinArrayContainsToInnerJoin` in the optimizer batches), so it will never run in normal query planning. Please wire this rule into the main optimizer (or adjust the description/tests accordingly) so that the behavior change actually takes effect. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/CrossJoinArrayContainsToInnerJoinBenchmark.scala: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf + +/** + * Benchmark to measure performance improvement of CrossJoinArrayContainsToInnerJoin optimization. + * + * This benchmark compares: + * 1. Cross join with array_contains filter (unoptimized) + * 2. Inner join with explode (manually optimized / what the rule produces) + * + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> + * --jars <spark core test jar>,<spark catalyst test jar> <spark sql test jar> + * 2. build/sbt "sql/Test/runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain <this class>" + * Results will be written to + * "benchmarks/CrossJoinArrayContainsToInnerJoinBenchmark-results.txt". + * }}} + */ +object CrossJoinArrayContainsToInnerJoinBenchmark extends SqlBasedBenchmark { + + import spark.implicits._ + + private def crossJoinWithArrayContains(numOrders: Int, numItems: Int, arraySize: Int): Unit = { + val benchmark = new Benchmark( + s"Cross join with array_contains ($numOrders orders, $numItems items, array size $arraySize)", + numOrders.toLong * numItems, + output = output + ) + + // Create orders table with array of item IDs + val orders = spark.range(numOrders) + .selectExpr( + "id as order_id", + s"array_repeat(cast((id % $numItems) as int), $arraySize) as item_ids" + ) + .cache() + + // Create items table + val items = spark.range(numItems) + .selectExpr("cast(id as int) as item_id", "concat('item_', id) as item_name") + .cache() + + // Force caching + orders.count() + items.count() + + // Register as temp views for SQL queries + orders.createOrReplaceTempView("orders") + items.createOrReplaceTempView("items") + + benchmark.addCase("Cross join + array_contains filter (unoptimized)", numIters = 3) { _ => + // Disable the optimization to simulate unoptimized behavior + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { Review Comment: This comment says we "Disable the optimization to simulate unoptimized behavior", but the code only toggles `CROSS_JOINS_ENABLED` and does not actually exclude `CrossJoinArrayContainsToInnerJoin` from the optimizer (for example via `SQLConf.OPTIMIZER_EXCLUDED_RULES`). Once the rule is registered in the optimizer, this benchmark case will no longer measure the true cross-join+filter baseline; consider explicitly disabling this rule for the "unoptimized" case so the reported speedups remain valid. ```suggestion withSQLConf( SQLConf.CROSS_JOINS_ENABLED.key -> "true", SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> "org.apache.spark.sql.catalyst.optimizer.CrossJoinArrayContainsToInnerJoin") { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
